Updated on 2024-07-05 GMT+08:00

Creating Topics in Batches

RocketMQ does not provide an API for creating topics in batches. To create topics in batches, their users can only create them one by one by calling an API each time. On the contrary, DMS for RocketMQ provides an API for creating topics in batches in code.

Prerequisites

  • The endpoint of the region where IAM and RocketMQ are deployed has been obtained.
  • The RocketMQ instance ID and the project (instance region) ID have been obtained.

Creating Various Topics in Batches Using a 5.x Huawei Cloud SDK

  1. Create a topics.txt file for storing information of the topic to be created.

    The file should contain content in the topic name message type form. The message types are NORMAL, FIFO, DELAY, and TRANSACTION. For example:

    topic-test01 NORMAL

    topic-test02 FIFO

    topic-test02 DELAY

    topic-test02 TRANSACTION

    One line for one topic. You can specify multiple lines as required.

  2. Create topics in a batch by calling an API in code.

    Sample code

    Modify the parts in bold to actual values.

    package com.huaweicloud.sdk.test;
    import com.huaweicloud.sdk.core.auth.ICredential;
    import com.huaweicloud.sdk.core.auth.BasicCredentials;
    import com.huaweicloud.sdk.core.exception.ConnectionException;
    import com.huaweicloud.sdk.core.exception.RequestTimeoutException;
    import com.huaweicloud.sdk.core.exception.ServiceResponseException;
    import com.huaweicloud.sdk.rocketmq.v2.region.RocketMQRegion;
    import com.huaweicloud.sdk.rocketmq.v2.*;
    import com.huaweicloud.sdk.rocketmq.v2.model.*;
    
    import java.io.*;
    import java.util.List;
    import java.util.ArrayList;
    
    public class CreateBatchTopicSolution {
         public static void main(String[] args) throws IOException {
            // The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security.
            // In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment
            String ak = "{CLOUD_SDK_AK}";   //Access key ID. To check the AK, see Obtaining an AK/SK.
            String sk = "{CLOUD_SDK_SK}";   //Secret key. To check the SK, see Obtaining an AK/SK.
            String projectId = "{project_id}";  //Project ID. To check the project ID, see Obtaining a Project ID.
    
            ICredential auth = new BasicCredentials()
                    .withProjectId(projectId)
                    .withAk(ak)
                    .withSk(sk);
    
             RocketMQClient client = RocketMQClient.newBuilder()
                    .withCredential(auth)
                    .withRegion(RocketMQRegion.valueOf("<YOUR REGION>"))  //Region where the project is. See the region by referring to Obtaining a Project ID.
                    .build();
    
             File file = new File("d:/topics.txt");   //The path of the topic file in 1. The file name and patch can be custom.
            FileReader fileReader = new FileReader(file);
            BufferedReader bufferedReader = new BufferedReader(fileReader);
            String str = null;
            List<producer.TopicEntity> list = new ArrayList<>();
            while ;(str = bufferedReader.readLine()) != null) {
                String[] topic = str.split(" ");
                list.add(new producer.TopicEntity(topic[0],topic[1]));
            }
            bufferedReader.close();
            CreateTopicOrBatchDeleteTopicRequest request = new
    CreateTopicOrBatchDeleteTopicRequest();
            request.withInstanceId("{instance_id}");   //RocketMQ instance ID
            CreateTopicOrBatchDeleteTopicReq body = new CreateTopicOrBatchDeleteTopicReq();
            for (int i = 0; i < list.size(); i++) {
                body.withMessageType(CreateTopicOrBatchDeleteTopicReq.MessageTypeEnum.fromValue(list.get(i).getMessageType()));
                body.withName(list.get(i).getName());
                request.withBody(body);
                try {
                    CreateTopicOrBatchDeleteTopicResponse response = client.createTopicOrBatchDeleteTopic(request);
                    System.out.println(response.toString());
                } catch (ConnectionException e) {
                    e.printStackTrace();
                } catch (RequestTimeoutException e) {
                    e.printStackTrace();
                } catch (ServiceResponseException e) {
                    e.printStackTrace();
                    System.out.println(e.getHttpStatusCode());
                    System.out.println(e.getRequestId());
                    System.out.println(e.getErrorCode());
                    System.out.println(e.getErrorMsg());
                }
            }
        }
    }
    //TopicEntity.java
    public class TopicEntity {
        private String name;  //Topic name.
        private String messageType;  //Message type.
    
        public String getName() {
            return name;
        }
    
         public void setName(String name) {
            this.name = name;
        }
         public String getMessageType() {
            return messageType;
        }
    
         public void setMessageType(String messageType) {
            this.messageType = messageType;
        }
    
         public TopicEntity(String name, String messageType) {
            this.name = name;
            this.messageType = messageType;
        }
    }