Este conteúdo foi traduzido por máquina para sua conveniência e a Huawei Cloud não pode garantir que o conteúdo foi traduzido com precisão. Para exibir o conteúdo original, use o link no canto superior direito para mudar para a página em inglês.
Central de ajuda/ Distributed Message Service for Kafka/ Referência de API/ APIs desatualizadas/ API V2/ Redefinição do deslocamento do grupo de consumidores para a posição especificada
Atualizado em 2024-09-10 GMT+08:00

Redefinição do deslocamento do grupo de consumidores para a posição especificada

Esta API está desatualizada e pode não ser mantida no futuro. Use a API descrita em Redefinição do deslocamento do grupo de consumidores para a posição especificada.

Função

As instâncias do Kafka não oferecem suporte à redefinição on-line do deslocamento do consumidor. Antes de redefinir, pare o cliente para o qual o deslocamento deve ser reiniciado. Depois que um cliente é interrompido, o servidor considera o cliente off-line somente após o período especificado em ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG (1000 ms por padrão).

Método de chamada

Para obter detalhes, consulte Como chamar uma API.

URI

POST /v2/{project_id}/instances/{instance_id}/management/groups/{group}/reset-message-offset

Tabela 1 Parâmetros de URI

Parâmetro

Obrigatório

Tipo

Descrição

project_id

Sim

String

ID do projeto. Para obter detalhes, consulte Obtenção de um ID de projeto.

instance_id

Sim

String

ID da instância.

group

Sim

String

Nome do grupo de consumidores.

Parâmetros de solicitação

Tabela 2 Parâmetros do corpo da solicitação

Parâmetro

Obrigatório

Tipo

Descrição

topic

Não

String

Nome do tópico.

partition

Sim

Integer

Número da partição. O valor padrão é -1, indicando que todas as partições são redefinidas.

message_offset

Não

Long

Esse parâmetro redefine o deslocamento do grupo de consumidores para a posição especificada.

  • Se esta posição for anterior ao deslocamento mais antigo atual, o deslocamento será redefinido para a posição mais antiga.
  • Se essa posição for posterior ao último deslocamento atual, o deslocamento será redefinido para a posição mais recente.

Deve ser especificado message_offset ou timestamp.

timestamp

Não

Long

Este parâmetro redefine o deslocamento do grupo de consumidores para o tempo especificado. O valor é um carimbo de data/hora UNIX, em milissegundos.

  • Se esse carimbo de data/hora for anterior ao carimbo de data/hora mais antigo atual, o deslocamento será redefinido para o carimbo de data/hora mais antigo.
  • Se esse carimbo de data/hora for posterior ao carimbo de data/hora mais recente atual, o deslocamento será redefinido para o carimbo de data/hora mais recente.

Deve ser especificado message_offset ou timestamp.

Parâmetros de resposta

Nenhum

Exemplo de solicitações

  • Um deslocamento de grupo de consumidores é redefinido para a posição especificada.
    POST https://{endpoint}/v2/{project_id}/instances/{instance_id}/management/groups/{group}/reset-message-offset
    
    {
      "topic" : "test",
      "partition" : 0,
      "message_offset" : 10
    }
  • Um deslocamento de grupo de consumidores é redefinido para o tempo especificado.
    POST https://{endpoint}/v2/{project_id}/instances/{instance_id}/management/groups/{group}/reset-message-offset
    
    {
      "topic" : "test",
      "partition" : 0,
      "timestamp" : 1571812144000
    }

Exemplo de resposta

Nenhum

Exemplos de código do SDK

A seguir estão exemplos de código do SDK.

Java

  • Um deslocamento de grupo de consumidores é redefinido para a posição especificada.
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    package com.huaweicloud.sdk.test;
    
    import com.huaweicloud.sdk.core.auth.ICredential;
    import com.huaweicloud.sdk.core.auth.BasicCredentials;
    import com.huaweicloud.sdk.core.exception.ConnectionException;
    import com.huaweicloud.sdk.core.exception.RequestTimeoutException;
    import com.huaweicloud.sdk.core.exception.ServiceResponseException;
    import com.huaweicloud.sdk.kafka.v2.region.KafkaRegion;
    import com.huaweicloud.sdk.kafka.v2.*;
    import com.huaweicloud.sdk.kafka.v2.model.*;
    
    
    public class ResetMessageOffsetSolution {
    
        public static void main(String[] args) {
            // The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security.
            // In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment
            String ak = System.getenv("CLOUD_SDK_AK");
            String sk = System.getenv("CLOUD_SDK_SK");
    
            ICredential auth = new BasicCredentials()
                    .withAk(ak)
                    .withSk(sk);
    
            KafkaClient client = KafkaClient.newBuilder()
                    .withCredential(auth)
                    .withRegion(KafkaRegion.valueOf("<YOUR REGION>"))
                    .build();
            ResetMessageOffsetRequest request = new ResetMessageOffsetRequest();
            ResetMessageOffsetReq body = new ResetMessageOffsetReq();
            body.withMessageOffset(10L);
            body.withPartition(0);
            body.withTopic("test");
            request.withBody(body);
            try {
                ResetMessageOffsetResponse response = client.resetMessageOffset(request);
                System.out.println(response.toString());
            } catch (ConnectionException e) {
                e.printStackTrace();
            } catch (RequestTimeoutException e) {
                e.printStackTrace();
            } catch (ServiceResponseException e) {
                e.printStackTrace();
                System.out.println(e.getHttpStatusCode());
                System.out.println(e.getRequestId());
                System.out.println(e.getErrorCode());
                System.out.println(e.getErrorMsg());
            }
        }
    }
    
  • Um deslocamento de grupo de consumidores é redefinido para o tempo especificado.
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    package com.huaweicloud.sdk.test;
    
    import com.huaweicloud.sdk.core.auth.ICredential;
    import com.huaweicloud.sdk.core.auth.BasicCredentials;
    import com.huaweicloud.sdk.core.exception.ConnectionException;
    import com.huaweicloud.sdk.core.exception.RequestTimeoutException;
    import com.huaweicloud.sdk.core.exception.ServiceResponseException;
    import com.huaweicloud.sdk.kafka.v2.region.KafkaRegion;
    import com.huaweicloud.sdk.kafka.v2.*;
    import com.huaweicloud.sdk.kafka.v2.model.*;
    
    
    public class ResetMessageOffsetSolution {
    
        public static void main(String[] args) {
            // The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security.
            // In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment
            String ak = System.getenv("CLOUD_SDK_AK");
            String sk = System.getenv("CLOUD_SDK_SK");
    
            ICredential auth = new BasicCredentials()
                    .withAk(ak)
                    .withSk(sk);
    
            KafkaClient client = KafkaClient.newBuilder()
                    .withCredential(auth)
                    .withRegion(KafkaRegion.valueOf("<YOUR REGION>"))
                    .build();
            ResetMessageOffsetRequest request = new ResetMessageOffsetRequest();
            ResetMessageOffsetReq body = new ResetMessageOffsetReq();
            body.withTimestamp(1571812144000L);
            body.withPartition(0);
            body.withTopic("test");
            request.withBody(body);
            try {
                ResetMessageOffsetResponse response = client.resetMessageOffset(request);
                System.out.println(response.toString());
            } catch (ConnectionException e) {
                e.printStackTrace();
            } catch (RequestTimeoutException e) {
                e.printStackTrace();
            } catch (ServiceResponseException e) {
                e.printStackTrace();
                System.out.println(e.getHttpStatusCode());
                System.out.println(e.getRequestId());
                System.out.println(e.getErrorCode());
                System.out.println(e.getErrorMsg());
            }
        }
    }
    

Python

  • Um deslocamento de grupo de consumidores é redefinido para a posição especificada.
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    # coding: utf-8
    
    from huaweicloudsdkcore.auth.credentials import BasicCredentials
    from huaweicloudsdkkafka.v2.region.kafka_region import KafkaRegion
    from huaweicloudsdkcore.exceptions import exceptions
    from huaweicloudsdkkafka.v2 import *
    
    if __name__ == "__main__":
        # The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security.
        # In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment
        ak = os.getenv("CLOUD_SDK_AK")
        sk = os.getenv("CLOUD_SDK_SK")
    
        credentials = BasicCredentials(ak, sk) \
    
        client = KafkaClient.new_builder() \
            .with_credentials(credentials) \
            .with_region(KafkaRegion.value_of("<YOUR REGION>")) \
            .build()
    
        try:
            request = ResetMessageOffsetRequest()
            request.body = ResetMessageOffsetReq(
                message_offset=10,
                partition=0,
                topic="test"
            )
            response = client.reset_message_offset(request)
            print(response)
        except exceptions.ClientRequestException as e:
            print(e.status_code)
            print(e.request_id)
            print(e.error_code)
            print(e.error_msg)
    
  • Um deslocamento de grupo de consumidores é redefinido para o tempo especificado.
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    # coding: utf-8
    
    from huaweicloudsdkcore.auth.credentials import BasicCredentials
    from huaweicloudsdkkafka.v2.region.kafka_region import KafkaRegion
    from huaweicloudsdkcore.exceptions import exceptions
    from huaweicloudsdkkafka.v2 import *
    
    if __name__ == "__main__":
        # The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security.
        # In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment
        ak = os.getenv("CLOUD_SDK_AK")
        sk = os.getenv("CLOUD_SDK_SK")
    
        credentials = BasicCredentials(ak, sk) \
    
        client = KafkaClient.new_builder() \
            .with_credentials(credentials) \
            .with_region(KafkaRegion.value_of("<YOUR REGION>")) \
            .build()
    
        try:
            request = ResetMessageOffsetRequest()
            request.body = ResetMessageOffsetReq(
                timestamp=1571812144000,
                partition=0,
                topic="test"
            )
            response = client.reset_message_offset(request)
            print(response)
        except exceptions.ClientRequestException as e:
            print(e.status_code)
            print(e.request_id)
            print(e.error_code)
            print(e.error_msg)
    

Go

  • Um deslocamento de grupo de consumidores é redefinido para a posição especificada.
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    package main
    
    import (
    	"fmt"
    	"github.com/huaweicloud/huaweicloud-sdk-go-v3/core/auth/basic"
        kafka "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/kafka/v2"
    	"github.com/huaweicloud/huaweicloud-sdk-go-v3/services/kafka/v2/model"
        region "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/kafka/v2/region"
    )
    
    func main() {
        // The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security.
        // In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment
        ak := os.Getenv("CLOUD_SDK_AK")
        sk := os.Getenv("CLOUD_SDK_SK")
    
        auth := basic.NewCredentialsBuilder().
            WithAk(ak).
            WithSk(sk).
            Build()
    
        client := kafka.NewKafkaClient(
            kafka.KafkaClientBuilder().
                WithRegion(region.ValueOf("<YOUR REGION>")).
                WithCredential(auth).
                Build())
    
        request := &model.ResetMessageOffsetRequest{}
    	messageOffsetResetMessageOffsetReq:= int64(10)
    	topicResetMessageOffsetReq:= "test"
    	request.Body = &model.ResetMessageOffsetReq{
    		MessageOffset: &messageOffsetResetMessageOffsetReq,
    		Partition: int32(0),
    		Topic: &topicResetMessageOffsetReq,
    	}
    	response, err := client.ResetMessageOffset(request)
    	if err == nil {
            fmt.Printf("%+v\n", response)
        } else {
            fmt.Println(err)
        }
    }
    
  • Um deslocamento de grupo de consumidores é redefinido para o tempo especificado.
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    package main
    
    import (
    	"fmt"
    	"github.com/huaweicloud/huaweicloud-sdk-go-v3/core/auth/basic"
        kafka "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/kafka/v2"
    	"github.com/huaweicloud/huaweicloud-sdk-go-v3/services/kafka/v2/model"
        region "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/kafka/v2/region"
    )
    
    func main() {
        // The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security.
        // In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment
        ak := os.Getenv("CLOUD_SDK_AK")
        sk := os.Getenv("CLOUD_SDK_SK")
    
        auth := basic.NewCredentialsBuilder().
            WithAk(ak).
            WithSk(sk).
            Build()
    
        client := kafka.NewKafkaClient(
            kafka.KafkaClientBuilder().
                WithRegion(region.ValueOf("<YOUR REGION>")).
                WithCredential(auth).
                Build())
    
        request := &model.ResetMessageOffsetRequest{}
    	timestampResetMessageOffsetReq:= int64(1571812144000)
    	topicResetMessageOffsetReq:= "test"
    	request.Body = &model.ResetMessageOffsetReq{
    		Timestamp: &timestampResetMessageOffsetReq,
    		Partition: int32(0),
    		Topic: &topicResetMessageOffsetReq,
    	}
    	response, err := client.ResetMessageOffset(request)
    	if err == nil {
            fmt.Printf("%+v\n", response)
        } else {
            fmt.Println(err)
        }
    }
    

Códigos de status

Código de status

Descrição

204

Redefinição bem-sucedida do deslocamento do grupo de consumidores para a posição especificada.

Códigos de erro

Consulte Códigos de erro.