Updated on 2024-08-07 GMT+08:00

Producing Messages to Kafka

Function

This API is used to send specified messages on the console to Kafka instances.

Calling Method

For details, see Calling APIs.

URI

POST /v2/{project_id}/instances/{instance_id}/messages/action

Table 1 Path Parameters

Parameter

Mandatory

Type

Description

project_id

Yes

String

Project ID. For details about how to obtain it, see Obtaining a Project ID.

instance_id

Yes

String

Instance ID.

Table 2 Query Parameters

Parameter

Mandatory

Type

Description

action_id

Yes

String

Action ID, which is send for production.

Request Parameters

Table 3 Request body parameters

Parameter

Mandatory

Type

Description

topic

Yes

String

Kafka topics.

body

Yes

String

Message content.

property_list

Yes

Array of property_list objects

Topic partition information.

Table 4 property_list

Parameter

Mandatory

Type

Description

name

No

String

Feature name.

value

No

String

Feature value.

Response Parameters

Status code: 200

Table 5 Response body parameters

Parameter

Type

Description

topic

String

Kafka topic.

body

String

Message content.

property_list

Array of objects

Topic partition information.

Example Requests

Sending messages on the Kafka console

POST https://{endpoint}/v2/{project_id}/instances/{instance_id}/messages/action?action_id={action_id}

{
  "topic" : "XXXX",
  "body" : "hello world",
  "property_list" : [ {
    "name" : "KEY",
    "value" : "testKey"
  }, {
    "name" : "PARTITION",
    "value" : "0"
  } ]
}

Example Responses

Status code: 200

Message produced.

{
  "topic" : "XXXX",
  "body" : "XXXX",
  "property_list" : [ ]
}

SDK Sample Code

The SDK sample code is as follows.

Sending messages on the Kafka console

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
package com.huaweicloud.sdk.test;

import com.huaweicloud.sdk.core.auth.ICredential;
import com.huaweicloud.sdk.core.auth.BasicCredentials;
import com.huaweicloud.sdk.core.exception.ConnectionException;
import com.huaweicloud.sdk.core.exception.RequestTimeoutException;
import com.huaweicloud.sdk.core.exception.ServiceResponseException;
import com.huaweicloud.sdk.kafka.v2.region.KafkaRegion;
import com.huaweicloud.sdk.kafka.v2.*;
import com.huaweicloud.sdk.kafka.v2.model.*;

import java.util.List;
import java.util.ArrayList;

public class SendKafkaMessageSolution {

    public static void main(String[] args) {
        // The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security.
        // In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment
        String ak = System.getenv("CLOUD_SDK_AK");
        String sk = System.getenv("CLOUD_SDK_SK");
        String projectId = "{project_id}";

        ICredential auth = new BasicCredentials()
                .withProjectId(projectId)
                .withAk(ak)
                .withSk(sk);

        KafkaClient client = KafkaClient.newBuilder()
                .withCredential(auth)
                .withRegion(KafkaRegion.valueOf("<YOUR REGION>"))
                .build();
        SendKafkaMessageRequest request = new SendKafkaMessageRequest();
        request.withInstanceId("{instance_id}");
        SendKafkaMessageRequestBody body = new SendKafkaMessageRequestBody();
        List<SendKafkaMessageRequestBodyPropertyList> listbodyPropertyList = new ArrayList<>();
        listbodyPropertyList.add(
            new SendKafkaMessageRequestBodyPropertyList()
                .withName("KEY")
                .withValue("testKey")
        );
        listbodyPropertyList.add(
            new SendKafkaMessageRequestBodyPropertyList()
                .withName("PARTITION")
                .withValue("0")
        );
        body.withPropertyList(listbodyPropertyList);
        body.withBody("hello world");
        body.withTopic("XXXX");
        request.withBody(body);
        try {
            SendKafkaMessageResponse response = client.sendKafkaMessage(request);
            System.out.println(response.toString());
        } catch (ConnectionException e) {
            e.printStackTrace();
        } catch (RequestTimeoutException e) {
            e.printStackTrace();
        } catch (ServiceResponseException e) {
            e.printStackTrace();
            System.out.println(e.getHttpStatusCode());
            System.out.println(e.getRequestId());
            System.out.println(e.getErrorCode());
            System.out.println(e.getErrorMsg());
        }
    }
}

Sending messages on the Kafka console

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# coding: utf-8

import os
from huaweicloudsdkcore.auth.credentials import BasicCredentials
from huaweicloudsdkkafka.v2.region.kafka_region import KafkaRegion
from huaweicloudsdkcore.exceptions import exceptions
from huaweicloudsdkkafka.v2 import *

if __name__ == "__main__":
    # The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security.
    # In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment
    ak = os.environ["CLOUD_SDK_AK"]
    sk = os.environ["CLOUD_SDK_SK"]
    projectId = "{project_id}"

    credentials = BasicCredentials(ak, sk, projectId)

    client = KafkaClient.new_builder() \
        .with_credentials(credentials) \
        .with_region(KafkaRegion.value_of("<YOUR REGION>")) \
        .build()

    try:
        request = SendKafkaMessageRequest()
        request.instance_id = "{instance_id}"
        listPropertyListbody = [
            SendKafkaMessageRequestBodyPropertyList(
                name="KEY",
                value="testKey"
            ),
            SendKafkaMessageRequestBodyPropertyList(
                name="PARTITION",
                value="0"
            )
        ]
        request.body = SendKafkaMessageRequestBody(
            property_list=listPropertyListbody,
            body="hello world",
            topic="XXXX"
        )
        response = client.send_kafka_message(request)
        print(response)
    except exceptions.ClientRequestException as e:
        print(e.status_code)
        print(e.request_id)
        print(e.error_code)
        print(e.error_msg)

Sending messages on the Kafka console

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package main

import (
	"fmt"
	"github.com/huaweicloud/huaweicloud-sdk-go-v3/core/auth/basic"
    kafka "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/kafka/v2"
	"github.com/huaweicloud/huaweicloud-sdk-go-v3/services/kafka/v2/model"
    region "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/kafka/v2/region"
)

func main() {
    // The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security.
    // In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment
    ak := os.Getenv("CLOUD_SDK_AK")
    sk := os.Getenv("CLOUD_SDK_SK")
    projectId := "{project_id}"

    auth := basic.NewCredentialsBuilder().
        WithAk(ak).
        WithSk(sk).
        WithProjectId(projectId).
        Build()

    client := kafka.NewKafkaClient(
        kafka.KafkaClientBuilder().
            WithRegion(region.ValueOf("<YOUR REGION>")).
            WithCredential(auth).
            Build())

    request := &model.SendKafkaMessageRequest{}
	request.InstanceId = "{instance_id}"
	namePropertyList:= "KEY"
	valuePropertyList:= "testKey"
	namePropertyList1:= "PARTITION"
	valuePropertyList1:= "0"
	var listPropertyListbody = []model.SendKafkaMessageRequestBodyPropertyList{
        {
            Name: &namePropertyList,
            Value: &valuePropertyList,
        },
        {
            Name: &namePropertyList1,
            Value: &valuePropertyList1,
        },
    }
	request.Body = &model.SendKafkaMessageRequestBody{
		PropertyList: listPropertyListbody,
		Body: "hello world",
		Topic: "XXXX",
	}
	response, err := client.SendKafkaMessage(request)
	if err == nil {
        fmt.Printf("%+v\n", response)
    } else {
        fmt.Println(err)
    }
}

For SDK sample code of more programming languages, see the Sample Code tab in API Explorer. SDK sample code can be automatically generated.

Status Codes

Status Code

Description

200

Message produced.

Error Codes

See Error Codes.