更新时间:2024-07-29 GMT+08:00

Java SDK接入示例

本文介绍使用AMQP协议的JMS客户端接入华为云物联网平台,接收服务端订阅消息的示例。

开发环境要求

本示例使用的开发环境为JDK 1.8及以上版本。

获取SDK

AMQP SDK为开源SDK。如果您使用Java开发语言,推荐使用Apache Qpid JMS客户端。请访问Qpid JMS下载客户端和查看使用说明。

添加Maven依赖

<!-- amqp 1.0 qpid client -->
 <dependency>
   <groupId>org.apache.qpid</groupId>
   <artifactId>qpid-jms-client</artifactId>
   <version>0.61.0</version>
 </dependency>

代码示例

您可以单击这里获取Java SDK接入示例,Demo中涉及的参数说明,请参考AMQP客户端接入说明

所有示例代码已经包含与服务端断线重连的逻辑。

示例代码中用到的AmqpClient.java、AmqpClientOptions.java、AmqpConstants.java可以从这里获取。

1、创建AmqpClient。

// 以下参数请修改为自己的参数值
AmqpClientOptions options = AmqpClientOptions.builder()
                .host(AmqpConstants.HOST)
                .port(AmqpConstants.PORT)
                .accessKey(AmqpConstants.ACCESS_KEY)
                .accessCode(AmqpConstants.ACCESS_CODE)
                .queuePrefetch(1000) // sdk会在内存中分配该参数大小的队列,用来接收消息,客户端内存较小的情况可以调小该参数。
                .build();
        AmqpClient amqpClient = new AmqpClient(options);
        amqpClient.initialize();

2、通过设置listener消费amqp消息。

 try {
     MessageConsumer consumer = amqpClient.newConsumer(AmqpConstants.DEFAULT_QUEUE);
     consumer.setMessageListener(message -> {
         try {
             // 此处进行消息处理。如果处理比较耗时,最好进行开启新的线程处理,否则可能造成心跳超时链接断开。
             processMessage(message.getBody(String.class));
             // 如果options.isAutoAcknowledge==false,此处应该调用message.acknowledge();
         } catch (Exception e) {
             log.warn("message.getBody error,exception is ", e);
         }
     });
 } catch (Exception e) {
     log.warn("Consumer initialize error,", e);
 }

3、主动拉取amqp消息。

  // 创建一个线程池用来拉取消息
  ExecutorService executorService = new ThreadPoolExecutor(1, 1, 60, TimeUnit.SECONDS,new LinkedBlockingQueue<>(1));

      try {
            MessageConsumer consumer = amqpClient.newConsumer(AmqpConstants.DEFAULT_QUEUE);
            executorService.execute(() -> {
                while (!isClose.get()) {
                    try {
                        Message message = consumer.receive();
                        // 此处进行消息处理。如果处理比较耗时,最好进行开启新的线程处理,否则可能造成心跳超时链接断开。
                        processMessage(message.getBody(String.class));
                        // 如果options.isAutoAcknowledge==false,此处应该调用message.acknowledge();
                    } catch (JMSException e) {
                        log.warn("receive message error,", e);
                    }
                }
            });
        } catch (Exception e) {
            log.warn("Consumer initialize error,", e);
        }

4、更多消费amqp消息的demo,请参考Java SDK接入示例工程。

资源

AmqpClient.java

package com.iot.amqp;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.qpid.jms.JmsConnection;
import org.apache.qpid.jms.JmsConnectionExtensions;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.apache.qpid.jms.JmsQueue;
import org.apache.qpid.jms.transports.TransportOptions;
import org.apache.qpid.jms.transports.TransportSupport;

import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;

@Slf4j
public class AmqpClient {
    private final AmqpClientOptions options;
    private Connection connection;
    private Session session;
    private final Set<MessageConsumer> consumerSet = Collections.synchronizedSet(new HashSet<>());

    public AmqpClient(AmqpClientOptions options) {
        this.options = options;
    }

    public String getId() {
        return options.getClientId();
    }

    public void initialize() throws Exception {
        String connectionUrl = options.generateConnectUrl();
        log.info("connectionUrl={}", connectionUrl);
        JmsConnectionFactory cf = new JmsConnectionFactory(connectionUrl);
        // 信任服务端
        TransportOptions to = new TransportOptions();
        to.setTrustAll(true);
        cf.setSslContext(TransportSupport.createJdkSslContext(to));
        String userName = "accessKey=" + options.getAccessKey();
        cf.setExtension(JmsConnectionExtensions.USERNAME_OVERRIDE.toString(), (connection, uri) -> {
            String newUserName = userName;
            if (connection instanceof JmsConnection) {
                newUserName = ((JmsConnection) connection).getUsername();
            }
            if (StringUtils.isEmpty(options.getInstanceId())) {
            // IoTDA的userName组成格式如下:“accessKey=${accessKey}|timestamp=${timestamp}”
                return newUserName + "|timestamp=" + System.currentTimeMillis();
            } else {
            // 同一region购买多个标准版时userName组成格式为“accessKey=${accessKey}|timestamp=${timestamp}|instanceId=${instanceId}”
                return newUserName + "|timestamp=" + System.currentTimeMillis() + "|instanceId=" + options.getInstanceId();
            }
        });
        // 创建连接
        connection = cf.createConnection(userName, options.getAccessCode());
        // 创建 Session, Session.CLIENT_ACKNOWLEDGE: 收到消息后,需要手动调用message.acknowledge()。Session.AUTO_ACKNOWLEDGE: SDK自动ACK(推荐)。
        session = connection.createSession(false, options.isAutoAcknowledge() ? Session.AUTO_ACKNOWLEDGE : Session.CLIENT_ACKNOWLEDGE);
        connection.start();
    }

    public MessageConsumer newConsumer(String queueName) throws Exception {
        if (connection == null || !(connection instanceof JmsConnection) || ((JmsConnection) connection).isClosed()) {
            throw new Exception("create consumer failed,the connection is disconnected.");
        }
        MessageConsumer consumer;

        consumer = session.createConsumer(new JmsQueue(queueName));
        if (consumer != null) {
            consumerSet.add(consumer);
        }
        return consumer;
    }

    public void close() {
        consumerSet.forEach(consumer -> {
            try {
                consumer.close();
            } catch (JMSException e) {
                log.warn("consumer close error,exception is ", e);
            }
        });

        if (session != null) {
            try {
                session.close();
            } catch (JMSException e) {
                log.warn("session close error,exception is ", e);
            }
        }

        if (connection != null) {
            try {
                connection.close();
            } catch (JMSException e) {
                log.warn("connection close error,exception is", e);
            }
        }
    }
}

AmqpClientOptions.java

package com.iot.amqp;

import lombok.Builder;
import lombok.Data;
import org.apache.commons.lang3.StringUtils;

import java.text.MessageFormat;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;

@Data
@Builder
public class AmqpClientOptions {
    private String host;
    @Builder.Default
    private int port = 5671;
    private String accessKey;
    private String accessCode;
    private String clientId;

    /**
     * 实例Id信息,同一个Region购买多个标准版实例时需设置
     */
    private String instanceId;

    /**
     * 仅支持true
     */
    @Builder.Default
    private boolean useSsl = true;

    /**
     * IoTDA仅支持default
     */
    @Builder.Default
    private String vhost = "default";

    /**
     * IoTDA仅支持PLAIN
     */
    @Builder.Default
    private String saslMechanisms = "PLAIN";

    /**
     * true: SDK自动ACK(默认)
     * false:收到消息后,需要手动调用message.acknowledge()
     */
    @Builder.Default
    private boolean isAutoAcknowledge = true;

    /**
     * 重连时延(ms)
     */
    @Builder.Default
    private long reconnectDelay = 3000L;

    /**
     * 最大重连时延(ms),随着重连次数增加重连时延逐渐增加
     */
    @Builder.Default
    private long maxReconnectDelay = 30 * 1000L;

    /**
     * 最大重连次数,默认值-1,代表没有限制
     */
    @Builder.Default
    private long maxReconnectAttempts = -1;

    /**
     * 空闲超时,对端在这个时间段内没有发送AMQP帧则会导致连接断开。默认值为30000。单位:毫秒。
     */
    @Builder.Default
    private long idleTimeout = 30 * 1000L;

    /**
     * The values below control how many messages the remote peer can send to the client and be held in a pre-fetch buffer for each consumer instance.
     */
    @Builder.Default
    private int queuePrefetch = 1000;

    /**
     * 扩展参数
     */
    private Map<String, String> extendedOptions;

    public String generateConnectUrl() {
        String uri = MessageFormat.format("{0}://{1}:{2}", (useSsl ? "amqps" : "amqp"), host, String.valueOf(port));
        Map<String, String> uriOptions = new HashMap<>();
        uriOptions.put("amqp.vhost", vhost);
        uriOptions.put("amqp.idleTimeout", String.valueOf(idleTimeout));
        uriOptions.put("amqp.saslMechanisms", saslMechanisms);

        Map<String, String> jmsOptions = new HashMap<>();
        jmsOptions.put("jms.prefetchPolicy.queuePrefetch", String.valueOf(queuePrefetch));
        if (StringUtils.isNotEmpty(clientId)) {
            jmsOptions.put("jms.clientID", clientId);
        } else {
            jmsOptions.put("jms.clientID", UUID.randomUUID().toString());
        }
        jmsOptions.put("failover.reconnectDelay", String.valueOf(reconnectDelay));
        jmsOptions.put("failover.maxReconnectDelay", String.valueOf(maxReconnectDelay));
        if (maxReconnectAttempts > 0) {
            jmsOptions.put("failover.maxReconnectAttempts", String.valueOf(maxReconnectAttempts));
        }
        if (extendedOptions != null) {
            for (Map.Entry<String, String> option : extendedOptions.entrySet()) {
                if (option.getKey().startsWith("amqp.") || option.getKey().startsWith("transport.")) {
                    uriOptions.put(option.getKey(), option.getValue());
                } else {
                    jmsOptions.put(option.getKey(), option.getValue());
                }
            }
        }
        StringBuilder stringBuilder = new StringBuilder();
        stringBuilder.append(uriOptions.entrySet().stream()
                .map(option -> MessageFormat.format("{0}={1}", option.getKey(), option.getValue()))
                .collect(Collectors.joining("&", "failover:(" + uri + "?", ")")));
        stringBuilder.append(jmsOptions.entrySet().stream()
                .map(option -> MessageFormat.format("{0}={1}", option.getKey(), option.getValue()))
                .collect(Collectors.joining("&", "?", "")));
        return stringBuilder.toString();
    }
}

AmqpConstants.java

package com.iot.amqp;


public interface AmqpConstants {
    /**
     * AMQP接入域名
     * eg: "****.iot-amqps.cn-north-4.myhuaweicloud.com";
     */
    String HOST = "127.0.0.1";   

    /**
     * AMQP接入端口
     */
    int PORT = 5671;

    /**
     * 接入凭证键值
     * 不需要拼接时间戳timestamp
     */
    String ACCESS_KEY = "accessKey";

    /**
     * 接入凭证密钥
     */
    String ACCESS_CODE = "accessCode";

    /**
     * 默认队列
     */
    String DEFAULT_QUEUE = "DefaultQueue";
}