Criação de uma tarefa do Smart Connect
Função
Essa API é usada para criar uma tarefa do Smart Connect.
Método de chamada
Para obter detalhes, consulte Chamada de APIs.
URI
POST /v2/{project_id}/instances/{instance_id}/connector/tasks
Parâmetro |
Obrigatório |
Tipo |
Descrição |
---|---|---|---|
project_id |
Sim |
String |
ID do projeto. Para obter detalhes, consulte Obtenção de um ID de projeto. |
instance_id |
Sim |
String |
ID da instância. |
Parâmetros de solicitação
Parâmetro |
Obrigatório |
Tipo |
Descrição |
---|---|---|---|
task_name |
Não |
String |
Nome da tarefa do Smart Connect. |
start_later |
Não |
Boolean |
Indica se a tarefa deve ser iniciada mais tarde. false: para criar uma tarefa e iniciá-la imediatamente; true: para criar uma tarefa e iniciá-la manualmente mais tarde na lista de tarefas. |
topics |
Não |
String |
Tópico de uma tarefa do Smart Connect. |
topics_regex |
Não |
String |
Expressão regular do tópico de uma tarefa do Smart Connect. |
source_type |
Não |
String |
Tipo de origem de uma tarefa do Smart Connect. |
source_task |
Não |
Configuração de origem de uma tarefa do Smart Connect. |
|
sink_type |
Não |
String |
Tipo de destino de uma tarefa do Smart Connect. |
sink_task |
Não |
Tipo de destino de uma tarefa do Smart Connect. |
Parâmetro |
Obrigatório |
Tipo |
Descrição |
---|---|---|---|
redis_address |
Não |
String |
Endereço da instância do Redis. (Obrigatório somente quando o tipo de origem for Redis.) |
redis_type |
Não |
String |
Tipo de instância do Redis. (Obrigatório somente quando o tipo de origem for Redis.) |
dcs_instance_id |
Não |
String |
ID da instância do DCS. (Obrigatório somente quando o tipo de origem for Redis.) |
redis_password |
Não |
String |
Senha do Redis. (Obrigatório somente quando o tipo de origem for Redis.) |
sync_mode |
Não |
String |
Tipo de sincronização: RDB_ONLY indica sincronização completa; CUSTOM_OFFSET indica sincronização completa e incremental. (Obrigatório somente quando o tipo de origem for Redis.) |
full_sync_wait_ms |
Não |
Integer |
Intervalo de novas tentativas de sincronização total, em ms. (Obrigatório somente quando o tipo de origem for Redis.) |
full_sync_max_retry |
Não |
Integer |
Máximo de novas tentativas de sincronização total. (Obrigatório somente quando o tipo de origem for Redis.) |
ratelimit |
Não |
Integer |
Limite de taxa, em KB/s. -1: não há limite de taxa. (Obrigatório somente quando o tipo de origem for Redis.) |
current_cluster_name |
Não |
String |
Nome da instância atual do Kafka. (Obrigatório somente quando o tipo de origem for Kafka.) |
cluster_name |
Não |
String |
Nome da instância do Kafka de destino. (Obrigatório somente quando o tipo de origem for Kafka.) |
user_name |
Não |
String |
Nome de usuário definido quando SASL_SSL foi ativado para uma instância do Kafka de destino ou quando um usuário do SASL_SSL foi criado. (Obrigatório somente quando o tipo de origem for Kafka e o modo de autenticação do Kafka de destino for SASL_SSL.) |
password |
Não |
String |
Senha definida quando o SASL_SSL foi ativado para uma instância de destino do Kafka ou quando um usuário do SASL_SSL foi criado. (Obrigatório somente quando o tipo de origem for Kafka e o modo de autenticação do Kafka de destino for SASL_SSL.) |
sasl_mechanism |
Não |
String |
Modo de autenticação do Kafka de destino. (Obrigatório somente quando o tipo de origem é Kafka e o modo de autenticação é SASL_SSL.) |
instance_id |
Não |
String |
ID de instância do Kafka de destino. (Obrigatório somente quando o tipo de origem for Kafka. Especifique o instance_id ou bootstrap_servers.) |
bootstrap_servers |
Não |
String |
Endereço de instância de Kafka de destino. (Obrigatório somente quando o tipo de origem for Kafka. Especifique o instance_id ou bootstrap_servers.) |
security_protocol |
Não |
String |
Autenticação do Kafka de destino. (Obrigatório somente quando o tipo de origem é Kafka). Existem dois modos de autenticação:
|
direction |
Não |
String |
Direção de sincronização: pull replica dados da instância do Kafka de destino para a atual; push replica dados da instância do Kafka de origem para a de destino; two-way replica dados da instância do Kafka de origem e de destino entre si. (Obrigatório somente quando o tipo de origem for Kafka.) |
sync_consumer_offsets_enabled |
Não |
Boolean |
Indica se o progresso do consumo deve ser sincronizado. (Obrigatório somente quando o tipo de origem for Kafka.) |
replication_factor |
Não |
Integer |
Número de réplicas de tópico quando um tópico é criado automaticamente na instância de par. O valor deste parâmetro não pode exceder o número de corretores na instância de par. Esse parâmetro substitui o parâmetro default.replication.factor configurado na instância de destino. (Obrigatório somente quando o tipo de origem for Kafka.) |
task_num |
Não |
Integer |
Número de tarefas de replicação de dados. O valor padrão é 2. É aconselhável usar o valor padrão. Se a direção de sincronização for definida como bidirecional, o número real de tarefas será o dobro do número de tarefas configuradas aqui. (Obrigatório somente quando o tipo de origem for Kafka.) |
rename_topic_enabled |
Não |
Boolean |
Indica se um tópico deve ser renomeado. Se sim, adicione o alias da instância do Kafka de origem antes do nome do tópico de destino para formar um novo nome do tópico de destino. (Obrigatório somente quando o tipo de origem for Kafka.) |
provenance_header_enabled |
Não |
Boolean |
O tópico de destino recebe as mensagens replicadas. O cabeçalho da mensagem contém a origem da mensagem. Se você selecionar Both para Sync Direction, ative Add Source Header para impedir a replicação infinita. (Obrigatório somente quando o tipo de origem for Kafka.) |
consumer_strategy |
Não |
String |
Deslocamento inicial. latest: obter os dados mais recentes; earliest: obter os dados mais antigos. (Obrigatório somente quando o tipo de origem for Kafka.) |
compression_type |
Não |
String |
Algoritmo de compactação a ser usado para copiar mensagens. (Obrigatório somente quando o tipo de origem for Kafka.)
|
topics_mapping |
Não |
String |
Mapeamento de tópico, que é usado para personalizar o nome do tópico de destino. Rename Topic e Topic Mapping não podem ser configurados ao mesmo tempo. Formato de mapeamento de tópicos: source topic:target topic. Use vírgulas (,) para separar vários mapeamentos de tópicos, por exemplo, topic-sc-1:topic-sc-2,topic-sc-3:topic-sc-4. (Obrigatório somente quando o tipo de origem for Kafka.) |
Parâmetro |
Obrigatório |
Tipo |
Descrição |
---|---|---|---|
redis_address |
Não |
String |
Endereço da instância do Redis. (Obrigatório somente quando o tipo de destino for Redis.) |
redis_type |
Não |
String |
Tipo de instância do Redis. (Obrigatório somente quando o tipo de destino for Redis.) |
dcs_instance_id |
Não |
String |
ID da instância do DCS. (Obrigatório somente quando o tipo de destino for Redis.) |
redis_password |
Não |
String |
Senha do Redis. (Obrigatório somente quando o tipo de destino for Redis.) |
consumer_strategy |
Não |
String |
Deslocamento inicial. latest: obter os dados mais recentes; earliest: obter os dados mais antigos. (Obrigatório somente quando o tipo de destino for OBS.) |
destination_file_type |
Não |
String |
Formato do arquivo de despejo. Somente TEXT é suportado. (Obrigatório somente quando o tipo de destino for OBS.) |
deliver_time_interval |
Não |
Integer |
Intervalos de despejo de dados, em segundos. O intervalo padrão é 300s. (Obrigatório somente quando o tipo de destino for OBS.) |
access_key |
Não |
String |
AK: ID da chave de acesso. (Obrigatório somente quando o tipo de destino for OBS.) |
secret_key |
Não |
String |
SK: chave de acesso secreta usada junto com o ID da chave de acesso. (Obrigatório somente quando o tipo de destino for OBS.) |
obs_bucket_name |
Não |
String |
Endereço de despejo, que é o bucket do OBS usado para armazenar os dados do tópico. (Obrigatório somente quando o tipo de destino for OBS.) |
obs_path |
Não |
String |
Diretório de despejo, que é o diretório para armazenar arquivos de tópicos despejados no OBS. Use barras (/) para separar os níveis de diretório. (Obrigatório somente quando o tipo de destino for OBS.) |
partition_format |
Não |
String |
Formato do diretório de hora. (Obrigatório somente quando o tipo de destino for OBS.)
|
record_delimiter |
Não |
String |
Quebra de linha, que é usada para separar os dados do usuário que são gravados no arquivo de despejo. (Obrigatório somente quando o tipo de destino for OBS.) Intervalo de valores:
|
store_keys |
Não |
Boolean |
Especifica se as chaves devem ser despejadas. (Obrigatório somente quando o tipo de destino for OBS.) |
Parâmetros de resposta
Código de status: 200
Parâmetro |
Tipo |
Descrição |
---|---|---|
task_name |
String |
Nome da tarefa do Smart Connect. |
topics |
String |
Tópico de uma tarefa do Smart Connect. |
topics_regex |
String |
Expressão regular do tópico de uma tarefa do Smart Connect. |
source_type |
String |
Tipo de origem de uma tarefa do Smart Connect. |
source_task |
Configuração de origem de uma tarefa do Smart Connect. |
|
sink_type |
String |
Tipo de destino de uma tarefa do Smart Connect. |
sink_task |
Tipo de destino de uma tarefa do Smart Connect. |
|
id |
String |
ID de uma tarefa do Smart Connect. |
status |
String |
Status da tarefa do Smart Connect. |
create_time |
Long |
Hora em que a tarefa do Smart Connect foi criada. |
Parâmetro |
Tipo |
Descrição |
---|---|---|
redis_address |
String |
Endereço da instância do Redis. (Exibido somente quando o tipo de origem é Redis.) |
redis_type |
String |
Tipo de instância do Redis. (Exibido somente quando o tipo de origem é Redis.) |
dcs_instance_id |
String |
ID da instância do DCS. (Exibido somente quando o tipo de origem é Redis.) |
sync_mode |
String |
Tipo de sincronização: RDB_ONLY indica sincronização completa; CUSTOM_OFFSET indica sincronização completa e incremental. (Exibido somente quando o tipo de origem é Redis.) |
full_sync_wait_ms |
Integer |
Intervalo de novas tentativas de sincronização total, em ms. (Exibido somente quando o tipo de origem é Redis.) |
full_sync_max_retry |
Integer |
Máximo de novas tentativas de sincronização total. (Exibido somente quando o tipo de origem é Redis.) |
ratelimit |
Integer |
Limite de taxa, em KB/s. -1: desativar. (Exibido somente quando o tipo de origem é Redis.) |
current_cluster_name |
String |
Nome da instância atual do Kafka. (Exibido somente quando o tipo de origem é Kafka.) |
cluster_name |
String |
Nome da instância do Kafka de destino. (Exibido somente quando o tipo de origem é Kafka.) |
user_name |
String |
Nome de usuário da instância de Kafka de destino. (Exibido somente quando o tipo de origem é Kafka.) |
sasl_mechanism |
String |
Modo de autenticação do Kafka de destino. (Exibido somente quando o tipo de origem é Kafka.) |
instance_id |
String |
ID de instância do Kafka de destino. (Exibido somente quando o tipo de origem é Kafka.) |
bootstrap_servers |
String |
Endereço de instância de Kafka de destino. (Exibido somente quando o tipo de origem é Kafka.) |
security_protocol |
String |
Autenticação do Kafka de destino. (Exibido somente quando o tipo de origem é Kafka.) |
direction |
String |
Direção de sincronização. (Exibido somente quando o tipo de origem é Kafka.) |
sync_consumer_offsets_enabled |
Boolean |
Indica se o progresso do consumo deve ser sincronizado. (Exibido somente quando o tipo de origem é Kafka.) |
replication_factor |
Integer |
Número de réplicas. (Exibido somente quando o tipo de origem é Kafka.) |
task_num |
Integer |
Número de tarefas. (Exibido somente quando o tipo de origem é Kafka.) |
rename_topic_enabled |
Boolean |
Indica se um tópico deve ser renomeado. (Exibido somente quando o tipo de origem é Kafka.) |
provenance_header_enabled |
Boolean |
Indica se o cabeçalho de origem deve ser adicionado. (Exibido somente quando o tipo de origem é Kafka.) |
consumer_strategy |
String |
Deslocamento inicial. latest: obter os dados mais recentes; earliest: obter os dados mais antigos. (Exibido somente quando o tipo de origem é Kafka.) |
compression_type |
String |
Algoritmo de compactação. (Exibido somente quando o tipo de origem é Kafka.) |
topics_mapping |
String |
Mapeamento de tópicos. (Exibido somente quando o tipo de origem é Kafka.) |
Parâmetro |
Tipo |
Descrição |
---|---|---|
redis_address |
String |
Endereço da instância do Redis. (Exibido somente quando o tipo de destino é Redis.) |
redis_type |
String |
Tipo de instância do Redis. (Exibido somente quando o tipo de destino é Redis.) |
dcs_instance_id |
String |
ID da instância do DCS. (Exibido somente quando o tipo de destino é Redis.) |
target_db |
Integer |
Banco de dados de destino. O valor padrão é -1. (Exibido somente quando o tipo de destino é Redis.) |
consumer_strategy |
String |
Deslocamento inicial. latest: obter os dados mais recentes; earliest: obter os dados mais antigos. (Exibido somente quando o tipo de destino é OBS.) |
destination_file_type |
String |
Formato do arquivo de despejo. Somente TEXT é suportado. (Exibido somente quando o tipo de destino é OBS.) |
deliver_time_interval |
Integer |
Período(s) de despejo. (Exibido somente quando o tipo de destino é OBS.) |
obs_bucket_name |
String |
Endereço de despejo. (Exibido somente quando o tipo de destino é OBS.) |
obs_path |
String |
Diretório de despejo. (Exibido somente quando o tipo de destino é OBS.) |
partition_format |
String |
Formato do diretório de hora. (Exibido somente quando o tipo de destino é OBS.) |
record_delimiter |
String |
Quebra de linha. (Exibido somente quando o tipo de destino é OBS.) |
store_keys |
Boolean |
Key de armazenamento. (Exibido somente quando o tipo de destino é OBS.) |
obs_part_size |
Integer |
Tamanho (em bytes) de cada arquivo a ser carregado. O valor padrão é 5242880. (Exibido somente quando o tipo de destino é OBS.) |
flush_size |
Integer |
flush_size. (Exibido somente quando o tipo de destino é OBS.) |
timezone |
String |
Fuso horário. (Exibido somente quando o tipo de destino é OBS.) |
connector_class |
String |
Classe de connector. O valor padrão é com.huawei.dms.connector.obs.OBSSinkConnector. (Exibido somente quando o tipo de destino é OBS.) |
storage_class |
String |
Classe de storage. O valor padrão é com.huawei.dms.connector.obs.storage.OBSStorage. (Exibido somente quando o tipo de destino é OBS.) |
format_class |
String |
Classe de format. O valor padrão é com.huawei.dms.connector.obs.format.bytearray.ByteArrayFormat. (Exibido somente quando o tipo de destino é OBS.) |
schema_generator_class |
String |
Classe de schema_generator. O valor padrão é io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator. (Exibido somente quando o tipo de destino é OBS.) |
partitioner_class |
String |
Classe de partitioner. O valor padrão é io.confluent.connect.storage.partitioner.TimeBasedPartitioner. (Exibido somente quando o tipo de destino é OBS.) |
value_converter |
String |
value_converter. O valor padrão é org.apache.kafka.connect.converters.ByteArrayConverter. (Exibido somente quando o tipo de destino é OBS.) |
key_converter |
String |
key_converter. O valor padrão é org.apache.kafka.connect.converters.ByteArrayConverter. (Exibido somente quando o tipo de destino é OBS.) |
kv_delimiter |
String |
kv_delimiter. O valor padrão é :. (Exibido somente quando o tipo de destino é OBS.) |
Exemplo de solicitações
- Criação de uma tarefa de despejo que começa imediatamente.
POST https://{endpoint}/v2/{project_id}/instances/{instance_id}/connector/tasks { "task_name" : "smart-connect-1", "start_later" : false, "source_type" : "NONE", "topics_regex" : "topic-obs*", "sink_type" : "OBS_SINK", "sink_task" : { "consumer_strategy" : "earliest", "destination_file_type" : "TEXT", "deliver_time_interval" : 300, "access_key" : "********", "secret_key" : "********", "obs_bucket_name" : "obs_bucket", "obs_path" : "obsTransfer-1810125534", "partition_format" : "yyyy/MM/dd/HH/mm", "record_delimiter" : "\\n", "store_keys" : false } }
- Criação de uma tarefa de replicação de dados do Kafka que é iniciada mais tarde.
POST https://{endpoint}/v2/{project_id}/instances/{instance_id}/connector/tasks { "task_name" : "smart-connect-2", "start_later" : true, "source_type" : "KAFKA_REPLICATOR_SOURCE", "source_task" : { "current_cluster_name" : "A", "cluster_name" : "B", "user_name" : "user1", "password" : "********", "sasl_mechanism" : "SCRAM-SHA-512", "instance_id" : "b54c9dd8-********-********", "direction" : "two-way", "sync_consumer_offsets_enabled" : false, "replication_factor" : 3, "task_num" : 2, "rename_topic_enabled" : false, "provenance_header_enabled" : true, "consumer_strategy" : "latest", "compression_type" : "snappy", "topics_mapping" : "topic-sc-1:topic-sc-3,topic-sc-2:topic-sc-4" } }
- Criação de uma tarefa de replicação de dados do Redis que começa imediatamente: sincronização total, 10 tempos máximos de repetição, intervalo de repetição de 10.000 ms e limite de largura de banda de 10 KB/s.
POST https://{endpoint}/v2/{project_id}/instances/{instance_id}/connector/tasks { "task_name" : "smart-connect-3", "start_later" : false, "source_type" : "REDIS_REPLICATOR_SOURCE", "source_task" : { "redis_address" : "192.168.99.249:6379,192.168.120.127:6379,192.168.116.219:6379", "redis_type" : "cluster", "redis_password" : "********", "sync_mode" : "RDB_ONLY", "full_sync_max_retry" : 10, "full_sync_wait_ms" : 10000, "ratelimit" : 10 }, "topics" : "topic-sc-3", "sink_type" : "REDIS_REPLICATOR_SINK", "sink_task" : { "redis_address" : "192.168.119.51:6379", "redis_type" : "standalone", "redis_password" : "********" } }
Exemplo de respostas
Código de status: 200
Bem-sucedido.
{ "task_name" : "smart-connect-121248117", "topics" : "topic-sc", "source_task" : { "redis_address" : "192.168.91.179:6379", "redis_type" : "standalone", "dcs_instance_id" : "949190a2-598a-4afd-99a8-dad3cae1e7cd", "sync_mode" : "RDB_ONLY,", "full_sync_wait_ms" : 13000, "full_sync_max_retry" : 4, "ratelimit" : -1 }, "source_type" : "REDIS_REPLICATOR_SOURCE", "sink_task" : { "redis_address" : "192.168.119.51:6379", "redis_type" : "standalone", "dcs_instance_id" : "9b981368-a8e3-416a-87d9-1581a968b41b", "target_db" : -1 }, "sink_type" : "REDIS_REPLICATOR_SINK", "id" : "8a205bbd-7181-4b5e-9bd6-37274ce84577", "status" : "RUNNING", "create_time" : 1708427753133 }
Código de exemplo do SDK
O código de exemplo do SDK é o seguinte.
Java
- Criação de uma tarefa de despejo que começa imediatamente.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
package com.huaweicloud.sdk.test; import com.huaweicloud.sdk.core.auth.ICredential; import com.huaweicloud.sdk.core.auth.BasicCredentials; import com.huaweicloud.sdk.core.exception.ConnectionException; import com.huaweicloud.sdk.core.exception.RequestTimeoutException; import com.huaweicloud.sdk.core.exception.ServiceResponseException; import com.huaweicloud.sdk.kafka.v2.region.KafkaRegion; import com.huaweicloud.sdk.kafka.v2.*; import com.huaweicloud.sdk.kafka.v2.model.*; public class CreateConnectorTaskSolution { public static void main(String[] args) { // The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security. // In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment String ak = System.getenv("CLOUD_SDK_AK"); String sk = System.getenv("CLOUD_SDK_SK"); String projectId = "{project_id}"; ICredential auth = new BasicCredentials() .withProjectId(projectId) .withAk(ak) .withSk(sk); KafkaClient client = KafkaClient.newBuilder() .withCredential(auth) .withRegion(KafkaRegion.valueOf("<YOUR REGION>")) .build(); CreateConnectorTaskRequest request = new CreateConnectorTaskRequest(); request.withInstanceId("{instance_id}"); CreateSmartConnectTaskReq body = new CreateSmartConnectTaskReq(); SmartConnectTaskReqSinkConfig sinkTaskbody = new SmartConnectTaskReqSinkConfig(); sinkTaskbody.withConsumerStrategy("earliest") .withDestinationFileType("TEXT") .withDeliverTimeInterval(300) .withAccessKey("********") .withSecretKey("********") .withObsBucketName("obs_bucket") .withObsPath("obsTransfer-1810125534") .withPartitionFormat("yyyy/MM/dd/HH/mm") .withRecordDelimiter("\n") .withStoreKeys(false); body.withSinkTask(sinkTaskbody); body.withSinkType(CreateSmartConnectTaskReq.SinkTypeEnum.fromValue("OBS_SINK")); body.withSourceType(CreateSmartConnectTaskReq.SourceTypeEnum.fromValue("NONE")); body.withTopicsRegex("topic-obs*"); body.withStartLater(false); body.withTaskName("smart-connect-1"); request.withBody(body); try { CreateConnectorTaskResponse response = client.createConnectorTask(request); System.out.println(response.toString()); } catch (ConnectionException e) { e.printStackTrace(); } catch (RequestTimeoutException e) { e.printStackTrace(); } catch (ServiceResponseException e) { e.printStackTrace(); System.out.println(e.getHttpStatusCode()); System.out.println(e.getRequestId()); System.out.println(e.getErrorCode()); System.out.println(e.getErrorMsg()); } } }
- Criação de uma tarefa de replicação de dados do Kafka que é iniciada mais tarde.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
package com.huaweicloud.sdk.test; import com.huaweicloud.sdk.core.auth.ICredential; import com.huaweicloud.sdk.core.auth.BasicCredentials; import com.huaweicloud.sdk.core.exception.ConnectionException; import com.huaweicloud.sdk.core.exception.RequestTimeoutException; import com.huaweicloud.sdk.core.exception.ServiceResponseException; import com.huaweicloud.sdk.kafka.v2.region.KafkaRegion; import com.huaweicloud.sdk.kafka.v2.*; import com.huaweicloud.sdk.kafka.v2.model.*; public class CreateConnectorTaskSolution { public static void main(String[] args) { // The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security. // In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment String ak = System.getenv("CLOUD_SDK_AK"); String sk = System.getenv("CLOUD_SDK_SK"); String projectId = "{project_id}"; ICredential auth = new BasicCredentials() .withProjectId(projectId) .withAk(ak) .withSk(sk); KafkaClient client = KafkaClient.newBuilder() .withCredential(auth) .withRegion(KafkaRegion.valueOf("<YOUR REGION>")) .build(); CreateConnectorTaskRequest request = new CreateConnectorTaskRequest(); request.withInstanceId("{instance_id}"); CreateSmartConnectTaskReq body = new CreateSmartConnectTaskReq(); SmartConnectTaskReqSourceConfig sourceTaskbody = new SmartConnectTaskReqSourceConfig(); sourceTaskbody.withCurrentClusterName("A") .withClusterName("B") .withUserName("user1") .withPassword("********") .withSaslMechanism("SCRAM-SHA-512") .withInstanceId("b54c9dd8-********-********") .withDirection("two-way") .withSyncConsumerOffsetsEnabled(false) .withReplicationFactor(3) .withTaskNum(2) .withRenameTopicEnabled(false) .withProvenanceHeaderEnabled(true) .withConsumerStrategy("latest") .withCompressionType("snappy") .withTopicsMapping("topic-sc-1:topic-sc-3,topic-sc-2:topic-sc-4"); body.withSourceTask(sourceTaskbody); body.withSourceType(CreateSmartConnectTaskReq.SourceTypeEnum.fromValue("KAFKA_REPLICATOR_SOURCE")); body.withStartLater(true); body.withTaskName("smart-connect-2"); request.withBody(body); try { CreateConnectorTaskResponse response = client.createConnectorTask(request); System.out.println(response.toString()); } catch (ConnectionException e) { e.printStackTrace(); } catch (RequestTimeoutException e) { e.printStackTrace(); } catch (ServiceResponseException e) { e.printStackTrace(); System.out.println(e.getHttpStatusCode()); System.out.println(e.getRequestId()); System.out.println(e.getErrorCode()); System.out.println(e.getErrorMsg()); } } }
- Criação de uma tarefa de replicação de dados do Redis que começa imediatamente: sincronização total, 10 tempos máximos de repetição, intervalo de repetição de 10.000 ms e limite de largura de banda de 10 KB/s.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69
package com.huaweicloud.sdk.test; import com.huaweicloud.sdk.core.auth.ICredential; import com.huaweicloud.sdk.core.auth.BasicCredentials; import com.huaweicloud.sdk.core.exception.ConnectionException; import com.huaweicloud.sdk.core.exception.RequestTimeoutException; import com.huaweicloud.sdk.core.exception.ServiceResponseException; import com.huaweicloud.sdk.kafka.v2.region.KafkaRegion; import com.huaweicloud.sdk.kafka.v2.*; import com.huaweicloud.sdk.kafka.v2.model.*; public class CreateConnectorTaskSolution { public static void main(String[] args) { // The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security. // In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment String ak = System.getenv("CLOUD_SDK_AK"); String sk = System.getenv("CLOUD_SDK_SK"); String projectId = "{project_id}"; ICredential auth = new BasicCredentials() .withProjectId(projectId) .withAk(ak) .withSk(sk); KafkaClient client = KafkaClient.newBuilder() .withCredential(auth) .withRegion(KafkaRegion.valueOf("<YOUR REGION>")) .build(); CreateConnectorTaskRequest request = new CreateConnectorTaskRequest(); request.withInstanceId("{instance_id}"); CreateSmartConnectTaskReq body = new CreateSmartConnectTaskReq(); SmartConnectTaskReqSinkConfig sinkTaskbody = new SmartConnectTaskReqSinkConfig(); sinkTaskbody.withRedisAddress("192.168.119.51:6379") .withRedisType("standalone") .withRedisPassword("********"); SmartConnectTaskReqSourceConfig sourceTaskbody = new SmartConnectTaskReqSourceConfig(); sourceTaskbody.withRedisAddress("192.168.99.249:6379,192.168.120.127:6379,192.168.116.219:6379") .withRedisType("cluster") .withRedisPassword("********") .withSyncMode("RDB_ONLY") .withFullSyncWaitMs(10000) .withFullSyncMaxRetry(10) .withRatelimit(10); body.withSinkTask(sinkTaskbody); body.withSinkType(CreateSmartConnectTaskReq.SinkTypeEnum.fromValue("REDIS_REPLICATOR_SINK")); body.withSourceTask(sourceTaskbody); body.withSourceType(CreateSmartConnectTaskReq.SourceTypeEnum.fromValue("REDIS_REPLICATOR_SOURCE")); body.withTopics("topic-sc-3"); body.withStartLater(false); body.withTaskName("smart-connect-3"); request.withBody(body); try { CreateConnectorTaskResponse response = client.createConnectorTask(request); System.out.println(response.toString()); } catch (ConnectionException e) { e.printStackTrace(); } catch (RequestTimeoutException e) { e.printStackTrace(); } catch (ServiceResponseException e) { e.printStackTrace(); System.out.println(e.getHttpStatusCode()); System.out.println(e.getRequestId()); System.out.println(e.getErrorCode()); System.out.println(e.getErrorMsg()); } } }
Python
- Criação de uma tarefa de despejo que começa imediatamente.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
# coding: utf-8 from huaweicloudsdkcore.auth.credentials import BasicCredentials from huaweicloudsdkkafka.v2.region.kafka_region import KafkaRegion from huaweicloudsdkcore.exceptions import exceptions from huaweicloudsdkkafka.v2 import * if __name__ == "__main__": # The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security. # In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment ak = __import__('os').getenv("CLOUD_SDK_AK") sk = __import__('os').getenv("CLOUD_SDK_SK") projectId = "{project_id}" credentials = BasicCredentials(ak, sk, projectId) \ client = KafkaClient.new_builder() \ .with_credentials(credentials) \ .with_region(KafkaRegion.value_of("<YOUR REGION>")) \ .build() try: request = CreateConnectorTaskRequest() request.instance_id = "{instance_id}" sinkTaskbody = SmartConnectTaskReqSinkConfig( consumer_strategy="earliest", destination_file_type="TEXT", deliver_time_interval=300, access_key="********", secret_key="********", obs_bucket_name="obs_bucket", obs_path="obsTransfer-1810125534", partition_format="yyyy/MM/dd/HH/mm", record_delimiter="\n", store_keys=False ) request.body = CreateSmartConnectTaskReq( sink_task=sinkTaskbody, sink_type="OBS_SINK", source_type="NONE", topics_regex="topic-obs*", start_later=False, task_name="smart-connect-1" ) response = client.create_connector_task(request) print(response) except exceptions.ClientRequestException as e: print(e.status_code) print(e.request_id) print(e.error_code) print(e.error_msg)
- Criação de uma tarefa de replicação de dados do Kafka que é iniciada mais tarde.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
# coding: utf-8 from huaweicloudsdkcore.auth.credentials import BasicCredentials from huaweicloudsdkkafka.v2.region.kafka_region import KafkaRegion from huaweicloudsdkcore.exceptions import exceptions from huaweicloudsdkkafka.v2 import * if __name__ == "__main__": # The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security. # In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment ak = __import__('os').getenv("CLOUD_SDK_AK") sk = __import__('os').getenv("CLOUD_SDK_SK") projectId = "{project_id}" credentials = BasicCredentials(ak, sk, projectId) \ client = KafkaClient.new_builder() \ .with_credentials(credentials) \ .with_region(KafkaRegion.value_of("<YOUR REGION>")) \ .build() try: request = CreateConnectorTaskRequest() request.instance_id = "{instance_id}" sourceTaskbody = SmartConnectTaskReqSourceConfig( current_cluster_name="A", cluster_name="B", user_name="user1", password="********", sasl_mechanism="SCRAM-SHA-512", instance_id="b54c9dd8-********-********", direction="two-way", sync_consumer_offsets_enabled=False, replication_factor=3, task_num=2, rename_topic_enabled=False, provenance_header_enabled=True, consumer_strategy="latest", compression_type="snappy", topics_mapping="topic-sc-1:topic-sc-3,topic-sc-2:topic-sc-4" ) request.body = CreateSmartConnectTaskReq( source_task=sourceTaskbody, source_type="KAFKA_REPLICATOR_SOURCE", start_later=True, task_name="smart-connect-2" ) response = client.create_connector_task(request) print(response) except exceptions.ClientRequestException as e: print(e.status_code) print(e.request_id) print(e.error_code) print(e.error_msg)
- Criação de uma tarefa de replicação de dados do Redis que começa imediatamente: sincronização total, 10 tempos máximos de repetição, intervalo de repetição de 10.000 ms e limite de largura de banda de 10 KB/s.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
# coding: utf-8 from huaweicloudsdkcore.auth.credentials import BasicCredentials from huaweicloudsdkkafka.v2.region.kafka_region import KafkaRegion from huaweicloudsdkcore.exceptions import exceptions from huaweicloudsdkkafka.v2 import * if __name__ == "__main__": # The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security. # In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment ak = __import__('os').getenv("CLOUD_SDK_AK") sk = __import__('os').getenv("CLOUD_SDK_SK") projectId = "{project_id}" credentials = BasicCredentials(ak, sk, projectId) \ client = KafkaClient.new_builder() \ .with_credentials(credentials) \ .with_region(KafkaRegion.value_of("<YOUR REGION>")) \ .build() try: request = CreateConnectorTaskRequest() request.instance_id = "{instance_id}" sinkTaskbody = SmartConnectTaskReqSinkConfig( redis_address="192.168.119.51:6379", redis_type="standalone", redis_password="********" ) sourceTaskbody = SmartConnectTaskReqSourceConfig( redis_address="192.168.99.249:6379,192.168.120.127:6379,192.168.116.219:6379", redis_type="cluster", redis_password="********", sync_mode="RDB_ONLY", full_sync_wait_ms=10000, full_sync_max_retry=10, ratelimit=10 ) request.body = CreateSmartConnectTaskReq( sink_task=sinkTaskbody, sink_type="REDIS_REPLICATOR_SINK", source_task=sourceTaskbody, source_type="REDIS_REPLICATOR_SOURCE", topics="topic-sc-3", start_later=False, task_name="smart-connect-3" ) response = client.create_connector_task(request) print(response) except exceptions.ClientRequestException as e: print(e.status_code) print(e.request_id) print(e.error_code) print(e.error_msg)
Go
- Criação de uma tarefa de despejo que começa imediatamente.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
package main import ( "fmt" "github.com/huaweicloud/huaweicloud-sdk-go-v3/core/auth/basic" kafka "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/kafka/v2" "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/kafka/v2/model" region "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/kafka/v2/region" ) func main() { // The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security. // In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment ak := os.Getenv("CLOUD_SDK_AK") sk := os.Getenv("CLOUD_SDK_SK") projectId := "{project_id}" auth := basic.NewCredentialsBuilder(). WithAk(ak). WithSk(sk). WithProjectId(projectId). Build() client := kafka.NewKafkaClient( kafka.KafkaClientBuilder(). WithRegion(region.ValueOf("<YOUR REGION>")). WithCredential(auth). Build()) request := &model.CreateConnectorTaskRequest{} request.InstanceId = "{instance_id}" consumerStrategySinkTask:= "earliest" destinationFileTypeSinkTask:= "TEXT" deliverTimeIntervalSinkTask:= int32(300) accessKeySinkTask:= "********" secretKeySinkTask:= "********" obsBucketNameSinkTask:= "obs_bucket" obsPathSinkTask:= "obsTransfer-1810125534" partitionFormatSinkTask:= "yyyy/MM/dd/HH/mm" recordDelimiterSinkTask:= "\n" storeKeysSinkTask:= false sinkTaskbody := &model.SmartConnectTaskReqSinkConfig{ ConsumerStrategy: &consumerStrategySinkTask, DestinationFileType: &destinationFileTypeSinkTask, DeliverTimeInterval: &deliverTimeIntervalSinkTask, AccessKey: &accessKeySinkTask, SecretKey: &secretKeySinkTask, ObsBucketName: &obsBucketNameSinkTask, ObsPath: &obsPathSinkTask, PartitionFormat: &partitionFormatSinkTask, RecordDelimiter: &recordDelimiterSinkTask, StoreKeys: &storeKeysSinkTask, } sinkTypeCreateSmartConnectTaskReq:= model.GetCreateSmartConnectTaskReqSinkTypeEnum().OBS_SINK sourceTypeCreateSmartConnectTaskReq:= model.GetCreateSmartConnectTaskReqSourceTypeEnum().NONE topicsRegexCreateSmartConnectTaskReq:= "topic-obs*" startLaterCreateSmartConnectTaskReq:= false taskNameCreateSmartConnectTaskReq:= "smart-connect-1" request.Body = &model.CreateSmartConnectTaskReq{ SinkTask: sinkTaskbody, SinkType: &sinkTypeCreateSmartConnectTaskReq, SourceType: &sourceTypeCreateSmartConnectTaskReq, TopicsRegex: &topicsRegexCreateSmartConnectTaskReq, StartLater: &startLaterCreateSmartConnectTaskReq, TaskName: &taskNameCreateSmartConnectTaskReq, } response, err := client.CreateConnectorTask(request) if err == nil { fmt.Printf("%+v\n", response) } else { fmt.Println(err) } }
- Criação de uma tarefa de replicação de dados do Kafka que é iniciada mais tarde.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79
package main import ( "fmt" "github.com/huaweicloud/huaweicloud-sdk-go-v3/core/auth/basic" kafka "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/kafka/v2" "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/kafka/v2/model" region "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/kafka/v2/region" ) func main() { // The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security. // In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment ak := os.Getenv("CLOUD_SDK_AK") sk := os.Getenv("CLOUD_SDK_SK") projectId := "{project_id}" auth := basic.NewCredentialsBuilder(). WithAk(ak). WithSk(sk). WithProjectId(projectId). Build() client := kafka.NewKafkaClient( kafka.KafkaClientBuilder(). WithRegion(region.ValueOf("<YOUR REGION>")). WithCredential(auth). Build()) request := &model.CreateConnectorTaskRequest{} request.InstanceId = "{instance_id}" currentClusterNameSourceTask:= "A" clusterNameSourceTask:= "B" userNameSourceTask:= "user1" passwordSourceTask:= "********" saslMechanismSourceTask:= "SCRAM-SHA-512" instanceIdSourceTask:= "b54c9dd8-********-********" directionSourceTask:= "two-way" syncConsumerOffsetsEnabledSourceTask:= false replicationFactorSourceTask:= int32(3) taskNumSourceTask:= int32(2) renameTopicEnabledSourceTask:= false provenanceHeaderEnabledSourceTask:= true consumerStrategySourceTask:= "latest" compressionTypeSourceTask:= "snappy" topicsMappingSourceTask:= "topic-sc-1:topic-sc-3,topic-sc-2:topic-sc-4" sourceTaskbody := &model.SmartConnectTaskReqSourceConfig{ CurrentClusterName: ¤tClusterNameSourceTask, ClusterName: &clusterNameSourceTask, UserName: &userNameSourceTask, Password: &passwordSourceTask, SaslMechanism: &saslMechanismSourceTask, InstanceId: &instanceIdSourceTask, Direction: &directionSourceTask, SyncConsumerOffsetsEnabled: &syncConsumerOffsetsEnabledSourceTask, ReplicationFactor: &replicationFactorSourceTask, TaskNum: &taskNumSourceTask, RenameTopicEnabled: &renameTopicEnabledSourceTask, ProvenanceHeaderEnabled: &provenanceHeaderEnabledSourceTask, ConsumerStrategy: &consumerStrategySourceTask, CompressionType: &compressionTypeSourceTask, TopicsMapping: &topicsMappingSourceTask, } sourceTypeCreateSmartConnectTaskReq:= model.GetCreateSmartConnectTaskReqSourceTypeEnum().KAFKA_REPLICATOR_SOURCE startLaterCreateSmartConnectTaskReq:= true taskNameCreateSmartConnectTaskReq:= "smart-connect-2" request.Body = &model.CreateSmartConnectTaskReq{ SourceTask: sourceTaskbody, SourceType: &sourceTypeCreateSmartConnectTaskReq, StartLater: &startLaterCreateSmartConnectTaskReq, TaskName: &taskNameCreateSmartConnectTaskReq, } response, err := client.CreateConnectorTask(request) if err == nil { fmt.Printf("%+v\n", response) } else { fmt.Println(err) } }
- Criação de uma tarefa de replicação de dados do Redis que começa imediatamente: sincronização total, 10 tempos máximos de repetição, intervalo de repetição de 10.000 ms e limite de largura de banda de 10 KB/s.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76
package main import ( "fmt" "github.com/huaweicloud/huaweicloud-sdk-go-v3/core/auth/basic" kafka "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/kafka/v2" "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/kafka/v2/model" region "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/kafka/v2/region" ) func main() { // The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security. // In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment ak := os.Getenv("CLOUD_SDK_AK") sk := os.Getenv("CLOUD_SDK_SK") projectId := "{project_id}" auth := basic.NewCredentialsBuilder(). WithAk(ak). WithSk(sk). WithProjectId(projectId). Build() client := kafka.NewKafkaClient( kafka.KafkaClientBuilder(). WithRegion(region.ValueOf("<YOUR REGION>")). WithCredential(auth). Build()) request := &model.CreateConnectorTaskRequest{} request.InstanceId = "{instance_id}" redisAddressSinkTask:= "192.168.119.51:6379" redisTypeSinkTask:= "standalone" redisPasswordSinkTask:= "********" sinkTaskbody := &model.SmartConnectTaskReqSinkConfig{ RedisAddress: &redisAddressSinkTask, RedisType: &redisTypeSinkTask, RedisPassword: &redisPasswordSinkTask, } redisAddressSourceTask:= "192.168.99.249:6379,192.168.120.127:6379,192.168.116.219:6379" redisTypeSourceTask:= "cluster" redisPasswordSourceTask:= "********" syncModeSourceTask:= "RDB_ONLY" fullSyncWaitMsSourceTask:= int32(10000) fullSyncMaxRetrySourceTask:= int32(10) ratelimitSourceTask:= int32(10) sourceTaskbody := &model.SmartConnectTaskReqSourceConfig{ RedisAddress: &redisAddressSourceTask, RedisType: &redisTypeSourceTask, RedisPassword: &redisPasswordSourceTask, SyncMode: &syncModeSourceTask, FullSyncWaitMs: &fullSyncWaitMsSourceTask, FullSyncMaxRetry: &fullSyncMaxRetrySourceTask, Ratelimit: &ratelimitSourceTask, } sinkTypeCreateSmartConnectTaskReq:= model.GetCreateSmartConnectTaskReqSinkTypeEnum().REDIS_REPLICATOR_SINK sourceTypeCreateSmartConnectTaskReq:= model.GetCreateSmartConnectTaskReqSourceTypeEnum().REDIS_REPLICATOR_SOURCE topicsCreateSmartConnectTaskReq:= "topic-sc-3" startLaterCreateSmartConnectTaskReq:= false taskNameCreateSmartConnectTaskReq:= "smart-connect-3" request.Body = &model.CreateSmartConnectTaskReq{ SinkTask: sinkTaskbody, SinkType: &sinkTypeCreateSmartConnectTaskReq, SourceTask: sourceTaskbody, SourceType: &sourceTypeCreateSmartConnectTaskReq, Topics: &topicsCreateSmartConnectTaskReq, StartLater: &startLaterCreateSmartConnectTaskReq, TaskName: &taskNameCreateSmartConnectTaskReq, } response, err := client.CreateConnectorTask(request) if err == nil { fmt.Printf("%+v\n", response) } else { fmt.Println(err) } }
Mais
Para obter o código de exemplo do SDK de mais linguagens de programação, consulte a guia Código de exemplo no API Explorer. O código de exemplo do SDK pode ser gerado automaticamente.
Códigos de status
Código de status |
Descrição |
---|---|
200 |
Bem-sucedido. |
Códigos de erro
Consulte Códigos de erro.