创建Smart Connect任务
功能介绍
创建Smart Connect任务。
调用方法
请参见如何调用API。
URI
POST /v2/{project_id}/instances/{instance_id}/connector/tasks
参数 |
是否必选 |
参数类型 |
描述 |
---|---|---|---|
project_id |
是 |
String |
项目ID,获取方式请参见获取项目ID。 |
instance_id |
是 |
String |
实例ID。 |
请求参数
参数 |
是否必选 |
参数类型 |
描述 |
---|---|---|---|
task_name |
否 |
String |
SmartConnect任务名称。 |
start_later |
否 |
Boolean |
是否稍后再启动任务。如需要创建任务后立即启动,请填false;如希望稍后在任务列表中手动开启任务,请填true。 |
topics |
否 |
String |
SmartConnect任务配置的Topic。 |
topics_regex |
否 |
String |
SmartConnect任务配置的Topic正则表达式。 |
source_type |
否 |
String |
SmartConnect任务的源端类型。 |
source_task |
否 |
SmartConnect任务的源端配置。 |
|
sink_type |
否 |
String |
SmartConnect任务的目标端类型。 |
sink_task |
否 |
SmartConnect任务的目标端配置。 |
参数 |
是否必选 |
参数类型 |
描述 |
---|---|---|---|
redis_address |
否 |
String |
Redis实例地址。(仅源端类型为Redis时需要填写) |
redis_type |
否 |
String |
Redis实例类型。(仅源端类型为Redis时需要填写) |
dcs_instance_id |
否 |
String |
DCS实例ID。(仅源端类型为Redis时需要填写) |
redis_password |
否 |
String |
Redis密码。(仅源端类型为Redis时需要填写) |
sync_mode |
否 |
String |
同步类型,“RDB_ONLY”为全量同步,“CUSTOM_OFFSET”为全量同步+增量同步。(仅源端类型为Redis时需要填写) |
full_sync_wait_ms |
否 |
Integer |
全量同步重试间隔时间,单位:毫秒。(仅源端类型为Redis时需要填写) |
full_sync_max_retry |
否 |
Integer |
全量同步最大重试次数。(仅源端类型为Redis时需要填写) |
ratelimit |
否 |
Integer |
限速,单位为KB/s。-1表示不限速。(仅源端类型为Redis时需要填写) |
current_cluster_name |
否 |
String |
当前Kafka实例别名。(仅源端类型为Kafka时需要填写) |
cluster_name |
否 |
String |
对端Kafka实例别名。(仅源端类型为Kafka时需要填写) |
user_name |
否 |
String |
对端Kafka开启SASL_SSL时设置的用户名,或者创建SASL_SSL用户时设置的用户名。(仅源端类型为Kafka且对端Kafka认证方式为“SASL_SSL”时需要填写) |
password |
否 |
String |
对端Kafka开启SASL_SSL时设置的密码,或者创建SASL_SSL用户时设置的密码。(仅源端类型为Kafka且对端Kafka认证方式为“SASL_SSL”时需要填写) |
sasl_mechanism |
否 |
String |
对端Kafka认证机制。(仅源端类型为Kafka且“认证方式”为“SASL_SSL”时需要填写) |
instance_id |
否 |
String |
对端Kafka实例ID。(仅源端类型为Kafka时需要填写,instance_id和bootstrap_servers仅需要填写其中一个) |
bootstrap_servers |
否 |
String |
对端Kafka实例地址。(仅源端类型为Kafka时需要填写,instance_id和bootstrap_servers仅需要填写其中一个) |
security_protocol |
否 |
String |
对端Kafka认证方式。(仅源端类型为Kafka需要填写) 支持以下两种认证方式:
|
direction |
否 |
String |
同步方向;pull为把对端Kafka实例数据复制到当前Kafka实例中,push为把当前Kafka实例数据复制到对端Kafka实例中,two-way为对两端Kafka实例数据进行双向复制。(仅源端类型为Kafka时需要填写) |
sync_consumer_offsets_enabled |
否 |
Boolean |
是否同步消费进度。(仅源端类型为Kafka时需要填写) |
replication_factor |
否 |
Integer |
在对端实例中自动创建Topic时,指定Topic的副本数,此参数值不能超过对端实例的代理数。如果对端实例中设置了“default.replication.factor”,此参数的优先级高于“default.replication.factor”。(仅源端类型为Kafka时需要填写) |
task_num |
否 |
Integer |
数据复制的任务数。默认值为2,建议保持默认值。如果“同步方式”为“双向”,实际任务数=设置的任务数*2。(仅源端类型为Kafka时需要填写) |
rename_topic_enabled |
否 |
Boolean |
是否重命名Topic,在目标Topic名称前添加源端Kafka实例的别名,形成目标Topic新的名称。(仅源端类型为Kafka时需要填写) |
provenance_header_enabled |
否 |
Boolean |
目标Topic接收复制的消息,此消息header中包含消息来源。两端实例数据双向复制时,请开启“添加来源header”,防止循环复制。(仅源端类型为Kafka时需要填写) |
consumer_strategy |
否 |
String |
启动偏移量,latest为获取最新的数据,earliest为获取最早的数据。(仅源端类型为Kafka时需要填写) |
compression_type |
否 |
String |
复制消息所使用的压缩算法。(仅源端类型为Kafka时需要填写)
|
topics_mapping |
否 |
String |
topic映射,用于自定义目标端Topic名称。不能同时设置“重命名Topic”和“topic映射”。topic映射请按照“源端topic:目的端topic”的格式填写,如涉及多个topic映射,请用“,”分隔开,例如:topic-sc-1:topic-sc-2,topic-sc-3:topic-sc-4。(仅源端类型为Kafka时需要填写) |
参数 |
是否必选 |
参数类型 |
描述 |
---|---|---|---|
redis_address |
否 |
String |
Redis实例地址。(仅目标端类型为Redis时需要填写) |
redis_type |
否 |
String |
Redis实例类型。(仅目标端类型为Redis时需要填写) |
dcs_instance_id |
否 |
String |
DCS实例ID。(仅目标端类型为Redis时需要填写) |
redis_password |
否 |
String |
Redis密码。(仅目标端类型为Redis时需要填写) |
consumer_strategy |
否 |
String |
转储启动偏移量,latest为获取最新的数据,earliest为获取最早的数据。(仅目标端类型为OBS时需要填写) |
destination_file_type |
否 |
String |
转储文件格式。当前只支持TEXT。(仅目标端类型为OBS时需要填写) |
deliver_time_interval |
否 |
Integer |
数据转储周期(秒),默认配置为300秒。(仅目标端类型为OBS时需要填写) |
access_key |
否 |
String |
AK,访问密钥ID。(仅目标端类型为OBS时需要填写) |
secret_key |
否 |
String |
SK,与访问密钥ID结合使用的密钥。(仅目标端类型为OBS时需要填写) |
obs_bucket_name |
否 |
String |
转储地址,即存储Topic数据的OBS桶的名称。(仅目标端类型为OBS时需要填写) |
obs_path |
否 |
String |
转储目录,即OBS中存储Topic的目录,多级目录可以用“/”进行分隔。(仅目标端类型为OBS时需要填写) |
partition_format |
否 |
String |
时间目录格式。(仅目标端类型为OBS时需要填写)
|
record_delimiter |
否 |
String |
记录分行符,用于分隔写入转储文件的用户数据。(仅目标端类型为OBS时需要填写) 取值范围:
|
store_keys |
否 |
Boolean |
是否转储Key,开启表示转储Key,关闭表示不转储Key。(仅目标端类型为OBS时需要填写) |
响应参数
状态码: 200
参数 |
参数类型 |
描述 |
---|---|---|
task_name |
String |
SmartConnect任务名称。 |
topics |
String |
SmartConnect任务配置的Topic。 |
topics_regex |
String |
SmartConnect任务配置的Topic正则表达式。 |
source_type |
String |
SmartConnect任务的源端类型。 |
source_task |
SmartConnect任务的源端配置。 |
|
sink_type |
String |
SmartConnect任务的目标端类型。 |
sink_task |
SmartConnect任务的目标端配置。 |
|
id |
String |
SmartConnect任务的id。 |
status |
String |
SmartConnect任务的状态。 |
create_time |
Long |
SmartConnect任务的创建时间。 |
参数 |
参数类型 |
描述 |
---|---|---|
redis_address |
String |
Redis实例地址。(仅源端类型为Redis时会显示) |
redis_type |
String |
Redis实例类型。(仅源端类型为Redis时会显示) |
dcs_instance_id |
String |
DCS实例ID。(仅源端类型为Redis时会显示) |
sync_mode |
String |
同步类型,“RDB_ONLY”为全量同步,“CUSTOM_OFFSET”为全量同步+增量同步。(仅源端类型为Redis时会显示) |
full_sync_wait_ms |
Integer |
全量同步重试间隔时间,单位:毫秒。(仅源端类型为Redis时会显示) |
full_sync_max_retry |
Integer |
全量同步最大重试次数。(仅源端类型为Redis时会显示) |
ratelimit |
Integer |
限速,单位为KB/s。-1表示不限速(仅源端类型为Redis时会显示) |
current_cluster_name |
String |
当前Kafka实例别名。(仅源端类型为Kafka时会显示) |
cluster_name |
String |
对端Kafka实例别名。(仅源端类型为Kafka时会显示) |
user_name |
String |
对端Kafka用户名。(仅源端类型为Kafka时会显示) |
sasl_mechanism |
String |
对端Kafka认证机制。(仅源端类型为Kafka时会显示) |
instance_id |
String |
对端Kafka实例ID。(仅源端类型为Kafka时会显示) |
bootstrap_servers |
String |
对端Kafka实例地址。(仅源端类型为Kafka时会显示) |
security_protocol |
String |
对端Kafka认证方式。(仅源端类型为Kafka时会显示) |
direction |
String |
同步方向。(仅源端类型为Kafka时会显示) |
sync_consumer_offsets_enabled |
Boolean |
是否同步消费进度。(仅源端类型为Kafka时会显示) |
replication_factor |
Integer |
副本数。(仅源端类型为Kafka时会显示) |
task_num |
Integer |
任务数。(仅源端类型为Kafka时会显示) |
rename_topic_enabled |
Boolean |
是否重命名Topic。(仅源端类型为Kafka时会显示) |
provenance_header_enabled |
Boolean |
是否添加来源header。(仅源端类型为Kafka时会显示) |
consumer_strategy |
String |
启动偏移量,latest为获取最新的数据,earliest为获取最早的数据。(仅源端类型为Kafka时会显示) |
compression_type |
String |
压缩算法。(仅源端类型为Kafka时会显示) |
topics_mapping |
String |
topic映射。(仅源端类型为Kafka时会显示) |
参数 |
参数类型 |
描述 |
---|---|---|
redis_address |
String |
Redis实例地址。(仅目标端类型为Redis时会显示) |
redis_type |
String |
Redis实例类型。(仅目标端类型为Redis时会显示) |
dcs_instance_id |
String |
DCS实例ID。(仅目标端类型为Redis时会显示) |
target_db |
Integer |
目标数据库,默认为-1。(仅目标端类型为Redis时会显示) |
consumer_strategy |
String |
转储启动偏移量,latest为获取最新的数据,earliest为获取最早的数据。(仅目标端类型为OBS时会显示) |
destination_file_type |
String |
转储文件格式。当前只支持TEXT。(仅目标端类型为OBS时会显示) |
deliver_time_interval |
Integer |
记数据转储周期(秒)。(仅目标端类型为OBS时会显示) |
obs_bucket_name |
String |
转储地址。(仅目标端类型为OBS时会显示) |
obs_path |
String |
转储目录。(仅目标端类型为OBS时会显示) |
partition_format |
String |
时间目录格式。(仅目标端类型为OBS时会显示) |
record_delimiter |
String |
记录分行符。(仅目标端类型为OBS时会显示) |
store_keys |
Boolean |
存储Key。(仅目标端类型为OBS时会显示) |
obs_part_size |
Integer |
每个传输文件多大后就开始上传,单位为byte;默认值5242880。(仅目标端类型为OBS时会显示) |
flush_size |
Integer |
flush_size。(仅目标端类型为OBS时会显示) |
timezone |
String |
时区。(仅目标端类型为OBS时会显示) |
schema_generator_class |
String |
schema_generator类,默认为"io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator"。(仅目标端类型为OBS时会显示) |
partitioner_class |
String |
partitioner类,默认"io.confluent.connect.storage.partitioner.TimeBasedPartitioner"。(仅目标端类型为OBS时会显示) |
value_converter |
String |
value_converter,默认为"org.apache.kafka.connect.converters.ByteArrayConverter"。(仅目标端类型为OBS时会显示) |
key_converter |
String |
key_converter,默认为"org.apache.kafka.connect.converters.ByteArrayConverter"。(仅目标端类型为OBS时会显示) |
kv_delimiter |
String |
kv_delimiter,默认为":"。(仅目标端类型为OBS时会显示) |
请求示例
-
创建一个立即启动的转储任务。
POST https://{endpoint}/v2/{project_id}/instances/{instance_id}/connector/tasks { "task_name" : "smart-connect-1", "start_later" : false, "source_type" : "NONE", "topics_regex" : "topic-obs*", "sink_type" : "OBS_SINK", "sink_task" : { "consumer_strategy" : "earliest", "destination_file_type" : "TEXT", "deliver_time_interval" : 300, "access_key" : "********", "secret_key" : "********", "obs_bucket_name" : "obs_bucket", "obs_path" : "obsTransfer-1810125534", "partition_format" : "yyyy/MM/dd/HH/mm", "record_delimiter" : "\\n", "store_keys" : false } }
-
创建一个稍后启动的Kafka数据复制任务。
POST https://{endpoint}/v2/{project_id}/instances/{instance_id}/connector/tasks { "task_name" : "smart-connect-2", "start_later" : true, "source_type" : "KAFKA_REPLICATOR_SOURCE", "source_task" : { "current_cluster_name" : "A", "cluster_name" : "B", "user_name" : "user1", "password" : "********", "sasl_mechanism" : "SCRAM-SHA-512", "instance_id" : "b54c9dd8-********-********", "direction" : "two-way", "sync_consumer_offsets_enabled" : false, "replication_factor" : 3, "task_num" : 2, "rename_topic_enabled" : false, "provenance_header_enabled" : true, "consumer_strategy" : "latest", "compression_type" : "snappy", "topics_mapping" : "topic-sc-1:topic-sc-3,topic-sc-2:topic-sc-4" } }
-
创建一个立即启动的Redis数据复制任务,同步方式为全量同步,配置全量同步最大重试次数为10,重试间隔时间为10000毫秒,带宽限制为10KB/s。
POST https://{endpoint}/v2/{project_id}/instances/{instance_id}/connector/tasks { "task_name" : "smart-connect-3", "start_later" : false, "source_type" : "REDIS_REPLICATOR_SOURCE", "source_task" : { "redis_address" : "192.168.99.249:6379,192.168.120.127:6379,192.168.116.219:6379", "redis_type" : "cluster", "redis_password" : "********", "sync_mode" : "RDB_ONLY", "full_sync_max_retry" : 10, "full_sync_wait_ms" : 10000, "ratelimit" : 10 }, "topics" : "topic-sc-3", "sink_type" : "REDIS_REPLICATOR_SINK", "sink_task" : { "redis_address" : "192.168.119.51:6379", "redis_type" : "standalone", "redis_password" : "********" } }
响应示例
状态码: 200
创建Smart Connect任务成功。
{ "task_name" : "smart-connect-121248117", "topics" : "topic-sc", "source_task" : { "redis_address" : "192.168.91.179:6379", "redis_type" : "standalone", "dcs_instance_id" : "949190a2-598a-4afd-99a8-dad3cae1e7cd", "sync_mode" : "RDB_ONLY,", "full_sync_wait_ms" : 13000, "full_sync_max_retry" : 4, "ratelimit" : -1 }, "source_type" : "REDIS_REPLICATOR_SOURCE", "sink_task" : { "redis_address" : "192.168.119.51:6379", "redis_type" : "standalone", "dcs_instance_id" : "9b981368-a8e3-416a-87d9-1581a968b41b", "target_db" : -1 }, "sink_type" : "REDIS_REPLICATOR_SINK", "id" : "8a205bbd-7181-4b5e-9bd6-37274ce84577", "status" : "RUNNING", "create_time" : 1708427753133 }
SDK代码示例
SDK代码示例如下。
-
创建一个立即启动的转储任务。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
package com.huaweicloud.sdk.test; import com.huaweicloud.sdk.core.auth.ICredential; import com.huaweicloud.sdk.core.auth.BasicCredentials; import com.huaweicloud.sdk.core.exception.ConnectionException; import com.huaweicloud.sdk.core.exception.RequestTimeoutException; import com.huaweicloud.sdk.core.exception.ServiceResponseException; import com.huaweicloud.sdk.kafka.v2.region.KafkaRegion; import com.huaweicloud.sdk.kafka.v2.*; import com.huaweicloud.sdk.kafka.v2.model.*; public class CreateConnectorTaskSolution { public static void main(String[] args) { // The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security. // In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment String ak = System.getenv("CLOUD_SDK_AK"); String sk = System.getenv("CLOUD_SDK_SK"); String projectId = "{project_id}"; ICredential auth = new BasicCredentials() .withProjectId(projectId) .withAk(ak) .withSk(sk); KafkaClient client = KafkaClient.newBuilder() .withCredential(auth) .withRegion(KafkaRegion.valueOf("<YOUR REGION>")) .build(); CreateConnectorTaskRequest request = new CreateConnectorTaskRequest(); request.withInstanceId("{instance_id}"); CreateSmartConnectTaskReq body = new CreateSmartConnectTaskReq(); SmartConnectTaskReqSinkConfig sinkTaskbody = new SmartConnectTaskReqSinkConfig(); sinkTaskbody.withConsumerStrategy("earliest") .withDestinationFileType("TEXT") .withDeliverTimeInterval(300) .withAccessKey("********") .withSecretKey("********") .withObsBucketName("obs_bucket") .withObsPath("obsTransfer-1810125534") .withPartitionFormat("yyyy/MM/dd/HH/mm") .withRecordDelimiter("\n") .withStoreKeys(false); body.withSinkTask(sinkTaskbody); body.withSinkType(CreateSmartConnectTaskReq.SinkTypeEnum.fromValue("OBS_SINK")); body.withSourceType(CreateSmartConnectTaskReq.SourceTypeEnum.fromValue("NONE")); body.withTopicsRegex("topic-obs*"); body.withStartLater(false); body.withTaskName("smart-connect-1"); request.withBody(body); try { CreateConnectorTaskResponse response = client.createConnectorTask(request); System.out.println(response.toString()); } catch (ConnectionException e) { e.printStackTrace(); } catch (RequestTimeoutException e) { e.printStackTrace(); } catch (ServiceResponseException e) { e.printStackTrace(); System.out.println(e.getHttpStatusCode()); System.out.println(e.getRequestId()); System.out.println(e.getErrorCode()); System.out.println(e.getErrorMsg()); } } }
-
创建一个稍后启动的Kafka数据复制任务。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
package com.huaweicloud.sdk.test; import com.huaweicloud.sdk.core.auth.ICredential; import com.huaweicloud.sdk.core.auth.BasicCredentials; import com.huaweicloud.sdk.core.exception.ConnectionException; import com.huaweicloud.sdk.core.exception.RequestTimeoutException; import com.huaweicloud.sdk.core.exception.ServiceResponseException; import com.huaweicloud.sdk.kafka.v2.region.KafkaRegion; import com.huaweicloud.sdk.kafka.v2.*; import com.huaweicloud.sdk.kafka.v2.model.*; public class CreateConnectorTaskSolution { public static void main(String[] args) { // The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security. // In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment String ak = System.getenv("CLOUD_SDK_AK"); String sk = System.getenv("CLOUD_SDK_SK"); String projectId = "{project_id}"; ICredential auth = new BasicCredentials() .withProjectId(projectId) .withAk(ak) .withSk(sk); KafkaClient client = KafkaClient.newBuilder() .withCredential(auth) .withRegion(KafkaRegion.valueOf("<YOUR REGION>")) .build(); CreateConnectorTaskRequest request = new CreateConnectorTaskRequest(); request.withInstanceId("{instance_id}"); CreateSmartConnectTaskReq body = new CreateSmartConnectTaskReq(); SmartConnectTaskReqSourceConfig sourceTaskbody = new SmartConnectTaskReqSourceConfig(); sourceTaskbody.withCurrentClusterName("A") .withClusterName("B") .withUserName("user1") .withPassword("********") .withSaslMechanism("SCRAM-SHA-512") .withInstanceId("b54c9dd8-********-********") .withDirection("two-way") .withSyncConsumerOffsetsEnabled(false) .withReplicationFactor(3) .withTaskNum(2) .withRenameTopicEnabled(false) .withProvenanceHeaderEnabled(true) .withConsumerStrategy("latest") .withCompressionType("snappy") .withTopicsMapping("topic-sc-1:topic-sc-3,topic-sc-2:topic-sc-4"); body.withSourceTask(sourceTaskbody); body.withSourceType(CreateSmartConnectTaskReq.SourceTypeEnum.fromValue("KAFKA_REPLICATOR_SOURCE")); body.withStartLater(true); body.withTaskName("smart-connect-2"); request.withBody(body); try { CreateConnectorTaskResponse response = client.createConnectorTask(request); System.out.println(response.toString()); } catch (ConnectionException e) { e.printStackTrace(); } catch (RequestTimeoutException e) { e.printStackTrace(); } catch (ServiceResponseException e) { e.printStackTrace(); System.out.println(e.getHttpStatusCode()); System.out.println(e.getRequestId()); System.out.println(e.getErrorCode()); System.out.println(e.getErrorMsg()); } } }
-
创建一个立即启动的Redis数据复制任务,同步方式为全量同步,配置全量同步最大重试次数为10,重试间隔时间为10000毫秒,带宽限制为10KB/s。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69
package com.huaweicloud.sdk.test; import com.huaweicloud.sdk.core.auth.ICredential; import com.huaweicloud.sdk.core.auth.BasicCredentials; import com.huaweicloud.sdk.core.exception.ConnectionException; import com.huaweicloud.sdk.core.exception.RequestTimeoutException; import com.huaweicloud.sdk.core.exception.ServiceResponseException; import com.huaweicloud.sdk.kafka.v2.region.KafkaRegion; import com.huaweicloud.sdk.kafka.v2.*; import com.huaweicloud.sdk.kafka.v2.model.*; public class CreateConnectorTaskSolution { public static void main(String[] args) { // The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security. // In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment String ak = System.getenv("CLOUD_SDK_AK"); String sk = System.getenv("CLOUD_SDK_SK"); String projectId = "{project_id}"; ICredential auth = new BasicCredentials() .withProjectId(projectId) .withAk(ak) .withSk(sk); KafkaClient client = KafkaClient.newBuilder() .withCredential(auth) .withRegion(KafkaRegion.valueOf("<YOUR REGION>")) .build(); CreateConnectorTaskRequest request = new CreateConnectorTaskRequest(); request.withInstanceId("{instance_id}"); CreateSmartConnectTaskReq body = new CreateSmartConnectTaskReq(); SmartConnectTaskReqSinkConfig sinkTaskbody = new SmartConnectTaskReqSinkConfig(); sinkTaskbody.withRedisAddress("192.168.119.51:6379") .withRedisType("standalone") .withRedisPassword("********"); SmartConnectTaskReqSourceConfig sourceTaskbody = new SmartConnectTaskReqSourceConfig(); sourceTaskbody.withRedisAddress("192.168.99.249:6379,192.168.120.127:6379,192.168.116.219:6379") .withRedisType("cluster") .withRedisPassword("********") .withSyncMode("RDB_ONLY") .withFullSyncWaitMs(10000) .withFullSyncMaxRetry(10) .withRatelimit(10); body.withSinkTask(sinkTaskbody); body.withSinkType(CreateSmartConnectTaskReq.SinkTypeEnum.fromValue("REDIS_REPLICATOR_SINK")); body.withSourceTask(sourceTaskbody); body.withSourceType(CreateSmartConnectTaskReq.SourceTypeEnum.fromValue("REDIS_REPLICATOR_SOURCE")); body.withTopics("topic-sc-3"); body.withStartLater(false); body.withTaskName("smart-connect-3"); request.withBody(body); try { CreateConnectorTaskResponse response = client.createConnectorTask(request); System.out.println(response.toString()); } catch (ConnectionException e) { e.printStackTrace(); } catch (RequestTimeoutException e) { e.printStackTrace(); } catch (ServiceResponseException e) { e.printStackTrace(); System.out.println(e.getHttpStatusCode()); System.out.println(e.getRequestId()); System.out.println(e.getErrorCode()); System.out.println(e.getErrorMsg()); } } }
-
创建一个立即启动的转储任务。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
# coding: utf-8 import os from huaweicloudsdkcore.auth.credentials import BasicCredentials from huaweicloudsdkkafka.v2.region.kafka_region import KafkaRegion from huaweicloudsdkcore.exceptions import exceptions from huaweicloudsdkkafka.v2 import * if __name__ == "__main__": # The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security. # In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment ak = os.environ["CLOUD_SDK_AK"] sk = os.environ["CLOUD_SDK_SK"] projectId = "{project_id}" credentials = BasicCredentials(ak, sk, projectId) client = KafkaClient.new_builder() \ .with_credentials(credentials) \ .with_region(KafkaRegion.value_of("<YOUR REGION>")) \ .build() try: request = CreateConnectorTaskRequest() request.instance_id = "{instance_id}" sinkTaskbody = SmartConnectTaskReqSinkConfig( consumer_strategy="earliest", destination_file_type="TEXT", deliver_time_interval=300, access_key="********", secret_key="********", obs_bucket_name="obs_bucket", obs_path="obsTransfer-1810125534", partition_format="yyyy/MM/dd/HH/mm", record_delimiter="\n", store_keys=False ) request.body = CreateSmartConnectTaskReq( sink_task=sinkTaskbody, sink_type="OBS_SINK", source_type="NONE", topics_regex="topic-obs*", start_later=False, task_name="smart-connect-1" ) response = client.create_connector_task(request) print(response) except exceptions.ClientRequestException as e: print(e.status_code) print(e.request_id) print(e.error_code) print(e.error_msg)
-
创建一个稍后启动的Kafka数据复制任务。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
# coding: utf-8 import os from huaweicloudsdkcore.auth.credentials import BasicCredentials from huaweicloudsdkkafka.v2.region.kafka_region import KafkaRegion from huaweicloudsdkcore.exceptions import exceptions from huaweicloudsdkkafka.v2 import * if __name__ == "__main__": # The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security. # In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment ak = os.environ["CLOUD_SDK_AK"] sk = os.environ["CLOUD_SDK_SK"] projectId = "{project_id}" credentials = BasicCredentials(ak, sk, projectId) client = KafkaClient.new_builder() \ .with_credentials(credentials) \ .with_region(KafkaRegion.value_of("<YOUR REGION>")) \ .build() try: request = CreateConnectorTaskRequest() request.instance_id = "{instance_id}" sourceTaskbody = SmartConnectTaskReqSourceConfig( current_cluster_name="A", cluster_name="B", user_name="user1", password="********", sasl_mechanism="SCRAM-SHA-512", instance_id="b54c9dd8-********-********", direction="two-way", sync_consumer_offsets_enabled=False, replication_factor=3, task_num=2, rename_topic_enabled=False, provenance_header_enabled=True, consumer_strategy="latest", compression_type="snappy", topics_mapping="topic-sc-1:topic-sc-3,topic-sc-2:topic-sc-4" ) request.body = CreateSmartConnectTaskReq( source_task=sourceTaskbody, source_type="KAFKA_REPLICATOR_SOURCE", start_later=True, task_name="smart-connect-2" ) response = client.create_connector_task(request) print(response) except exceptions.ClientRequestException as e: print(e.status_code) print(e.request_id) print(e.error_code) print(e.error_msg)
-
创建一个立即启动的Redis数据复制任务,同步方式为全量同步,配置全量同步最大重试次数为10,重试间隔时间为10000毫秒,带宽限制为10KB/s。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
# coding: utf-8 import os from huaweicloudsdkcore.auth.credentials import BasicCredentials from huaweicloudsdkkafka.v2.region.kafka_region import KafkaRegion from huaweicloudsdkcore.exceptions import exceptions from huaweicloudsdkkafka.v2 import * if __name__ == "__main__": # The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security. # In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment ak = os.environ["CLOUD_SDK_AK"] sk = os.environ["CLOUD_SDK_SK"] projectId = "{project_id}" credentials = BasicCredentials(ak, sk, projectId) client = KafkaClient.new_builder() \ .with_credentials(credentials) \ .with_region(KafkaRegion.value_of("<YOUR REGION>")) \ .build() try: request = CreateConnectorTaskRequest() request.instance_id = "{instance_id}" sinkTaskbody = SmartConnectTaskReqSinkConfig( redis_address="192.168.119.51:6379", redis_type="standalone", redis_password="********" ) sourceTaskbody = SmartConnectTaskReqSourceConfig( redis_address="192.168.99.249:6379,192.168.120.127:6379,192.168.116.219:6379", redis_type="cluster", redis_password="********", sync_mode="RDB_ONLY", full_sync_wait_ms=10000, full_sync_max_retry=10, ratelimit=10 ) request.body = CreateSmartConnectTaskReq( sink_task=sinkTaskbody, sink_type="REDIS_REPLICATOR_SINK", source_task=sourceTaskbody, source_type="REDIS_REPLICATOR_SOURCE", topics="topic-sc-3", start_later=False, task_name="smart-connect-3" ) response = client.create_connector_task(request) print(response) except exceptions.ClientRequestException as e: print(e.status_code) print(e.request_id) print(e.error_code) print(e.error_msg)
-
创建一个立即启动的转储任务。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
package main import ( "fmt" "github.com/huaweicloud/huaweicloud-sdk-go-v3/core/auth/basic" kafka "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/kafka/v2" "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/kafka/v2/model" region "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/kafka/v2/region" ) func main() { // The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security. // In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment ak := os.Getenv("CLOUD_SDK_AK") sk := os.Getenv("CLOUD_SDK_SK") projectId := "{project_id}" auth := basic.NewCredentialsBuilder(). WithAk(ak). WithSk(sk). WithProjectId(projectId). Build() client := kafka.NewKafkaClient( kafka.KafkaClientBuilder(). WithRegion(region.ValueOf("<YOUR REGION>")). WithCredential(auth). Build()) request := &model.CreateConnectorTaskRequest{} request.InstanceId = "{instance_id}" consumerStrategySinkTask:= "earliest" destinationFileTypeSinkTask:= "TEXT" deliverTimeIntervalSinkTask:= int32(300) accessKeySinkTask:= "********" secretKeySinkTask:= "********" obsBucketNameSinkTask:= "obs_bucket" obsPathSinkTask:= "obsTransfer-1810125534" partitionFormatSinkTask:= "yyyy/MM/dd/HH/mm" recordDelimiterSinkTask:= "\n" storeKeysSinkTask:= false sinkTaskbody := &model.SmartConnectTaskReqSinkConfig{ ConsumerStrategy: &consumerStrategySinkTask, DestinationFileType: &destinationFileTypeSinkTask, DeliverTimeInterval: &deliverTimeIntervalSinkTask, AccessKey: &accessKeySinkTask, SecretKey: &secretKeySinkTask, ObsBucketName: &obsBucketNameSinkTask, ObsPath: &obsPathSinkTask, PartitionFormat: &partitionFormatSinkTask, RecordDelimiter: &recordDelimiterSinkTask, StoreKeys: &storeKeysSinkTask, } sinkTypeCreateSmartConnectTaskReq:= model.GetCreateSmartConnectTaskReqSinkTypeEnum().OBS_SINK sourceTypeCreateSmartConnectTaskReq:= model.GetCreateSmartConnectTaskReqSourceTypeEnum().NONE topicsRegexCreateSmartConnectTaskReq:= "topic-obs*" startLaterCreateSmartConnectTaskReq:= false taskNameCreateSmartConnectTaskReq:= "smart-connect-1" request.Body = &model.CreateSmartConnectTaskReq{ SinkTask: sinkTaskbody, SinkType: &sinkTypeCreateSmartConnectTaskReq, SourceType: &sourceTypeCreateSmartConnectTaskReq, TopicsRegex: &topicsRegexCreateSmartConnectTaskReq, StartLater: &startLaterCreateSmartConnectTaskReq, TaskName: &taskNameCreateSmartConnectTaskReq, } response, err := client.CreateConnectorTask(request) if err == nil { fmt.Printf("%+v\n", response) } else { fmt.Println(err) } }
-
创建一个稍后启动的Kafka数据复制任务。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79
package main import ( "fmt" "github.com/huaweicloud/huaweicloud-sdk-go-v3/core/auth/basic" kafka "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/kafka/v2" "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/kafka/v2/model" region "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/kafka/v2/region" ) func main() { // The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security. // In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment ak := os.Getenv("CLOUD_SDK_AK") sk := os.Getenv("CLOUD_SDK_SK") projectId := "{project_id}" auth := basic.NewCredentialsBuilder(). WithAk(ak). WithSk(sk). WithProjectId(projectId). Build() client := kafka.NewKafkaClient( kafka.KafkaClientBuilder(). WithRegion(region.ValueOf("<YOUR REGION>")). WithCredential(auth). Build()) request := &model.CreateConnectorTaskRequest{} request.InstanceId = "{instance_id}" currentClusterNameSourceTask:= "A" clusterNameSourceTask:= "B" userNameSourceTask:= "user1" passwordSourceTask:= "********" saslMechanismSourceTask:= "SCRAM-SHA-512" instanceIdSourceTask:= "b54c9dd8-********-********" directionSourceTask:= "two-way" syncConsumerOffsetsEnabledSourceTask:= false replicationFactorSourceTask:= int32(3) taskNumSourceTask:= int32(2) renameTopicEnabledSourceTask:= false provenanceHeaderEnabledSourceTask:= true consumerStrategySourceTask:= "latest" compressionTypeSourceTask:= "snappy" topicsMappingSourceTask:= "topic-sc-1:topic-sc-3,topic-sc-2:topic-sc-4" sourceTaskbody := &model.SmartConnectTaskReqSourceConfig{ CurrentClusterName: ¤tClusterNameSourceTask, ClusterName: &clusterNameSourceTask, UserName: &userNameSourceTask, Password: &passwordSourceTask, SaslMechanism: &saslMechanismSourceTask, InstanceId: &instanceIdSourceTask, Direction: &directionSourceTask, SyncConsumerOffsetsEnabled: &syncConsumerOffsetsEnabledSourceTask, ReplicationFactor: &replicationFactorSourceTask, TaskNum: &taskNumSourceTask, RenameTopicEnabled: &renameTopicEnabledSourceTask, ProvenanceHeaderEnabled: &provenanceHeaderEnabledSourceTask, ConsumerStrategy: &consumerStrategySourceTask, CompressionType: &compressionTypeSourceTask, TopicsMapping: &topicsMappingSourceTask, } sourceTypeCreateSmartConnectTaskReq:= model.GetCreateSmartConnectTaskReqSourceTypeEnum().KAFKA_REPLICATOR_SOURCE startLaterCreateSmartConnectTaskReq:= true taskNameCreateSmartConnectTaskReq:= "smart-connect-2" request.Body = &model.CreateSmartConnectTaskReq{ SourceTask: sourceTaskbody, SourceType: &sourceTypeCreateSmartConnectTaskReq, StartLater: &startLaterCreateSmartConnectTaskReq, TaskName: &taskNameCreateSmartConnectTaskReq, } response, err := client.CreateConnectorTask(request) if err == nil { fmt.Printf("%+v\n", response) } else { fmt.Println(err) } }
-
创建一个立即启动的Redis数据复制任务,同步方式为全量同步,配置全量同步最大重试次数为10,重试间隔时间为10000毫秒,带宽限制为10KB/s。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76
package main import ( "fmt" "github.com/huaweicloud/huaweicloud-sdk-go-v3/core/auth/basic" kafka "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/kafka/v2" "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/kafka/v2/model" region "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/kafka/v2/region" ) func main() { // The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security. // In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment ak := os.Getenv("CLOUD_SDK_AK") sk := os.Getenv("CLOUD_SDK_SK") projectId := "{project_id}" auth := basic.NewCredentialsBuilder(). WithAk(ak). WithSk(sk). WithProjectId(projectId). Build() client := kafka.NewKafkaClient( kafka.KafkaClientBuilder(). WithRegion(region.ValueOf("<YOUR REGION>")). WithCredential(auth). Build()) request := &model.CreateConnectorTaskRequest{} request.InstanceId = "{instance_id}" redisAddressSinkTask:= "192.168.119.51:6379" redisTypeSinkTask:= "standalone" redisPasswordSinkTask:= "********" sinkTaskbody := &model.SmartConnectTaskReqSinkConfig{ RedisAddress: &redisAddressSinkTask, RedisType: &redisTypeSinkTask, RedisPassword: &redisPasswordSinkTask, } redisAddressSourceTask:= "192.168.99.249:6379,192.168.120.127:6379,192.168.116.219:6379" redisTypeSourceTask:= "cluster" redisPasswordSourceTask:= "********" syncModeSourceTask:= "RDB_ONLY" fullSyncWaitMsSourceTask:= int32(10000) fullSyncMaxRetrySourceTask:= int32(10) ratelimitSourceTask:= int32(10) sourceTaskbody := &model.SmartConnectTaskReqSourceConfig{ RedisAddress: &redisAddressSourceTask, RedisType: &redisTypeSourceTask, RedisPassword: &redisPasswordSourceTask, SyncMode: &syncModeSourceTask, FullSyncWaitMs: &fullSyncWaitMsSourceTask, FullSyncMaxRetry: &fullSyncMaxRetrySourceTask, Ratelimit: &ratelimitSourceTask, } sinkTypeCreateSmartConnectTaskReq:= model.GetCreateSmartConnectTaskReqSinkTypeEnum().REDIS_REPLICATOR_SINK sourceTypeCreateSmartConnectTaskReq:= model.GetCreateSmartConnectTaskReqSourceTypeEnum().REDIS_REPLICATOR_SOURCE topicsCreateSmartConnectTaskReq:= "topic-sc-3" startLaterCreateSmartConnectTaskReq:= false taskNameCreateSmartConnectTaskReq:= "smart-connect-3" request.Body = &model.CreateSmartConnectTaskReq{ SinkTask: sinkTaskbody, SinkType: &sinkTypeCreateSmartConnectTaskReq, SourceTask: sourceTaskbody, SourceType: &sourceTypeCreateSmartConnectTaskReq, Topics: &topicsCreateSmartConnectTaskReq, StartLater: &startLaterCreateSmartConnectTaskReq, TaskName: &taskNameCreateSmartConnectTaskReq, } response, err := client.CreateConnectorTask(request) if err == nil { fmt.Printf("%+v\n", response) } else { fmt.Println(err) } }
更多编程语言的SDK代码示例,请参见API Explorer的代码示例页签,可生成自动对应的SDK代码示例。
状态码
状态码 |
描述 |
---|---|
200 |
创建Smart Connect任务成功。 |
错误码
请参见错误码。