# AMQP客户端接入说明
如果需要将设备消息转发至AMQP客户端进行消费,您需在业务服务器中使用AMQP SDK开启客户端连接物联网平台。AMQP客户端接入成功并在业务服务器运行,才能接收物联网平台的设备消息。本文介绍AMQP客户端接入物联网平台的方法。
# 前提条件
- 已创建消费组:使用AMQP SDK 开启 AMQP 客户端以消费消费组内的订阅消息时,必须指定消费组 ID。
- 已创建订阅并选择推送消息类型。
# AMQP SDK 下载
- java版本:amqp-sdk-3.1.5-jar-with-dependencies.jar (opens new window)
- php版本:php-amqp-sdk-3.1.3.zip (opens new window)
- python版本:python-amqp-sdk-3.1.3.zip (opens new window)
- node版本:node-amqp-sdk-3.1.3.zip (opens new window)
- .net版本:dotnet-amqp-sdk-3.1.3.zip (opens new window)
- go版本:go-amqp-sdk-3.1.3.zip (opens new window)
# 获取SDK配置参数
- AccessKeyId:AccessKeyId,登录平台后,在右上角用户信息处获取。
- AccessKeySecret:AccessKeySecret,登录平台后,在右上角用户信息处获取。
- instanceId:当前物联网平台实例的ID。您可在物联网平台控制台的实例概览页签,查看当前实例的ID。
- groupId:当前物联网平台对应实例中的消费组ID。登录物联网平台控制台,在对应实例的消息转发 > 服务端订阅 > 消费组列表查看您的消费组ID。
- clientId:表示客户端ID,需您自定义,长度最大 30 位。
- region:区域,根据服务所在区域,填写支持的区域文档中对应的Region ID。
- needQueue:指定要监听的消息类型采用 {productKey}.{type} 格式组装,示例:qYJ3iFdY***.up_raw
- 如需按类型筛选监听,请使用 AmqpClient.consumerNeedQueue 方法
- 如需监听所有已创建订阅的消息,请使用 AmqpClient.consumer 方法
- 其中 {productKey} 的值为需要订阅产品的productKey
- 其中 {type} 的可选值包括:
- 透传上报: up_raw
- 设备属性上报: property_post
- 设备事件上报: event_post
- 自定义 Topic: custom_topic
- OTA模块版本号变更通知: ota_version_post
- OTA升级进度通知: ota_progress_post
- 属性设置、服务调用响应: downlink_reply
- 设备状态变化通知: status_change
# 注意事项
- 当一个消费组下启动单个SDK客户端时,修改平台侧的订阅消息类型后,客户端会自主动态更新订阅消息类型,客户端无需重新启动。
- 当一个消费组下启动多个个SDK客户端时,修改平台侧的订阅消息类型后,需要将各个客户端重新启动,才能保证客户端都能订阅接收到新增的消息类型。
# JAVA-SDK示例代码
package com.fenydata.amqpdemo;
import cn.hutool.json.JSONUtil;
import com.rabbitmq.client.DeliverCallback;
import org.iot.AmqpClient;
import org.iot.dto.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@Configuration
public class SdkDemo {
private final static Logger logger = LoggerFactory.getLogger(SdkDemo.class);
//业务处理异步线程池,线程池参数可以根据您的业务特点调整,或者您也可以用其他异步方式处理接收到的消息。
private final static ExecutorService executorService = new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors(),
Runtime.getRuntime().availableProcessors() * 2, 60, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(50000));
@PostConstruct
public void init() {
callback("2025-client-a");
}
public void callback(String clientId) {
logger.info("服务启动开始监听消息------------------------------------------------");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
try {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
// 1.建议异步处理收到的消息,确保没有耗时逻辑。
// 如果业务处理耗时过程过长阻塞住线程,可能会可能会有oom发生。
executorService.submit(new Runnable() {
@Override
public void run() {
processMessage(message);
}
});
} catch (Exception e) {
logger.error("sdk收到消息,处理消息异常:{}", e.getMessage());
}
};
// 监听所有队列的消息
AmqpClient.consumer("AccessKeyId", "AccessKeySecret", "实例id", "消费组id", clientId, deliverCallback, "地区");
// 监听指定队列的消息
// List<String> needQueue = new ArrayList<>();
// needQueue.add("qYJ3iFdY***.up_raw"); // 透传上报
// AmqpClient.consumerNeedQueue("AccessKeyId", "AccessKeySecret", "实例id", "消费组id", clientId, deliverCallback, "地区", needQueue);
}
/**
* 在这里处理您收到消息后的具体业务逻辑。
*/
private static void processMessage(String message) {
try {
logger.info("收到消息,message:{}", message);
Message msg = JSONUtil.toBean(message, Message.class);
// TODO: 在这里处理您收到消息后的具体业务逻辑。
System.out.println("generateTime:" + msg.getGenerateTime());
System.out.println("traceId:" + msg.getTraceId());
System.out.println("messageId:" + msg.getMessageId());
System.out.println("instanceId:" + msg.getInstanceId());
System.out.println("productKey:" + msg.getProductKey());
System.out.println("deviceName:" + msg.getDeviceName());
System.out.println("topic:" + msg.getTopic());
System.out.println("content:" + new String(msg.getContent())); // 默认使用此方式
// System.out.println("content:" + HexUtil.encodeHexStr(msg.getContent())); // 透传/自定义Topic且数据为二进制,使用本方式
System.out.println("------------------------------------------------");
// 设备上下线消息content结构:org.iot.dto.ContentOfOnlineOffLine
// 设备上报物模型属性消息content结构:org.iot.dto.ContentOfProperty
// 设备上报物模型事件消息content结构:org.iot.dto.ContentOfEvent
} catch (Exception e) {
logger.error("error ", e);
}
}
}