Using Kafka to Report Logs to LTS
You can use various Kafka Producer SDKs or collection tools that support the Kafka protocol to report logs to LTS. This function is applicable to the following scenarios:
- Scenario 1: If you have built a system based on open-source tools, such as Logstash, simply modify the configuration file to enable log reporting to LTS.
- Scenario 2: If you want to use Kafka Producer SDKs to collect and report logs, you do not need to install ICAgent.

This function is available only in regions CN North-Beijing1, CN North-Beijing4, CN South-Guangzhou, CN East-Shanghai1, CN East-Shanghai2, CN-Hong Kong, CN North-Ulanqab1, and AP-Singapore. To use it in other regions, submit a service ticket.
Prerequisites
- You have registered an account and enabled LTS.
- You have obtained the ID of the region where LTS is deployed.
- You have obtained the AK/SK of your account.
- You have obtained a project ID of your Huawei Cloud account. For details, see API Credentials in My Credentials.
- You have obtained the IDs of the target log group and stream in LTS.
- Currently, this function is available only for reporting ECSs logs over intranets.
Constraints
- This function is available only on intranets. The port number must be 9095. You can set the IP address as required.
- The supported Kafka protocol versions are 1.0.X, 2.X.X, and 3.X.X.
- The supported compression modes are gzip, snappy, and lz4.
- The Kafka authentication mode is SASL_PLAINTEXT.
- The ACKS parameter of the Kafka protocol can be set to 0, 1 (default), or all.
Configuration
- To use the Kafka protocol to report logs, set the following common parameters:
Table 1 Common parameters Parameter
Description
Type
projectId
Project ID of the user account
String
logGroupId
LTS log group ID
String
logStreamId
LTS log stream ID
String
regionName
Region where the LTS service is deployed
String
accessKey
AK of the user account
String
accessSecret
SK of the user account
String
- To use Kafka to report logs, you also need to set the following parameters:
Table 2 Configuration parameters Parameter
Description
Connection type
SASL_PLAINTEXT is supported.
hosts
IP address and port number of Kafka. The format is lts-kafka.${regionName}.${external_global_domain_name}:9095 or lts-access.${regionName}.${external_global_domain_name}:9095.
Set the IP address based on your region and set the port number to 9095. For details, see Obtaining Parameters. For example, the value of hosts for the CN North-Beijing4 region is lts-kafka.cn-north-4.myhuaweicloud.com:9095.
topic
Kafka topic name. The format is ${log group ID}_${log stream ID}, in which the LTS log group and stream IDs are connected by an underscore (_).
- For JSON logs, you can set the topic name to ${Log group ID}_${Log stream ID}_json to automatically expand JSON logs.
- To specify the log collection time, set the topic name to ${Log group ID}_${Log stream ID}_format and ensure that the reported logs are standardized and meet the requirements.
username
Username for accessing Kafka. Set this parameter to the project ID of the user account.
password
Password for accessing Kafka. The format is ${accessKey}#${accessSecret}, in which the AK and SK of the user account are connected by a number sign (#).
- Automatic expansion of JSON logs
If all logs are in JSON format, you are advised to use the automatic expansion function. Set the topic name to ${Log group ID}_${Log stream ID}_json_${index}, then the system will automatically parse the JSON logs.
The default number of JSON parsing layers is 1. The value of index in a topic name indicates the number of parsing layers. The value ranges from 1 to 4.
If a log fails to be parsed because it is not JSON, the log content will be reported to the _content_parse_fail_ field.
- Specifying the log collection time
To specify the log collection time, set the topic name to ${log group ID}_${log stream ID}_format and ensure that the reported logs are in JSON format and contain the following fields. For details, see Table 3.
To specify the log collection time and enable automatic expansion of JSON logs, set the topic name for Kafka to ${ Log group ID}_${Log stream ID}_format_json_${index}.
Table 3 Log parameters Parameter
Mandatory
Type
Description
content
Yes
String
Log content.
log_time_ns
Yes
Long
Log collection time (UTC timestamp in nanoseconds).
The collection time must be within one day before and after the current time. Otherwise, the specified collection time is invalid and the current time is used as the collection time.
The interval between the log collection time and current time must be less than the log retention duration. Otherwise, reported logs will be cleared. For example, if the log retention duration is seven days, the log collection time must be within the last seven days.
labels
Yes
Object
Custom labels.
Do not set field names to system reserved fields. Otherwise, problems such as duplicate field names and inaccurate query may occur.
Log Example
{
"log_time_ns": "XXXXXXXXXXXXXXXXXXX",
"content": "This is a log 1"
"labels": {
"type": "kafka"
}
}
Calling Example
- The following uses FileBeat as an example to describe how to configure parameters for calling by Beat series software.
output.kafka: hosts: ["${ip}:${port}"] partition.round_robin: reachable_only: false username: "${projectId}" password: "${accessKey}#${accessSecret}" topic: "${logGroupId}_${logStreamId}" sasl.mechanism: "PLAIN" security.protocol: "SASL_PLAINTEXT" acks: "0" compression: gzip
- When Logstash is used to report logs:
input { stdin {} } output { kafka { # Configure an address. bootstrap_servers => "${ip}:${port}" # Configure a topic. topic_id => "${logGroupId}_${logStreamId}" # Configure a message acknowledgment mechanism. acks => "0" # Configure a compression mode. compression_type => "gzip" # Configure an authentication mode. security_protocol => "SASL_PLAINTEXT" sasl_mechanism => "PLAIN" # Username: projectId; Password: accessKey#accessSecret sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='${projectId}' password='${accessKey}#${accessSecret}';" } }
- When Flume is used to report logs:
#Name a1.sources = r1 a1.channels = c1 a1.sinks = k1 #Source a1.sources.r1.type = TAILDIR a1.sources.r1.channels = c1 a1.sources.r1.filegroups = f1 a1.sources.r1.filegroups.f1 = /tmp/test.txt a1.sources.r1.fileHeader = true a1.sources.r1.maxBatchCount = 1000 #Channel a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 100 #Sink a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId} a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port} a1.sinks.k1.kafka.producer.acks = 0 a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN a1.sinks.k1.kafka.producer.compression.type = gzip a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}"; #Bind a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
SDK Calling Examples
- Java SDK calling example:
Maven dependency (take Kafka protocol 2.7.1 as an example):
<dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.7.1</version> </dependency> </dependencies>
Sample code:
package org.example; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; public class ProducerDemo { public static void main(String[] args) { Properties props = new Properties(); // Configure an address. props.put("bootstrap.servers", "${ip}:${port}"); // Configure a message acknowledgment mechanism. props.put("acks", "0"); // Configure an authentication mode. props.put("security.protocol", "SASL_PLAINTEXT"); props.put("sasl.mechanism", "PLAIN"); // Username: projectId; Password: accessKey#accessSecret props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='${projectId}' password='${accessKey}#${accessSecret}';"); // Configure a compression mode. props.put("compression.type", "${compress_type}"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 1. Create a producer object. Producer<String, String> producer = new KafkaProducer<>(props); // 2. Call the send method. for (int i = 0; i < 1; i++) { ProducerRecord record = new ProducerRecord<>("${logGroupId}_${logStreamId}", "${message}"); // Configure recordHeader. // record.headers().add(new RecordHeader("LTS_LOG_TYPE","FORMAT".getBytes())); producer.send(record); } // 3. Close the producer. producer.close(); } }
- Python SDK calling example:
from kafka import KafkaProducer producer = KafkaProducer( # Configure an address. bootstrap_servers="${ip}:${port}", # Configure a message acknowledgment mechanism. acks="0", # Configure a compression mode. compression_type ="${compression_type}" # Configure an authentication mode. sasl_mechanism="PLAIN", security_protocol="SASL_PLAINTEXT", # Username: projectId; Password: accessKey#accessSecret sasl_plain_username="${projectId}", sasl_plain_password="${accessKey}#${accessSecret}" ) print('start producer') for i in range(0, 3): data = bytes("${message}", encoding="utf-8") future = producer.send("${logGroupId}_{logStreamId}", data) result = future.get(timeout=10) print(result) print('end producer')
Error Description
If there is an invalid or mismatched parameter, an error message will be displayed.
Error Message |
Cause |
---|---|
TopicAuthorizationException |
The projectId (project ID), accessKey (AK), and accessSecret (SK) parameters are invalid or mismatched. |
UnknownTopicOrPartitionException |
The logGroupId (log group ID) and logStreamId (log stream ID) parameters are invalid or mismatched. |
InvalidRecordException |
The log format is incorrect, or the settings of projectId, logGroupId, and logStreamId in the log are inconsistent with those of the external parameters. |
Obtaining Parameters
Region Name |
Endpoint |
---|---|
CN North-Beijing4 |
lts-kafka.cn-north-4.myhuaweicloud.com |
CN North-Beijing1 |
lts-access.cn-north-1.myhuaweicloud.com |
CN East-Shanghai1 |
lts-access.cn-east-3.myhuaweicloud.com |
CN East-Shanghai2 |
lts-kafka.cn-east-2.myhuaweicloud.com |
CN South-Guangzhou |
lts-access.cn-south-1.myhuaweicloud.com |
AP-Singapore |
lts-kafka.ap-southeast-3.myhuaweicloud.com |
CN-Hong Kong |
lts-access.ap-southeast-1.myhuaweicloud.com |
CN North-Ulanqab1 |
lts-access.cn-north-9.myhuaweicloud.com |
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot