Help Center/ Distributed Message Service for Kafka/ API Reference/ Out-of-Date APIs/ API V2/ Resetting Consumer Group Offset to the Specified Position
Updated on 2024-02-27 GMT+08:00

Resetting Consumer Group Offset to the Specified Position

This API is out-of-date and may not be maintained in the future. Use the API described in Resetting Consumer Group Offset to the Specified Position.

Function

Kafka instances do not support resetting the consumer offset online. Before resetting, stop the client for which the offset is to be reset. After a client is stopped, the server considers the client offline only after the time period specified in ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG (1000 ms by default).

Call Method

For details, see How to Call an API.

URI

POST /v2/{project_id}/instances/{instance_id}/management/groups/{group}/reset-message-offset

Table 1 URI parameters

Parameter

Mandatory

Type

Description

project_id

Yes

String

Project ID. For details, see Obtaining a Project ID.

instance_id

Yes

String

Instance ID.

group

Yes

String

Consumer group name.

Request Parameters

Table 2 Request body parameters

Parameter

Mandatory

Type

Description

topic

No

String

Topic name.

partition

Yes

Integer

Partition number. The default value is -1, indicating that all partitions are reset.

message_offset

No

Long

This parameter resets consumer group offset to the specified position.

  • If this position is earlier than the current earliest offset, the offset will be reset to the earliest position.
  • If this position is later than the current latest offset, the offset will be reset to the latest position.

Either message_offset or timestamp must be specified.

timestamp

No

Long

This parameter resets consumer group offset to the specified time. The value is a UNIX timestamp, in millisecond.

  • If this timestamp is earlier than the current earliest timestamp, the offset will be reset to the earliest timestamp.
  • If this timestamp is later than the current latest timestamp, the offset will be reset to the latest timestamp.

Either message_offset or timestamp must be specified.

Response Parameters

None

Example Requests

  • A consumer group offset is reset to the specified position.
    POST https://{endpoint}/v2/{project_id}/instances/{instance_id}/management/groups/{group}/reset-message-offset
    
    {
      "topic" : "test",
      "partition" : 0,
      "message_offset" : 10
    }
  • A consumer group offset is reset to the specified time.
    POST https://{endpoint}/v2/{project_id}/instances/{instance_id}/management/groups/{group}/reset-message-offset
    
    {
      "topic" : "test",
      "partition" : 0,
      "timestamp" : 1571812144000
    }

Example Response

None

SDK Code Samples

The following are SDK code samples.

  • A consumer group offset is reset to the specified position.
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    package com.huaweicloud.sdk.test;
    
    import com.huaweicloud.sdk.core.auth.ICredential;
    import com.huaweicloud.sdk.core.auth.BasicCredentials;
    import com.huaweicloud.sdk.core.exception.ConnectionException;
    import com.huaweicloud.sdk.core.exception.RequestTimeoutException;
    import com.huaweicloud.sdk.core.exception.ServiceResponseException;
    import com.huaweicloud.sdk.kafka.v2.region.KafkaRegion;
    import com.huaweicloud.sdk.kafka.v2.*;
    import com.huaweicloud.sdk.kafka.v2.model.*;
    
    
    public class ResetMessageOffsetSolution {
    
        public static void main(String[] args) {
            // The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security.
            // In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment
            String ak = System.getenv("CLOUD_SDK_AK");
            String sk = System.getenv("CLOUD_SDK_SK");
    
            ICredential auth = new BasicCredentials()
                    .withAk(ak)
                    .withSk(sk);
    
            KafkaClient client = KafkaClient.newBuilder()
                    .withCredential(auth)
                    .withRegion(KafkaRegion.valueOf("<YOUR REGION>"))
                    .build();
            ResetMessageOffsetRequest request = new ResetMessageOffsetRequest();
            ResetMessageOffsetReq body = new ResetMessageOffsetReq();
            body.withMessageOffset(10L);
            body.withPartition(0);
            body.withTopic("test");
            request.withBody(body);
            try {
                ResetMessageOffsetResponse response = client.resetMessageOffset(request);
                System.out.println(response.toString());
            } catch (ConnectionException e) {
                e.printStackTrace();
            } catch (RequestTimeoutException e) {
                e.printStackTrace();
            } catch (ServiceResponseException e) {
                e.printStackTrace();
                System.out.println(e.getHttpStatusCode());
                System.out.println(e.getRequestId());
                System.out.println(e.getErrorCode());
                System.out.println(e.getErrorMsg());
            }
        }
    }
    
  • A consumer group offset is reset to the specified time.
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    package com.huaweicloud.sdk.test;
    
    import com.huaweicloud.sdk.core.auth.ICredential;
    import com.huaweicloud.sdk.core.auth.BasicCredentials;
    import com.huaweicloud.sdk.core.exception.ConnectionException;
    import com.huaweicloud.sdk.core.exception.RequestTimeoutException;
    import com.huaweicloud.sdk.core.exception.ServiceResponseException;
    import com.huaweicloud.sdk.kafka.v2.region.KafkaRegion;
    import com.huaweicloud.sdk.kafka.v2.*;
    import com.huaweicloud.sdk.kafka.v2.model.*;
    
    
    public class ResetMessageOffsetSolution {
    
        public static void main(String[] args) {
            // The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security.
            // In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment
            String ak = System.getenv("CLOUD_SDK_AK");
            String sk = System.getenv("CLOUD_SDK_SK");
    
            ICredential auth = new BasicCredentials()
                    .withAk(ak)
                    .withSk(sk);
    
            KafkaClient client = KafkaClient.newBuilder()
                    .withCredential(auth)
                    .withRegion(KafkaRegion.valueOf("<YOUR REGION>"))
                    .build();
            ResetMessageOffsetRequest request = new ResetMessageOffsetRequest();
            ResetMessageOffsetReq body = new ResetMessageOffsetReq();
            body.withTimestamp(1571812144000L);
            body.withPartition(0);
            body.withTopic("test");
            request.withBody(body);
            try {
                ResetMessageOffsetResponse response = client.resetMessageOffset(request);
                System.out.println(response.toString());
            } catch (ConnectionException e) {
                e.printStackTrace();
            } catch (RequestTimeoutException e) {
                e.printStackTrace();
            } catch (ServiceResponseException e) {
                e.printStackTrace();
                System.out.println(e.getHttpStatusCode());
                System.out.println(e.getRequestId());
                System.out.println(e.getErrorCode());
                System.out.println(e.getErrorMsg());
            }
        }
    }
    
  • A consumer group offset is reset to the specified position.
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    # coding: utf-8
    
    from huaweicloudsdkcore.auth.credentials import BasicCredentials
    from huaweicloudsdkkafka.v2.region.kafka_region import KafkaRegion
    from huaweicloudsdkcore.exceptions import exceptions
    from huaweicloudsdkkafka.v2 import *
    
    if __name__ == "__main__":
        # The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security.
        # In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment
        ak = os.getenv("CLOUD_SDK_AK")
        sk = os.getenv("CLOUD_SDK_SK")
    
        credentials = BasicCredentials(ak, sk) \
    
        client = KafkaClient.new_builder() \
            .with_credentials(credentials) \
            .with_region(KafkaRegion.value_of("<YOUR REGION>")) \
            .build()
    
        try:
            request = ResetMessageOffsetRequest()
            request.body = ResetMessageOffsetReq(
                message_offset=10,
                partition=0,
                topic="test"
            )
            response = client.reset_message_offset(request)
            print(response)
        except exceptions.ClientRequestException as e:
            print(e.status_code)
            print(e.request_id)
            print(e.error_code)
            print(e.error_msg)
    
  • A consumer group offset is reset to the specified time.
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    # coding: utf-8
    
    from huaweicloudsdkcore.auth.credentials import BasicCredentials
    from huaweicloudsdkkafka.v2.region.kafka_region import KafkaRegion
    from huaweicloudsdkcore.exceptions import exceptions
    from huaweicloudsdkkafka.v2 import *
    
    if __name__ == "__main__":
        # The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security.
        # In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment
        ak = os.getenv("CLOUD_SDK_AK")
        sk = os.getenv("CLOUD_SDK_SK")
    
        credentials = BasicCredentials(ak, sk) \
    
        client = KafkaClient.new_builder() \
            .with_credentials(credentials) \
            .with_region(KafkaRegion.value_of("<YOUR REGION>")) \
            .build()
    
        try:
            request = ResetMessageOffsetRequest()
            request.body = ResetMessageOffsetReq(
                timestamp=1571812144000,
                partition=0,
                topic="test"
            )
            response = client.reset_message_offset(request)
            print(response)
        except exceptions.ClientRequestException as e:
            print(e.status_code)
            print(e.request_id)
            print(e.error_code)
            print(e.error_msg)
    
  • A consumer group offset is reset to the specified position.
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    package main
    
    import (
    	"fmt"
    	"github.com/huaweicloud/huaweicloud-sdk-go-v3/core/auth/basic"
        kafka "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/kafka/v2"
    	"github.com/huaweicloud/huaweicloud-sdk-go-v3/services/kafka/v2/model"
        region "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/kafka/v2/region"
    )
    
    func main() {
        // The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security.
        // In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment
        ak := os.Getenv("CLOUD_SDK_AK")
        sk := os.Getenv("CLOUD_SDK_SK")
    
        auth := basic.NewCredentialsBuilder().
            WithAk(ak).
            WithSk(sk).
            Build()
    
        client := kafka.NewKafkaClient(
            kafka.KafkaClientBuilder().
                WithRegion(region.ValueOf("<YOUR REGION>")).
                WithCredential(auth).
                Build())
    
        request := &model.ResetMessageOffsetRequest{}
    	messageOffsetResetMessageOffsetReq:= int64(10)
    	topicResetMessageOffsetReq:= "test"
    	request.Body = &model.ResetMessageOffsetReq{
    		MessageOffset: &messageOffsetResetMessageOffsetReq,
    		Partition: int32(0),
    		Topic: &topicResetMessageOffsetReq,
    	}
    	response, err := client.ResetMessageOffset(request)
    	if err == nil {
            fmt.Printf("%+v\n", response)
        } else {
            fmt.Println(err)
        }
    }
    
  • A consumer group offset is reset to the specified time.
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    package main
    
    import (
    	"fmt"
    	"github.com/huaweicloud/huaweicloud-sdk-go-v3/core/auth/basic"
        kafka "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/kafka/v2"
    	"github.com/huaweicloud/huaweicloud-sdk-go-v3/services/kafka/v2/model"
        region "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/kafka/v2/region"
    )
    
    func main() {
        // The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security.
        // In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment
        ak := os.Getenv("CLOUD_SDK_AK")
        sk := os.Getenv("CLOUD_SDK_SK")
    
        auth := basic.NewCredentialsBuilder().
            WithAk(ak).
            WithSk(sk).
            Build()
    
        client := kafka.NewKafkaClient(
            kafka.KafkaClientBuilder().
                WithRegion(region.ValueOf("<YOUR REGION>")).
                WithCredential(auth).
                Build())
    
        request := &model.ResetMessageOffsetRequest{}
    	timestampResetMessageOffsetReq:= int64(1571812144000)
    	topicResetMessageOffsetReq:= "test"
    	request.Body = &model.ResetMessageOffsetReq{
    		Timestamp: &timestampResetMessageOffsetReq,
    		Partition: int32(0),
    		Topic: &topicResetMessageOffsetReq,
    	}
    	response, err := client.ResetMessageOffset(request)
    	if err == nil {
            fmt.Printf("%+v\n", response)
        } else {
            fmt.Println(err)
        }
    }
    

Status Codes

Status Code

Description

204

Successfully reset the consumer group offset to the specified position.

Error Codes

See Error Codes.