# AMQP客户端接入说明

如果需要将设备消息转发至AMQP客户端进行消费,您需在业务服务器中使用AMQP SDK开启客户端连接物联网平台。AMQP客户端接入成功并在业务服务器运行,才能接收物联网平台的设备消息。本文介绍AMQP客户端接入物联网平台的方法。

# 前提条件

  • 已创建消费组:使用AMQP SDK 开启 AMQP 客户端以消费消费组内的订阅消息时,必须指定消费组 ID。
  • 已创建订阅并选择推送消息类型。

# AMQP SDK 下载

# 获取SDK配置参数

  • AccessKeyId:AccessKeyId,登录平台后,在右上角用户信息处获取。
  • AccessKeySecret:AccessKeySecret,登录平台后,在右上角用户信息处获取。
  • instanceId:当前物联网平台实例的ID。您可在物联网平台控制台的实例概览页签,查看当前实例的ID。
  • groupId:当前物联网平台对应实例中的消费组ID。登录物联网平台控制台,在对应实例的消息转发 > 服务端订阅 > 消费组列表查看您的消费组ID。
  • clientId:表示客户端ID,需您自定义,长度最大 30 位。
  • region:区域,根据服务所在区域,填写支持的区域文档中对应的Region ID。
  • needQueue:指定要监听的消息类型采用 {productKey}.{type} 格式组装,示例:qYJ3iFdY***.up_raw
    • 如需按类型筛选监听,请使用 AmqpClient.consumerNeedQueue 方法
    • 如需监听所有已创建订阅的消息,请使用 AmqpClient.consumer 方法
    • 其中 {productKey} 的值为需要订阅产品的productKey
    • 其中 {type} 的可选值包括:
      • 透传上报: up_raw
      • 设备属性上报: property_post
      • 设备事件上报: event_post
      • 自定义 Topic: custom_topic
      • OTA模块版本号变更通知: ota_version_post
      • OTA升级进度通知: ota_progress_post
      • 属性设置、服务调用响应: downlink_reply
      • 设备状态变化通知: status_change

# 注意事项

  • 当一个消费组下启动单个SDK客户端时,修改平台侧的订阅消息类型后,客户端会自主动态更新订阅消息类型,客户端无需重新启动。
  • 当一个消费组下启动多个个SDK客户端时,修改平台侧的订阅消息类型后,需要将各个客户端重新启动,才能保证客户端都能订阅接收到新增的消息类型。

# JAVA-SDK示例代码

示例demo (opens new window)

package com.fenydata.amqpdemo;

import cn.hutool.json.JSONUtil;
import com.rabbitmq.client.DeliverCallback;
import org.iot.AmqpClient;
import org.iot.dto.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Configuration;

import javax.annotation.PostConstruct;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

@Configuration
public class SdkDemo {

  private final static Logger logger = LoggerFactory.getLogger(SdkDemo.class);

  //业务处理异步线程池,线程池参数可以根据您的业务特点调整,或者您也可以用其他异步方式处理接收到的消息。
  private final static ExecutorService executorService = new ThreadPoolExecutor(
    Runtime.getRuntime().availableProcessors(),
    Runtime.getRuntime().availableProcessors() * 2, 60, TimeUnit.SECONDS,
    new LinkedBlockingQueue<>(50000));

  @PostConstruct
  public void init() {
    callback("2025-client-a");
  }

  public void callback(String clientId) {
    logger.info("服务启动开始监听消息------------------------------------------------");

    DeliverCallback deliverCallback = (consumerTag, delivery) -> {
      try {

        String message = new String(delivery.getBody(), StandardCharsets.UTF_8);

        // 1.建议异步处理收到的消息,确保没有耗时逻辑。
        // 如果业务处理耗时过程过长阻塞住线程,可能会可能会有oom发生。
        executorService.submit(new Runnable() {
          @Override
          public void run() {
            processMessage(message);
          }
        });


      } catch (Exception e) {
        logger.error("sdk收到消息,处理消息异常:{}", e.getMessage());
      }
    };

    // 监听所有队列的消息
    AmqpClient.consumer("AccessKeyId", "AccessKeySecret", "实例id", "消费组id", clientId, deliverCallback, "地区");

    // 监听指定队列的消息
    // List<String> needQueue = new ArrayList<>();
    // needQueue.add("qYJ3iFdY***.up_raw"); // 透传上报
    // AmqpClient.consumerNeedQueue("AccessKeyId", "AccessKeySecret", "实例id", "消费组id", clientId, deliverCallback, "地区", needQueue);

  }

  /**
   * 在这里处理您收到消息后的具体业务逻辑。
   */
  private static void processMessage(String message) {
    try {
      logger.info("收到消息,message:{}", message);
      Message msg = JSONUtil.toBean(message, Message.class);
      // TODO: 在这里处理您收到消息后的具体业务逻辑。
      System.out.println("generateTime:" + msg.getGenerateTime());
      System.out.println("traceId:" + msg.getTraceId());
      System.out.println("messageId:" + msg.getMessageId());
      System.out.println("instanceId:" + msg.getInstanceId());
      System.out.println("productKey:" + msg.getProductKey());
      System.out.println("deviceName:" + msg.getDeviceName());
      System.out.println("topic:" + msg.getTopic());
      System.out.println("content:" + new String(msg.getContent())); // 默认使用此方式
      // System.out.println("content:" + HexUtil.encodeHexStr(msg.getContent())); // 透传/自定义Topic且数据为二进制,使用本方式

      System.out.println("------------------------------------------------");

      // 设备上下线消息content结构:org.iot.dto.ContentOfOnlineOffLine
      // 设备上报物模型属性消息content结构:org.iot.dto.ContentOfProperty
      // 设备上报物模型事件消息content结构:org.iot.dto.ContentOfEvent
    } catch (Exception e) {
      logger.error("error ", e);
    }
  }
}