Java SDK接入示例
本文介绍使用AMQP协议的JMS客户端接入华为云物联网平台,接收服务端订阅消息的示例。
开发环境要求
本示例使用的开发环境为JDK 1.8及以上版本。
获取SDK
AMQP SDK为开源SDK。如果您使用Java开发语言,推荐使用Apache Qpid JMS客户端。请访问Qpid JMS下载客户端和查看使用说明。
添加Maven依赖
<!-- amqp 1.0 qpid client --> <dependency> <groupId>org.apache.qpid</groupId> <artifactId>qpid-jms-client</artifactId> <version>0.61.0</version> </dependency>
代码示例
您可以单击这里获取Java SDK接入示例,Demo中涉及的参数说明,请参考AMQP客户端接入说明。
所有示例代码已经包含与服务端断线重连的逻辑。
示例代码中用到的AmqpClient.java、AmqpClientOptions.java、AmqpConstants.java可以从这里获取。
1、创建AmqpClient。
// 以下参数请修改为自己的参数值 AmqpClientOptions options = AmqpClientOptions.builder() .host(AmqpConstants.HOST) .port(AmqpConstants.PORT) .accessKey(AmqpConstants.ACCESS_KEY) .accessCode(AmqpConstants.ACCESS_CODE) .queuePrefetch(1000) // sdk会在内存中分配该参数大小的队列,用来接收消息,客户端内存较小的情况可以调小该参数。 .build(); AmqpClient amqpClient = new AmqpClient(options); amqpClient.initialize();
2、通过设置listener消费amqp消息。
try { MessageConsumer consumer = amqpClient.newConsumer(AmqpConstants.DEFAULT_QUEUE); consumer.setMessageListener(message -> { try { // 此处进行消息处理。如果处理比较耗时,最好进行开启新的线程处理,否则可能造成心跳超时链接断开。 processMessage(message.getBody(String.class)); // 如果options.isAutoAcknowledge==false,此处应该调用message.acknowledge(); } catch (Exception e) { log.warn("message.getBody error,exception is ", e); } }); } catch (Exception e) { log.warn("Consumer initialize error,", e); }
3、主动拉取amqp消息。
// 创建一个线程池用来拉取消息 ExecutorService executorService = new ThreadPoolExecutor(1, 1, 60, TimeUnit.SECONDS,new LinkedBlockingQueue<>(1)); try { MessageConsumer consumer = amqpClient.newConsumer(AmqpConstants.DEFAULT_QUEUE); executorService.execute(() -> { while (!isClose.get()) { try { Message message = consumer.receive(); // 此处进行消息处理。如果处理比较耗时,最好进行开启新的线程处理,否则可能造成心跳超时链接断开。 processMessage(message.getBody(String.class)); // 如果options.isAutoAcknowledge==false,此处应该调用message.acknowledge(); } catch (JMSException e) { log.warn("receive message error,", e); } } }); } catch (Exception e) { log.warn("Consumer initialize error,", e); }
4、更多消费amqp消息的demo,请参考Java SDK接入示例工程。
资源
AmqpClient.java
package com.iot.amqp; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.qpid.jms.JmsConnection; import org.apache.qpid.jms.JmsConnectionExtensions; import org.apache.qpid.jms.JmsConnectionFactory; import org.apache.qpid.jms.JmsQueue; import org.apache.qpid.jms.transports.TransportOptions; import org.apache.qpid.jms.transports.TransportSupport; import javax.jms.Connection; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.Session; import java.util.Collections; import java.util.HashSet; import java.util.Set; @Slf4j public class AmqpClient { private final AmqpClientOptions options; private Connection connection; private Session session; private final Set<MessageConsumer> consumerSet = Collections.synchronizedSet(new HashSet<>()); public AmqpClient(AmqpClientOptions options) { this.options = options; } public String getId() { return options.getClientId(); } public void initialize() throws Exception { String connectionUrl = options.generateConnectUrl(); log.info("connectionUrl={}", connectionUrl); JmsConnectionFactory cf = new JmsConnectionFactory(connectionUrl); // 信任服务端 TransportOptions to = new TransportOptions(); to.setTrustAll(true); cf.setSslContext(TransportSupport.createJdkSslContext(to)); String userName = "accessKey=" + options.getAccessKey(); cf.setExtension(JmsConnectionExtensions.USERNAME_OVERRIDE.toString(), (connection, uri) -> { String newUserName = userName; if (connection instanceof JmsConnection) { newUserName = ((JmsConnection) connection).getUsername(); } if (StringUtils.isEmpty(options.getInstanceId())) { // IoTDA的userName组成格式如下:“accessKey=${accessKey}|timestamp=${timestamp}” return newUserName + "|timestamp=" + System.currentTimeMillis(); } else { // 同一region购买多个标准版时userName组成格式为“accessKey=${accessKey}|timestamp=${timestamp}|instanceId=${instanceId}” return newUserName + "|timestamp=" + System.currentTimeMillis() + "|instanceId=" + options.getInstanceId(); } }); // 创建连接 connection = cf.createConnection(userName, options.getAccessCode()); // 创建 Session, Session.CLIENT_ACKNOWLEDGE: 收到消息后,需要手动调用message.acknowledge()。Session.AUTO_ACKNOWLEDGE: SDK自动ACK(推荐)。 session = connection.createSession(false, options.isAutoAcknowledge() ? Session.AUTO_ACKNOWLEDGE : Session.CLIENT_ACKNOWLEDGE); connection.start(); } public MessageConsumer newConsumer(String queueName) throws Exception { if (connection == null || !(connection instanceof JmsConnection) || ((JmsConnection) connection).isClosed()) { throw new Exception("create consumer failed,the connection is disconnected."); } MessageConsumer consumer; consumer = session.createConsumer(new JmsQueue(queueName)); if (consumer != null) { consumerSet.add(consumer); } return consumer; } public void close() { consumerSet.forEach(consumer -> { try { consumer.close(); } catch (JMSException e) { log.warn("consumer close error,exception is ", e); } }); if (session != null) { try { session.close(); } catch (JMSException e) { log.warn("session close error,exception is ", e); } } if (connection != null) { try { connection.close(); } catch (JMSException e) { log.warn("connection close error,exception is", e); } } } }
AmqpClientOptions.java
package com.iot.amqp; import lombok.Builder; import lombok.Data; import org.apache.commons.lang3.StringUtils; import java.text.MessageFormat; import java.util.HashMap; import java.util.Map; import java.util.UUID; import java.util.stream.Collectors; @Data @Builder public class AmqpClientOptions { private String host; @Builder.Default private int port = 5671; private String accessKey; private String accessCode; private String clientId; /** * 实例Id信息,同一个Region购买多个标准版实例时需设置 */ private String instanceId; /** * 仅支持true */ @Builder.Default private boolean useSsl = true; /** * IoTDA仅支持default */ @Builder.Default private String vhost = "default"; /** * IoTDA仅支持PLAIN */ @Builder.Default private String saslMechanisms = "PLAIN"; /** * true: SDK自动ACK(默认) * false:收到消息后,需要手动调用message.acknowledge() */ @Builder.Default private boolean isAutoAcknowledge = true; /** * 重连时延(ms) */ @Builder.Default private long reconnectDelay = 3000L; /** * 最大重连时延(ms),随着重连次数增加重连时延逐渐增加 */ @Builder.Default private long maxReconnectDelay = 30 * 1000L; /** * 最大重连次数,默认值-1,代表没有限制 */ @Builder.Default private long maxReconnectAttempts = -1; /** * 空闲超时,对端在这个时间段内没有发送AMQP帧则会导致连接断开。默认值为30000。单位:毫秒。 */ @Builder.Default private long idleTimeout = 30 * 1000L; /** * The values below control how many messages the remote peer can send to the client and be held in a pre-fetch buffer for each consumer instance. */ @Builder.Default private int queuePrefetch = 1000; /** * 扩展参数 */ private Map<String, String> extendedOptions; public String generateConnectUrl() { String uri = MessageFormat.format("{0}://{1}:{2}", (useSsl ? "amqps" : "amqp"), host, String.valueOf(port)); Map<String, String> uriOptions = new HashMap<>(); uriOptions.put("amqp.vhost", vhost); uriOptions.put("amqp.idleTimeout", String.valueOf(idleTimeout)); uriOptions.put("amqp.saslMechanisms", saslMechanisms); Map<String, String> jmsOptions = new HashMap<>(); jmsOptions.put("jms.prefetchPolicy.queuePrefetch", String.valueOf(queuePrefetch)); if (StringUtils.isNotEmpty(clientId)) { jmsOptions.put("jms.clientID", clientId); } else { jmsOptions.put("jms.clientID", UUID.randomUUID().toString()); } jmsOptions.put("failover.reconnectDelay", String.valueOf(reconnectDelay)); jmsOptions.put("failover.maxReconnectDelay", String.valueOf(maxReconnectDelay)); if (maxReconnectAttempts > 0) { jmsOptions.put("failover.maxReconnectAttempts", String.valueOf(maxReconnectAttempts)); } if (extendedOptions != null) { for (Map.Entry<String, String> option : extendedOptions.entrySet()) { if (option.getKey().startsWith("amqp.") || option.getKey().startsWith("transport.")) { uriOptions.put(option.getKey(), option.getValue()); } else { jmsOptions.put(option.getKey(), option.getValue()); } } } StringBuilder stringBuilder = new StringBuilder(); stringBuilder.append(uriOptions.entrySet().stream() .map(option -> MessageFormat.format("{0}={1}", option.getKey(), option.getValue())) .collect(Collectors.joining("&", "failover:(" + uri + "?", ")"))); stringBuilder.append(jmsOptions.entrySet().stream() .map(option -> MessageFormat.format("{0}={1}", option.getKey(), option.getValue())) .collect(Collectors.joining("&", "?", ""))); return stringBuilder.toString(); } }
AmqpConstants.java
package com.iot.amqp; public interface AmqpConstants { /** * AMQP接入域名 * eg: "****.iot-amqps.cn-north-4.myhuaweicloud.com"; */ String HOST = "127.0.0.1"; /** * AMQP接入端口 */ int PORT = 5671; /** * 接入凭证键值 * 不需要拼接时间戳timestamp */ String ACCESS_KEY = "accessKey"; /** * 接入凭证密钥 */ String ACCESS_CODE = "accessCode"; /** * 默认队列 */ String DEFAULT_QUEUE = "DefaultQueue"; }