Kafka实例创建Topic
功能介绍
该接口用于向Kafka实例创建Topic。
调用方法
请参见如何调用API。
URI
POST /v2/{project_id}/instances/{instance_id}/topics
参数 |
是否必选 |
参数类型 |
描述 |
---|---|---|---|
project_id |
是 |
String |
项目ID,获取方式请参见获取项目ID。 |
instance_id |
是 |
String |
实例ID。 |
请求参数
参数 |
是否必选 |
参数类型 |
描述 |
---|---|---|---|
id |
是 |
String |
topic名称,长度为3-200,以字母开头且只支持大小写字母、中横线、下划线、点以及数字。 |
replication |
否 |
Integer |
副本数,配置数据的可靠性。 取值范围:1-3。 |
sync_message_flush |
否 |
Boolean |
是否使用同步落盘。默认值为false。同步落盘会导致性能降低。 |
partition |
否 |
Integer |
topic分区数,设置消费的并发数。取值范围:1-200。 |
sync_replication |
否 |
Boolean |
是否开启同步复制,开启后,客户端生产消息时相应的也要设置acks=-1,否则不生效,默认关闭。 |
retention_time |
否 |
Integer |
消息老化时间。默认值为72。取值范围1-720,单位小时。 |
topic_other_configs |
否 |
Array of topic_other_configs objects |
topic配置 |
topic_desc |
否 |
String |
topic描述 |
响应参数
状态码:200
参数 |
参数类型 |
描述 |
---|---|---|
name |
String |
topic名称。 |
请求示例
创建一个Topic,Topic名称为test01。
POST https://{endpoint}/v2/{project_id}/instances/{instance_id}/topics { "id" : "test01", "partition" : 3, "replication" : 3, "retention_time" : 72, "sync_message_flush" : false, "sync_replication" : "false", "topic_other_configs" : [ { "name" : "message.timestamp.type", "value" : "LogAppendTime" }, { "name" : "max.message.bytes", "value" : 10485760 } ], "topic_desc" : "" }
响应示例
状态码:200
创建成功,返回topic名称
{ "name" : "test01" }
SDK代码示例
SDK代码示例如下。
创建一个Topic,Topic名称为test01。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 |
package com.huaweicloud.sdk.test; import com.huaweicloud.sdk.core.auth.ICredential; import com.huaweicloud.sdk.core.auth.BasicCredentials; import com.huaweicloud.sdk.core.exception.ConnectionException; import com.huaweicloud.sdk.core.exception.RequestTimeoutException; import com.huaweicloud.sdk.core.exception.ServiceResponseException; import com.huaweicloud.sdk.kafka.v2.region.KafkaRegion; import com.huaweicloud.sdk.kafka.v2.*; import com.huaweicloud.sdk.kafka.v2.model.*; import java.util.List; import java.util.ArrayList; public class CreateInstanceTopicSolution { public static void main(String[] args) { // The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security. // In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment String ak = System.getenv("CLOUD_SDK_AK"); String sk = System.getenv("CLOUD_SDK_SK"); String projectId = "{project_id}"; ICredential auth = new BasicCredentials() .withProjectId(projectId) .withAk(ak) .withSk(sk); KafkaClient client = KafkaClient.newBuilder() .withCredential(auth) .withRegion(KafkaRegion.valueOf("<YOUR REGION>")) .build(); CreateInstanceTopicRequest request = new CreateInstanceTopicRequest(); request.withInstanceId("{instance_id}"); CreateInstanceTopicReq body = new CreateInstanceTopicReq(); List<CreateInstanceTopicReqTopicOtherConfigs> listbodyTopicOtherConfigs = new ArrayList<>(); listbodyTopicOtherConfigs.add( new CreateInstanceTopicReqTopicOtherConfigs() .withName("message.timestamp.type") .withValue("LogAppendTime") ); listbodyTopicOtherConfigs.add( new CreateInstanceTopicReqTopicOtherConfigs() .withName("max.message.bytes") .withValue("10485760") ); body.withTopicDesc(""); body.withTopicOtherConfigs(listbodyTopicOtherConfigs); body.withRetentionTime(72); body.withSyncReplication(false); body.withPartition(3); body.withSyncMessageFlush(false); body.withReplication(3); body.withId("test01"); request.withBody(body); try { CreateInstanceTopicResponse response = client.createInstanceTopic(request); System.out.println(response.toString()); } catch (ConnectionException e) { e.printStackTrace(); } catch (RequestTimeoutException e) { e.printStackTrace(); } catch (ServiceResponseException e) { e.printStackTrace(); System.out.println(e.getHttpStatusCode()); System.out.println(e.getRequestId()); System.out.println(e.getErrorCode()); System.out.println(e.getErrorMsg()); } } } |
创建一个Topic,Topic名称为test01。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
# coding: utf-8 import os from huaweicloudsdkcore.auth.credentials import BasicCredentials from huaweicloudsdkkafka.v2.region.kafka_region import KafkaRegion from huaweicloudsdkcore.exceptions import exceptions from huaweicloudsdkkafka.v2 import * if __name__ == "__main__": # The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security. # In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment ak = os.environ["CLOUD_SDK_AK"] sk = os.environ["CLOUD_SDK_SK"] projectId = "{project_id}" credentials = BasicCredentials(ak, sk, projectId) client = KafkaClient.new_builder() \ .with_credentials(credentials) \ .with_region(KafkaRegion.value_of("<YOUR REGION>")) \ .build() try: request = CreateInstanceTopicRequest() request.instance_id = "{instance_id}" listTopicOtherConfigsbody = [ CreateInstanceTopicReqTopicOtherConfigs( name="message.timestamp.type", value="LogAppendTime" ), CreateInstanceTopicReqTopicOtherConfigs( name="max.message.bytes", value="10485760" ) ] request.body = CreateInstanceTopicReq( topic_desc="", topic_other_configs=listTopicOtherConfigsbody, retention_time=72, sync_replication=False, partition=3, sync_message_flush=False, replication=3, id="test01" ) response = client.create_instance_topic(request) print(response) except exceptions.ClientRequestException as e: print(e.status_code) print(e.request_id) print(e.error_code) print(e.error_msg) |
创建一个Topic,Topic名称为test01。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
package main import ( "fmt" "github.com/huaweicloud/huaweicloud-sdk-go-v3/core/auth/basic" kafka "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/kafka/v2" "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/kafka/v2/model" region "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/kafka/v2/region" ) func main() { // The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security. // In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment ak := os.Getenv("CLOUD_SDK_AK") sk := os.Getenv("CLOUD_SDK_SK") projectId := "{project_id}" auth := basic.NewCredentialsBuilder(). WithAk(ak). WithSk(sk). WithProjectId(projectId). Build() client := kafka.NewKafkaClient( kafka.KafkaClientBuilder(). WithRegion(region.ValueOf("<YOUR REGION>")). WithCredential(auth). Build()) request := &model.CreateInstanceTopicRequest{} request.InstanceId = "{instance_id}" nameTopicOtherConfigs:= "message.timestamp.type" valueTopicOtherConfigs:= "LogAppendTime" nameTopicOtherConfigs1:= "max.message.bytes" valueTopicOtherConfigs1:= "10485760" var listTopicOtherConfigsbody = []model.CreateInstanceTopicReqTopicOtherConfigs{ { Name: &nameTopicOtherConfigs, Value: &valueTopicOtherConfigs, }, { Name: &nameTopicOtherConfigs1, Value: &valueTopicOtherConfigs1, }, } topicDescCreateInstanceTopicReq:= "" retentionTimeCreateInstanceTopicReq:= int32(72) syncReplicationCreateInstanceTopicReq:= false partitionCreateInstanceTopicReq:= int32(3) syncMessageFlushCreateInstanceTopicReq:= false replicationCreateInstanceTopicReq:= int32(3) request.Body = &model.CreateInstanceTopicReq{ TopicDesc: &topicDescCreateInstanceTopicReq, TopicOtherConfigs: &listTopicOtherConfigsbody, RetentionTime: &retentionTimeCreateInstanceTopicReq, SyncReplication: &syncReplicationCreateInstanceTopicReq, Partition: &partitionCreateInstanceTopicReq, SyncMessageFlush: &syncMessageFlushCreateInstanceTopicReq, Replication: &replicationCreateInstanceTopicReq, Id: "test01", } response, err := client.CreateInstanceTopic(request) if err == nil { fmt.Printf("%+v\n", response) } else { fmt.Println(err) } } |
更多编程语言的SDK代码示例,请参见API Explorer的代码示例页签,可生成自动对应的SDK代码示例。
状态码
状态码 |
描述 |
---|---|
200 |
创建成功,返回topic名称 |
错误码
请参见错误码。