数据流转是规则引擎中处理和转换数据的常见方式。用户可以通过RabbitMQ、Kafka、MQTT、Rest Api这四种方式,将想要的数据转发出去。同时用户可以在 规则链 里面将脏数据行过滤、清洗等操作,以提高数据的质量、提高数据分析的准确性、增强数据的价值、减少冗余和重复数据。
流转的数据格式
{
"CO2": 118,
"CO": 128,
"humidity": 58.58,
"deviceId": "07190dc0-1a12-xxxx-a3a776be23dc",
"deviceProfileId": "ba900170-1xxxxx-a3a776be23dc",
"deviceName": "直连设备001",
"deviceType": "默认MQTT直连设备"
}
参数说明
- deviceId:TK的设备ID
- deviceName:设备名称
- deviceProfileId:产品的ID
- deviceType:产品名称
- 其他:上报的数据
RabbitMQ
前置条件
RabbitMQ已安装,如未安装请自行安装。
配置
在平台创建RabbitMQ进行数据转发时,建议用户先在RabbitMQ的管理页面,创建自定义的 虚拟主机、队列 和 交换机 并让它们之间形成绑定关系,然后在平台填写使用的RabbitMQ信息。
💡注意:
交换机模式:amq.direct、amp.fanout、amq.headers、amq.match、amq.rabbitmq.trace、amq.topic。队列:在创建时,使用的以上哪种模式创建的,在thingskit数据流转配置时也要输入。
RabbitMQ配置
RabbitMQ配置的顺序是:创建虚拟主机 -> 创建队列 -> 选择交换机 -> 队列与交换机进行绑定
平台配置
注意:
RabbitMQ设置的虚拟主机在平台配置时不用加“/”
推送数据并查看
数据推送,可通过以下链接进行参考:
消费
用户可以通过java编写消费者进行消费,具体示例如下:
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.4.3</version>
</dependency>
import com.rabbitmq.client.*;
import java.io.IOException;
public class SimpleConsumer {
public static void main(String[] args) throws Exception {
test1();
}
public static void test1() throws Exception {
String userName = "xx";
String password = "xx";
String virtualHost = "xxx";
String hostName = "xxxx";
int port = 5672;
ConnectionFactory factory = new ConnectionFactory();
// "guest"/"guest" by default, limited to localhost connections
factory.setUsername(userName);
factory.setPassword(password);
factory.setVirtualHost(virtualHost);
factory.setHost(hostName);
factory.setPort(port);
Connection conn = factory.newConnection();
Channel channel = conn.createChannel();
//RabbitMQ自定义的队列名称
String queueName = "xxx";
Consumer consumer =
new DefaultConsumer(channel) {
@Override
public void handleDelivery(
String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
// 消息到达 触发这个方法
String msg = new String(body, "utf-8");
System.out.println("===>" + msg);
}
};
channel.basicConsume(queueName, true, consumer);
}
}
Kafka
前置条件
Kafka已安装,如未安装请自行安装。
💡提示:
如用户不想自己创建主题,可以通过设置kafka的参数KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true来自动创建不存在的主题。
配置
KafKa配置
如果Topic已存在或者KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true则不用考虑后面的手动创建Topic
手动创建Topic(可选)
以下示例为通过docker部署的kafka:
docker exec -it kafka1 bash #kafka1 请更改为自己的容器名称或ID
kafka-topics.sh --create --bootstrap-server localhost:9092 --topic ThingsKit-KafKa --partitions 3 --replication-factor 1
kafka-topics.sh --bootstrap-server localhost:9092 --list
平台配置
推送数据并消费
💡注意:
消费时的Topic要与创建的Topic一致
数据推送,可通过以下链接进行参考:
通过Kafka创建消费者,并定义消费者的topic。以下为通过windows安装Kafka并执行的命令:
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic ThingsKit-KafKa --from-beginning
如果消费者收到以下信息代表数据流转成功,如下图:
属性说明
功能模块 | 属性名称 | 属性值说明 |
SSL | ssl.keystore.certificate.chain | |
ssl.keystore.key | ||
ssl.truststore.certificates | ||
Kafka鉴权 | ssl.endpoint.identification.algorithm | Kafka Broker是否验证主机名,例如:http |
security.protocol | Kafka Broker与客户端的通信安全协议,例如:SASL_PLAINTEXT | |
sasl.mechanism | Kafka Broker的认证机制,例如:SCRAM-SHA-512 | |
sasl.jaas.config | Kafka Broker的JAAS 认证信息,例如:org.apache.kafka.common.security.scram.ScramLoginModule required username="账号" password="密码"; |
MQTT
前置条件
MQTT Broker已安装,如未安装请自行安装或使用第三方Broker。
配置
以下示例通过 MQTTX(模拟温湿度设备)、MQTTX(模拟消费者)、EMQ(模拟MQTT Broker) 进行的模拟。
平台配置
中间件查看连接(EMQ)
消费者配置
💡注意:
消费者配置的信息说明
- 服务器地址:EMQ的访问地址
- 端口: 默认1883,示例里面进行了修改
- 用户名:EMQ的登录用户名
- 密码:EMQ的登录密码
推送数据并消费
Rest Api
💡注意:
第三方接口凭证说明如下:
- Anonymons(匿名)
- 不用访问凭证即可访问的api接口
- Basic(Basic认证)
- 使用Basic认证访问的api接口,客户端通过明文(BASE64编码格式)将用户名和密码传输到服务端进行认证。Basic加密的方式,用户名:密码。建议:在HTTPS的环境下使用
- PEM(证书认证)
- 使用CA证书的方式进行认证(推荐)
以下示例为匿名访问:
第三方平台需要定义 无需登录 可以直接访问的api接口,且接口请求方式为 POST ,参数接收用@RequestBody 具体示例如下:
@PostMapping("receive")
public void receive(@RequestBody Map<String,Object> params){
//编写数据处理代码
}