Creating a Smart Connect Task
Function
This API is used to create a Smart Connect task.
Calling Method
For details, see Calling APIs.
URI
POST /v2/{project_id}/instances/{instance_id}/connector/tasks
Parameter |
Mandatory |
Type |
Description |
---|---|---|---|
project_id |
Yes |
String |
Project ID. For details, see Obtaining a Project ID. |
instance_id |
Yes |
String |
Instance ID. |
Request Parameters
Parameter |
Mandatory |
Type |
Description |
---|---|---|---|
task_name |
No |
String |
Smart Connect task name. |
start_later |
No |
Boolean |
Indicates whether to start a task later. false: to create a task and start immediately; true: to create a task and manually start it later in the task list. |
topics |
No |
String |
Topic of a Smart Connect task. |
topics_regex |
No |
String |
Regular expression of the topic of a Smart Connect task. |
source_type |
No |
String |
Source type of a Smart Connect task. |
source_task |
No |
Source configuration of a Smart Connect task. |
|
sink_type |
No |
String |
Target type of a Smart Connect task. |
sink_task |
No |
Target type of a Smart Connect task. |
Parameter |
Mandatory |
Type |
Description |
---|---|---|---|
redis_address |
No |
String |
Redis instance address. (Mandatory only when the source type is Redis.) |
redis_type |
No |
String |
Redis instance type. (Mandatory only when the source type is Redis.) |
dcs_instance_id |
No |
String |
DCS instance ID. (Mandatory only when the source type is Redis.) |
redis_password |
No |
String |
Redis password. (Mandatory only when the source type is Redis.) |
sync_mode |
No |
String |
Synchronization type: RDB_ONLY indicates full synchronization; CUSTOM_OFFSET indicates full and incremental synchronization. (Mandatory only when the source type is Redis.) |
full_sync_wait_ms |
No |
Integer |
Interval of full synchronization retries, in ms. (Mandatory only when the source type is Redis.) |
full_sync_max_retry |
No |
Integer |
Max. retries of full synchronization. (Mandatory only when the source type is Redis.) |
ratelimit |
No |
Integer |
Rate limit, in KB/s. -1: There is no rate limit. (Mandatory only when the source type is Redis.) |
current_cluster_name |
No |
String |
Current Kafka instance name. (Mandatory only when the source type is Kafka.) |
cluster_name |
No |
String |
Target Kafka instance name. (Mandatory only when the source type is Kafka.) |
user_name |
No |
String |
Username set when SASL_SSL was enabled for a target Kafka instance, or when a SASL_SSL user was created. (Mandatory only when the source type is Kafka and the target Kafka authentication mode is SASL_SSL.) |
password |
No |
String |
Password set when SASL_SSL was enabled for a target Kafka instance, or when a SASL_SSL user was created. (Mandatory only when the source type is Kafka and the target Kafka authentication mode is SASL_SSL.) |
sasl_mechanism |
No |
String |
Target Kafka authentication mode. (Mandatory only when the source type is Kafka and the authentication mode is SASL_SSL.) |
instance_id |
No |
String |
Target Kafka instance ID. (Mandatory only when the source type is Kafka. Specify either instance_id or bootstrap_servers.) |
bootstrap_servers |
No |
String |
Target Kafka instance address. (Mandatory only when the source type is Kafka. Specify either instance_id or bootstrap_servers.) |
security_protocol |
No |
String |
Target Kafka authentication. (Mandatory only when the source type is Kafka). There are two authentication modes:
|
direction |
No |
String |
Sync direction: pull replicates data from the target Kafka instance to the current one; push replicates data from the source Kafka instance to the target one; two-way replicates data from the source and target Kafka instance data to each other. (Mandatory only when the source type is Kafka.) |
sync_consumer_offsets_enabled |
No |
Boolean |
Indicates whether to sync the consumption progress. (Mandatory only when the source type is Kafka.) |
replication_factor |
No |
Integer |
Number of topic replicas when a topic is automatically created in the peer instance. The value of this parameter cannot exceed the number of brokers in the peer instance. This parameter overrides the default.replication.factor parameter configured in the target instance. (Mandatory only when the source type is Kafka.) |
task_num |
No |
Integer |
Number of data replication tasks. The default value is 2. You are advised to use the default value. If the sync direction is set to two-way, the actual number of tasks will be twice the number of tasks you configure here. (Mandatory only when the source type is Kafka.) |
rename_topic_enabled |
No |
Boolean |
Indicates whether to rename a topic. If yes, add the alias of the source Kafka instance before the target topic name to form a new name of the target topic. (Mandatory only when the source type is Kafka.) |
provenance_header_enabled |
No |
Boolean |
The target topic receives the replicated messages. The message header contains the message source. If you select Both for Sync Direction, enable Add Source Header to prevent infinite replication. (Mandatory only when the source type is Kafka.) |
consumer_strategy |
No |
String |
Start offset. latest: Obtain the latest data; earliest: Obtain the earliest data. (Mandatory only when the source type is Kafka.) |
compression_type |
No |
String |
Compression algorithm to use for copying messages. (Mandatory only when the source type is Kafka.)
|
topics_mapping |
No |
String |
Topic mapping, which is used to customize the target topic name. Rename Topic and Topic Mapping cannot be configured at the same time. Topic mapping format: source topic:target topic. Use commas (,) to separate multiple topic mappings, for example, topic-sc-1:topic-sc-2,topic-sc-3:topic-sc-4. (Mandatory only when the source type is Kafka.) |
Parameter |
Mandatory |
Type |
Description |
---|---|---|---|
redis_address |
No |
String |
Redis instance address. (Mandatory only when the target type is Redis.) |
redis_type |
No |
String |
Redis instance type. (Mandatory only when the target type is Redis.) |
dcs_instance_id |
No |
String |
DCS instance ID. (Mandatory only when the target type is Redis.) |
redis_password |
No |
String |
Redis password. (Mandatory only when the target type is Redis.) |
consumer_strategy |
No |
String |
Start offset. latest: Obtain the latest data; earliest: Obtain the earliest data. (Mandatory only when the target type is OBS.) |
destination_file_type |
No |
String |
Dump file format. Only TEXT is supported. (Mandatory only when the target type is OBS.) |
deliver_time_interval |
No |
Integer |
Data dumping intervals, in seconds. The default interval is 300s. (Mandatory only when the target type is OBS.) |
access_key |
No |
String |
AK: access key ID. (Mandatory only when the target type is OBS.) |
secret_key |
No |
String |
SK: secret access key used together with the access key ID. (Mandatory only when the target type is OBS.) |
obs_bucket_name |
No |
String |
Dumping address, which is the OBS bucket used to store the topic data. (Mandatory only when the target type is OBS.) |
obs_path |
No |
String |
Dumping directory, which is the directory for storing topic files dumped to OBS. Use slashes (/) to separate directory levels. (Mandatory only when the target type is OBS.) |
partition_format |
No |
String |
Time directory format. (Mandatory only when the target type is OBS.)
|
record_delimiter |
No |
String |
Line break, which is used to separate the user data that is written into the dump file. (Mandatory only when the target type is OBS.) Value range:
|
store_keys |
No |
Boolean |
Specifies whether to dump keys. (Mandatory only when the target type is OBS.) |
Response Parameters
Status code: 200
Parameter |
Type |
Description |
---|---|---|
task_name |
String |
Smart Connect task name. |
topics |
String |
Topic of a Smart Connect task. |
topics_regex |
String |
Regular expression of the topic of a Smart Connect task. |
source_type |
String |
Source type of a Smart Connect task. |
source_task |
Source configuration of a Smart Connect task. |
|
sink_type |
String |
Target type of a Smart Connect task. |
sink_task |
Target type of a Smart Connect task. |
|
id |
String |
ID of a Smart Connect task. |
status |
String |
Smart Connect task status. |
create_time |
Long |
Time when the Smart Connect task was created. |
Parameter |
Type |
Description |
---|---|---|
redis_address |
String |
Redis instance address. (Displayed only when the source type is Redis.) |
redis_type |
String |
Redis instance type. (Displayed only when the source type is Redis.) |
dcs_instance_id |
String |
DCS instance ID. (Displayed only when the source type is Redis.) |
sync_mode |
String |
Synchronization type: RDB_ONLY indicates full synchronization; CUSTOM_OFFSET indicates full and incremental synchronization. (Displayed only when the source type is Redis.) |
full_sync_wait_ms |
Integer |
Interval of full synchronization retries, in ms. (Displayed only when the source type is Redis.) |
full_sync_max_retry |
Integer |
Max. retries of full synchronization. (Displayed only when the source type is Redis.) |
ratelimit |
Integer |
Rate limit, in KB/s. -1: disable. (Displayed only when the source type is Redis.) |
current_cluster_name |
String |
Current Kafka instance name. (Displayed only when the source type is Kafka.) |
cluster_name |
String |
Target Kafka instance name. (Displayed only when the source type is Kafka.) |
user_name |
String |
Username of the target Kafka instance. (Displayed only when the source type is Kafka.) |
sasl_mechanism |
String |
Target Kafka authentication mode. (Displayed only when the source type is Kafka.) |
instance_id |
String |
Target Kafka instance ID. (Displayed only when the source type is Kafka.) |
bootstrap_servers |
String |
Target Kafka instance address. (Displayed only when the source type is Kafka.) |
security_protocol |
String |
Target Kafka authentication. (Displayed only when the source type is Kafka.) |
direction |
String |
Sync direction. (Displayed only when the source type is Kafka.) |
sync_consumer_offsets_enabled |
Boolean |
Indicates whether to sync the consumption progress. (Displayed only when the source type is Kafka.) |
replication_factor |
Integer |
Number of replicas. (Displayed only when the source type is Kafka.) |
task_num |
Integer |
Number of tasks. (Displayed only when the source type is Kafka.) |
rename_topic_enabled |
Boolean |
Indicates whether to rename a topic. (Displayed only when the source type is Kafka.) |
provenance_header_enabled |
Boolean |
Indicates whether to add the source header. (Displayed only when the source type is Kafka.) |
consumer_strategy |
String |
Start offset. latest: Obtain the latest data; earliest: Obtain the earliest data. (Displayed only when the source type is Kafka.) |
compression_type |
String |
Compression algorithm. (Displayed only when the source type is Kafka.) |
topics_mapping |
String |
Topic mapping. (Displayed only when the source type is Kafka.) |
Parameter |
Type |
Description |
---|---|---|
redis_address |
String |
Redis instance address. (Displayed only when the target type is Redis.) |
redis_type |
String |
Redis instance type. (Displayed only when the target type is Redis.) |
dcs_instance_id |
String |
DCS instance ID. (Displayed only when the target type is Redis.) |
target_db |
Integer |
Target database. The default value is -1. (Displayed only when the target type is Redis.) |
consumer_strategy |
String |
Start offset. latest: Obtain the latest data; earliest: Obtain the earliest data. (Displayed only when the target type is OBS.) |
destination_file_type |
String |
Dump file format. Only TEXT is supported. (Displayed only when the target type is OBS.) |
deliver_time_interval |
Integer |
Dumping period (s). (Displayed only when the target type is OBS.) |
obs_bucket_name |
String |
Dumping address. (Displayed only when the target type is OBS.) |
obs_path |
String |
Dump directory. (Displayed only when the target type is OBS.) |
partition_format |
String |
Time directory format. (Displayed only when the target type is OBS.) |
record_delimiter |
String |
Line break. (Displayed only when the target type is OBS.) |
store_keys |
Boolean |
Storage key. (Displayed only when the target type is OBS.) |
obs_part_size |
Integer |
Size (in bytes) of each file to be uploaded. The default value is 5242880. (Displayed only when the target type is OBS.) |
flush_size |
Integer |
flush_size. (Displayed only when the target type is OBS.) |
timezone |
String |
Time zone. (Displayed only when the target type is OBS.) |
schema_generator_class |
String |
schema_generator class. The default value is io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator. (Displayed only when the target type is OBS.) |
partitioner_class |
String |
partitioner class. The default value is io.confluent.connect.storage.partitioner.TimeBasedPartitioner. (Displayed only when the target type is OBS.) |
value_converter |
String |
value_converter. The default value is org.apache.kafka.connect.converters.ByteArrayConverter. (Displayed only when the target type is OBS.) |
key_converter |
String |
key_converter. The default value is org.apache.kafka.connect.converters.ByteArrayConverter. (Displayed only when the target type is OBS.) |
kv_delimiter |
String |
kv_delimiter. The default value is :. (Displayed only when the target type is OBS.) |
Example Requests
-
Creating a dumping task that starts immediately.
POST https://{endpoint}/v2/{project_id}/instances/{instance_id}/connector/tasks { "task_name" : "smart-connect-1", "start_later" : false, "source_type" : "NONE", "topics_regex" : "topic-obs*", "sink_type" : "OBS_SINK", "sink_task" : { "consumer_strategy" : "earliest", "destination_file_type" : "TEXT", "deliver_time_interval" : 300, "access_key" : "********", "secret_key" : "********", "obs_bucket_name" : "obs_bucket", "obs_path" : "obsTransfer-1810125534", "partition_format" : "yyyy/MM/dd/HH/mm", "record_delimiter" : "\\n", "store_keys" : false } }
-
Creating a Kafka data replication task that starts later.
POST https://{endpoint}/v2/{project_id}/instances/{instance_id}/connector/tasks { "task_name" : "smart-connect-2", "start_later" : true, "source_type" : "KAFKA_REPLICATOR_SOURCE", "source_task" : { "current_cluster_name" : "A", "cluster_name" : "B", "user_name" : "user1", "password" : "********", "sasl_mechanism" : "SCRAM-SHA-512", "instance_id" : "b54c9dd8-********-********", "direction" : "two-way", "sync_consumer_offsets_enabled" : false, "replication_factor" : 3, "task_num" : 2, "rename_topic_enabled" : false, "provenance_header_enabled" : true, "consumer_strategy" : "latest", "compression_type" : "snappy", "topics_mapping" : "topic-sc-1:topic-sc-3,topic-sc-2:topic-sc-4" } }
-
Creating a Redis data replication task that starts immediately: full synchronization, 10 maximum retry times, 10,000 ms retry interval, and 10 KB/s bandwidth limit.
POST https://{endpoint}/v2/{project_id}/instances/{instance_id}/connector/tasks { "task_name" : "smart-connect-3", "start_later" : false, "source_type" : "REDIS_REPLICATOR_SOURCE", "source_task" : { "redis_address" : "192.168.99.249:6379,192.168.120.127:6379,192.168.116.219:6379", "redis_type" : "cluster", "redis_password" : "********", "sync_mode" : "RDB_ONLY", "full_sync_max_retry" : 10, "full_sync_wait_ms" : 10000, "ratelimit" : 10 }, "topics" : "topic-sc-3", "sink_type" : "REDIS_REPLICATOR_SINK", "sink_task" : { "redis_address" : "192.168.119.51:6379", "redis_type" : "standalone", "redis_password" : "********" } }
Example Responses
Status code: 200
Successful.
{ "task_name" : "smart-connect-121248117", "topics" : "topic-sc", "source_task" : { "redis_address" : "192.168.91.179:6379", "redis_type" : "standalone", "dcs_instance_id" : "949190a2-598a-4afd-99a8-dad3cae1e7cd", "sync_mode" : "RDB_ONLY,", "full_sync_wait_ms" : 13000, "full_sync_max_retry" : 4, "ratelimit" : -1 }, "source_type" : "REDIS_REPLICATOR_SOURCE", "sink_task" : { "redis_address" : "192.168.119.51:6379", "redis_type" : "standalone", "dcs_instance_id" : "9b981368-a8e3-416a-87d9-1581a968b41b", "target_db" : -1 }, "sink_type" : "REDIS_REPLICATOR_SINK", "id" : "8a205bbd-7181-4b5e-9bd6-37274ce84577", "status" : "RUNNING", "create_time" : 1708427753133 }
SDK Sample Code
The SDK sample code is as follows.
-
Creating a dumping task that starts immediately.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
package com.huaweicloud.sdk.test; import com.huaweicloud.sdk.core.auth.ICredential; import com.huaweicloud.sdk.core.auth.BasicCredentials; import com.huaweicloud.sdk.core.exception.ConnectionException; import com.huaweicloud.sdk.core.exception.RequestTimeoutException; import com.huaweicloud.sdk.core.exception.ServiceResponseException; import com.huaweicloud.sdk.kafka.v2.region.KafkaRegion; import com.huaweicloud.sdk.kafka.v2.*; import com.huaweicloud.sdk.kafka.v2.model.*; public class CreateConnectorTaskSolution { public static void main(String[] args) { // The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security. // In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment String ak = System.getenv("CLOUD_SDK_AK"); String sk = System.getenv("CLOUD_SDK_SK"); String projectId = "{project_id}"; ICredential auth = new BasicCredentials() .withProjectId(projectId) .withAk(ak) .withSk(sk); KafkaClient client = KafkaClient.newBuilder() .withCredential(auth) .withRegion(KafkaRegion.valueOf("<YOUR REGION>")) .build(); CreateConnectorTaskRequest request = new CreateConnectorTaskRequest(); request.withInstanceId("{instance_id}"); CreateSmartConnectTaskReq body = new CreateSmartConnectTaskReq(); SmartConnectTaskReqSinkConfig sinkTaskbody = new SmartConnectTaskReqSinkConfig(); sinkTaskbody.withConsumerStrategy("earliest") .withDestinationFileType("TEXT") .withDeliverTimeInterval(300) .withAccessKey("********") .withSecretKey("********") .withObsBucketName("obs_bucket") .withObsPath("obsTransfer-1810125534") .withPartitionFormat("yyyy/MM/dd/HH/mm") .withRecordDelimiter("\n") .withStoreKeys(false); body.withSinkTask(sinkTaskbody); body.withSinkType(CreateSmartConnectTaskReq.SinkTypeEnum.fromValue("OBS_SINK")); body.withSourceType(CreateSmartConnectTaskReq.SourceTypeEnum.fromValue("NONE")); body.withTopicsRegex("topic-obs*"); body.withStartLater(false); body.withTaskName("smart-connect-1"); request.withBody(body); try { CreateConnectorTaskResponse response = client.createConnectorTask(request); System.out.println(response.toString()); } catch (ConnectionException e) { e.printStackTrace(); } catch (RequestTimeoutException e) { e.printStackTrace(); } catch (ServiceResponseException e) { e.printStackTrace(); System.out.println(e.getHttpStatusCode()); System.out.println(e.getRequestId()); System.out.println(e.getErrorCode()); System.out.println(e.getErrorMsg()); } } }
-
Creating a Kafka data replication task that starts later.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
package com.huaweicloud.sdk.test; import com.huaweicloud.sdk.core.auth.ICredential; import com.huaweicloud.sdk.core.auth.BasicCredentials; import com.huaweicloud.sdk.core.exception.ConnectionException; import com.huaweicloud.sdk.core.exception.RequestTimeoutException; import com.huaweicloud.sdk.core.exception.ServiceResponseException; import com.huaweicloud.sdk.kafka.v2.region.KafkaRegion; import com.huaweicloud.sdk.kafka.v2.*; import com.huaweicloud.sdk.kafka.v2.model.*; public class CreateConnectorTaskSolution { public static void main(String[] args) { // The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security. // In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment String ak = System.getenv("CLOUD_SDK_AK"); String sk = System.getenv("CLOUD_SDK_SK"); String projectId = "{project_id}"; ICredential auth = new BasicCredentials() .withProjectId(projectId) .withAk(ak) .withSk(sk); KafkaClient client = KafkaClient.newBuilder() .withCredential(auth) .withRegion(KafkaRegion.valueOf("<YOUR REGION>")) .build(); CreateConnectorTaskRequest request = new CreateConnectorTaskRequest(); request.withInstanceId("{instance_id}"); CreateSmartConnectTaskReq body = new CreateSmartConnectTaskReq(); SmartConnectTaskReqSourceConfig sourceTaskbody = new SmartConnectTaskReqSourceConfig(); sourceTaskbody.withCurrentClusterName("A") .withClusterName("B") .withUserName("user1") .withPassword("********") .withSaslMechanism("SCRAM-SHA-512") .withInstanceId("b54c9dd8-********-********") .withDirection("two-way") .withSyncConsumerOffsetsEnabled(false) .withReplicationFactor(3) .withTaskNum(2) .withRenameTopicEnabled(false) .withProvenanceHeaderEnabled(true) .withConsumerStrategy("latest") .withCompressionType("snappy") .withTopicsMapping("topic-sc-1:topic-sc-3,topic-sc-2:topic-sc-4"); body.withSourceTask(sourceTaskbody); body.withSourceType(CreateSmartConnectTaskReq.SourceTypeEnum.fromValue("KAFKA_REPLICATOR_SOURCE")); body.withStartLater(true); body.withTaskName("smart-connect-2"); request.withBody(body); try { CreateConnectorTaskResponse response = client.createConnectorTask(request); System.out.println(response.toString()); } catch (ConnectionException e) { e.printStackTrace(); } catch (RequestTimeoutException e) { e.printStackTrace(); } catch (ServiceResponseException e) { e.printStackTrace(); System.out.println(e.getHttpStatusCode()); System.out.println(e.getRequestId()); System.out.println(e.getErrorCode()); System.out.println(e.getErrorMsg()); } } }
-
Creating a Redis data replication task that starts immediately: full synchronization, 10 maximum retry times, 10,000 ms retry interval, and 10 KB/s bandwidth limit.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69
package com.huaweicloud.sdk.test; import com.huaweicloud.sdk.core.auth.ICredential; import com.huaweicloud.sdk.core.auth.BasicCredentials; import com.huaweicloud.sdk.core.exception.ConnectionException; import com.huaweicloud.sdk.core.exception.RequestTimeoutException; import com.huaweicloud.sdk.core.exception.ServiceResponseException; import com.huaweicloud.sdk.kafka.v2.region.KafkaRegion; import com.huaweicloud.sdk.kafka.v2.*; import com.huaweicloud.sdk.kafka.v2.model.*; public class CreateConnectorTaskSolution { public static void main(String[] args) { // The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security. // In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment String ak = System.getenv("CLOUD_SDK_AK"); String sk = System.getenv("CLOUD_SDK_SK"); String projectId = "{project_id}"; ICredential auth = new BasicCredentials() .withProjectId(projectId) .withAk(ak) .withSk(sk); KafkaClient client = KafkaClient.newBuilder() .withCredential(auth) .withRegion(KafkaRegion.valueOf("<YOUR REGION>")) .build(); CreateConnectorTaskRequest request = new CreateConnectorTaskRequest(); request.withInstanceId("{instance_id}"); CreateSmartConnectTaskReq body = new CreateSmartConnectTaskReq(); SmartConnectTaskReqSinkConfig sinkTaskbody = new SmartConnectTaskReqSinkConfig(); sinkTaskbody.withRedisAddress("192.168.119.51:6379") .withRedisType("standalone") .withRedisPassword("********"); SmartConnectTaskReqSourceConfig sourceTaskbody = new SmartConnectTaskReqSourceConfig(); sourceTaskbody.withRedisAddress("192.168.99.249:6379,192.168.120.127:6379,192.168.116.219:6379") .withRedisType("cluster") .withRedisPassword("********") .withSyncMode("RDB_ONLY") .withFullSyncWaitMs(10000) .withFullSyncMaxRetry(10) .withRatelimit(10); body.withSinkTask(sinkTaskbody); body.withSinkType(CreateSmartConnectTaskReq.SinkTypeEnum.fromValue("REDIS_REPLICATOR_SINK")); body.withSourceTask(sourceTaskbody); body.withSourceType(CreateSmartConnectTaskReq.SourceTypeEnum.fromValue("REDIS_REPLICATOR_SOURCE")); body.withTopics("topic-sc-3"); body.withStartLater(false); body.withTaskName("smart-connect-3"); request.withBody(body); try { CreateConnectorTaskResponse response = client.createConnectorTask(request); System.out.println(response.toString()); } catch (ConnectionException e) { e.printStackTrace(); } catch (RequestTimeoutException e) { e.printStackTrace(); } catch (ServiceResponseException e) { e.printStackTrace(); System.out.println(e.getHttpStatusCode()); System.out.println(e.getRequestId()); System.out.println(e.getErrorCode()); System.out.println(e.getErrorMsg()); } } }
-
Creating a dumping task that starts immediately.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
# coding: utf-8 import os from huaweicloudsdkcore.auth.credentials import BasicCredentials from huaweicloudsdkkafka.v2.region.kafka_region import KafkaRegion from huaweicloudsdkcore.exceptions import exceptions from huaweicloudsdkkafka.v2 import * if __name__ == "__main__": # The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security. # In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment ak = os.environ["CLOUD_SDK_AK"] sk = os.environ["CLOUD_SDK_SK"] projectId = "{project_id}" credentials = BasicCredentials(ak, sk, projectId) client = KafkaClient.new_builder() \ .with_credentials(credentials) \ .with_region(KafkaRegion.value_of("<YOUR REGION>")) \ .build() try: request = CreateConnectorTaskRequest() request.instance_id = "{instance_id}" sinkTaskbody = SmartConnectTaskReqSinkConfig( consumer_strategy="earliest", destination_file_type="TEXT", deliver_time_interval=300, access_key="********", secret_key="********", obs_bucket_name="obs_bucket", obs_path="obsTransfer-1810125534", partition_format="yyyy/MM/dd/HH/mm", record_delimiter="\n", store_keys=False ) request.body = CreateSmartConnectTaskReq( sink_task=sinkTaskbody, sink_type="OBS_SINK", source_type="NONE", topics_regex="topic-obs*", start_later=False, task_name="smart-connect-1" ) response = client.create_connector_task(request) print(response) except exceptions.ClientRequestException as e: print(e.status_code) print(e.request_id) print(e.error_code) print(e.error_msg)
-
Creating a Kafka data replication task that starts later.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
# coding: utf-8 import os from huaweicloudsdkcore.auth.credentials import BasicCredentials from huaweicloudsdkkafka.v2.region.kafka_region import KafkaRegion from huaweicloudsdkcore.exceptions import exceptions from huaweicloudsdkkafka.v2 import * if __name__ == "__main__": # The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security. # In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment ak = os.environ["CLOUD_SDK_AK"] sk = os.environ["CLOUD_SDK_SK"] projectId = "{project_id}" credentials = BasicCredentials(ak, sk, projectId) client = KafkaClient.new_builder() \ .with_credentials(credentials) \ .with_region(KafkaRegion.value_of("<YOUR REGION>")) \ .build() try: request = CreateConnectorTaskRequest() request.instance_id = "{instance_id}" sourceTaskbody = SmartConnectTaskReqSourceConfig( current_cluster_name="A", cluster_name="B", user_name="user1", password="********", sasl_mechanism="SCRAM-SHA-512", instance_id="b54c9dd8-********-********", direction="two-way", sync_consumer_offsets_enabled=False, replication_factor=3, task_num=2, rename_topic_enabled=False, provenance_header_enabled=True, consumer_strategy="latest", compression_type="snappy", topics_mapping="topic-sc-1:topic-sc-3,topic-sc-2:topic-sc-4" ) request.body = CreateSmartConnectTaskReq( source_task=sourceTaskbody, source_type="KAFKA_REPLICATOR_SOURCE", start_later=True, task_name="smart-connect-2" ) response = client.create_connector_task(request) print(response) except exceptions.ClientRequestException as e: print(e.status_code) print(e.request_id) print(e.error_code) print(e.error_msg)
-
Creating a Redis data replication task that starts immediately: full synchronization, 10 maximum retry times, 10,000 ms retry interval, and 10 KB/s bandwidth limit.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
# coding: utf-8 import os from huaweicloudsdkcore.auth.credentials import BasicCredentials from huaweicloudsdkkafka.v2.region.kafka_region import KafkaRegion from huaweicloudsdkcore.exceptions import exceptions from huaweicloudsdkkafka.v2 import * if __name__ == "__main__": # The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security. # In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment ak = os.environ["CLOUD_SDK_AK"] sk = os.environ["CLOUD_SDK_SK"] projectId = "{project_id}" credentials = BasicCredentials(ak, sk, projectId) client = KafkaClient.new_builder() \ .with_credentials(credentials) \ .with_region(KafkaRegion.value_of("<YOUR REGION>")) \ .build() try: request = CreateConnectorTaskRequest() request.instance_id = "{instance_id}" sinkTaskbody = SmartConnectTaskReqSinkConfig( redis_address="192.168.119.51:6379", redis_type="standalone", redis_password="********" ) sourceTaskbody = SmartConnectTaskReqSourceConfig( redis_address="192.168.99.249:6379,192.168.120.127:6379,192.168.116.219:6379", redis_type="cluster", redis_password="********", sync_mode="RDB_ONLY", full_sync_wait_ms=10000, full_sync_max_retry=10, ratelimit=10 ) request.body = CreateSmartConnectTaskReq( sink_task=sinkTaskbody, sink_type="REDIS_REPLICATOR_SINK", source_task=sourceTaskbody, source_type="REDIS_REPLICATOR_SOURCE", topics="topic-sc-3", start_later=False, task_name="smart-connect-3" ) response = client.create_connector_task(request) print(response) except exceptions.ClientRequestException as e: print(e.status_code) print(e.request_id) print(e.error_code) print(e.error_msg)
-
Creating a dumping task that starts immediately.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
package main import ( "fmt" "github.com/huaweicloud/huaweicloud-sdk-go-v3/core/auth/basic" kafka "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/kafka/v2" "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/kafka/v2/model" region "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/kafka/v2/region" ) func main() { // The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security. // In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment ak := os.Getenv("CLOUD_SDK_AK") sk := os.Getenv("CLOUD_SDK_SK") projectId := "{project_id}" auth := basic.NewCredentialsBuilder(). WithAk(ak). WithSk(sk). WithProjectId(projectId). Build() client := kafka.NewKafkaClient( kafka.KafkaClientBuilder(). WithRegion(region.ValueOf("<YOUR REGION>")). WithCredential(auth). Build()) request := &model.CreateConnectorTaskRequest{} request.InstanceId = "{instance_id}" consumerStrategySinkTask:= "earliest" destinationFileTypeSinkTask:= "TEXT" deliverTimeIntervalSinkTask:= int32(300) accessKeySinkTask:= "********" secretKeySinkTask:= "********" obsBucketNameSinkTask:= "obs_bucket" obsPathSinkTask:= "obsTransfer-1810125534" partitionFormatSinkTask:= "yyyy/MM/dd/HH/mm" recordDelimiterSinkTask:= "\n" storeKeysSinkTask:= false sinkTaskbody := &model.SmartConnectTaskReqSinkConfig{ ConsumerStrategy: &consumerStrategySinkTask, DestinationFileType: &destinationFileTypeSinkTask, DeliverTimeInterval: &deliverTimeIntervalSinkTask, AccessKey: &accessKeySinkTask, SecretKey: &secretKeySinkTask, ObsBucketName: &obsBucketNameSinkTask, ObsPath: &obsPathSinkTask, PartitionFormat: &partitionFormatSinkTask, RecordDelimiter: &recordDelimiterSinkTask, StoreKeys: &storeKeysSinkTask, } sinkTypeCreateSmartConnectTaskReq:= model.GetCreateSmartConnectTaskReqSinkTypeEnum().OBS_SINK sourceTypeCreateSmartConnectTaskReq:= model.GetCreateSmartConnectTaskReqSourceTypeEnum().NONE topicsRegexCreateSmartConnectTaskReq:= "topic-obs*" startLaterCreateSmartConnectTaskReq:= false taskNameCreateSmartConnectTaskReq:= "smart-connect-1" request.Body = &model.CreateSmartConnectTaskReq{ SinkTask: sinkTaskbody, SinkType: &sinkTypeCreateSmartConnectTaskReq, SourceType: &sourceTypeCreateSmartConnectTaskReq, TopicsRegex: &topicsRegexCreateSmartConnectTaskReq, StartLater: &startLaterCreateSmartConnectTaskReq, TaskName: &taskNameCreateSmartConnectTaskReq, } response, err := client.CreateConnectorTask(request) if err == nil { fmt.Printf("%+v\n", response) } else { fmt.Println(err) } }
-
Creating a Kafka data replication task that starts later.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79
package main import ( "fmt" "github.com/huaweicloud/huaweicloud-sdk-go-v3/core/auth/basic" kafka "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/kafka/v2" "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/kafka/v2/model" region "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/kafka/v2/region" ) func main() { // The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security. // In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment ak := os.Getenv("CLOUD_SDK_AK") sk := os.Getenv("CLOUD_SDK_SK") projectId := "{project_id}" auth := basic.NewCredentialsBuilder(). WithAk(ak). WithSk(sk). WithProjectId(projectId). Build() client := kafka.NewKafkaClient( kafka.KafkaClientBuilder(). WithRegion(region.ValueOf("<YOUR REGION>")). WithCredential(auth). Build()) request := &model.CreateConnectorTaskRequest{} request.InstanceId = "{instance_id}" currentClusterNameSourceTask:= "A" clusterNameSourceTask:= "B" userNameSourceTask:= "user1" passwordSourceTask:= "********" saslMechanismSourceTask:= "SCRAM-SHA-512" instanceIdSourceTask:= "b54c9dd8-********-********" directionSourceTask:= "two-way" syncConsumerOffsetsEnabledSourceTask:= false replicationFactorSourceTask:= int32(3) taskNumSourceTask:= int32(2) renameTopicEnabledSourceTask:= false provenanceHeaderEnabledSourceTask:= true consumerStrategySourceTask:= "latest" compressionTypeSourceTask:= "snappy" topicsMappingSourceTask:= "topic-sc-1:topic-sc-3,topic-sc-2:topic-sc-4" sourceTaskbody := &model.SmartConnectTaskReqSourceConfig{ CurrentClusterName: ¤tClusterNameSourceTask, ClusterName: &clusterNameSourceTask, UserName: &userNameSourceTask, Password: &passwordSourceTask, SaslMechanism: &saslMechanismSourceTask, InstanceId: &instanceIdSourceTask, Direction: &directionSourceTask, SyncConsumerOffsetsEnabled: &syncConsumerOffsetsEnabledSourceTask, ReplicationFactor: &replicationFactorSourceTask, TaskNum: &taskNumSourceTask, RenameTopicEnabled: &renameTopicEnabledSourceTask, ProvenanceHeaderEnabled: &provenanceHeaderEnabledSourceTask, ConsumerStrategy: &consumerStrategySourceTask, CompressionType: &compressionTypeSourceTask, TopicsMapping: &topicsMappingSourceTask, } sourceTypeCreateSmartConnectTaskReq:= model.GetCreateSmartConnectTaskReqSourceTypeEnum().KAFKA_REPLICATOR_SOURCE startLaterCreateSmartConnectTaskReq:= true taskNameCreateSmartConnectTaskReq:= "smart-connect-2" request.Body = &model.CreateSmartConnectTaskReq{ SourceTask: sourceTaskbody, SourceType: &sourceTypeCreateSmartConnectTaskReq, StartLater: &startLaterCreateSmartConnectTaskReq, TaskName: &taskNameCreateSmartConnectTaskReq, } response, err := client.CreateConnectorTask(request) if err == nil { fmt.Printf("%+v\n", response) } else { fmt.Println(err) } }
-
Creating a Redis data replication task that starts immediately: full synchronization, 10 maximum retry times, 10,000 ms retry interval, and 10 KB/s bandwidth limit.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76
package main import ( "fmt" "github.com/huaweicloud/huaweicloud-sdk-go-v3/core/auth/basic" kafka "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/kafka/v2" "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/kafka/v2/model" region "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/kafka/v2/region" ) func main() { // The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security. // In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment ak := os.Getenv("CLOUD_SDK_AK") sk := os.Getenv("CLOUD_SDK_SK") projectId := "{project_id}" auth := basic.NewCredentialsBuilder(). WithAk(ak). WithSk(sk). WithProjectId(projectId). Build() client := kafka.NewKafkaClient( kafka.KafkaClientBuilder(). WithRegion(region.ValueOf("<YOUR REGION>")). WithCredential(auth). Build()) request := &model.CreateConnectorTaskRequest{} request.InstanceId = "{instance_id}" redisAddressSinkTask:= "192.168.119.51:6379" redisTypeSinkTask:= "standalone" redisPasswordSinkTask:= "********" sinkTaskbody := &model.SmartConnectTaskReqSinkConfig{ RedisAddress: &redisAddressSinkTask, RedisType: &redisTypeSinkTask, RedisPassword: &redisPasswordSinkTask, } redisAddressSourceTask:= "192.168.99.249:6379,192.168.120.127:6379,192.168.116.219:6379" redisTypeSourceTask:= "cluster" redisPasswordSourceTask:= "********" syncModeSourceTask:= "RDB_ONLY" fullSyncWaitMsSourceTask:= int32(10000) fullSyncMaxRetrySourceTask:= int32(10) ratelimitSourceTask:= int32(10) sourceTaskbody := &model.SmartConnectTaskReqSourceConfig{ RedisAddress: &redisAddressSourceTask, RedisType: &redisTypeSourceTask, RedisPassword: &redisPasswordSourceTask, SyncMode: &syncModeSourceTask, FullSyncWaitMs: &fullSyncWaitMsSourceTask, FullSyncMaxRetry: &fullSyncMaxRetrySourceTask, Ratelimit: &ratelimitSourceTask, } sinkTypeCreateSmartConnectTaskReq:= model.GetCreateSmartConnectTaskReqSinkTypeEnum().REDIS_REPLICATOR_SINK sourceTypeCreateSmartConnectTaskReq:= model.GetCreateSmartConnectTaskReqSourceTypeEnum().REDIS_REPLICATOR_SOURCE topicsCreateSmartConnectTaskReq:= "topic-sc-3" startLaterCreateSmartConnectTaskReq:= false taskNameCreateSmartConnectTaskReq:= "smart-connect-3" request.Body = &model.CreateSmartConnectTaskReq{ SinkTask: sinkTaskbody, SinkType: &sinkTypeCreateSmartConnectTaskReq, SourceTask: sourceTaskbody, SourceType: &sourceTypeCreateSmartConnectTaskReq, Topics: &topicsCreateSmartConnectTaskReq, StartLater: &startLaterCreateSmartConnectTaskReq, TaskName: &taskNameCreateSmartConnectTaskReq, } response, err := client.CreateConnectorTask(request) if err == nil { fmt.Printf("%+v\n", response) } else { fmt.Println(err) } }
For SDK sample code of more programming languages, see the Sample Code tab in API Explorer. SDK sample code can be automatically generated.
Status Codes
Status Code |
Description |
---|---|
200 |
Successful. |
Error Codes
See Error Codes.
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot