Updated on 2024-06-07 GMT+08:00

Querying Messages

Function

This API is used to query the offset and content of a message.

This API queries the message offset based on the timestamp and then queries the message content based on the offset.

Calling Method

For details, see Calling APIs.

URI

GET /v2/{project_id}/instances/{instance_id}/messages

Table 1 Path Parameters

Parameter

Mandatory

Type

Description

project_id

Yes

String

Project ID. For details about how to obtain it, see Obtaining a Project ID.

instance_id

Yes

String

Instance ID.

Table 2 Query Parameters

Parameter

Mandatory

Type

Description

topic

Yes

String

Topic name.

A topic name must start with a letter and can only contain letters, hyphens (-), underscores (_), and digits.

asc

No

Boolean

Whether to sort messages by time.

start_time

No

String

Start time.

The value is a Unix timestamp, in millisecond.

This parameter is mandatory when you query the message offset.

end_time

No

String

End time.

The value is a Unix timestamp, in millisecond.

This parameter is mandatory when you query the message offset.

limit

No

String

Number of messages displayed on each page.

offset

No

String

Page number.

download

No

Boolean

Whether download is required.

message_offset

No

String

Message offset.

This parameter is mandatory when you query the message content.

If start_time and end_time are not empty, this parameter is invalid.

partition

No

String

Partition.

This parameter is mandatory when you query the message content.

If start_time and end_time are not empty, this parameter is invalid.

keyword

No

String

Keyword.

The value ranges from 0 to 50.

Request Parameters

None

Response Parameters

Status code: 200

Table 3 Response body parameters

Parameter

Type

Description

messages

Array of MessagesEntity objects

Message list.

total

Long

Total number of messages.

size

Long

Number of records on each page.

Table 4 MessagesEntity

Parameter

Type

Description

topic

String

Topic name.

partition

Integer

Partition where the message is located.

key

String

Message key.

value

String

Message content.

size

Integer

Message size.

timestamp

Long

Message production time. The value is a UNIX timestamp, in ms.

huge_message

Boolean

Big data flag.

message_offset

Long

Message offset.

message_id

String

Message ID.

app_id

String

Application ID.

tag

String

Message label.

Status code: 400

Table 5 Response body parameters

Parameter

Type

Description

error_code

String

Error code.

error_msg

String

Error description.

Status code: 403

Table 6 Response body parameters

Parameter

Type

Description

error_code

String

Error code.

error_msg

String

Error description.

Example Requests

  • Querying the message offset.

    GET https://{endpoint}/v2/{project_id}/instances/{instance_id}/messages?asc=false&end_time=1608609032042&limit=10&offset=0&start_time=1608608432042&topic=topic-test
  • Querying the message content.

    GET https://{endpoint}/v2/{project_id}/instances/{instance_id}/messages?download=false&message_offset=0&partition=0&topic=topic-test

Example Responses

Status code: 200

The query is successful.

{
  "messages" : [ {
    "topic" : "topic-test",
    "partition" : 0,
    "value" : "hello world",
    "size" : 21,
    "timestamp" : 1607598463502,
    "huge_message" : false,
    "message_offset" : 4,
    "message_id" : "",
    "app_id" : "",
    "tag" : ""
  } ],
  "total" : 1,
  "size" : 1
}

SDK Sample Code

The SDK sample code is as follows.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package com.huaweicloud.sdk.test;

import com.huaweicloud.sdk.core.auth.ICredential;
import com.huaweicloud.sdk.core.auth.BasicCredentials;
import com.huaweicloud.sdk.core.exception.ConnectionException;
import com.huaweicloud.sdk.core.exception.RequestTimeoutException;
import com.huaweicloud.sdk.core.exception.ServiceResponseException;
import com.huaweicloud.sdk.kafka.v2.region.KafkaRegion;
import com.huaweicloud.sdk.kafka.v2.*;
import com.huaweicloud.sdk.kafka.v2.model.*;


public class ShowInstanceMessagesSolution {

    public static void main(String[] args) {
        // The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security.
        // In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment
        String ak = System.getenv("CLOUD_SDK_AK");
        String sk = System.getenv("CLOUD_SDK_SK");
        String projectId = "{project_id}";

        ICredential auth = new BasicCredentials()
                .withProjectId(projectId)
                .withAk(ak)
                .withSk(sk);

        KafkaClient client = KafkaClient.newBuilder()
                .withCredential(auth)
                .withRegion(KafkaRegion.valueOf("<YOUR REGION>"))
                .build();
        ShowInstanceMessagesRequest request = new ShowInstanceMessagesRequest();
        request.withInstanceId("{instance_id}");
        try {
            ShowInstanceMessagesResponse response = client.showInstanceMessages(request);
            System.out.println(response.toString());
        } catch (ConnectionException e) {
            e.printStackTrace();
        } catch (RequestTimeoutException e) {
            e.printStackTrace();
        } catch (ServiceResponseException e) {
            e.printStackTrace();
            System.out.println(e.getHttpStatusCode());
            System.out.println(e.getRequestId());
            System.out.println(e.getErrorCode());
            System.out.println(e.getErrorMsg());
        }
    }
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# coding: utf-8

import os
from huaweicloudsdkcore.auth.credentials import BasicCredentials
from huaweicloudsdkkafka.v2.region.kafka_region import KafkaRegion
from huaweicloudsdkcore.exceptions import exceptions
from huaweicloudsdkkafka.v2 import *

if __name__ == "__main__":
    # The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security.
    # In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment
    ak = os.environ["CLOUD_SDK_AK"]
    sk = os.environ["CLOUD_SDK_SK"]
    projectId = "{project_id}"

    credentials = BasicCredentials(ak, sk, projectId)

    client = KafkaClient.new_builder() \
        .with_credentials(credentials) \
        .with_region(KafkaRegion.value_of("<YOUR REGION>")) \
        .build()

    try:
        request = ShowInstanceMessagesRequest()
        request.instance_id = "{instance_id}"
        response = client.show_instance_messages(request)
        print(response)
    except exceptions.ClientRequestException as e:
        print(e.status_code)
        print(e.request_id)
        print(e.error_code)
        print(e.error_msg)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package main

import (
	"fmt"
	"github.com/huaweicloud/huaweicloud-sdk-go-v3/core/auth/basic"
    kafka "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/kafka/v2"
	"github.com/huaweicloud/huaweicloud-sdk-go-v3/services/kafka/v2/model"
    region "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/kafka/v2/region"
)

func main() {
    // The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security.
    // In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment
    ak := os.Getenv("CLOUD_SDK_AK")
    sk := os.Getenv("CLOUD_SDK_SK")
    projectId := "{project_id}"

    auth := basic.NewCredentialsBuilder().
        WithAk(ak).
        WithSk(sk).
        WithProjectId(projectId).
        Build()

    client := kafka.NewKafkaClient(
        kafka.KafkaClientBuilder().
            WithRegion(region.ValueOf("<YOUR REGION>")).
            WithCredential(auth).
            Build())

    request := &model.ShowInstanceMessagesRequest{}
	request.InstanceId = "{instance_id}"
	response, err := client.ShowInstanceMessages(request)
	if err == nil {
        fmt.Printf("%+v\n", response)
    } else {
        fmt.Println(err)
    }
}

For SDK sample code of more programming languages, see the Sample Code tab in API Explorer. SDK sample code can be automatically generated.

Status Codes

Status Code

Description

200

The query is successful.

400

Invalid parameters.

403

Authentication failed.

Error Codes

See Error Codes.