Redefinição do deslocamento do grupo de consumidores para a posição especificada
Função
As instâncias do Kafka não oferecem suporte à redefinição on-line do deslocamento do consumidor. Antes de redefinir, pare o cliente para o qual o deslocamento deve ser redefinido. Depois que um cliente é interrompido, o servidor considera o cliente off-line somente após o período especificado em ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG (1000 ms por padrão).
Método de chamada
Para obter detalhes, consulte Chamada de APIs.
URI
PUT /v2/kafka/{project_id}/instances/{instance_id}/groups/{group}/reset-message-offset
Parâmetro |
Obrigatório |
Tipo |
Descrição |
---|---|---|---|
project_id |
Sim |
String |
ID do projeto. |
instance_id |
Sim |
String |
ID da instância. |
group |
Sim |
String |
Nome do grupo de consumidores. |
Parâmetros de solicitação
Parâmetro |
Obrigatório |
Tipo |
Descrição |
---|---|---|---|
topic |
Não |
String |
Nome do tópico. |
partition |
Sim |
Integer |
Número da partição. O valor padrão é -1, indicando que todas as partições são redefinidas. |
message_offset |
Não |
Long |
Redefinição do deslocamento do grupo de consumidores para a posição especificada.
Deve ser especificado message_offset ou timestamp. |
timestamp |
Não |
Long |
Hora especificada para a qual o deslocamento deve ser redefinido. O valor é um carimbo de data/hora UNIX, em milissegundos.
Deve ser especificado message_offset ou timestamp. |
Parâmetros de resposta
Nenhum
Exemplo de solicitações
-
Redefinição do deslocamento do grupo de consumidores para a posição especificada
POST https://{endpoint}/v2/kafka/{project_id}/instances/{instance_id}/groups/{group}/reset-message-offset { "topic" : "test", "partition" : 0, "message_offset" : 10 }
-
Redefinição do deslocamento do grupo de consumidores para o horário especificado
POST https://{endpoint}/v2/kafka/{project_id}/instances/{instance_id}/groups/{group}/reset-message-offset { "topic" : "test", "partition" : 0, "timestamp" : 1571812144000 }
Exemplo de respostas
Nenhum
Código de exemplo do SDK
O código de exemplo do SDK é o seguinte.
Java
-
Redefinição do deslocamento do grupo de consumidores para a posição especificada
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
package com.huaweicloud.sdk.test; import com.huaweicloud.sdk.core.auth.ICredential; import com.huaweicloud.sdk.core.auth.BasicCredentials; import com.huaweicloud.sdk.core.exception.ConnectionException; import com.huaweicloud.sdk.core.exception.RequestTimeoutException; import com.huaweicloud.sdk.core.exception.ServiceResponseException; import com.huaweicloud.sdk.kafka.v2.region.KafkaRegion; import com.huaweicloud.sdk.kafka.v2.*; import com.huaweicloud.sdk.kafka.v2.model.*; public class ResetMessageOffsetWithEngineSolution { public static void main(String[] args) { // The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security. // In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment String ak = System.getenv("CLOUD_SDK_AK"); String sk = System.getenv("CLOUD_SDK_SK"); String projectId = "{project_id}"; ICredential auth = new BasicCredentials() .withProjectId(projectId) .withAk(ak) .withSk(sk); KafkaClient client = KafkaClient.newBuilder() .withCredential(auth) .withRegion(KafkaRegion.valueOf("<YOUR REGION>")) .build(); ResetMessageOffsetWithEngineRequest request = new ResetMessageOffsetWithEngineRequest(); request.withInstanceId("{instance_id}"); request.withGroup("{group}"); ResetMessageOffsetReq body = new ResetMessageOffsetReq(); body.withMessageOffset(10L); body.withPartition(0); body.withTopic("test"); request.withBody(body); try { ResetMessageOffsetWithEngineResponse response = client.resetMessageOffsetWithEngine(request); System.out.println(response.toString()); } catch (ConnectionException e) { e.printStackTrace(); } catch (RequestTimeoutException e) { e.printStackTrace(); } catch (ServiceResponseException e) { e.printStackTrace(); System.out.println(e.getHttpStatusCode()); System.out.println(e.getRequestId()); System.out.println(e.getErrorCode()); System.out.println(e.getErrorMsg()); } } }
-
Redefinição do deslocamento do grupo de consumidores para o horário especificado
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
package com.huaweicloud.sdk.test; import com.huaweicloud.sdk.core.auth.ICredential; import com.huaweicloud.sdk.core.auth.BasicCredentials; import com.huaweicloud.sdk.core.exception.ConnectionException; import com.huaweicloud.sdk.core.exception.RequestTimeoutException; import com.huaweicloud.sdk.core.exception.ServiceResponseException; import com.huaweicloud.sdk.kafka.v2.region.KafkaRegion; import com.huaweicloud.sdk.kafka.v2.*; import com.huaweicloud.sdk.kafka.v2.model.*; public class ResetMessageOffsetWithEngineSolution { public static void main(String[] args) { // The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security. // In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment String ak = System.getenv("CLOUD_SDK_AK"); String sk = System.getenv("CLOUD_SDK_SK"); String projectId = "{project_id}"; ICredential auth = new BasicCredentials() .withProjectId(projectId) .withAk(ak) .withSk(sk); KafkaClient client = KafkaClient.newBuilder() .withCredential(auth) .withRegion(KafkaRegion.valueOf("<YOUR REGION>")) .build(); ResetMessageOffsetWithEngineRequest request = new ResetMessageOffsetWithEngineRequest(); request.withInstanceId("{instance_id}"); request.withGroup("{group}"); ResetMessageOffsetReq body = new ResetMessageOffsetReq(); body.withTimestamp(1571812144000L); body.withPartition(0); body.withTopic("test"); request.withBody(body); try { ResetMessageOffsetWithEngineResponse response = client.resetMessageOffsetWithEngine(request); System.out.println(response.toString()); } catch (ConnectionException e) { e.printStackTrace(); } catch (RequestTimeoutException e) { e.printStackTrace(); } catch (ServiceResponseException e) { e.printStackTrace(); System.out.println(e.getHttpStatusCode()); System.out.println(e.getRequestId()); System.out.println(e.getErrorCode()); System.out.println(e.getErrorMsg()); } } }
Python
-
Redefinição do deslocamento do grupo de consumidores para a posição especificada
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
# coding: utf-8 from huaweicloudsdkcore.auth.credentials import BasicCredentials from huaweicloudsdkkafka.v2.region.kafka_region import KafkaRegion from huaweicloudsdkcore.exceptions import exceptions from huaweicloudsdkkafka.v2 import * if __name__ == "__main__": # The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security. # In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment ak = __import__('os').getenv("CLOUD_SDK_AK") sk = __import__('os').getenv("CLOUD_SDK_SK") projectId = "{project_id}" credentials = BasicCredentials(ak, sk, projectId) \ client = KafkaClient.new_builder() \ .with_credentials(credentials) \ .with_region(KafkaRegion.value_of("<YOUR REGION>")) \ .build() try: request = ResetMessageOffsetWithEngineRequest() request.instance_id = "{instance_id}" request.group = "{group}" request.body = ResetMessageOffsetReq( message_offset=10, partition=0, topic="test" ) response = client.reset_message_offset_with_engine(request) print(response) except exceptions.ClientRequestException as e: print(e.status_code) print(e.request_id) print(e.error_code) print(e.error_msg)
-
Redefinição do deslocamento do grupo de consumidores para o horário especificado
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
# coding: utf-8 from huaweicloudsdkcore.auth.credentials import BasicCredentials from huaweicloudsdkkafka.v2.region.kafka_region import KafkaRegion from huaweicloudsdkcore.exceptions import exceptions from huaweicloudsdkkafka.v2 import * if __name__ == "__main__": # The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security. # In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment ak = __import__('os').getenv("CLOUD_SDK_AK") sk = __import__('os').getenv("CLOUD_SDK_SK") projectId = "{project_id}" credentials = BasicCredentials(ak, sk, projectId) \ client = KafkaClient.new_builder() \ .with_credentials(credentials) \ .with_region(KafkaRegion.value_of("<YOUR REGION>")) \ .build() try: request = ResetMessageOffsetWithEngineRequest() request.instance_id = "{instance_id}" request.group = "{group}" request.body = ResetMessageOffsetReq( timestamp=1571812144000, partition=0, topic="test" ) response = client.reset_message_offset_with_engine(request) print(response) except exceptions.ClientRequestException as e: print(e.status_code) print(e.request_id) print(e.error_code) print(e.error_msg)
Go
-
Redefinição do deslocamento do grupo de consumidores para a posição especificada
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
package main import ( "fmt" "github.com/huaweicloud/huaweicloud-sdk-go-v3/core/auth/basic" kafka "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/kafka/v2" "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/kafka/v2/model" region "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/kafka/v2/region" ) func main() { // The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security. // In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment ak := os.Getenv("CLOUD_SDK_AK") sk := os.Getenv("CLOUD_SDK_SK") projectId := "{project_id}" auth := basic.NewCredentialsBuilder(). WithAk(ak). WithSk(sk). WithProjectId(projectId). Build() client := kafka.NewKafkaClient( kafka.KafkaClientBuilder(). WithRegion(region.ValueOf("<YOUR REGION>")). WithCredential(auth). Build()) request := &model.ResetMessageOffsetWithEngineRequest{} request.InstanceId = "{instance_id}" request.Group = "{group}" messageOffsetResetMessageOffsetReq:= int64(10) topicResetMessageOffsetReq:= "test" request.Body = &model.ResetMessageOffsetReq{ MessageOffset: &messageOffsetResetMessageOffsetReq, Partition: int32(0), Topic: &topicResetMessageOffsetReq, } response, err := client.ResetMessageOffsetWithEngine(request) if err == nil { fmt.Printf("%+v\n", response) } else { fmt.Println(err) } }
-
Redefinição do deslocamento do grupo de consumidores para o horário especificado
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
package main import ( "fmt" "github.com/huaweicloud/huaweicloud-sdk-go-v3/core/auth/basic" kafka "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/kafka/v2" "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/kafka/v2/model" region "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/kafka/v2/region" ) func main() { // The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security. // In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment ak := os.Getenv("CLOUD_SDK_AK") sk := os.Getenv("CLOUD_SDK_SK") projectId := "{project_id}" auth := basic.NewCredentialsBuilder(). WithAk(ak). WithSk(sk). WithProjectId(projectId). Build() client := kafka.NewKafkaClient( kafka.KafkaClientBuilder(). WithRegion(region.ValueOf("<YOUR REGION>")). WithCredential(auth). Build()) request := &model.ResetMessageOffsetWithEngineRequest{} request.InstanceId = "{instance_id}" request.Group = "{group}" timestampResetMessageOffsetReq:= int64(1571812144000) topicResetMessageOffsetReq:= "test" request.Body = &model.ResetMessageOffsetReq{ Timestamp: ×tampResetMessageOffsetReq, Partition: int32(0), Topic: &topicResetMessageOffsetReq, } response, err := client.ResetMessageOffsetWithEngine(request) if err == nil { fmt.Printf("%+v\n", response) } else { fmt.Println(err) } }
Mais
Para obter o código de exemplo do SDK de mais linguagens de programação, consulte a guia Código de exemplo no API Explorer. O código de exemplo do SDK pode ser gerado automaticamente.
Códigos de status
Código de status |
Descrição |
---|---|
204 |
Bem-sucedido |
Códigos de erro
Consulte Códigos de erro.