Este conteúdo foi traduzido por máquina para sua conveniência e a Huawei Cloud não pode garantir que o conteúdo foi traduzido com precisão. Para exibir o conteúdo original, use o link no canto superior direito para mudar para a página em inglês.
Central de ajuda/ Distributed Message Service for Kafka/ Referência de API/ APIs V2 (recomendado)/ Gerenciamento de instâncias/ Redefinição do deslocamento do grupo de consumidores para a posição especificada
Atualizado em 2024-09-10 GMT+08:00

Redefinição do deslocamento do grupo de consumidores para a posição especificada

Função

As instâncias do Kafka não oferecem suporte à redefinição on-line do deslocamento do consumidor. Antes de redefinir, pare o cliente para o qual o deslocamento deve ser redefinido. Depois que um cliente é interrompido, o servidor considera o cliente off-line somente após o período especificado em ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG (1000 ms por padrão).

Método de chamada

Para obter detalhes, consulte Chamada de APIs.

URI

PUT /v2/kafka/{project_id}/instances/{instance_id}/groups/{group}/reset-message-offset

Tabela 1 Parâmetros de caminho

Parâmetro

Obrigatório

Tipo

Descrição

project_id

Sim

String

ID do projeto.

instance_id

Sim

String

ID da instância.

group

Sim

String

Nome do grupo de consumidores.

Parâmetros de solicitação

Tabela 2 Parâmetros do corpo da solicitação

Parâmetro

Obrigatório

Tipo

Descrição

topic

Não

String

Nome do tópico.

partition

Sim

Integer

Número da partição. O valor padrão é -1, indicando que todas as partições são redefinidas.

message_offset

Não

Long

Redefinição do deslocamento do grupo de consumidores para a posição especificada.

  • Se essa posição for anterior ao deslocamento mais anterior atual, o deslocamento será redefinido para o deslocamento mais anterior.

  • Se esse deslocamento for posterior ao maior deslocamento atual, o deslocamento será redefinido para o deslocamento mais recente.

Deve ser especificado message_offset ou timestamp.

timestamp

Não

Long

Hora especificada para a qual o deslocamento deve ser redefinido. O valor é um carimbo de data/hora UNIX, em milissegundos.

  • Se essa hora for anterior ao registro de data e hora mais anterior atual, o deslocamento será redefinido para o carimbo de data/hora mais anterior.

  • Se esse tempo for posterior ao maior carimbo de data/hora atual, o deslocamento será redefinido para o carimbo de data/hora mais recente.

Deve ser especificado message_offset ou timestamp.

Parâmetros de resposta

Nenhum

Exemplo de solicitações

  • Redefinição do deslocamento do grupo de consumidores para a posição especificada

    POST https://{endpoint}/v2/kafka/{project_id}/instances/{instance_id}/groups/{group}/reset-message-offset
    
    {
      "topic" : "test",
      "partition" : 0,
      "message_offset" : 10
    }
  • Redefinição do deslocamento do grupo de consumidores para o horário especificado

    POST https://{endpoint}/v2/kafka/{project_id}/instances/{instance_id}/groups/{group}/reset-message-offset
    
    {
      "topic" : "test",
      "partition" : 0,
      "timestamp" : 1571812144000
    }

Exemplo de respostas

Nenhum

Código de exemplo do SDK

O código de exemplo do SDK é o seguinte.

Java

  • Redefinição do deslocamento do grupo de consumidores para a posição especificada

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    package com.huaweicloud.sdk.test;
    
    import com.huaweicloud.sdk.core.auth.ICredential;
    import com.huaweicloud.sdk.core.auth.BasicCredentials;
    import com.huaweicloud.sdk.core.exception.ConnectionException;
    import com.huaweicloud.sdk.core.exception.RequestTimeoutException;
    import com.huaweicloud.sdk.core.exception.ServiceResponseException;
    import com.huaweicloud.sdk.kafka.v2.region.KafkaRegion;
    import com.huaweicloud.sdk.kafka.v2.*;
    import com.huaweicloud.sdk.kafka.v2.model.*;
    
    
    public class ResetMessageOffsetWithEngineSolution {
    
        public static void main(String[] args) {
            // The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security.
            // In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment
            String ak = System.getenv("CLOUD_SDK_AK");
            String sk = System.getenv("CLOUD_SDK_SK");
            String projectId = "{project_id}";
    
            ICredential auth = new BasicCredentials()
                    .withProjectId(projectId)
                    .withAk(ak)
                    .withSk(sk);
    
            KafkaClient client = KafkaClient.newBuilder()
                    .withCredential(auth)
                    .withRegion(KafkaRegion.valueOf("<YOUR REGION>"))
                    .build();
            ResetMessageOffsetWithEngineRequest request = new ResetMessageOffsetWithEngineRequest();
            request.withInstanceId("{instance_id}");
            request.withGroup("{group}");
            ResetMessageOffsetReq body = new ResetMessageOffsetReq();
            body.withMessageOffset(10L);
            body.withPartition(0);
            body.withTopic("test");
            request.withBody(body);
            try {
                ResetMessageOffsetWithEngineResponse response = client.resetMessageOffsetWithEngine(request);
                System.out.println(response.toString());
            } catch (ConnectionException e) {
                e.printStackTrace();
            } catch (RequestTimeoutException e) {
                e.printStackTrace();
            } catch (ServiceResponseException e) {
                e.printStackTrace();
                System.out.println(e.getHttpStatusCode());
                System.out.println(e.getRequestId());
                System.out.println(e.getErrorCode());
                System.out.println(e.getErrorMsg());
            }
        }
    }
    
  • Redefinição do deslocamento do grupo de consumidores para o horário especificado

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    package com.huaweicloud.sdk.test;
    
    import com.huaweicloud.sdk.core.auth.ICredential;
    import com.huaweicloud.sdk.core.auth.BasicCredentials;
    import com.huaweicloud.sdk.core.exception.ConnectionException;
    import com.huaweicloud.sdk.core.exception.RequestTimeoutException;
    import com.huaweicloud.sdk.core.exception.ServiceResponseException;
    import com.huaweicloud.sdk.kafka.v2.region.KafkaRegion;
    import com.huaweicloud.sdk.kafka.v2.*;
    import com.huaweicloud.sdk.kafka.v2.model.*;
    
    
    public class ResetMessageOffsetWithEngineSolution {
    
        public static void main(String[] args) {
            // The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security.
            // In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment
            String ak = System.getenv("CLOUD_SDK_AK");
            String sk = System.getenv("CLOUD_SDK_SK");
            String projectId = "{project_id}";
    
            ICredential auth = new BasicCredentials()
                    .withProjectId(projectId)
                    .withAk(ak)
                    .withSk(sk);
    
            KafkaClient client = KafkaClient.newBuilder()
                    .withCredential(auth)
                    .withRegion(KafkaRegion.valueOf("<YOUR REGION>"))
                    .build();
            ResetMessageOffsetWithEngineRequest request = new ResetMessageOffsetWithEngineRequest();
            request.withInstanceId("{instance_id}");
            request.withGroup("{group}");
            ResetMessageOffsetReq body = new ResetMessageOffsetReq();
            body.withTimestamp(1571812144000L);
            body.withPartition(0);
            body.withTopic("test");
            request.withBody(body);
            try {
                ResetMessageOffsetWithEngineResponse response = client.resetMessageOffsetWithEngine(request);
                System.out.println(response.toString());
            } catch (ConnectionException e) {
                e.printStackTrace();
            } catch (RequestTimeoutException e) {
                e.printStackTrace();
            } catch (ServiceResponseException e) {
                e.printStackTrace();
                System.out.println(e.getHttpStatusCode());
                System.out.println(e.getRequestId());
                System.out.println(e.getErrorCode());
                System.out.println(e.getErrorMsg());
            }
        }
    }
    

Python

  • Redefinição do deslocamento do grupo de consumidores para a posição especificada

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    # coding: utf-8
    
    from huaweicloudsdkcore.auth.credentials import BasicCredentials
    from huaweicloudsdkkafka.v2.region.kafka_region import KafkaRegion
    from huaweicloudsdkcore.exceptions import exceptions
    from huaweicloudsdkkafka.v2 import *
    
    if __name__ == "__main__":
        # The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security.
        # In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment
        ak = __import__('os').getenv("CLOUD_SDK_AK")
        sk = __import__('os').getenv("CLOUD_SDK_SK")
        projectId = "{project_id}"
    
        credentials = BasicCredentials(ak, sk, projectId) \
    
        client = KafkaClient.new_builder() \
            .with_credentials(credentials) \
            .with_region(KafkaRegion.value_of("<YOUR REGION>")) \
            .build()
    
        try:
            request = ResetMessageOffsetWithEngineRequest()
            request.instance_id = "{instance_id}"
            request.group = "{group}"
            request.body = ResetMessageOffsetReq(
                message_offset=10,
                partition=0,
                topic="test"
            )
            response = client.reset_message_offset_with_engine(request)
            print(response)
        except exceptions.ClientRequestException as e:
            print(e.status_code)
            print(e.request_id)
            print(e.error_code)
            print(e.error_msg)
    
  • Redefinição do deslocamento do grupo de consumidores para o horário especificado

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    # coding: utf-8
    
    from huaweicloudsdkcore.auth.credentials import BasicCredentials
    from huaweicloudsdkkafka.v2.region.kafka_region import KafkaRegion
    from huaweicloudsdkcore.exceptions import exceptions
    from huaweicloudsdkkafka.v2 import *
    
    if __name__ == "__main__":
        # The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security.
        # In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment
        ak = __import__('os').getenv("CLOUD_SDK_AK")
        sk = __import__('os').getenv("CLOUD_SDK_SK")
        projectId = "{project_id}"
    
        credentials = BasicCredentials(ak, sk, projectId) \
    
        client = KafkaClient.new_builder() \
            .with_credentials(credentials) \
            .with_region(KafkaRegion.value_of("<YOUR REGION>")) \
            .build()
    
        try:
            request = ResetMessageOffsetWithEngineRequest()
            request.instance_id = "{instance_id}"
            request.group = "{group}"
            request.body = ResetMessageOffsetReq(
                timestamp=1571812144000,
                partition=0,
                topic="test"
            )
            response = client.reset_message_offset_with_engine(request)
            print(response)
        except exceptions.ClientRequestException as e:
            print(e.status_code)
            print(e.request_id)
            print(e.error_code)
            print(e.error_msg)
    

Go

  • Redefinição do deslocamento do grupo de consumidores para a posição especificada

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    package main
    
    import (
    	"fmt"
    	"github.com/huaweicloud/huaweicloud-sdk-go-v3/core/auth/basic"
        kafka "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/kafka/v2"
    	"github.com/huaweicloud/huaweicloud-sdk-go-v3/services/kafka/v2/model"
        region "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/kafka/v2/region"
    )
    
    func main() {
        // The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security.
        // In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment
        ak := os.Getenv("CLOUD_SDK_AK")
        sk := os.Getenv("CLOUD_SDK_SK")
        projectId := "{project_id}"
    
        auth := basic.NewCredentialsBuilder().
            WithAk(ak).
            WithSk(sk).
            WithProjectId(projectId).
            Build()
    
        client := kafka.NewKafkaClient(
            kafka.KafkaClientBuilder().
                WithRegion(region.ValueOf("<YOUR REGION>")).
                WithCredential(auth).
                Build())
    
        request := &model.ResetMessageOffsetWithEngineRequest{}
    	request.InstanceId = "{instance_id}"
    	request.Group = "{group}"
    	messageOffsetResetMessageOffsetReq:= int64(10)
    	topicResetMessageOffsetReq:= "test"
    	request.Body = &model.ResetMessageOffsetReq{
    		MessageOffset: &messageOffsetResetMessageOffsetReq,
    		Partition: int32(0),
    		Topic: &topicResetMessageOffsetReq,
    	}
    	response, err := client.ResetMessageOffsetWithEngine(request)
    	if err == nil {
            fmt.Printf("%+v\n", response)
        } else {
            fmt.Println(err)
        }
    }
    
  • Redefinição do deslocamento do grupo de consumidores para o horário especificado

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    package main
    
    import (
    	"fmt"
    	"github.com/huaweicloud/huaweicloud-sdk-go-v3/core/auth/basic"
        kafka "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/kafka/v2"
    	"github.com/huaweicloud/huaweicloud-sdk-go-v3/services/kafka/v2/model"
        region "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/kafka/v2/region"
    )
    
    func main() {
        // The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security.
        // In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment
        ak := os.Getenv("CLOUD_SDK_AK")
        sk := os.Getenv("CLOUD_SDK_SK")
        projectId := "{project_id}"
    
        auth := basic.NewCredentialsBuilder().
            WithAk(ak).
            WithSk(sk).
            WithProjectId(projectId).
            Build()
    
        client := kafka.NewKafkaClient(
            kafka.KafkaClientBuilder().
                WithRegion(region.ValueOf("<YOUR REGION>")).
                WithCredential(auth).
                Build())
    
        request := &model.ResetMessageOffsetWithEngineRequest{}
    	request.InstanceId = "{instance_id}"
    	request.Group = "{group}"
    	timestampResetMessageOffsetReq:= int64(1571812144000)
    	topicResetMessageOffsetReq:= "test"
    	request.Body = &model.ResetMessageOffsetReq{
    		Timestamp: &timestampResetMessageOffsetReq,
    		Partition: int32(0),
    		Topic: &topicResetMessageOffsetReq,
    	}
    	response, err := client.ResetMessageOffsetWithEngine(request)
    	if err == nil {
            fmt.Printf("%+v\n", response)
        } else {
            fmt.Println(err)
        }
    }
    

Mais

Para obter o código de exemplo do SDK de mais linguagens de programação, consulte a guia Código de exemplo no API Explorer. O código de exemplo do SDK pode ser gerado automaticamente.

Códigos de status

Código de status

Descrição

204

Bem-sucedido

Códigos de erro

Consulte Códigos de erro.