Java SDK Access Example
An AMQP-compliant JMS client connects to the IoT platform and receives subscribed messages from the platform.
Requirements for the Development Environment
JDK 1.8 or later has been installed.
Obtaining the Java SDK
The AMQP SDK is an open-source SDK. If you use Java, you are advised to use the Apache Qpid JMS client. Visit Qpid JMS 0.50.0 to download the client and view the instructions for use.
Adding a Maven Dependency
<!-- amqp 1.0 qpid client --> <dependency> <groupId>org.apache.qpid</groupId> <artifactId>qpid-jms-client</artifactId> <version>0.50.0</version> </dependency>
Code Samples
You can click here to obtain the Java SDK access example. For details on the parameters involved in the demo, see AMQP Client Access.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 |
package com.huawei.iot.amqp.jms;
import org.apache.qpid.jms.JmsConnection;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.apache.qpid.jms.JmsConnectionListener;
import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
import org.apache.qpid.jms.transports.TransportOptions;
import org.apache.qpid.jms.transports.TransportSupport;
import javax.jms.*;
import javax.naming.Context;
import javax.naming.InitialContext;
import java.net.URI;
import java.util.Hashtable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class HwIotAmqpJavaClientDemo{
// Asynchronous thread pool. You can adjust the parameters based on service features or use other asynchronous processing modes.
private final static ExecutorService executorService = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),
Runtime.getRuntime().availableProcessors() * 2, 60,
TimeUnit.SECONDS, new LinkedBlockingQueue<>(5000));
public static void main(String[] args) throws Exception{
// accessKey for the access credential.
String accessKey = "${yourAccessKey}";
long timeStamp = System.currentTimeMillis();
// Method to assemble userName. For details, see AMQP Client Access.
String userName = "accessKey=" + accessKey + "|timestamp=" + timeStamp;
// accessCode for the access credential.
String password = "${yourAccessCode}";
// Assemble the connection URL according to the qpid-jms specifications.
String connectionUrl = "amqps://${UUCID}.iot-amqps.cn-north-4.myhuaweicloud.com:5671?amqp.vhost=default&amqp.idleTimeout=8000&amqp.saslMechanisms=PLAIN";
Hashtable<String, String> hashtable = new Hashtable<>();
hashtable.put("connectionfactory.HwConnectionURL", connectionUrl);
// Queue name. You can use DefaultQueue.
String queueName = "${yourQueue}";
hashtable.put("queue.HwQueueName", queueName);
hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
Context context = new InitialContext(hashtable);
JmsConnectionFactory cf = (JmsConnectionFactory) context.lookup("HwConnectionURL");
// Multiple queues can be created for one connection. Match queue.HwQueueName with queue.HwQueueName.
Destination queue = (Destination) context.lookup("HwQueueName");
// Trust the server.
TransportOptions to = new TransportOptions(); to.setTrustAll(true);
cf.setSslContext(TransportSupport.createJdkSslContext(to));
// Create a connection.
Connection connection = cf.createConnection(userName, password);
((JmsConnection) connection).addConnectionListener(myJmsConnectionListener);
// Create a session.
// Session.CLIENT_ACKNOWLEDGE: After receiving a message, manually call message.acknowledge().
// Session.AUTO_ACKNOWLEDGE: The SDK automatically responds with an ACK message. (recommended processing)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
connection.start();
// Create a receiver link.
MessageConsumer consumer = session.createConsumer(queue);
// Messages can be processed in either of the following ways:
// 1. Proactively pull data (recommended processing). For details, see receiveMessage(consumer).
// 2. Add a listener. For details, see consumer.setMessageListener(messageListener). The server proactively pushes data to the client at an acceptable data rate.
receiveMessage(consumer);
// consumer.setMessageListener(messageListener);
}
private static void receiveMessage(MessageConsumer consumer) throws JMSException{
while (true){
try{
// It is recommended that received messages be processed asynchronously. Ensure that the receiveMessage function does not contain time-consuming logic.
Message message = consumer.receive(); processMessage(message);
} catch (Exception e) {
System.out.println("receiveMessage hand an exception: " + e.getMessage());
e.printStackTrace();
}
}
}
private static MessageListener messageListener = new MessageListener(){
@Override
public void onMessage(Message message){
try {
// It is recommended that received messages be processed asynchronously. Ensure that the onMessage function does not contain time-consuming logic.
// If the service processing takes a long time and blocks the thread, the normal callback after the SDK receives the message may be affected.
executorService.submit(() -> processMessage(message));
} catch (Exception e){
System.out.println("submit task occurs exception: " + e.getMessage());
e.printStackTrace();
}
}
};
/**
* Service logic for processing the received messages
*/
private static void processMessage(Message message) {
try {
String body = message.getBody(String.class); String content = new String(body);
System.out.println("receive an message, the content is " + content);
} catch (Exception e){
System.out.println("processMessage occurs error: " + e.getMessage());
e.printStackTrace();
}
}
private static JmsConnectionListener myJmsConnectionListener = new JmsConnectionListener(){
/**
* Connection established.
*/
@Override
public void onConnectionEstablished(URI remoteURI){
System.out.println("onConnectionEstablished, remoteUri:" + remoteURI);
}
/**
* The connection fails after the maximum number of retries is reached.
*/
@Override
public void onConnectionFailure(Throwable error){
System.out.println("onConnectionFailure, " + error.getMessage());
}
/**
* Connection interrupted.
*/
@Override
public void onConnectionInterrupted(URI remoteURI){
System.out.println("onConnectionInterrupted, remoteUri:" + remoteURI);
}
/**
* Automatic reconnection.
*/
@Override
public void onConnectionRestored(URI remoteURI){
System.out.println("onConnectionRestored, remoteUri:" + remoteURI);
}
@Override
public void onInboundMessage(JmsInboundMessageDispatch envelope){
System.out.println("onInboundMessage, " + envelope);
}
@Override
public void onSessionClosed(Session session, Throwable cause){
System.out.println("onSessionClosed, session=" + session + ", cause =" + cause);
}
@Override
public void onConsumerClosed(MessageConsumer consumer, Throwable cause){
System.out.println("MessageConsumer, consumer=" + consumer + ", cause =" + cause);
}
@Override
public void onProducerClosed(MessageProducer producer, Throwable cause){
System.out.println("MessageProducer, producer=" + producer + ", cause =" + cause);
}
};
}
|
Last Article: AMQP Client Access
Next Article: Node.js SDK Access Example
Did this article solve your problem?
Thank you for your score!Your feedback would help us improve the website.