Compute
Elastic Cloud Server
Huawei Cloud Flexus
Bare Metal Server
Auto Scaling
Image Management Service
Dedicated Host
FunctionGraph
Cloud Phone Host
Huawei Cloud EulerOS
Networking
Virtual Private Cloud
Elastic IP
Elastic Load Balance
NAT Gateway
Direct Connect
Virtual Private Network
VPC Endpoint
Cloud Connect
Enterprise Router
Enterprise Switch
Global Accelerator
Management & Governance
Cloud Eye
Identity and Access Management
Cloud Trace Service
Resource Formation Service
Tag Management Service
Log Tank Service
Config
OneAccess
Resource Access Manager
Simple Message Notification
Application Performance Management
Application Operations Management
Organizations
Optimization Advisor
IAM Identity Center
Cloud Operations Center
Resource Governance Center
Migration
Server Migration Service
Object Storage Migration Service
Cloud Data Migration
Migration Center
Cloud Ecosystem
KooGallery
Partner Center
User Support
My Account
Billing Center
Cost Center
Resource Center
Enterprise Management
Service Tickets
HUAWEI CLOUD (International) FAQs
ICP Filing
Support Plans
My Credentials
Customer Operation Capabilities
Partner Support Plans
Professional Services
Analytics
MapReduce Service
Data Lake Insight
CloudTable Service
Cloud Search Service
Data Lake Visualization
Data Ingestion Service
GaussDB(DWS)
DataArts Studio
Data Lake Factory
DataArts Lake Formation
IoT
IoT Device Access
Others
Product Pricing Details
System Permissions
Console Quick Start
Common FAQs
Instructions for Associating with a HUAWEI CLOUD Partner
Message Center
Security & Compliance
Security Technologies and Applications
Web Application Firewall
Host Security Service
Cloud Firewall
SecMaster
Anti-DDoS Service
Data Encryption Workshop
Database Security Service
Cloud Bastion Host
Data Security Center
Cloud Certificate Manager
Edge Security
Managed Threat Detection
Blockchain
Blockchain Service
Web3 Node Engine Service
Media Services
Media Processing Center
Video On Demand
Live
SparkRTC
MetaStudio
Storage
Object Storage Service
Elastic Volume Service
Cloud Backup and Recovery
Storage Disaster Recovery Service
Scalable File Service Turbo
Scalable File Service
Volume Backup Service
Cloud Server Backup Service
Data Express Service
Dedicated Distributed Storage Service
Containers
Cloud Container Engine
SoftWare Repository for Container
Application Service Mesh
Ubiquitous Cloud Native Service
Cloud Container Instance
Databases
Relational Database Service
Document Database Service
Data Admin Service
Data Replication Service
GeminiDB
GaussDB
Distributed Database Middleware
Database and Application Migration UGO
TaurusDB
Middleware
Distributed Cache Service
API Gateway
Distributed Message Service for Kafka
Distributed Message Service for RabbitMQ
Distributed Message Service for RocketMQ
Cloud Service Engine
Multi-Site High Availability Service
EventGrid
Dedicated Cloud
Dedicated Computing Cluster
Business Applications
Workspace
ROMA Connect
Message & SMS
Domain Name Service
Edge Data Center Management
Meeting
AI
Face Recognition Service
Graph Engine Service
Content Moderation
Image Recognition
Optical Character Recognition
ModelArts
ImageSearch
Conversational Bot Service
Speech Interaction Service
Huawei HiLens
Video Intelligent Analysis Service
Developer Tools
SDK Developer Guide
API Request Signing Guide
Terraform
Koo Command Line Interface
Content Delivery & Edge Computing
Content Delivery Network
Intelligent EdgeFabric
CloudPond
Intelligent EdgeCloud
Solutions
SAP Cloud
High Performance Computing
Developer Services
ServiceStage
CodeArts
CodeArts PerfTest
CodeArts Req
CodeArts Pipeline
CodeArts Build
CodeArts Deploy
CodeArts Artifact
CodeArts TestPlan
CodeArts Check
CodeArts Repo
Cloud Application Engine
MacroVerse aPaaS
KooMessage
KooPhone
KooDrive
Help Center/ Log Tank Service/ User Guide/ Log Ingestion/ Using Kafka to Report Logs to LTS

Using Kafka to Report Logs to LTS

Updated on 2025-01-24 GMT+08:00

You can use various Kafka Producer SDKs or collection tools that support the Kafka protocol to report logs to LTS. This function applies to the following scenarios:

  • Scenario 1: If you have built a system based on open-source tools, such as Logstash, simply modify the configuration file to enable log reporting to LTS.
  • Scenario 2: If you want to use Kafka Producer SDKs to collect and report logs, you do not need to install ICAgent.
NOTE:

Currently, this function is available only in regions CN North-Beijing4, CN South-Guangzhou, CN East-Shanghai1, CN East-Shanghai2, and AP-Singapore. To use it in other regions, submit a service ticket.

Prerequisites

  • You have registered an account and enabled LTS.
  • You have obtained the ID of the region where LTS is deployed.
  • You have obtained the AK/SK of your account.
  • You have obtained a project ID of your Huawei Cloud account. For details, see API Credentials in My Credentials.
  • You have obtained the IDs of the target log group and stream in LTS.
  • Currently, this function is available only for reporting ECSs logs over intranets.

Constraints

  • Currently, this function is available only on intranets. The port number must be 9095. You can set the IP address as required.
  • The supported Kafka protocol versions are 1.0.X, 2.X.X, and 3.X.X.
  • The supported compression modes are gzip, snappy, and lz4.
  • The Kafka authentication mode is SASL_PLAINTEXT.
  • The ACKS parameter of the Kafka protocol must be set to 0.

Configuration

  • To use the Kafka protocol to report logs, set the following common parameters:
    Table 1 Common parameters

    Parameter

    Description

    Type

    projectId

    Project ID of the user account

    String

    logGroupId

    LTS log group ID

    String

    logStreamId

    LTS log stream ID

    String

    regionName

    Region where the LTS service is deployed

    String

    accessKey

    AK of the user account

    String

    accessSecret

    SK of the user account

    String

  • To use Kafka to report logs, you also need to set the following parameters:
    Table 2 Configuration parameters

    Parameter

    Description

    Connection type

    SASL_PLAINTEXT is supported.

    hosts

    IP address and port number of Kafka. The format is lts-kafka.${regionName}.${external_global_domain_name}:9095 or lts-access.${regionName}.${external_global_domain_name}:9095.

    Set the IP address based on the site requirements and set the port number to 9095. For details, see Obtaining Parameters. For example, the value of hosts for the CN North-Beijing4 region is lts-kafka.cn-north-4.myhuaweicloud.com:9095.

    topic

    Kafka topic name. The format is ${log group ID}_${log stream ID}, in which the LTS log group and stream IDs are connected by an underscore (_).

    username

    Username for accessing Kafka. Set this parameter to the project ID of the user account.

    password

    Password for accessing Kafka. The format is ${accessKey}#${accessSecret}, in which the AK and SK of the user account are connected by a number sign (#).

    headers

    When customizing the label field, you need to configure headers.

    Add a header with the key LTS_LOG_TYPE and the value FORMAT to headers. The reported logs must be standardized logs that meet the format requirement.

  • ${message} log format

    Logs must comply with this format only when a header whose key is LTS_LOG_TYPE and value is FORMAT is added to headers.

    Table 3 Log parameters

    Parameter

    Mandatory

    Type

    Description

    tenant_project_id

    Yes

    String

    Project ID of the user account

    tenant_group_id

    Yes

    String

    LTS log group ID

    tenant_stream_id

    Yes

    String

    LTS log stream ID

    log_time_ns

    Yes

    Long

    Log collection time (UTC time, in nanoseconds)

    NOTE:

    The interval between the log collection time and current time must be less than the log retention duration. Otherwise, reported logs will be cleared. For example, if the log retention duration is seven days, the log collection time must be within the last seven days.

    contents

    Yes

    Array of String

    Log content

    labels

    Yes

    Object

    Custom labels

    NOTE:

    Avoid using the name of a built-in reserved field as a field name to prevent issues like duplicate names and query inaccuracies.

Log Example

{
    "tenant_project_id": "${projectId}",
    "tenant_group_id": "${logGroupId}",
    "tenant_stream_id": "${logStreamId}",
    "log_time_ns": "XXXXXXXXXXXXXXXXXXX",
    "contents": [
        "This is a log 1",
        "This is a log 2"
    ],
    "labels": {
        "type": "kafka"
    }
}

Calling Example

  1. The following uses FileBeat as an example to describe how to configure parameters for calling by Beat series software.
    output.kafka:
    hosts: ["${ip}:${port}"]
    partition.round_robin:
    reachable_only: false
    username: "${projectId}"
    password: "${accessKey}#${accessSecret}"
    topic: "${logGroupId}_${logStreamId}'"
    sasl.mechanism: "PLAIN"
    security.protocol: "SASL_PLAINTEXT"
    acks: "0"
    compression: gzip
  2. When Logstash is used to report logs:
    input {
    stdin {}
    }
    output {
    kafka {
    # Configure an address.
    bootstrap_servers => "${ip}:${port}"
    # Configure a topic.
    topic_id => "${logGroupId}_${logStreamId}"
    # Configure a message acknowledgment mechanism.
    acks => "0"
    # Configure a compression mode.
    compression_type => "gzip"
    # Configure an authentication mode.
    security_protocol => "SASL_PLAINTEXT"
    sasl_mechanism => "PLAIN"
    # Username: projectId; Password: accessKey#accessSecret
    sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='${projectId}' password='${accessKey}#${accessSecret}';"
    }
    }
  3. When Flume is used to report logs:
    #Name
    a1.sources = r1
    a1.channels = c1
    a1.sinks = k1
    #Source
    a1.sources.r1.type = TAILDIR
    a1.sources.r1.channels = c1
    a1.sources.r1.filegroups = f1
    a1.sources.r1.filegroups.f1 = /tmp/test.txt
    a1.sources.r1.fileHeader = true
    a1.sources.r1.maxBatchCount = 1000
    #Channel
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 10000
    a1.channels.c1.transactionCapacity = 100
    #Sink
    a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
    a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId}
    a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port}
    a1.sinks.k1.kafka.producer.acks = 0
    a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT
    a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN
    a1.sinks.k1.kafka.producer.compression.type = gzip
    a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}";
    #Bind
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1

SDK Calling Examples

  1. Java SDK calling example:

    Maven dependency (take Kafka protocol 2.7.1 as an example):

    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.7.1</version>
        </dependency>
    </dependencies>

    Sample code:

    package org.example;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.Producer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import java.util.Properties;
    public class ProducerDemo {
    public static void main(String[] args) {
    Properties props = new Properties();
    // Configure an address.
    props.put("bootstrap.servers", "${ip}:${port}");
    // Configure a message acknowledgment mechanism.
    props.put("acks", "0");
    // Configure an authentication mode.
    props.put("security.protocol", "SASL_PLAINTEXT");
    props.put("sasl.mechanism", "PLAIN");
    // Username: projectId; Password: accessKey#accessSecret
    props.put("sasl.jaas.config",
    "org.apache.kafka.common.security.plain.PlainLoginModule required username='${projectId}' password='${accessKey}#${accessSecret}';");
    // Configure a compression mode.
    props.put("compression.type", "${compress_type}");
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    // 1. Create a producer object.
    Producer<String, String> producer = new KafkaProducer<>(props);
    // 2. Call the send method.
    for (int i = 0; i < 1; i++) {
    ProducerRecord record = new ProducerRecord<>("${logGroupId}_${logStreamId}", "${message}");
    // Configure recordHeader.
    // record.headers().add(new RecordHeader("LTS_LOG_TYPE","FORMAT".getBytes()));
    producer.send(record);
    }
    // 3. Close the producer.
    producer.close();
    }
    }
  2. Python SDK calling example:
    from kafka import KafkaProducer
    producer = KafkaProducer(
    # Configure an address.
    bootstrap_servers="${ip}:${port}",
    # Configure a message acknowledgment mechanism.
    acks="0",
    # Configure a compression mode.
    compression_type ="${compression_type}"
    # Configure an authentication mode.
    sasl_mechanism="PLAIN",
    security_protocol="SASL_PLAINTEXT",
    # Username: projectId; Password: accessKey#accessSecret
    sasl_plain_username="${projectId}",
    sasl_plain_password="${accessKey}#${accessSecret}"
    )
    print('start producer')
    for i in range(0, 3):
        data = bytes("${message}", encoding="utf-8")
        future = producer.send("${logGroupId}_{logStreamId}", data)
        result = future.get(timeout=10)
        print(result)
    print('end producer')

Error Description

If there is an invalid or mismatched parameter, an error message will be displayed.

Table 4 Error description

Error Message

Cause

TopicAuthorizationException

The projectId (project ID), accessKey (AK), and accessSecret (SK) parameters are invalid or mismatched.

UnknownTopicOrPartitionException

The logGroupId (log group ID) and logStreamId (log stream ID) parameters are invalid or mismatched.

InvalidRecordException

This error occurs only when the headers parameter is configured to report standardized logs.

The log format is incorrect, or the settings of projectId, logGroupId, and logStreamId in the log are inconsistent with those of the external parameters.

Obtaining Parameters

Table 5 Regions

Region

RegionName

CN North-Beijing4

lts-kafka.cn-north-4.myhuaweicloud.com

CN East-Shanghai1

lts-access.cn-east-3.myhuaweicloud.com

CN East-Shanghai2

lts-kafka.cn-east-2.myhuaweicloud.com

CN South-Guangzhou

lts-access.cn-south-1.myhuaweicloud.com

AP-Singapore

lts-kafka.ap-southeast-3.myhuaweicloud.com

We use cookies to improve our site and your experience. By continuing to browse our site you accept our cookie policy. Find out more

Feedback

Feedback

Feedback

0/500

Selected Content

Submit selected content with the feedback