Java SDK Access Example

An AMQP-compliant JMS client connects to the IoT platform and receives subscribed messages from the platform.

Requirements for the Development Environment

JDK 1.8 or later has been installed.

Obtaining the Java SDK

The AMQP SDK is an open-source SDK. If you use Java, you are advised to use the Apache Qpid JMS client. Visit Qpid JMS 0.50.0 to download the client and view the instructions for use.

Adding a Maven Dependency

<!-- amqp 1.0 qpid client -->
 <dependency>
   <groupId>org.apache.qpid</groupId>
   <artifactId>qpid-jms-client</artifactId>
   <version>0.50.0</version>
 </dependency>

Code Samples

You can click here to obtain the Java SDK access example. For details on the parameters involved in the demo, see AMQP Client Access.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
package com.huawei.iot.amqp.jms;

import org.apache.qpid.jms.JmsConnection;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.apache.qpid.jms.JmsConnectionListener;
import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
import org.apache.qpid.jms.transports.TransportOptions;
import org.apache.qpid.jms.transports.TransportSupport;

import javax.jms.*;
import javax.naming.Context;
import javax.naming.InitialContext;
import java.net.URI;
import java.util.Hashtable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class HwIotAmqpJavaClientDemo{
    // Asynchronous thread pool. You can adjust the parameters based on service features or use other asynchronous processing modes.
    private final static ExecutorService executorService = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),
            Runtime.getRuntime().availableProcessors() * 2, 60,
            TimeUnit.SECONDS, new LinkedBlockingQueue<>(5000));

    public static void main(String[] args) throws Exception{
        // accessKey for the access credential.
        String accessKey = "${yourAccessKey}";
        long timeStamp = System.currentTimeMillis();
        // Method to assemble userName. For details, see AMQP Client Access.
        String userName = "accessKey=" + accessKey + "|timestamp=" + timeStamp;
        // accessCode for the access credential.
        String password = "${yourAccessCode}";
        // Assemble the connection URL according to the qpid-jms specifications.
        String connectionUrl = "amqps://${UUCID}.iot-amqps.cn-north-4.myhuaweicloud.com:5671?amqp.vhost=default&amqp.idleTimeout=8000&amqp.saslMechanisms=PLAIN";
        Hashtable<String, String> hashtable = new Hashtable<>();
        hashtable.put("connectionfactory.HwConnectionURL", connectionUrl);
        // Queue name. You can use DefaultQueue.
        String queueName = "${yourQueue}";
        hashtable.put("queue.HwQueueName", queueName);
        hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
        Context context = new InitialContext(hashtable);
        JmsConnectionFactory cf = (JmsConnectionFactory) context.lookup("HwConnectionURL");
        // Multiple queues can be created for one connection. Match queue.HwQueueName with queue.HwQueueName.
        Destination queue = (Destination) context.lookup("HwQueueName");

        // Trust the server.
        TransportOptions to = new TransportOptions(); to.setTrustAll(true);
        cf.setSslContext(TransportSupport.createJdkSslContext(to));

        // Create a connection.
        Connection connection = cf.createConnection(userName, password);
        ((JmsConnection) connection).addConnectionListener(myJmsConnectionListener);
        // Create a session.
        // Session.CLIENT_ACKNOWLEDGE: After receiving a message, manually call message.acknowledge().
        // Session.AUTO_ACKNOWLEDGE: The SDK automatically responds with an ACK message. (recommended processing)
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        connection.start();
        // Create a receiver link.
        MessageConsumer consumer = session.createConsumer(queue);
        // Messages can be processed in either of the following ways:
        // 1. Proactively pull data (recommended processing). For details, see receiveMessage(consumer).
        // 2. Add a listener. For details, see consumer.setMessageListener(messageListener). The server proactively pushes data to the client at an acceptable data rate.
        receiveMessage(consumer);
        // consumer.setMessageListener(messageListener);
    }

    private static void receiveMessage(MessageConsumer consumer) throws JMSException{
        while (true){
            try{
                // It is recommended that received messages be processed asynchronously. Ensure that the receiveMessage function does not contain time-consuming logic.
                Message message = consumer.receive(); processMessage(message);
            } catch (Exception e) {
                System.out.println("receiveMessage hand an exception: " + e.getMessage());
                e.printStackTrace();
            }
        }

    }

    private static MessageListener messageListener = new MessageListener(){
        @Override
        public void onMessage(Message message){
            try {
                // It is recommended that received messages be processed asynchronously. Ensure that the onMessage function does not contain time-consuming logic.
                // If the service processing takes a long time and blocks the thread, the normal callback after the SDK receives the message may be affected.
                executorService.submit(() -> processMessage(message));
            } catch (Exception e){
                System.out.println("submit task occurs exception: " + e.getMessage());
                e.printStackTrace();
            }
        }
    };

    /**
     * Service logic for processing the received messages
     */
    private static void processMessage(Message message) {
        try {
            String body = message.getBody(String.class); String content = new String(body);
            System.out.println("receive an message, the content is " + content);
        } catch (Exception e){
            System.out.println("processMessage occurs error: " + e.getMessage());
            e.printStackTrace();
        }
    }

    private static JmsConnectionListener myJmsConnectionListener = new JmsConnectionListener(){
        /**
         * Connection established.
         */
        @Override
        public void onConnectionEstablished(URI remoteURI){
            System.out.println("onConnectionEstablished, remoteUri:" + remoteURI);
        }

        /**
         * The connection fails after the maximum number of retries is reached.
         */
        @Override
        public void onConnectionFailure(Throwable error){
            System.out.println("onConnectionFailure, " + error.getMessage());
        }

        /**
         * Connection interrupted.
         */
        @Override
        public void onConnectionInterrupted(URI remoteURI){
            System.out.println("onConnectionInterrupted, remoteUri:" + remoteURI);
        }

        /**
         * Automatic reconnection.
         */
        @Override
        public void onConnectionRestored(URI remoteURI){
            System.out.println("onConnectionRestored, remoteUri:" + remoteURI);
        }

        @Override
        public void onInboundMessage(JmsInboundMessageDispatch envelope){
            System.out.println("onInboundMessage, " + envelope);
        }

        @Override
        public void onSessionClosed(Session session, Throwable cause){
            System.out.println("onSessionClosed, session=" + session + ", cause =" + cause);
        }

        @Override
        public void onConsumerClosed(MessageConsumer consumer, Throwable cause){
            System.out.println("MessageConsumer, consumer=" + consumer + ", cause =" + cause);
        }

        @Override
        public void onProducerClosed(MessageProducer producer, Throwable cause){
            System.out.println("MessageProducer, producer=" + producer + ", cause =" + cause);
        }
    };
}