Java SDK接入示例
本文介绍使用AMQP协议的JMS客户端接入华为云物联网平台,接收服务端订阅消息的示例。
开发环境要求
本示例使用的开发环境为JDK 1.8及以上版本。
获取SDK
AMQP SDK为开源SDK。如果您使用Java开发语言,推荐使用Apache Qpid JMS客户端。请访问Qpid JMS下载客户端和查看使用说明。
添加Maven依赖
<!-- amqp 1.0 qpid client --> <dependency> <groupId>org.apache.qpid</groupId> <artifactId>qpid-jms-client</artifactId> <version>0.61.0</version> </dependency>
代码示例
您可以单击这里获取Java SDK接入示例,Demo中涉及的参数说明,请参考AMQP客户端接入说明。
1、创建AmqpClient。
// 以下参数请修改为自己的参数值
AmqpClientOptions options = AmqpClientOptions.builder()
.host(AmqpConstants.HOST)
.port(AmqpConstants.PORT)
.accessKey(AmqpConstants.ACCESS_KEY)
.accessCode(AmqpConstants.ACCESS_CODE)
.queuePrefetch(1000) // sdk会在内存中分配该参数大小的队列,用来接收消息,客户端内存较小的情况可以调小该参数。
.build();
AmqpClient amqpClient = new AmqpClient(options);
amqpClient.initialize();
2、通过设置listener消费amqp消息。
try {
MessageConsumer consumer = amqpClient.newConsumer(AmqpConstants.DEFAULT_QUEUE);
consumer.setMessageListener(message -> {
try {
// 此处进行消息处理。如果处理比较耗时,最好进行开启新的线程处理,否则可能造成心跳超时链接断开。
processMessage(message.getBody(String.class));
// 如果options.isAutoAcknowledge==false,此处应该调用message.acknowledge();
} catch (Exception e) {
log.warn("message.getBody error,exception is ", e);
}
});
} catch (Exception e) {
log.warn("Consumer initialize error,", e);
}
3、主动拉取amqp消息。
// 创建一个线程池用来拉取消息
ExecutorService executorService = new ThreadPoolExecutor(1, 1, 60, TimeUnit.SECONDS,new LinkedBlockingQueue<>(1));
try {
MessageConsumer consumer = amqpClient.newConsumer(AmqpConstants.DEFAULT_QUEUE);
executorService.execute(() -> {
while (!isClose.get()) {
try {
Message message = consumer.receive();
// 此处进行消息处理。如果处理比较耗时,最好进行开启新的线程处理,否则可能造成心跳超时链接断开。
processMessage(message.getBody(String.class));
// 如果options.isAutoAcknowledge==false,此处应该调用message.acknowledge();
} catch (JMSException e) {
log.warn("receive message error,", e);
}
}
});
} catch (Exception e) {
log.warn("Consumer initialize error,", e);
}
4、更多消费amqp消息的demo,请参考Java SDK接入示例工程。
资源
AmqpClient.java
package com.iot.amqp;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.qpid.jms.JmsConnection;
import org.apache.qpid.jms.JmsConnectionExtensions;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.apache.qpid.jms.JmsQueue;
import org.apache.qpid.jms.transports.TransportOptions;
import org.apache.qpid.jms.transports.TransportSupport;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
@Slf4j
public class AmqpClient {
private final AmqpClientOptions options;
private Connection connection;
private Session session;
private final Set<MessageConsumer> consumerSet = Collections.synchronizedSet(new HashSet<>());
public AmqpClient(AmqpClientOptions options) {
this.options = options;
}
public String getId() {
return options.getClientId();
}
public void initialize() throws Exception {
String connectionUrl = options.generateConnectUrl();
log.info("connectionUrl={}", connectionUrl);
JmsConnectionFactory cf = new JmsConnectionFactory(connectionUrl);
// 信任服务端
TransportOptions to = new TransportOptions();
to.setTrustAll(true);
cf.setSslContext(TransportSupport.createJdkSslContext(to));
String userName = "accessKey=" + options.getAccessKey();
cf.setExtension(JmsConnectionExtensions.USERNAME_OVERRIDE.toString(), (connection, uri) -> {
String newUserName = userName;
if (connection instanceof JmsConnection) {
newUserName = ((JmsConnection) connection).getUsername();
}
if (StringUtils.isEmpty(options.getInstanceId())) {
// IoTDA的userName组成格式如下:“accessKey=${accessKey}|timestamp=${timestamp}”
return newUserName + "|timestamp=" + System.currentTimeMillis();
} else {
// 同一region购买多个标准版时userName组成格式为“accessKey=${accessKey}|timestamp=${timestamp}|instanceId=${instanceId}”
return newUserName + "|timestamp=" + System.currentTimeMillis() + "|instanceId=" + options.getInstanceId();
}
});
// 创建连接
connection = cf.createConnection(userName, options.getAccessCode());
// 创建 Session, Session.CLIENT_ACKNOWLEDGE: 收到消息后,需要手动调用message.acknowledge()。Session.AUTO_ACKNOWLEDGE: SDK自动ACK(推荐)。
session = connection.createSession(false, options.isAutoAcknowledge() ? Session.AUTO_ACKNOWLEDGE : Session.CLIENT_ACKNOWLEDGE);
connection.start();
}
public MessageConsumer newConsumer(String queueName) throws Exception {
if (connection == null || !(connection instanceof JmsConnection) || ((JmsConnection) connection).isClosed()) {
throw new Exception("create consumer failed,the connection is disconnected.");
}
MessageConsumer consumer;
consumer = session.createConsumer(new JmsQueue(queueName));
if (consumer != null) {
consumerSet.add(consumer);
}
return consumer;
}
public void close() {
consumerSet.forEach(consumer -> {
try {
consumer.close();
} catch (JMSException e) {
log.warn("consumer close error,exception is ", e);
}
});
if (session != null) {
try {
session.close();
} catch (JMSException e) {
log.warn("session close error,exception is ", e);
}
}
if (connection != null) {
try {
connection.close();
} catch (JMSException e) {
log.warn("connection close error,exception is", e);
}
}
}
}
AmqpClientOptions.java
package com.iot.amqp;
import lombok.Builder;
import lombok.Data;
import org.apache.commons.lang3.StringUtils;
import java.text.MessageFormat;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
@Data
@Builder
public class AmqpClientOptions {
private String host;
@Builder.Default
private int port = 5671;
private String accessKey;
private String accessCode;
private String clientId;
/**
* 实例Id信息,同一个Region购买多个标准版实例时需设置
*/
private String instanceId;
/**
* 仅支持true
*/
@Builder.Default
private boolean useSsl = true;
/**
* IoTDA仅支持default
*/
@Builder.Default
private String vhost = "default";
/**
* IoTDA仅支持PLAIN
*/
@Builder.Default
private String saslMechanisms = "PLAIN";
/**
* true: SDK自动ACK(默认)
* false:收到消息后,需要手动调用message.acknowledge()
*/
@Builder.Default
private boolean isAutoAcknowledge = true;
/**
* 重连时延(ms)
*/
@Builder.Default
private long reconnectDelay = 3000L;
/**
* 最大重连时延(ms),随着重连次数增加重连时延逐渐增加
*/
@Builder.Default
private long maxReconnectDelay = 30 * 1000L;
/**
* 最大重连次数,默认值-1,代表没有限制
*/
@Builder.Default
private long maxReconnectAttempts = -1;
/**
* 空闲超时,对端在这个时间段内没有发送AMQP帧则会导致连接断开。默认值为30000。单位:毫秒。
*/
@Builder.Default
private long idleTimeout = 30 * 1000L;
/**
* The values below control how many messages the remote peer can send to the client and be held in a pre-fetch buffer for each consumer instance.
*/
@Builder.Default
private int queuePrefetch = 1000;
/**
* 扩展参数
*/
private Map<String, String> extendedOptions;
public String generateConnectUrl() {
String uri = MessageFormat.format("{0}://{1}:{2}", (useSsl ? "amqps" : "amqp"), host, String.valueOf(port));
Map<String, String> uriOptions = new HashMap<>();
uriOptions.put("amqp.vhost", vhost);
uriOptions.put("amqp.idleTimeout", String.valueOf(idleTimeout));
uriOptions.put("amqp.saslMechanisms", saslMechanisms);
Map<String, String> jmsOptions = new HashMap<>();
jmsOptions.put("jms.prefetchPolicy.queuePrefetch", String.valueOf(queuePrefetch));
if (StringUtils.isNotEmpty(clientId)) {
jmsOptions.put("jms.clientID", clientId);
} else {
jmsOptions.put("jms.clientID", UUID.randomUUID().toString());
}
jmsOptions.put("failover.reconnectDelay", String.valueOf(reconnectDelay));
jmsOptions.put("failover.maxReconnectDelay", String.valueOf(maxReconnectDelay));
if (maxReconnectAttempts > 0) {
jmsOptions.put("failover.maxReconnectAttempts", String.valueOf(maxReconnectAttempts));
}
if (extendedOptions != null) {
for (Map.Entry<String, String> option : extendedOptions.entrySet()) {
if (option.getKey().startsWith("amqp.") || option.getKey().startsWith("transport.")) {
uriOptions.put(option.getKey(), option.getValue());
} else {
jmsOptions.put(option.getKey(), option.getValue());
}
}
}
StringBuilder stringBuilder = new StringBuilder();
stringBuilder.append(uriOptions.entrySet().stream()
.map(option -> MessageFormat.format("{0}={1}", option.getKey(), option.getValue()))
.collect(Collectors.joining("&", "failover:(" + uri + "?", ")")));
stringBuilder.append(jmsOptions.entrySet().stream()
.map(option -> MessageFormat.format("{0}={1}", option.getKey(), option.getValue()))
.collect(Collectors.joining("&", "?", "")));
return stringBuilder.toString();
}
}
AmqpConstants.java
package com.iot.amqp;
public interface AmqpConstants {
/**
* AMQP接入域名
* eg: "****.iot-amqps.cn-north-4.myhuaweicloud.com";
*/
String HOST = "127.0.0.1";
/**
* AMQP接入端口
*/
int PORT = 5671;
/**
* 接入凭证键值
* 不需要拼接时间戳timestamp
*/
String ACCESS_KEY = "accessKey";
/**
* 接入凭证密钥
*/
String ACCESS_CODE = "accessCode";
/**
* 默认队列
*/
String DEFAULT_QUEUE = "DefaultQueue";
}
