数据流转的使用_物联网平台_边缘计算网关

数据流转是规则引擎中处理和转换数据的常见方式。用户可以通过RabbitMQKafkaMQTTRest Api这四种方式,将想要的数据转发出去。同时用户可以在 规则链 里面将脏数据行过滤、清洗等操作,以提高数据的质量、提高数据分析的准确性、增强数据的价值、减少冗余和重复数据。

流转的数据格式

{
	"CO2": 118,
	"CO": 128,
	"humidity": 58.58,
	"deviceId": "07190dc0-1a12-xxxx-a3a776be23dc",
	"deviceProfileId": "ba900170-1xxxxx-a3a776be23dc",
	"deviceName": "直连设备001",
	"deviceType": "默认MQTT直连设备"
}

参数说明

  • deviceId:TK的设备ID
  • deviceName:设备名称
  • deviceProfileId:产品的ID
  • deviceType:产品名称
  • 其他:上报的数据

RabbitMQ

前置条件

RabbitMQ已安装,如未安装请自行安装。

配置

在平台创建RabbitMQ进行数据转发时,建议用户先在RabbitMQ的管理页面,创建自定义的 虚拟主机队列 交换机 并让它们之间形成绑定关系然后在平台填写使用的RabbitMQ信息。

💡注意:

交换机模式:amq.direct、amp.fanout、amq.headers、amq.match、amq.rabbitmq.trace、amq.topic。队列:在创建时,使用的以上哪种模式创建的,在thingskit数据流转配置时也要输入。

RabbitMQ配置

RabbitMQ配置的顺序是:创建虚拟主机 -> 创建队列 -> 选择交换机 -> 队列与交换机进行绑定

平台配置

注意:

RabbitMQ设置的虚拟主机在平台配置时不用加“/”

推送数据并查看

数据推送,可通过以下链接进行参考:

此处为语雀内容卡片,点击链接查看:https://yunteng.yuque.com/avshoi/v2xdocs/bnxw31sunnh1ewt2#zkvz1

消费

用户可以通过java编写消费者进行消费,具体示例如下:

<dependency>
    	<groupId>com.rabbitmq</groupId>
    	<artifactId>amqp-client</artifactId>
    	<version>5.4.3</version>
</dependency>
import com.rabbitmq.client.*;

import java.io.IOException;

public class SimpleConsumer {
  public static void main(String[] args) throws Exception {
    test1();
  }

  public static void test1() throws Exception {
    String userName = "xx";
    String password = "xx";
    String virtualHost = "xxx";
    String hostName = "xxxx";
    int port = 5672;
    ConnectionFactory factory = new ConnectionFactory();
    // "guest"/"guest" by default, limited to localhost connections
    factory.setUsername(userName);
    factory.setPassword(password);
    factory.setVirtualHost(virtualHost);
    factory.setHost(hostName);
    factory.setPort(port);

    Connection conn = factory.newConnection();
    Channel channel = conn.createChannel();
    //RabbitMQ自定义的队列名称
    String queueName = "xxx";
    Consumer consumer =
        new DefaultConsumer(channel) {
          @Override
          public void handleDelivery(
              String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
              throws IOException {
            // 消息到达 触发这个方法
            String msg = new String(body, "utf-8");
            System.out.println("===>" + msg);
          }
        };
    channel.basicConsume(queueName, true, consumer);
  }
}

Kafka

前置条件

Kafka已安装,如未安装请自行安装。

💡提示:

如用户不想自己创建主题,可以通过设置kafka的参数KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true来自动创建不存在的主题。

配置

KafKa配置

如果Topic已存在或者KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true则不用考虑后面的手动创建Topic

手动创建Topic(可选)

以下示例为通过docker部署的kafka:

docker exec -it kafka1 bash #kafka1 请更改为自己的容器名称或ID
kafka-topics.sh --create  --bootstrap-server localhost:9092 --topic ThingsKit-KafKa --partitions 3 --replication-factor 1
kafka-topics.sh --bootstrap-server localhost:9092 --list

平台配置

推送数据并消费

💡注意:

消费时的Topic要与创建的Topic一致

数据推送,可通过以下链接进行参考:

此处为语雀内容卡片,点击链接查看:https://yunteng.yuque.com/avshoi/v2xdocs/bnxw31sunnh1ewt2#zkvz1

通过Kafka创建消费者,并定义消费者的topic。以下为通过windows安装Kafka并执行的命令:

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic ThingsKit-KafKa --from-beginning

如果消费者收到以下信息代表数据流转成功,如下图:

属性说明

功能模块

属性名称

属性值说明

SSL

ssl.keystore.certificate.chain

ssl.keystore.key

ssl.truststore.certificates

Kafka鉴权

ssl.endpoint.identification.algorithm

Kafka Broker是否验证主机名,例如:http

security.protocol

Kafka Broker与客户端的通信安全协议,例如:SASL_PLAINTEXT

sasl.mechanism

Kafka Broker的认证机制,例如:SCRAM-SHA-512

sasl.jaas.config

Kafka Broker的JAAS 认证信息,例如:org.apache.kafka.common.security.scram.ScramLoginModule required username="账号" password="密码";

MQTT

前置条件

MQTT Broker已安装,如未安装请自行安装或使用第三方Broker。

配置

以下示例通过 MQTTX(模拟温湿度设备)MQTTX(模拟消费者)EMQ(模拟MQTT Broker) 进行的模拟。

平台配置

中间件查看连接(EMQ)

消费者配置

💡注意:

消费者配置的信息说明

    • 服务器地址:EMQ的访问地址
    • 端口: 默认1883,示例里面进行了修改
    • 用户名:EMQ的登录用户名
    • 密码:EMQ的登录密码

推送数据并消费

Rest Api

💡注意:

第三方接口凭证说明如下:

    • Anonymons(匿名)
      • 不用访问凭证即可访问的api接口
    • Basic(Basic认证)
      • 使用Basic认证访问的api接口,客户端通过明文(BASE64编码格式)将用户名和密码传输到服务端进行认证。Basic加密的方式,用户名:密码。建议:在HTTPS的环境下使用
    • PEM(证书认证)
      • 使用CA证书的方式进行认证(推荐)

以下示例为匿名访问:

第三方平台需要定义 无需登录 可以直接访问的api接口,且接口请求方式为 POST ,参数接收用@RequestBody 具体示例如下:

@PostMapping("receive")
public void receive(@RequestBody Map<String,Object> params){
    //编写数据处理代码
}
本文通过 YUQUE WORDPRESS 同步自语雀
云腾五洲-AI助理