Inicialização da reatribuição de partição para uma instância do Kafka
Função
Essa API é usada para enviar uma tarefa de rebalanceamento de partição a uma instância do Kafka ou calcular o tempo estimado de rebalanceamento.
Método de chamada
Para obter detalhes, consulte Chamada de APIs.
URI
POST /v2/kafka/{project_id}/instances/{instance_id}/reassign
Parâmetro |
Obrigatório |
Tipo |
Descrição |
---|---|---|---|
project_id |
Sim |
String |
ID do projeto. Para obter detalhes sobre como obtê-lo, consulte Obtenção de um ID de projeto. |
instance_id |
Sim |
String |
ID da instância. |
Parâmetros de solicitação
Parâmetro |
Obrigatório |
Tipo |
Descrição |
---|---|---|---|
reassignments |
Sim |
Array of PartitionReassignEntity objects |
Plano de reatribuição. |
throttle |
Não |
Integer |
Limite de reatribuição. |
is_schedule |
Não |
Boolean |
Indica se a tarefa está agendada. Se não, is_schedule e execute_at podem ser deixados em branco. Se sim, is_schedule é true e execute_at deve ser especificado. |
execute_at |
Não |
Long |
Horário de agendamento. O valor é um carimbo de data/hora UNIX, em ms. |
time_estimate |
Não |
Boolean |
Defina true para executar tarefas de estimativa de tempo e false para executar tarefas de rebalanceamento. Padrão: false |
Parâmetro |
Obrigatório |
Tipo |
Descrição |
---|---|---|---|
topic |
Sim |
String |
Nome do tópico. |
brokers |
Não |
Array of integers |
Lista de corretores aos quais as partições são reatribuídas. Este parâmetro é obrigatório na atribuição automática. |
replication_factor |
Não |
Integer |
Fator de replicação, que pode ser especificado na atribuição automática. |
assignment |
Não |
Array of TopicAssignment objects |
Plano de atribuição especificado manualmente. O parâmetro brokers e este parâmetro não podem estar vazios ao mesmo tempo. |
Parâmetros de resposta
Código de status: 200
Parâmetro |
Tipo |
Descrição |
---|---|---|
job_id |
String |
ID da tarefa. Somente job_id é retornado para uma tarefa de rebalanceamento. |
reassignment_time |
Integer |
Tempo estimado, em segundos. Somente reassignment_time é retornado para uma tarefa de estimativa de tempo. |
Exemplo de solicitações
POST https://{endpoint}/v2/kafka/{project_id}/instances/{instance_id}/reassign { "reassignments" : [ { "topic" : "topic-1513476102", "brokers" : [ 0, 1, 2 ], "replication_factor" : 3, "assignment" : [ { "partition" : 0, "partition_brokers" : [ 0, 1, 2 ] }, { "partition" : 1, "partition_brokers" : [ 1, 2, 0 ] }, { "partition" : 2, "partition_brokers" : [ 2, 0, 1 ] } ] }, { "topic" : "topic-1513558717", "brokers" : [ 0, 1, 4 ], "replication_factor" : 3, "assignment" : [ { "partition" : 0, "partition_brokers" : [ 0, 1, 2 ] }, { "partition" : 1, "partition_brokers" : [ 1, 2, 0 ] }, { "partition" : 2, "partition_brokers" : [ 2, 0, 1 ] } ] } ], "throttle" : 10000000, "time_estimate" : false }
Exemplo de respostas
Código de status: 200
Tarefa de rebalanceamento enviada ou o tempo estimado é retornado.
{ "job_id" : "8a2c259182ab0e9d0182ab1882560009", "reassignment_time" : 10 }
Código de exemplo do SDK
O código de exemplo do SDK é o seguinte.
Java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 |
package com.huaweicloud.sdk.test; import com.huaweicloud.sdk.core.auth.ICredential; import com.huaweicloud.sdk.core.auth.BasicCredentials; import com.huaweicloud.sdk.core.exception.ConnectionException; import com.huaweicloud.sdk.core.exception.RequestTimeoutException; import com.huaweicloud.sdk.core.exception.ServiceResponseException; import com.huaweicloud.sdk.kafka.v2.region.KafkaRegion; import com.huaweicloud.sdk.kafka.v2.*; import com.huaweicloud.sdk.kafka.v2.model.*; import java.util.List; import java.util.ArrayList; public class CreateReassignmentTaskSolution { public static void main(String[] args) { // The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security. // In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment String ak = System.getenv("CLOUD_SDK_AK"); String sk = System.getenv("CLOUD_SDK_SK"); String projectId = "{project_id}"; ICredential auth = new BasicCredentials() .withProjectId(projectId) .withAk(ak) .withSk(sk); KafkaClient client = KafkaClient.newBuilder() .withCredential(auth) .withRegion(KafkaRegion.valueOf("<YOUR REGION>")) .build(); CreateReassignmentTaskRequest request = new CreateReassignmentTaskRequest(); request.withInstanceId("{instance_id}"); PartitionReassignRequest body = new PartitionReassignRequest(); List<Integer> listAssignmentPartitionBrokers = new ArrayList<>(); listAssignmentPartitionBrokers.add(2); listAssignmentPartitionBrokers.add(0); listAssignmentPartitionBrokers.add(1); List<Integer> listAssignmentPartitionBrokers1 = new ArrayList<>(); listAssignmentPartitionBrokers1.add(1); listAssignmentPartitionBrokers1.add(2); listAssignmentPartitionBrokers1.add(0); List<Integer> listAssignmentPartitionBrokers2 = new ArrayList<>(); listAssignmentPartitionBrokers2.add(0); listAssignmentPartitionBrokers2.add(1); listAssignmentPartitionBrokers2.add(2); List<TopicAssignment> listReassignmentsAssignment = new ArrayList<>(); listReassignmentsAssignment.add( new TopicAssignment() .withPartition(0) .withPartitionBrokers(listAssignmentPartitionBrokers2) ); listReassignmentsAssignment.add( new TopicAssignment() .withPartition(1) .withPartitionBrokers(listAssignmentPartitionBrokers1) ); listReassignmentsAssignment.add( new TopicAssignment() .withPartition(2) .withPartitionBrokers(listAssignmentPartitionBrokers) ); List<Integer> listReassignmentsBrokers = new ArrayList<>(); listReassignmentsBrokers.add(0); listReassignmentsBrokers.add(1); listReassignmentsBrokers.add(4); List<Integer> listAssignmentPartitionBrokers3 = new ArrayList<>(); listAssignmentPartitionBrokers3.add(2); listAssignmentPartitionBrokers3.add(0); listAssignmentPartitionBrokers3.add(1); List<Integer> listAssignmentPartitionBrokers4 = new ArrayList<>(); listAssignmentPartitionBrokers4.add(1); listAssignmentPartitionBrokers4.add(2); listAssignmentPartitionBrokers4.add(0); List<Integer> listAssignmentPartitionBrokers5 = new ArrayList<>(); listAssignmentPartitionBrokers5.add(0); listAssignmentPartitionBrokers5.add(1); listAssignmentPartitionBrokers5.add(2); List<TopicAssignment> listReassignmentsAssignment1 = new ArrayList<>(); listReassignmentsAssignment1.add( new TopicAssignment() .withPartition(0) .withPartitionBrokers(listAssignmentPartitionBrokers5) ); listReassignmentsAssignment1.add( new TopicAssignment() .withPartition(1) .withPartitionBrokers(listAssignmentPartitionBrokers4) ); listReassignmentsAssignment1.add( new TopicAssignment() .withPartition(2) .withPartitionBrokers(listAssignmentPartitionBrokers3) ); List<Integer> listReassignmentsBrokers1 = new ArrayList<>(); listReassignmentsBrokers1.add(0); listReassignmentsBrokers1.add(1); listReassignmentsBrokers1.add(2); List<PartitionReassignEntity> listbodyReassignments = new ArrayList<>(); listbodyReassignments.add( new PartitionReassignEntity() .withTopic("topic-1513476102") .withBrokers(listReassignmentsBrokers1) .withReplicationFactor(3) .withAssignment(listReassignmentsAssignment1) ); listbodyReassignments.add( new PartitionReassignEntity() .withTopic("topic-1513558717") .withBrokers(listReassignmentsBrokers) .withReplicationFactor(3) .withAssignment(listReassignmentsAssignment) ); body.withTimeEstimate(false); body.withThrottle(10000000); body.withReassignments(listbodyReassignments); request.withBody(body); try { CreateReassignmentTaskResponse response = client.createReassignmentTask(request); System.out.println(response.toString()); } catch (ConnectionException e) { e.printStackTrace(); } catch (RequestTimeoutException e) { e.printStackTrace(); } catch (ServiceResponseException e) { e.printStackTrace(); System.out.println(e.getHttpStatusCode()); System.out.println(e.getRequestId()); System.out.println(e.getErrorCode()); System.out.println(e.getErrorMsg()); } } } |
Python
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 |
# coding: utf-8 from huaweicloudsdkcore.auth.credentials import BasicCredentials from huaweicloudsdkkafka.v2.region.kafka_region import KafkaRegion from huaweicloudsdkcore.exceptions import exceptions from huaweicloudsdkkafka.v2 import * if __name__ == "__main__": # The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security. # In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment ak = __import__('os').getenv("CLOUD_SDK_AK") sk = __import__('os').getenv("CLOUD_SDK_SK") projectId = "{project_id}" credentials = BasicCredentials(ak, sk, projectId) \ client = KafkaClient.new_builder() \ .with_credentials(credentials) \ .with_region(KafkaRegion.value_of("<YOUR REGION>")) \ .build() try: request = CreateReassignmentTaskRequest() request.instance_id = "{instance_id}" listPartitionBrokersAssignment = [ 2, 0, 1 ] listPartitionBrokersAssignment1 = [ 1, 2, 0 ] listPartitionBrokersAssignment2 = [ 0, 1, 2 ] listAssignmentReassignments = [ TopicAssignment( partition=0, partition_brokers=listPartitionBrokersAssignment2 ), TopicAssignment( partition=1, partition_brokers=listPartitionBrokersAssignment1 ), TopicAssignment( partition=2, partition_brokers=listPartitionBrokersAssignment ) ] listBrokersReassignments = [ 0, 1, 4 ] listPartitionBrokersAssignment3 = [ 2, 0, 1 ] listPartitionBrokersAssignment4 = [ 1, 2, 0 ] listPartitionBrokersAssignment5 = [ 0, 1, 2 ] listAssignmentReassignments1 = [ TopicAssignment( partition=0, partition_brokers=listPartitionBrokersAssignment5 ), TopicAssignment( partition=1, partition_brokers=listPartitionBrokersAssignment4 ), TopicAssignment( partition=2, partition_brokers=listPartitionBrokersAssignment3 ) ] listBrokersReassignments1 = [ 0, 1, 2 ] listReassignmentsbody = [ PartitionReassignEntity( topic="topic-1513476102", brokers=listBrokersReassignments1, replication_factor=3, assignment=listAssignmentReassignments1 ), PartitionReassignEntity( topic="topic-1513558717", brokers=listBrokersReassignments, replication_factor=3, assignment=listAssignmentReassignments ) ] request.body = PartitionReassignRequest( time_estimate=False, throttle=10000000, reassignments=listReassignmentsbody ) response = client.create_reassignment_task(request) print(response) except exceptions.ClientRequestException as e: print(e.status_code) print(e.request_id) print(e.error_code) print(e.error_msg) |
Go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 |
package main import ( "fmt" "github.com/huaweicloud/huaweicloud-sdk-go-v3/core/auth/basic" kafka "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/kafka/v2" "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/kafka/v2/model" region "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/kafka/v2/region" ) func main() { // The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security. // In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment ak := os.Getenv("CLOUD_SDK_AK") sk := os.Getenv("CLOUD_SDK_SK") projectId := "{project_id}" auth := basic.NewCredentialsBuilder(). WithAk(ak). WithSk(sk). WithProjectId(projectId). Build() client := kafka.NewKafkaClient( kafka.KafkaClientBuilder(). WithRegion(region.ValueOf("<YOUR REGION>")). WithCredential(auth). Build()) request := &model.CreateReassignmentTaskRequest{} request.InstanceId = "{instance_id}" var listPartitionBrokersAssignment = []int32{ int32(2), int32(0), int32(1), } var listPartitionBrokersAssignment1 = []int32{ int32(1), int32(2), int32(0), } var listPartitionBrokersAssignment2 = []int32{ int32(0), int32(1), int32(2), } partitionAssignment:= int32(0) partitionAssignment1:= int32(1) partitionAssignment2:= int32(2) var listAssignmentReassignments = []model.TopicAssignment{ { Partition: &partitionAssignment, PartitionBrokers: &listPartitionBrokersAssignment2, }, { Partition: &partitionAssignment1, PartitionBrokers: &listPartitionBrokersAssignment1, }, { Partition: &partitionAssignment2, PartitionBrokers: &listPartitionBrokersAssignment, }, } var listBrokersReassignments = []int32{ int32(0), int32(1), int32(4), } var listPartitionBrokersAssignment3 = []int32{ int32(2), int32(0), int32(1), } var listPartitionBrokersAssignment4 = []int32{ int32(1), int32(2), int32(0), } var listPartitionBrokersAssignment5 = []int32{ int32(0), int32(1), int32(2), } partitionAssignment3:= int32(0) partitionAssignment4:= int32(1) partitionAssignment5:= int32(2) var listAssignmentReassignments1 = []model.TopicAssignment{ { Partition: &partitionAssignment3, PartitionBrokers: &listPartitionBrokersAssignment5, }, { Partition: &partitionAssignment4, PartitionBrokers: &listPartitionBrokersAssignment4, }, { Partition: &partitionAssignment5, PartitionBrokers: &listPartitionBrokersAssignment3, }, } var listBrokersReassignments1 = []int32{ int32(0), int32(1), int32(2), } replicationFactorReassignments:= int32(3) replicationFactorReassignments1:= int32(3) var listReassignmentsbody = []model.PartitionReassignEntity{ { Topic: "topic-1513476102", Brokers: &listBrokersReassignments1, ReplicationFactor: &replicationFactorReassignments, Assignment: &listAssignmentReassignments1, }, { Topic: "topic-1513558717", Brokers: &listBrokersReassignments, ReplicationFactor: &replicationFactorReassignments1, Assignment: &listAssignmentReassignments, }, } timeEstimatePartitionReassignRequest:= false throttlePartitionReassignRequest:= int32(10000000) request.Body = &model.PartitionReassignRequest{ TimeEstimate: &timeEstimatePartitionReassignRequest, Throttle: &throttlePartitionReassignRequest, Reassignments: listReassignmentsbody, } response, err := client.CreateReassignmentTask(request) if err == nil { fmt.Printf("%+v\n", response) } else { fmt.Println(err) } } |
Mais
Para obter o código de exemplo do SDK de mais linguagens de programação, consulte a guia Código de exemplo no API Explorer. O código de exemplo do SDK pode ser gerado automaticamente.
Códigos de status
Código de status |
Descrição |
---|---|
200 |
Tarefa de rebalanceamento enviada ou o tempo estimado é retornado. |
Códigos de erro
Consulte Códigos de erro.