Initiating Partition Reassigning for a Kafka Instance
Function
This API is used to submit a partition rebalancing task to a Kafka instance or calculate estimated rebalancing time.
Calling Method
For details, see Calling APIs.
URI
POST /v2/kafka/{project_id}/instances/{instance_id}/reassign
Parameter |
Mandatory |
Type |
Description |
---|---|---|---|
project_id |
Yes |
String |
Project ID. For details about how to obtain it, see Obtaining a Project ID. |
instance_id |
Yes |
String |
Instance ID. |
Request Parameters
Parameter |
Mandatory |
Type |
Description |
---|---|---|---|
reassignments |
Yes |
Array of PartitionReassignEntity objects |
Reassignment plan. |
throttle |
No |
Integer |
Reassignment threshold. |
is_schedule |
No |
Boolean |
Indicates whether the task is scheduled. If no, is_schedule and execute_at can be left blank. If yes, is_schedule is true and execute_at must be specified. |
execute_at |
No |
Long |
Schedule time. The value is a UNIX timestamp, in ms. |
time_estimate |
No |
Boolean |
Set true to perform time estimation tasks and false to perform rebalancing tasks. |
Parameter |
Mandatory |
Type |
Description |
---|---|---|---|
topic |
Yes |
String |
Topic name. |
brokers |
No |
Array of integers |
List of brokers to which partitions are reassigned. This parameter is mandatory in automatic assignment. |
replication_factor |
No |
Integer |
Replication factor, which can be specified in automatic assignment. |
assignment |
No |
Array of TopicAssignment objects |
Manually specified assignment plan. The brokers parameter and this parameter cannot be empty at the same time. |
Response Parameters
Status code: 200
Parameter |
Type |
Description |
---|---|---|
job_id |
String |
Task ID. Only job_id is returned for a rebalancing task. |
reassignment_time |
Integer |
Estimated time, in seconds. Only reassignment_time is returned for a time estimation task. |
Example Requests
POST https://{endpoint}/v2/kafka/{project_id}/instances/{instance_id}/reassign { "reassignments" : [ { "topic" : "topic-1513476102", "brokers" : [ 0, 1, 2 ], "replication_factor" : 3, "assignment" : [ { "partition" : 0, "partition_brokers" : [ 0, 1, 2 ] }, { "partition" : 1, "partition_brokers" : [ 1, 2, 0 ] }, { "partition" : 2, "partition_brokers" : [ 2, 0, 1 ] } ] }, { "topic" : "topic-1513558717", "brokers" : [ 0, 1, 4 ], "replication_factor" : 3, "assignment" : [ { "partition" : 0, "partition_brokers" : [ 0, 1, 2 ] }, { "partition" : 1, "partition_brokers" : [ 1, 2, 0 ] }, { "partition" : 2, "partition_brokers" : [ 2, 0, 1 ] } ] } ], "throttle" : 10000000, "time_estimate" : false }
Example Responses
Status code: 200
Rebalancing task submitted or the estimated time is returned.
{ "job_id" : "8a2c259182ab0e9d0182ab1882560009", "reassignment_time" : 10 }
SDK Sample Code
The SDK sample code is as follows.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 |
package com.huaweicloud.sdk.test; import com.huaweicloud.sdk.core.auth.ICredential; import com.huaweicloud.sdk.core.auth.BasicCredentials; import com.huaweicloud.sdk.core.exception.ConnectionException; import com.huaweicloud.sdk.core.exception.RequestTimeoutException; import com.huaweicloud.sdk.core.exception.ServiceResponseException; import com.huaweicloud.sdk.kafka.v2.region.KafkaRegion; import com.huaweicloud.sdk.kafka.v2.*; import com.huaweicloud.sdk.kafka.v2.model.*; import java.util.List; import java.util.ArrayList; public class CreateReassignmentTaskSolution { public static void main(String[] args) { // The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security. // In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment String ak = System.getenv("CLOUD_SDK_AK"); String sk = System.getenv("CLOUD_SDK_SK"); String projectId = "{project_id}"; ICredential auth = new BasicCredentials() .withProjectId(projectId) .withAk(ak) .withSk(sk); KafkaClient client = KafkaClient.newBuilder() .withCredential(auth) .withRegion(KafkaRegion.valueOf("<YOUR REGION>")) .build(); CreateReassignmentTaskRequest request = new CreateReassignmentTaskRequest(); request.withInstanceId("{instance_id}"); PartitionReassignRequest body = new PartitionReassignRequest(); List<Integer> listAssignmentPartitionBrokers = new ArrayList<>(); listAssignmentPartitionBrokers.add(2); listAssignmentPartitionBrokers.add(0); listAssignmentPartitionBrokers.add(1); List<Integer> listAssignmentPartitionBrokers1 = new ArrayList<>(); listAssignmentPartitionBrokers1.add(1); listAssignmentPartitionBrokers1.add(2); listAssignmentPartitionBrokers1.add(0); List<Integer> listAssignmentPartitionBrokers2 = new ArrayList<>(); listAssignmentPartitionBrokers2.add(0); listAssignmentPartitionBrokers2.add(1); listAssignmentPartitionBrokers2.add(2); List<TopicAssignment> listReassignmentsAssignment = new ArrayList<>(); listReassignmentsAssignment.add( new TopicAssignment() .withPartition(0) .withPartitionBrokers(listAssignmentPartitionBrokers2) ); listReassignmentsAssignment.add( new TopicAssignment() .withPartition(1) .withPartitionBrokers(listAssignmentPartitionBrokers1) ); listReassignmentsAssignment.add( new TopicAssignment() .withPartition(2) .withPartitionBrokers(listAssignmentPartitionBrokers) ); List<Integer> listReassignmentsBrokers = new ArrayList<>(); listReassignmentsBrokers.add(0); listReassignmentsBrokers.add(1); listReassignmentsBrokers.add(4); List<Integer> listAssignmentPartitionBrokers3 = new ArrayList<>(); listAssignmentPartitionBrokers3.add(2); listAssignmentPartitionBrokers3.add(0); listAssignmentPartitionBrokers3.add(1); List<Integer> listAssignmentPartitionBrokers4 = new ArrayList<>(); listAssignmentPartitionBrokers4.add(1); listAssignmentPartitionBrokers4.add(2); listAssignmentPartitionBrokers4.add(0); List<Integer> listAssignmentPartitionBrokers5 = new ArrayList<>(); listAssignmentPartitionBrokers5.add(0); listAssignmentPartitionBrokers5.add(1); listAssignmentPartitionBrokers5.add(2); List<TopicAssignment> listReassignmentsAssignment1 = new ArrayList<>(); listReassignmentsAssignment1.add( new TopicAssignment() .withPartition(0) .withPartitionBrokers(listAssignmentPartitionBrokers5) ); listReassignmentsAssignment1.add( new TopicAssignment() .withPartition(1) .withPartitionBrokers(listAssignmentPartitionBrokers4) ); listReassignmentsAssignment1.add( new TopicAssignment() .withPartition(2) .withPartitionBrokers(listAssignmentPartitionBrokers3) ); List<Integer> listReassignmentsBrokers1 = new ArrayList<>(); listReassignmentsBrokers1.add(0); listReassignmentsBrokers1.add(1); listReassignmentsBrokers1.add(2); List<PartitionReassignEntity> listbodyReassignments = new ArrayList<>(); listbodyReassignments.add( new PartitionReassignEntity() .withTopic("topic-1513476102") .withBrokers(listReassignmentsBrokers1) .withReplicationFactor(3) .withAssignment(listReassignmentsAssignment1) ); listbodyReassignments.add( new PartitionReassignEntity() .withTopic("topic-1513558717") .withBrokers(listReassignmentsBrokers) .withReplicationFactor(3) .withAssignment(listReassignmentsAssignment) ); body.withTimeEstimate(false); body.withThrottle(10000000); body.withReassignments(listbodyReassignments); request.withBody(body); try { CreateReassignmentTaskResponse response = client.createReassignmentTask(request); System.out.println(response.toString()); } catch (ConnectionException e) { e.printStackTrace(); } catch (RequestTimeoutException e) { e.printStackTrace(); } catch (ServiceResponseException e) { e.printStackTrace(); System.out.println(e.getHttpStatusCode()); System.out.println(e.getRequestId()); System.out.println(e.getErrorCode()); System.out.println(e.getErrorMsg()); } } } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 |
# coding: utf-8 import os from huaweicloudsdkcore.auth.credentials import BasicCredentials from huaweicloudsdkkafka.v2.region.kafka_region import KafkaRegion from huaweicloudsdkcore.exceptions import exceptions from huaweicloudsdkkafka.v2 import * if __name__ == "__main__": # The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security. # In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment ak = os.environ["CLOUD_SDK_AK"] sk = os.environ["CLOUD_SDK_SK"] projectId = "{project_id}" credentials = BasicCredentials(ak, sk, projectId) client = KafkaClient.new_builder() \ .with_credentials(credentials) \ .with_region(KafkaRegion.value_of("<YOUR REGION>")) \ .build() try: request = CreateReassignmentTaskRequest() request.instance_id = "{instance_id}" listPartitionBrokersAssignment = [ 2, 0, 1 ] listPartitionBrokersAssignment1 = [ 1, 2, 0 ] listPartitionBrokersAssignment2 = [ 0, 1, 2 ] listAssignmentReassignments = [ TopicAssignment( partition=0, partition_brokers=listPartitionBrokersAssignment2 ), TopicAssignment( partition=1, partition_brokers=listPartitionBrokersAssignment1 ), TopicAssignment( partition=2, partition_brokers=listPartitionBrokersAssignment ) ] listBrokersReassignments = [ 0, 1, 4 ] listPartitionBrokersAssignment3 = [ 2, 0, 1 ] listPartitionBrokersAssignment4 = [ 1, 2, 0 ] listPartitionBrokersAssignment5 = [ 0, 1, 2 ] listAssignmentReassignments1 = [ TopicAssignment( partition=0, partition_brokers=listPartitionBrokersAssignment5 ), TopicAssignment( partition=1, partition_brokers=listPartitionBrokersAssignment4 ), TopicAssignment( partition=2, partition_brokers=listPartitionBrokersAssignment3 ) ] listBrokersReassignments1 = [ 0, 1, 2 ] listReassignmentsbody = [ PartitionReassignEntity( topic="topic-1513476102", brokers=listBrokersReassignments1, replication_factor=3, assignment=listAssignmentReassignments1 ), PartitionReassignEntity( topic="topic-1513558717", brokers=listBrokersReassignments, replication_factor=3, assignment=listAssignmentReassignments ) ] request.body = PartitionReassignRequest( time_estimate=False, throttle=10000000, reassignments=listReassignmentsbody ) response = client.create_reassignment_task(request) print(response) except exceptions.ClientRequestException as e: print(e.status_code) print(e.request_id) print(e.error_code) print(e.error_msg) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 |
package main import ( "fmt" "github.com/huaweicloud/huaweicloud-sdk-go-v3/core/auth/basic" kafka "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/kafka/v2" "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/kafka/v2/model" region "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/kafka/v2/region" ) func main() { // The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security. // In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment ak := os.Getenv("CLOUD_SDK_AK") sk := os.Getenv("CLOUD_SDK_SK") projectId := "{project_id}" auth := basic.NewCredentialsBuilder(). WithAk(ak). WithSk(sk). WithProjectId(projectId). Build() client := kafka.NewKafkaClient( kafka.KafkaClientBuilder(). WithRegion(region.ValueOf("<YOUR REGION>")). WithCredential(auth). Build()) request := &model.CreateReassignmentTaskRequest{} request.InstanceId = "{instance_id}" var listPartitionBrokersAssignment = []int32{ int32(2), int32(0), int32(1), } var listPartitionBrokersAssignment1 = []int32{ int32(1), int32(2), int32(0), } var listPartitionBrokersAssignment2 = []int32{ int32(0), int32(1), int32(2), } partitionAssignment:= int32(0) partitionAssignment1:= int32(1) partitionAssignment2:= int32(2) var listAssignmentReassignments = []model.TopicAssignment{ { Partition: &partitionAssignment, PartitionBrokers: &listPartitionBrokersAssignment2, }, { Partition: &partitionAssignment1, PartitionBrokers: &listPartitionBrokersAssignment1, }, { Partition: &partitionAssignment2, PartitionBrokers: &listPartitionBrokersAssignment, }, } var listBrokersReassignments = []int32{ int32(0), int32(1), int32(4), } var listPartitionBrokersAssignment3 = []int32{ int32(2), int32(0), int32(1), } var listPartitionBrokersAssignment4 = []int32{ int32(1), int32(2), int32(0), } var listPartitionBrokersAssignment5 = []int32{ int32(0), int32(1), int32(2), } partitionAssignment3:= int32(0) partitionAssignment4:= int32(1) partitionAssignment5:= int32(2) var listAssignmentReassignments1 = []model.TopicAssignment{ { Partition: &partitionAssignment3, PartitionBrokers: &listPartitionBrokersAssignment5, }, { Partition: &partitionAssignment4, PartitionBrokers: &listPartitionBrokersAssignment4, }, { Partition: &partitionAssignment5, PartitionBrokers: &listPartitionBrokersAssignment3, }, } var listBrokersReassignments1 = []int32{ int32(0), int32(1), int32(2), } replicationFactorReassignments:= int32(3) replicationFactorReassignments1:= int32(3) var listReassignmentsbody = []model.PartitionReassignEntity{ { Topic: "topic-1513476102", Brokers: &listBrokersReassignments1, ReplicationFactor: &replicationFactorReassignments, Assignment: &listAssignmentReassignments1, }, { Topic: "topic-1513558717", Brokers: &listBrokersReassignments, ReplicationFactor: &replicationFactorReassignments1, Assignment: &listAssignmentReassignments, }, } timeEstimatePartitionReassignRequest:= false throttlePartitionReassignRequest:= int32(10000000) request.Body = &model.PartitionReassignRequest{ TimeEstimate: &timeEstimatePartitionReassignRequest, Throttle: &throttlePartitionReassignRequest, Reassignments: listReassignmentsbody, } response, err := client.CreateReassignmentTask(request) if err == nil { fmt.Printf("%+v\n", response) } else { fmt.Println(err) } } |
For SDK sample code of more programming languages, see the Sample Code tab in API Explorer. SDK sample code can be automatically generated.
Status Codes
Status Code |
Description |
---|---|
200 |
Rebalancing task submitted or the estimated time is returned. |
Error Codes
See Error Codes.
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot