Help Center/ Log Tank Service/ User Guide/ Log Ingestion/ Using Kafka to Report Logs to LTS
Updated on 2025-06-19 GMT+08:00

Using Kafka to Report Logs to LTS

You can use various Kafka Producer SDKs or collection tools that support the Kafka protocol to report logs to LTS. This function is applicable to the following scenarios:

  • Scenario 1: If you have built a system based on open-source tools, such as Logstash, simply modify the configuration file to enable log reporting to LTS.
  • Scenario 2: If you want to use Kafka Producer SDKs to collect and report logs, you do not need to install ICAgent.

This function is available only in regions CN North-Beijing1, CN North-Beijing4, CN South-Guangzhou, CN East-Shanghai1, CN East-Shanghai2, CN-Hong Kong, CN North-Ulanqab1, and AP-Singapore. To use it in other regions, submit a service ticket.

Prerequisites

  • You have registered an account and enabled LTS.
  • You have obtained the ID of the region where LTS is deployed.
  • You have obtained the AK/SK of your account.
  • You have obtained a project ID of your Huawei Cloud account. For details, see API Credentials in My Credentials.
  • You have obtained the IDs of the target log group and stream in LTS.
  • Currently, this function is available only for reporting ECSs logs over intranets.

Constraints

  • This function is available only on intranets. The port number must be 9095. You can set the IP address as required.
  • The supported Kafka protocol versions are 1.0.X, 2.X.X, and 3.X.X.
  • The supported compression modes are gzip, snappy, and lz4.
  • The Kafka authentication mode is SASL_PLAINTEXT.
  • The ACKS parameter of the Kafka protocol can be set to 0, 1 (default), or all.

Configuration

  • To use the Kafka protocol to report logs, set the following common parameters:
    Table 1 Common parameters

    Parameter

    Description

    Type

    projectId

    Project ID of the user account

    String

    logGroupId

    LTS log group ID

    String

    logStreamId

    LTS log stream ID

    String

    regionName

    Region where the LTS service is deployed

    String

    accessKey

    AK of the user account

    String

    accessSecret

    SK of the user account

    String

  • To use Kafka to report logs, you also need to set the following parameters:
    Table 2 Configuration parameters

    Parameter

    Description

    Connection type

    SASL_PLAINTEXT is supported.

    hosts

    IP address and port number of Kafka. The format is lts-kafka.${regionName}.${external_global_domain_name}:9095 or lts-access.${regionName}.${external_global_domain_name}:9095.

    Set the IP address based on your region and set the port number to 9095. For details, see Obtaining Parameters. For example, the value of hosts for the CN North-Beijing4 region is lts-kafka.cn-north-4.myhuaweicloud.com:9095.

    topic

    Kafka topic name. The format is ${log group ID}_${log stream ID}, in which the LTS log group and stream IDs are connected by an underscore (_).

    • For JSON logs, you can set the topic name to ${Log group ID}_${Log stream ID}_json to automatically expand JSON logs.
    • To specify the log collection time, set the topic name to ${Log group ID}_${Log stream ID}_format and ensure that the reported logs are standardized and meet the requirements.

    username

    Username for accessing Kafka. Set this parameter to the project ID of the user account.

    password

    Password for accessing Kafka. The format is ${accessKey}#${accessSecret}, in which the AK and SK of the user account are connected by a number sign (#).

  • Automatic expansion of JSON logs

    If all logs are in JSON format, you are advised to use the automatic expansion function. Set the topic name to ${Log group ID}_${Log stream ID}_json_${index}, then the system will automatically parse the JSON logs.

    The default number of JSON parsing layers is 1. The value of index in a topic name indicates the number of parsing layers. The value ranges from 1 to 4.

    If a log fails to be parsed because it is not JSON, the log content will be reported to the _content_parse_fail_ field.

  • Specifying the log collection time

    To specify the log collection time, set the topic name to ${log group ID}_${log stream ID}_format and ensure that the reported logs are in JSON format and contain the following fields. For details, see Table 3.

    To specify the log collection time and enable automatic expansion of JSON logs, set the topic name for Kafka to ${ Log group ID}_${Log stream ID}_format_json_${index}.

    Table 3 Log parameters

    Parameter

    Mandatory

    Type

    Description

    content

    Yes

    String

    Log content.

    log_time_ns

    Yes

    Long

    Log collection time (UTC timestamp in nanoseconds).

    The collection time must be within one day before and after the current time. Otherwise, the specified collection time is invalid and the current time is used as the collection time.

    The interval between the log collection time and current time must be less than the log retention duration. Otherwise, reported logs will be cleared. For example, if the log retention duration is seven days, the log collection time must be within the last seven days.

    labels

    Yes

    Object

    Custom labels.

    Do not set field names to system reserved fields. Otherwise, problems such as duplicate field names and inaccurate query may occur.

Log Example

{
    "log_time_ns": "XXXXXXXXXXXXXXXXXXX",
    "content": "This is a log 1"
    "labels": {
        "type": "kafka"
    }
}

Calling Example

  1. The following uses FileBeat as an example to describe how to configure parameters for calling by Beat series software.
    output.kafka:
    hosts: ["${ip}:${port}"]
    partition.round_robin:
    reachable_only: false
    username: "${projectId}"
    password: "${accessKey}#${accessSecret}"
    topic: "${logGroupId}_${logStreamId}"
    sasl.mechanism: "PLAIN"
    security.protocol: "SASL_PLAINTEXT"
    acks: "0"
    compression: gzip
  2. When Logstash is used to report logs:
    input {
    stdin {}
    }
    output {
    kafka {
    # Configure an address.
    bootstrap_servers => "${ip}:${port}"
    # Configure a topic.
    topic_id => "${logGroupId}_${logStreamId}"
    # Configure a message acknowledgment mechanism.
    acks => "0"
    # Configure a compression mode.
    compression_type => "gzip"
    # Configure an authentication mode.
    security_protocol => "SASL_PLAINTEXT"
    sasl_mechanism => "PLAIN"
    # Username: projectId; Password: accessKey#accessSecret
    sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='${projectId}' password='${accessKey}#${accessSecret}';"
    }
    }
  3. When Flume is used to report logs:
    #Name
    a1.sources = r1
    a1.channels = c1
    a1.sinks = k1
    #Source
    a1.sources.r1.type = TAILDIR
    a1.sources.r1.channels = c1
    a1.sources.r1.filegroups = f1
    a1.sources.r1.filegroups.f1 = /tmp/test.txt
    a1.sources.r1.fileHeader = true
    a1.sources.r1.maxBatchCount = 1000
    #Channel
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 10000
    a1.channels.c1.transactionCapacity = 100
    #Sink
    a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
    a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId}
    a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port}
    a1.sinks.k1.kafka.producer.acks = 0
    a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT
    a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN
    a1.sinks.k1.kafka.producer.compression.type = gzip
    a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}";
    #Bind
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1

SDK Calling Examples

  1. Java SDK calling example:

    Maven dependency (take Kafka protocol 2.7.1 as an example):

    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.7.1</version>
        </dependency>
    </dependencies>

    Sample code:

    package org.example;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.Producer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import java.util.Properties;
    public class ProducerDemo {
    public static void main(String[] args) {
    Properties props = new Properties();
    // Configure an address.
    props.put("bootstrap.servers", "${ip}:${port}");
    // Configure a message acknowledgment mechanism.
    props.put("acks", "0");
    // Configure an authentication mode.
    props.put("security.protocol", "SASL_PLAINTEXT");
    props.put("sasl.mechanism", "PLAIN");
    // Username: projectId; Password: accessKey#accessSecret
    props.put("sasl.jaas.config",
    "org.apache.kafka.common.security.plain.PlainLoginModule required username='${projectId}' password='${accessKey}#${accessSecret}';");
    // Configure a compression mode.
    props.put("compression.type", "${compress_type}");
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    // 1. Create a producer object.
    Producer<String, String> producer = new KafkaProducer<>(props);
    // 2. Call the send method.
    for (int i = 0; i < 1; i++) {
    ProducerRecord record = new ProducerRecord<>("${logGroupId}_${logStreamId}", "${message}");
    // Configure recordHeader.
    // record.headers().add(new RecordHeader("LTS_LOG_TYPE","FORMAT".getBytes()));
    producer.send(record);
    }
    // 3. Close the producer.
    producer.close();
    }
    }
  2. Python SDK calling example:
    from kafka import KafkaProducer
    producer = KafkaProducer(
    # Configure an address.
    bootstrap_servers="${ip}:${port}",
    # Configure a message acknowledgment mechanism.
    acks="0",
    # Configure a compression mode.
    compression_type ="${compression_type}"
    # Configure an authentication mode.
    sasl_mechanism="PLAIN",
    security_protocol="SASL_PLAINTEXT",
    # Username: projectId; Password: accessKey#accessSecret
    sasl_plain_username="${projectId}",
    sasl_plain_password="${accessKey}#${accessSecret}"
    )
    print('start producer')
    for i in range(0, 3):
        data = bytes("${message}", encoding="utf-8")
        future = producer.send("${logGroupId}_{logStreamId}", data)
        result = future.get(timeout=10)
        print(result)
    print('end producer')

Error Description

If there is an invalid or mismatched parameter, an error message will be displayed.

Table 4 Error description

Error Message

Cause

TopicAuthorizationException

The projectId (project ID), accessKey (AK), and accessSecret (SK) parameters are invalid or mismatched.

UnknownTopicOrPartitionException

The logGroupId (log group ID) and logStreamId (log stream ID) parameters are invalid or mismatched.

InvalidRecordException

The log format is incorrect, or the settings of projectId, logGroupId, and logStreamId in the log are inconsistent with those of the external parameters.

Obtaining Parameters

Table 5 Regions

Region Name

Endpoint

CN North-Beijing4

lts-kafka.cn-north-4.myhuaweicloud.com

CN North-Beijing1

lts-access.cn-north-1.myhuaweicloud.com

CN East-Shanghai1

lts-access.cn-east-3.myhuaweicloud.com

CN East-Shanghai2

lts-kafka.cn-east-2.myhuaweicloud.com

CN South-Guangzhou

lts-access.cn-south-1.myhuaweicloud.com

AP-Singapore

lts-kafka.ap-southeast-3.myhuaweicloud.com

CN-Hong Kong

lts-access.ap-southeast-1.myhuaweicloud.com

CN North-Ulanqab1

lts-access.cn-north-9.myhuaweicloud.com