更新时间:2024-07-05 GMT+08:00
批量创建Topic
RocketMQ未提供批量创建Topic接口,用户想批量创建Topic时只能手动多次调用接口一个个创建,效率低下。现通过代码实现接口循环调用,可实现批量创建Topic的效果。
前提条件
- 已获取IAM和RocketMQ服务所在区域的Endpoint地址。
- 已获取RocketMQ实例的实例ID,以及实例所在区域的项目ID。
使用华为云SDK 5.x版本批量创建不同类型的Topic
- 新建“topics.txt”文件,用于存放待创建的Topic信息。
文件内容格式为“Topic名称 消息类型”,消息类型包含普通(NORMAL)、顺序(FIFO)、定时(DELAY)、事务(TRANSACTION)四种。文件内容填写示例如下:
topic-test01 NORMAL
topic-test02 FIFO
topic-test02 DELAY
topic-test02 TRANSACTION
一个Topic一行数据,您可以根据实际情况设置多个Topic名称和消息类型。
- 通过代码实现接口循环调用,批量创建Topic。
其中加粗部分内容请根据实际情况修改。
package com.huaweicloud.sdk.test; import com.huaweicloud.sdk.core.auth.ICredential; import com.huaweicloud.sdk.core.auth.BasicCredentials; import com.huaweicloud.sdk.core.exception.ConnectionException; import com.huaweicloud.sdk.core.exception.RequestTimeoutException; import com.huaweicloud.sdk.core.exception.ServiceResponseException; import com.huaweicloud.sdk.rocketmq.v2.region.RocketMQRegion; import com.huaweicloud.sdk.rocketmq.v2.*; import com.huaweicloud.sdk.rocketmq.v2.model.*; import java.io.*; import java.util.List; import java.util.ArrayList; public class CreateBatchTopicSolution { public static void main(String[] args) throws IOException { // The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security. // In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment String ak = "{CLOUD_SDK_AK}"; //访问密钥ID。查看AK的方法请参见获取AK/SK。 String sk = "{CLOUD_SDK_SK}"; //与访问密钥ID结合使用的密钥。查看SK的方法请参见获取AK/SK。 String projectId = "{project_id}"; //项目ID。查看项目ID的方法请参见获取项目ID。 ICredential auth = new BasicCredentials() .withProjectId(projectId) .withAk(ak) .withSk(sk); RocketMQClient client = RocketMQClient.newBuilder() .withCredential(auth) .withRegion(RocketMQRegion.valueOf("<YOUR REGION>")) //项目所在的区域。在获取项目ID中可以查看到项目ID对应的区域信息。 .build(); File file = new File("d:/topics.txt"); //1中topic文件所在路径。文件名称和文件路径可自定义。 FileReader fileReader = new FileReader(file); BufferedReader bufferedReader = new BufferedReader(fileReader); String str = null; List<producer.TopicEntity> list = new ArrayList<>(); while ;(str = bufferedReader.readLine()) != null) { String[] topic = str.split(" "); list.add(new producer.TopicEntity(topic[0],topic[1])); } bufferedReader.close(); CreateTopicOrBatchDeleteTopicRequest request = new CreateTopicOrBatchDeleteTopicRequest(); request.withInstanceId("{instance_id}"); //RocketMQ实例的ID CreateTopicOrBatchDeleteTopicReq body = new CreateTopicOrBatchDeleteTopicReq(); for (int i = 0; i < list.size(); i++) { body.withMessageType(CreateTopicOrBatchDeleteTopicReq.MessageTypeEnum.fromValue(list.get(i).getMessageType())); body.withName(list.get(i).getName()); request.withBody(body); try { CreateTopicOrBatchDeleteTopicResponse response = client.createTopicOrBatchDeleteTopic(request); System.out.println(response.toString()); } catch (ConnectionException e) { e.printStackTrace(); } catch (RequestTimeoutException e) { e.printStackTrace(); } catch (ServiceResponseException e) { e.printStackTrace(); System.out.println(e.getHttpStatusCode()); System.out.println(e.getRequestId()); System.out.println(e.getErrorCode()); System.out.println(e.getErrorMsg()); } } } } //TopicEntity.java public class TopicEntity { private String name; //Topic名称 private String messageType; //消息类型 public String getName() { return name; } public void setName(String name) { this.name = name; } public String getMessageType() { return messageType; } public void setMessageType(String messageType) { this.messageType = messageType; } public TopicEntity(String name, String messageType) { this.name = name; this.messageType = messageType; } }