Help Center/ Log Tank Service/ User Guide/ Log Ingestion/ Using Kafka to Report Logs to LTS
Updated on 2024-09-19 GMT+08:00

Using Kafka to Report Logs to LTS

You can use various Kafka Producer SDKs or collection tools that support the Kafka protocol to report logs to LTS. This function applies to the following scenarios:

  • Scenario 1: If you have built a system based on open-source tools, such as Logstash, simply modify the configuration file to enable log reporting to LTS.
  • Scenario 2: If you want to use Kafka Producer SDKs to collect and report logs, you do not need to install ICAgent.

Currently, this function is available only in regions CN North-Beijing4, CN South-Guangzhou, CN East-Shanghai2, and AP-Singapore. To use it in other regions, submit a service ticket.

Prerequisites

  • You have registered an account and enabled LTS.
  • You have obtained the ID of the region where LTS is deployed.
  • You have obtained the AK/SK of your account.
  • You have obtained a project ID of your Huawei Cloud account. For details, see API Credentials in My Credentials.
  • You have obtained the IDs of the target log group and stream in LTS.
  • Currently, this function is available only for reporting ECSs logs over intranets.

Constraints

  • Currently, this function is available only on intranets. The port number must be 9096. You can set the IP address based on the site requirements.
  • The supported Kafka protocol versions are 1.0.X, 2.X.X, and 3.X.X.
  • The supported compression modes are gzip, snappy, and lz4.
  • The Kafka authentication mode is SASL_PLAINTEXT.
  • The ACKS parameter of the Kafka protocol must be set to 0.

Configuration

  • To use the Kafka protocol to report logs, set the following common parameters:
    Table 1 Common parameters

    Parameter

    Description

    Type

    projectId

    Project ID of the user account

    String

    logGroupId

    LTS log group ID

    String

    logStreamId

    LTS log stream ID

    String

    regionName

    Region where the LTS service is deployed

    String

    accessKey

    AK of the user account

    String

    accessSecret

    SK of the user account

    String

  • To use Kafka to report logs, you also need to set the following parameters:
    Table 2 Configuration parameters

    Parameter

    Description

    Connection type

    SASL_PLAINTEXT is supported.

    hosts

    IP address and port number of Kafka. The format is lts-kafka.${regionName}.${external_global_domain_name}:9095 or lts-access.${regionName}.${external_global_domain_name}:9095.

    Set the IP address based on the site requirements and set the port number to 9095. For details, see Obtaining Parameters. For example, the value of hosts for the CN North-Beijing4 region is lts-kafka.cn-north-4.myhuaweicloud.com:9095.

    topic

    Kafka topic name. The format is ${log group ID}_${log stream ID}, in which the LTS log group and stream IDs are connected by an underscore (_).

    username

    Username for accessing Kafka. Set this parameter to the project ID of the user account.

    password

    Password for accessing Kafka. The format is ${accessKey}#${accessSecret}, in which the AK and SK of the user account are connected by a number sign (#).

    headers

    When customizing the label field, you need to configure headers.

    Add a header with the key LTS_LOG_TYPE and the value FORMAT to headers. The reported logs must be standardized logs that meet the format requirement.

  • ${message} log format

    Logs must comply with this format only when a header whose key is LTS_LOG_TYPE and value is FORMAT is added to headers.

    Table 3 Log parameters

    Parameter

    Mandatory

    Type

    Description

    tenant_project_id

    Yes

    String

    Project ID of the user account

    tenant_group_id

    Yes

    String

    LTS log group ID

    tenant_stream_id

    Yes

    String

    LTS log stream ID

    log_time_ns

    Yes

    Long

    Log collection time (UTC time, in nanoseconds)

    NOTE:

    The interval between the log collection time and current time must be less than the log retention duration. Otherwise, reported logs will be cleared. For example, if the log retention duration is seven days, the log collection time must be within the last seven days.

    contents

    Yes

    Array of String

    Log content

    labels

    Yes

    Object

    Custom labels

    NOTE:

    Avoid using the name of a built-in reserved field as a field name to prevent issues like duplicate names and query inaccuracies.

Log Example

{
    "tenant_project_id": "${projectId}",
    "tenant_group_id": "${logGroupId}",
    "tenant_stream_id": "${logStreamId}",
    "log_time_ns": "XXXXXXXXXXXXXXXXXXX",
    "contents": [
        "This is a log 1",
        "This is a log 2"
    ],
    "labels": {
        "type": "kafka"
    }
}

Calling Example

  1. The following uses FileBeat as an example to describe how to configure parameters for calling by Beat series software.
    output.kafka:
    hosts: ["${ip}:${port}"]
    partition.round_robin:
    reachable_only: false
    username: "${projectId}"
    password: "${accessKey}#${accessSecret}"
    topic: "${logGroupId}_${logStreamId}'"
    sasl.mechanism: "PLAIN"
    security.protocol: "SASL_PLAINTEXT"
    acks: "0"
    compression: gzip
  2. When Logstash is used to report logs:
    input {
    stdin {}
    }
    output {
    kafka {
    # Configure an address.
    bootstrap_servers => "${ip}:${port}"
    # Configure a topic.
    topic_id => "${logGroupId}_${logStreamId}"
    # Configure a message acknowledgment mechanism.
    acks => "0"
    # Configure a compression mode.
    compression_type => "gzip"
    # Configure an authentication mode.
    security_protocol => "SASL_PLAINTEXT"
    sasl_mechanism => "PLAIN"
    # Username: projectId; Password: accessKey#accessSecret
    sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='${projectId}' password='${accessKey}#${accessSecret}';"
    }
    }
  3. When Flume is used to report logs:
    #Name
    a1.sources = r1
    a1.channels = c1
    a1.sinks = k1
    #Source
    a1.sources.r1.type = TAILDIR
    a1.sources.r1.channels = c1
    a1.sources.r1.filegroups = f1
    a1.sources.r1.filegroups.f1 = /tmp/test.txt
    a1.sources.r1.fileHeader = true
    a1.sources.r1.maxBatchCount = 1000
    #Channel
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 10000
    a1.channels.c1.transactionCapacity = 100
    #Sink
    a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
    a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId}
    a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port}
    a1.sinks.k1.kafka.producer.acks = 0
    a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT
    a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN
    a1.sinks.k1.kafka.producer.compression.type = gzip
    a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}";
    #Bind
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1

SDK Calling Examples

  1. Java SDK calling example:

    Maven dependency (take Kafka protocol 2.7.1 as an example):

    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.7.1</version>
        </dependency>
    </dependencies>

    Sample code:

    package org.example;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.Producer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import java.util.Properties;
    public class ProducerDemo {
    public static void main(String[] args) {
    Properties props = new Properties();
    // Configure an address.
    props.put("bootstrap.servers", "${ip}:${port}");
    // Configure a message acknowledgment mechanism.
    props.put("acks", "0");
    // Configure an authentication mode.
    props.put("security.protocol", "SASL_PLAINTEXT");
    props.put("sasl.mechanism", "PLAIN");
    // Username: projectId; Password: accessKey#accessSecret
    props.put("sasl.jaas.config",
    "org.apache.kafka.common.security.plain.PlainLoginModule required username='${projectId}' password='${accessKey}#${accessSecret}';");
    // Configure a compression mode.
    props.put("compression.type", "${compress_type}");
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    // 1. Create a producer object.
    Producer<String, String> producer = new KafkaProducer<>(props);
    // 2. Call the send method.
    for (int i = 0; i < 1; i++) {
    ProducerRecord record = new ProducerRecord<>("${logGroupId}_${logStreamId}", "${message}");
    // Configure recordHeader.
    // record.headers().add(new RecordHeader("LTS_LOG_TYPE","FORMAT".getBytes()));
    producer.send(record);
    }
    // 3. Close the producer.
    producer.close();
    }
    }
  2. Python SDK calling example:
    from kafka import KafkaProducer
    producer = KafkaProducer(
    # Configure an address.
    bootstrap_servers="${ip}:${port}",
    # Configure a message acknowledgment mechanism.
    acks="0",
    # Configure a compression mode.
    compression_type ="${compression_type}"
    # Configure an authentication mode.
    sasl_mechanism="PLAIN",
    security_protocol="SASL_PLAINTEXT",
    # Username: projectId; Password: accessKey#accessSecret
    sasl_plain_username="${projectId}",
    sasl_plain_password="${accessKey}#${accessSecret}"
    )
    print('start producer')
    for i in range(0, 3):
        data = bytes("${message}", encoding="utf-8")
        future = producer.send("${logGroupId}_{logStreamId}", data)
        result = future.get(timeout=10)
        print(result)
    print('end producer')

Error Description

If there is a invalid or mismatched parameter, an error message will be displayed.

Table 4 Error description

Error Message

Cause

TopicAuthorizationException

The projectId (project ID), accessKey (AK), and accessSecret (SK) parameters are invalid or mismatched.

UnknownTopicOrPartitionException

The logGroupId (log group ID) and logStreamId (log stream ID) parameters are invalid or mismatched.

InvalidRecordException

This error occurs only when the headers parameter is configured to report standardized logs.

The log format is incorrect, or the settings of projectId, logGroupId, and logStreamId in the log are inconsistent with those of the external parameters.

Obtaining Parameters

Table 5 Regions

Region

RegionName

CN North-Beijing4

lts-kafka.cn-north-4.myhuaweicloud.com

CN East-Shanghai2

lts-kafka.cn-east-2.myhuaweicloud.com

CN South-Guangzhou

lts-access.cn-south-1.myhuaweicloud.com

AP-Singapore

lts-kafka.ap-southeast-3.myhuaweicloud.com