Using Kafka to Report Logs to LTS
You can use various Kafka Producer SDKs or collection tools that support the Kafka protocol to report logs to LTS. This function applies to the following scenarios:
- Scenario 1: If you have built a system based on open-source tools, such as Logstash, simply modify the configuration file to enable log reporting to LTS.
- Scenario 2: If you want to use Kafka Producer SDKs to collect and report logs, you do not need to install ICAgent.
Currently, this function is available only in regions CN North-Beijing4, CN South-Guangzhou, CN East-Shanghai1, CN East-Shanghai2, and AP-Singapore. To use it in other regions, submit a service ticket.
Prerequisites
- You have registered an account and enabled LTS.
- You have obtained the ID of the region where LTS is deployed.
- You have obtained the AK/SK of your account.
- You have obtained a project ID of your Huawei Cloud account. For details, see API Credentials in My Credentials.
- You have obtained the IDs of the target log group and stream in LTS.
- Currently, this function is available only for reporting ECSs logs over intranets.
Constraints
- Currently, this function is available only on intranets. The port number must be 9095. You can set the IP address as required.
- The supported Kafka protocol versions are 1.0.X, 2.X.X, and 3.X.X.
- The supported compression modes are gzip, snappy, and lz4.
- The Kafka authentication mode is SASL_PLAINTEXT.
- The ACKS parameter of the Kafka protocol must be set to 0.
Configuration
- To use the Kafka protocol to report logs, set the following common parameters:
Table 1 Common parameters Parameter
Description
Type
projectId
Project ID of the user account
String
logGroupId
LTS log group ID
String
logStreamId
LTS log stream ID
String
regionName
Region where the LTS service is deployed
String
accessKey
AK of the user account
String
accessSecret
SK of the user account
String
- To use Kafka to report logs, you also need to set the following parameters:
Table 2 Configuration parameters Parameter
Description
Connection type
SASL_PLAINTEXT is supported.
hosts
IP address and port number of Kafka. The format is lts-kafka.${regionName}.${external_global_domain_name}:9095 or lts-access.${regionName}.${external_global_domain_name}:9095.
Set the IP address based on the site requirements and set the port number to 9095. For details, see Obtaining Parameters. For example, the value of hosts for the CN North-Beijing4 region is lts-kafka.cn-north-4.myhuaweicloud.com:9095.
topic
Kafka topic name. The format is ${log group ID}_${log stream ID}, in which the LTS log group and stream IDs are connected by an underscore (_).
username
Username for accessing Kafka. Set this parameter to the project ID of the user account.
password
Password for accessing Kafka. The format is ${accessKey}#${accessSecret}, in which the AK and SK of the user account are connected by a number sign (#).
headers
When customizing the label field, you need to configure headers.
Add a header with the key LTS_LOG_TYPE and the value FORMAT to headers. The reported logs must be standardized logs that meet the format requirement.
- ${message} log format
Logs must comply with this format only when a header whose key is LTS_LOG_TYPE and value is FORMAT is added to headers.
Table 3 Log parameters Parameter
Mandatory
Type
Description
tenant_project_id
Yes
String
Project ID of the user account
tenant_group_id
Yes
String
LTS log group ID
tenant_stream_id
Yes
String
LTS log stream ID
log_time_ns
Yes
Long
Log collection time (UTC time, in nanoseconds)
NOTE:The interval between the log collection time and current time must be less than the log retention duration. Otherwise, reported logs will be cleared. For example, if the log retention duration is seven days, the log collection time must be within the last seven days.
contents
Yes
Array of String
Log content
labels
Yes
Object
Custom labels
NOTE:Avoid using the name of a built-in reserved field as a field name to prevent issues like duplicate names and query inaccuracies.
Log Example
{ "tenant_project_id": "${projectId}", "tenant_group_id": "${logGroupId}", "tenant_stream_id": "${logStreamId}", "log_time_ns": "XXXXXXXXXXXXXXXXXXX", "contents": [ "This is a log 1", "This is a log 2" ], "labels": { "type": "kafka" } }
Calling Example
- The following uses FileBeat as an example to describe how to configure parameters for calling by Beat series software.
output.kafka: hosts: ["${ip}:${port}"] partition.round_robin: reachable_only: false username: "${projectId}" password: "${accessKey}#${accessSecret}" topic: "${logGroupId}_${logStreamId}'" sasl.mechanism: "PLAIN" security.protocol: "SASL_PLAINTEXT" acks: "0" compression: gzip
- When Logstash is used to report logs:
input { stdin {} } output { kafka { # Configure an address. bootstrap_servers => "${ip}:${port}" # Configure a topic. topic_id => "${logGroupId}_${logStreamId}" # Configure a message acknowledgment mechanism. acks => "0" # Configure a compression mode. compression_type => "gzip" # Configure an authentication mode. security_protocol => "SASL_PLAINTEXT" sasl_mechanism => "PLAIN" # Username: projectId; Password: accessKey#accessSecret sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='${projectId}' password='${accessKey}#${accessSecret}';" } }
- When Flume is used to report logs:
#Name a1.sources = r1 a1.channels = c1 a1.sinks = k1 #Source a1.sources.r1.type = TAILDIR a1.sources.r1.channels = c1 a1.sources.r1.filegroups = f1 a1.sources.r1.filegroups.f1 = /tmp/test.txt a1.sources.r1.fileHeader = true a1.sources.r1.maxBatchCount = 1000 #Channel a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 100 #Sink a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId} a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port} a1.sinks.k1.kafka.producer.acks = 0 a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN a1.sinks.k1.kafka.producer.compression.type = gzip a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}"; #Bind a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
SDK Calling Examples
- Java SDK calling example:
Maven dependency (take Kafka protocol 2.7.1 as an example):
<dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.7.1</version> </dependency> </dependencies>
Sample code:
package org.example; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; public class ProducerDemo { public static void main(String[] args) { Properties props = new Properties(); // Configure an address. props.put("bootstrap.servers", "${ip}:${port}"); // Configure a message acknowledgment mechanism. props.put("acks", "0"); // Configure an authentication mode. props.put("security.protocol", "SASL_PLAINTEXT"); props.put("sasl.mechanism", "PLAIN"); // Username: projectId; Password: accessKey#accessSecret props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='${projectId}' password='${accessKey}#${accessSecret}';"); // Configure a compression mode. props.put("compression.type", "${compress_type}"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 1. Create a producer object. Producer<String, String> producer = new KafkaProducer<>(props); // 2. Call the send method. for (int i = 0; i < 1; i++) { ProducerRecord record = new ProducerRecord<>("${logGroupId}_${logStreamId}", "${message}"); // Configure recordHeader. // record.headers().add(new RecordHeader("LTS_LOG_TYPE","FORMAT".getBytes())); producer.send(record); } // 3. Close the producer. producer.close(); } }
- Python SDK calling example:
from kafka import KafkaProducer producer = KafkaProducer( # Configure an address. bootstrap_servers="${ip}:${port}", # Configure a message acknowledgment mechanism. acks="0", # Configure a compression mode. compression_type ="${compression_type}" # Configure an authentication mode. sasl_mechanism="PLAIN", security_protocol="SASL_PLAINTEXT", # Username: projectId; Password: accessKey#accessSecret sasl_plain_username="${projectId}", sasl_plain_password="${accessKey}#${accessSecret}" ) print('start producer') for i in range(0, 3): data = bytes("${message}", encoding="utf-8") future = producer.send("${logGroupId}_{logStreamId}", data) result = future.get(timeout=10) print(result) print('end producer')
Error Description
If there is an invalid or mismatched parameter, an error message will be displayed.
Error Message |
Cause |
---|---|
TopicAuthorizationException |
The projectId (project ID), accessKey (AK), and accessSecret (SK) parameters are invalid or mismatched. |
UnknownTopicOrPartitionException |
The logGroupId (log group ID) and logStreamId (log stream ID) parameters are invalid or mismatched. |
InvalidRecordException |
This error occurs only when the headers parameter is configured to report standardized logs. The log format is incorrect, or the settings of projectId, logGroupId, and logStreamId in the log are inconsistent with those of the external parameters. |
Obtaining Parameters
Region |
RegionName |
---|---|
CN North-Beijing4 |
lts-kafka.cn-north-4.myhuaweicloud.com |
CN East-Shanghai1 |
lts-access.cn-east-3.myhuaweicloud.com |
CN East-Shanghai2 |
lts-kafka.cn-east-2.myhuaweicloud.com |
CN South-Guangzhou |
lts-access.cn-south-1.myhuaweicloud.com |
AP-Singapore |
lts-kafka.ap-southeast-3.myhuaweicloud.com |
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot