Redefinição do deslocamento do grupo de consumidores para a posição especificada

Esta API está desatualizada e pode não ser mantida no futuro. Use a API descrita em Redefinição do deslocamento do grupo de consumidores para a posição especificada.
Função
As instâncias do Kafka não oferecem suporte à redefinição on-line do deslocamento do consumidor. Antes de redefinir, pare o cliente para o qual o deslocamento deve ser reiniciado. Depois que um cliente é interrompido, o servidor considera o cliente off-line somente após o período especificado em ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG (1000 ms por padrão).
Método de chamada
Para obter detalhes, consulte Como chamar uma API.
URI
POST /v2/{project_id}/instances/{instance_id}/management/groups/{group}/reset-message-offset
Parâmetro |
Obrigatório |
Tipo |
Descrição |
---|---|---|---|
project_id |
Sim |
String |
ID do projeto. Para obter detalhes, consulte Obtenção de um ID de projeto. |
instance_id |
Sim |
String |
ID da instância. |
group |
Sim |
String |
Nome do grupo de consumidores. |
Parâmetros de solicitação
Parâmetro |
Obrigatório |
Tipo |
Descrição |
---|---|---|---|
topic |
Não |
String |
Nome do tópico. |
partition |
Sim |
Integer |
Número da partição. O valor padrão é -1, indicando que todas as partições são redefinidas. |
message_offset |
Não |
Long |
Esse parâmetro redefine o deslocamento do grupo de consumidores para a posição especificada.
Deve ser especificado message_offset ou timestamp. |
timestamp |
Não |
Long |
Este parâmetro redefine o deslocamento do grupo de consumidores para o tempo especificado. O valor é um carimbo de data/hora UNIX, em milissegundos.
Deve ser especificado message_offset ou timestamp. |
Parâmetros de resposta
Nenhum
Exemplo de solicitações
- Um deslocamento de grupo de consumidores é redefinido para a posição especificada.
POST https://{endpoint}/v2/{project_id}/instances/{instance_id}/management/groups/{group}/reset-message-offset { "topic" : "test", "partition" : 0, "message_offset" : 10 }
- Um deslocamento de grupo de consumidores é redefinido para o tempo especificado.
POST https://{endpoint}/v2/{project_id}/instances/{instance_id}/management/groups/{group}/reset-message-offset { "topic" : "test", "partition" : 0, "timestamp" : 1571812144000 }
Exemplo de resposta
Nenhum
Exemplos de código do SDK
A seguir estão exemplos de código do SDK.
Java
- Um deslocamento de grupo de consumidores é redefinido para a posição especificada.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
package com.huaweicloud.sdk.test; import com.huaweicloud.sdk.core.auth.ICredential; import com.huaweicloud.sdk.core.auth.BasicCredentials; import com.huaweicloud.sdk.core.exception.ConnectionException; import com.huaweicloud.sdk.core.exception.RequestTimeoutException; import com.huaweicloud.sdk.core.exception.ServiceResponseException; import com.huaweicloud.sdk.kafka.v2.region.KafkaRegion; import com.huaweicloud.sdk.kafka.v2.*; import com.huaweicloud.sdk.kafka.v2.model.*; public class ResetMessageOffsetSolution { public static void main(String[] args) { // The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security. // In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment String ak = System.getenv("CLOUD_SDK_AK"); String sk = System.getenv("CLOUD_SDK_SK"); ICredential auth = new BasicCredentials() .withAk(ak) .withSk(sk); KafkaClient client = KafkaClient.newBuilder() .withCredential(auth) .withRegion(KafkaRegion.valueOf("<YOUR REGION>")) .build(); ResetMessageOffsetRequest request = new ResetMessageOffsetRequest(); ResetMessageOffsetReq body = new ResetMessageOffsetReq(); body.withMessageOffset(10L); body.withPartition(0); body.withTopic("test"); request.withBody(body); try { ResetMessageOffsetResponse response = client.resetMessageOffset(request); System.out.println(response.toString()); } catch (ConnectionException e) { e.printStackTrace(); } catch (RequestTimeoutException e) { e.printStackTrace(); } catch (ServiceResponseException e) { e.printStackTrace(); System.out.println(e.getHttpStatusCode()); System.out.println(e.getRequestId()); System.out.println(e.getErrorCode()); System.out.println(e.getErrorMsg()); } } }
- Um deslocamento de grupo de consumidores é redefinido para o tempo especificado.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
package com.huaweicloud.sdk.test; import com.huaweicloud.sdk.core.auth.ICredential; import com.huaweicloud.sdk.core.auth.BasicCredentials; import com.huaweicloud.sdk.core.exception.ConnectionException; import com.huaweicloud.sdk.core.exception.RequestTimeoutException; import com.huaweicloud.sdk.core.exception.ServiceResponseException; import com.huaweicloud.sdk.kafka.v2.region.KafkaRegion; import com.huaweicloud.sdk.kafka.v2.*; import com.huaweicloud.sdk.kafka.v2.model.*; public class ResetMessageOffsetSolution { public static void main(String[] args) { // The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security. // In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment String ak = System.getenv("CLOUD_SDK_AK"); String sk = System.getenv("CLOUD_SDK_SK"); ICredential auth = new BasicCredentials() .withAk(ak) .withSk(sk); KafkaClient client = KafkaClient.newBuilder() .withCredential(auth) .withRegion(KafkaRegion.valueOf("<YOUR REGION>")) .build(); ResetMessageOffsetRequest request = new ResetMessageOffsetRequest(); ResetMessageOffsetReq body = new ResetMessageOffsetReq(); body.withTimestamp(1571812144000L); body.withPartition(0); body.withTopic("test"); request.withBody(body); try { ResetMessageOffsetResponse response = client.resetMessageOffset(request); System.out.println(response.toString()); } catch (ConnectionException e) { e.printStackTrace(); } catch (RequestTimeoutException e) { e.printStackTrace(); } catch (ServiceResponseException e) { e.printStackTrace(); System.out.println(e.getHttpStatusCode()); System.out.println(e.getRequestId()); System.out.println(e.getErrorCode()); System.out.println(e.getErrorMsg()); } } }
Python
- Um deslocamento de grupo de consumidores é redefinido para a posição especificada.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
# coding: utf-8 from huaweicloudsdkcore.auth.credentials import BasicCredentials from huaweicloudsdkkafka.v2.region.kafka_region import KafkaRegion from huaweicloudsdkcore.exceptions import exceptions from huaweicloudsdkkafka.v2 import * if __name__ == "__main__": # The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security. # In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment ak = os.getenv("CLOUD_SDK_AK") sk = os.getenv("CLOUD_SDK_SK") credentials = BasicCredentials(ak, sk) \ client = KafkaClient.new_builder() \ .with_credentials(credentials) \ .with_region(KafkaRegion.value_of("<YOUR REGION>")) \ .build() try: request = ResetMessageOffsetRequest() request.body = ResetMessageOffsetReq( message_offset=10, partition=0, topic="test" ) response = client.reset_message_offset(request) print(response) except exceptions.ClientRequestException as e: print(e.status_code) print(e.request_id) print(e.error_code) print(e.error_msg)
- Um deslocamento de grupo de consumidores é redefinido para o tempo especificado.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
# coding: utf-8 from huaweicloudsdkcore.auth.credentials import BasicCredentials from huaweicloudsdkkafka.v2.region.kafka_region import KafkaRegion from huaweicloudsdkcore.exceptions import exceptions from huaweicloudsdkkafka.v2 import * if __name__ == "__main__": # The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security. # In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment ak = os.getenv("CLOUD_SDK_AK") sk = os.getenv("CLOUD_SDK_SK") credentials = BasicCredentials(ak, sk) \ client = KafkaClient.new_builder() \ .with_credentials(credentials) \ .with_region(KafkaRegion.value_of("<YOUR REGION>")) \ .build() try: request = ResetMessageOffsetRequest() request.body = ResetMessageOffsetReq( timestamp=1571812144000, partition=0, topic="test" ) response = client.reset_message_offset(request) print(response) except exceptions.ClientRequestException as e: print(e.status_code) print(e.request_id) print(e.error_code) print(e.error_msg)
Go
- Um deslocamento de grupo de consumidores é redefinido para a posição especificada.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
package main import ( "fmt" "github.com/huaweicloud/huaweicloud-sdk-go-v3/core/auth/basic" kafka "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/kafka/v2" "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/kafka/v2/model" region "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/kafka/v2/region" ) func main() { // The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security. // In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment ak := os.Getenv("CLOUD_SDK_AK") sk := os.Getenv("CLOUD_SDK_SK") auth := basic.NewCredentialsBuilder(). WithAk(ak). WithSk(sk). Build() client := kafka.NewKafkaClient( kafka.KafkaClientBuilder(). WithRegion(region.ValueOf("<YOUR REGION>")). WithCredential(auth). Build()) request := &model.ResetMessageOffsetRequest{} messageOffsetResetMessageOffsetReq:= int64(10) topicResetMessageOffsetReq:= "test" request.Body = &model.ResetMessageOffsetReq{ MessageOffset: &messageOffsetResetMessageOffsetReq, Partition: int32(0), Topic: &topicResetMessageOffsetReq, } response, err := client.ResetMessageOffset(request) if err == nil { fmt.Printf("%+v\n", response) } else { fmt.Println(err) } }
- Um deslocamento de grupo de consumidores é redefinido para o tempo especificado.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
package main import ( "fmt" "github.com/huaweicloud/huaweicloud-sdk-go-v3/core/auth/basic" kafka "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/kafka/v2" "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/kafka/v2/model" region "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/kafka/v2/region" ) func main() { // The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security. // In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment ak := os.Getenv("CLOUD_SDK_AK") sk := os.Getenv("CLOUD_SDK_SK") auth := basic.NewCredentialsBuilder(). WithAk(ak). WithSk(sk). Build() client := kafka.NewKafkaClient( kafka.KafkaClientBuilder(). WithRegion(region.ValueOf("<YOUR REGION>")). WithCredential(auth). Build()) request := &model.ResetMessageOffsetRequest{} timestampResetMessageOffsetReq:= int64(1571812144000) topicResetMessageOffsetReq:= "test" request.Body = &model.ResetMessageOffsetReq{ Timestamp: ×tampResetMessageOffsetReq, Partition: int32(0), Topic: &topicResetMessageOffsetReq, } response, err := client.ResetMessageOffset(request) if err == nil { fmt.Printf("%+v\n", response) } else { fmt.Println(err) } }
Códigos de status
Código de status |
Descrição |
---|---|
204 |
Redefinição bem-sucedida do deslocamento do grupo de consumidores para a posição especificada. |
Códigos de erro
Consulte Códigos de erro.