文档首页/ 云日志服务 LTS/ 用户指南/ 日志接入/ 使用KAFKA协议上报日志到LTS
更新时间:2025-06-19 GMT+08:00

使用KAFKA协议上报日志到LTS

您可以通过Kafka协议上报日志到日志服务,目前支持各类Kafka Producer SDK或采集工具,仅依赖于Kafka协议。支持以下场景:

  • 场景1:已有基于开源采集的自建系统,仅修改配置文件便可以将日志上报到 LTS,例如Logstash。
  • 场景2:希望通过 Kafka producer SDK 来采集日志并上报,不必再安装采集ICAgent。

目前此功能仅支持华北-北京四、华北-北京一、华南-广州、华东-上海一、华东-上海二、亚太-新加坡、中国-香港、华北-乌兰察布一,其他局点需要提交工单申请使用。

前提条件

  • 使用云日志SDK前,您需要注册用户账号,并开通云日志服务。
  • 确认云日志服务的区域,请用户根据所在区域,获取regionid。
  • 如何获取访问密钥AK/SK
  • 获取华为云账号的项目ID(project id),详情请参见“我的凭证 >API凭证”。
  • 获取需要上报到LTS的日志组ID、日志流ID。
  • 当前仅支持内网上报,需要在ECS主机上使用。

相关限制

  • 当前仅支持内网上报,端口固定为9095,IP根据所在局点进行配置。
  • 支持 Kafka 协议版本为:1.0.X,2.X.X,3.X.X。
  • 支持压缩方式:gzip,snappy,lz4。
  • KAFKA认证方式为 SASL_PLAINTEXT 认证。
  • KAFKA协议的ACKS参数支持设置为0、1、all,默认设置为1。

配置方式

  • 使用Kafka协议上报日志时,需要使用到的通用参数如下。
    表1 通用参数

    参数名称

    描述

    类型

    projectId

    用户账号的项目ID(project id)

    String

    logGroupId

    LTS的日志组ID

    String

    logStreamId

    LTS的日志流ID

    String

    regionName

    云日志服务的区域

    String

    accessKey

    用户账号的AK

    String

    accessSecret

    用户账号的SK

    String

  • 使用Kafka协议上报日志时,需要配置以下参数。
    表2 配置参数

    参数名称

    说明

    连接类型

    当前支持SASL_PLAINTEXT

    hosts

    Kafka的IP和PORT地址,格式为lts-kafka.${regionName}.${external_global_domain_name}:9095或lts-access.${regionName}.${external_global_domain_name}:9095

    其中IP根据局点进行配置,PORT固定为9095。详细请参考参数获取方式,例如北京四局点对应hosts为 lts-kafka.cn-north-4.myhuaweicloud.com:9095。

    topic

    Kafka的topic名称,格式为 ${日志组ID}_${日志流ID},即LTS的日志组ID和日志流ID通过下划线连接,作为topic的名称。

    • 若日志格式为JSON,可以将topic名称设置为${日志组ID}_${日志流ID}_json,即可实现JSON日志自动展开。
    • 若需要指定日志的采集时间,可以将topic名称设置为${日志组ID}_${日志流ID}_format,确保上报符合要求的规范化日志。

    username

    Kafka访问用户名,配置为用户账号的项目ID。

    password

    Kafka访问密码,格式为${accessKey}#${accessSecret},即用户账号的AK和SK通过#连接,作为Kafka的访问密码。

  • JSON日志自动展开

    若日志格式均为JSON,推荐使用JSON日志自动展开功能,将topic名称设置为${日志组ID}_${日志流ID}_json_${index},系统会自动解析JSON类型的日志。

    默认JSON解析层数为1层,topic名称中index的值代表解析层数,index取值范围为 [1 4],即最大解析层数为4层。

    若日志非JSON类型导致解析失败,会将日志内容上报至_content_parse_fail_字段。

  • 指定日志采集时间

    若需要指定日志采集时间,将topic名称设置为${日志组ID}_${日志流ID}_format,上报日志格式需要为JSON结构且包含以下字段,请参考表3

    若需要指定日志采集时间,同时需要开启JSON日志自动展开功能,Kafka的topic名称格式需要为${日志组ID}_${日志流ID}_format_json_${index}。

    表3 日志参数

    参数名称

    是否必选

    参数类型

    描述

    content

    String

    日志内容。

    log_time_ns

    Long

    日志数据采集时间,UTC时间(纳秒级时间戳)。

    采集时间需在实际当前时间的前后1天范围内,否则指定的采集时间失效,采用当前时间作为采集时间。

    采集时间需在日志存储时间范围之内,否则上报日志会被删除。比如日志组的日志存储时间是7天,则此参数不应早于当前时间的7天前。

    labels

    Object

    用户自定义label。

    请不要将字段名称设置为内置保留字段,否则可能会造成字段名称重复、查询不精确等问题。

日志示例

{
    "log_time_ns": "XXXXXXXXXXXXXXXXXXX",
    "content": "This is a log 1"
    "labels": {
        "type": "kafka"
    }
}

调用示例

  1. Beat系列软件调用(FileBeat等)。以FileBeat为例,配置参数如下:
    output.kafka:
    hosts: ["${ip}:${port}"]
    partition.round_robin:
    reachable_only: false
    username: "${projectId}"
    password: "${accessKey}#${accessSecret}"
    topic: "${logGroupId}_${logStreamId}"
    sasl.mechanism: "PLAIN"
    security.protocol: "SASL_PLAINTEXT"
    acks: "0"
    compression: gzip
  2. 通过Logstash软件上报日志。
    input {
    stdin {}
    }
    output {
    kafka {
    # 配置地址
    bootstrap_servers => "${ip}:${port}"
    # 配置topic
    topic_id => "${logGroupId}_${logStreamId}"
    # 配置消息确认机制
    acks => "0"
    # 配置压缩方式
    compression_type => "gzip"
    # 配置认证方式
    security_protocol => "SASL_PLAINTEXT"
    sasl_mechanism => "PLAIN"
    # 用户名 projectId 密码 accessKey#accessSecret
    sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='${projectId}' password='${accessKey}#${accessSecret}';"
    }
    }
  3. 通过Flume软件上报日志。
    #Name
    a1.sources = r1
    a1.channels = c1
    a1.sinks = k1
    #Source
    a1.sources.r1.type = TAILDIR
    a1.sources.r1.channels = c1
    a1.sources.r1.filegroups = f1
    a1.sources.r1.filegroups.f1 = /tmp/test.txt
    a1.sources.r1.fileHeader = true
    a1.sources.r1.maxBatchCount = 1000
    #Channel
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 10000
    a1.channels.c1.transactionCapacity = 100
    #Sink
    a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
    a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId}
    a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port}
    a1.sinks.k1.kafka.producer.acks = 0
    a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT
    a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN
    a1.sinks.k1.kafka.producer.compression.type = gzip
    a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}";
    #Bind
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1

SDK 调用示例

  1. Java SDK调用示例。

    maven依赖(示例kafka协议版本为2.7.1):

    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.7.1</version>
        </dependency>
    </dependencies>

    代码示例:

    package org.example;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.Producer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import java.util.Properties;
    public class ProducerDemo {
    public static void main(String[] args) {
    Properties props = new Properties();
    // 配置地址
    props.put("bootstrap.servers", "${ip}:${port}");
    // 配置消息确认机制
    props.put("acks", "0");
    // 配置认证方式
    props.put("security.protocol", "SASL_PLAINTEXT");
    props.put("sasl.mechanism", "PLAIN");
    // 用户名 projectId 密码 accessKey#accessSecret
    props.put("sasl.jaas.config",
    "org.apache.kafka.common.security.plain.PlainLoginModule required username='${projectId}' password='${accessKey}#${accessSecret}';");
    // 配置压缩方式
    props.put("compression.type", "${compress_type}");
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    // 1.创建一个生产者对象
    Producer<String, String> producer = new KafkaProducer<>(props);
    // 2.调用send方法
    for (int i = 0; i < 1; i++) {
    ProducerRecord record = new ProducerRecord<>("${logGroupId}_${logStreamId}", "${message}");
    // 配置recordHeader
    // record.headers().add(new RecordHeader("LTS_LOG_TYPE","FORMAT".getBytes()));
    producer.send(record);
    }
    // 3.关闭生产者
    producer.close();
    }
    }
  2. Python SDK调用示例。
    from kafka import KafkaProducer
    producer = KafkaProducer(
    # 配置地址
    bootstrap_servers="${ip}:${port}",
    # 配置消息确认机制
    acks="0",
    # 配置压缩方式
    compression_type ="${compression_type}"
    # 配置认证方式
    sasl_mechanism="PLAIN",
    security_protocol="SASL_PLAINTEXT",
    # 用户名 projectId 密码 accessKey#accessSecret
    sasl_plain_username="${projectId}",
    sasl_plain_password="${accessKey}#${accessSecret}"
    )
    print('start producer')
    for i in range(0, 3):
        data = bytes("${message}", encoding="utf-8")
        future = producer.send("${logGroupId}_{logStreamId}", data)
        result = future.get(timeout=10)
        print(result)
    print('end producer')

报错说明

当参数错误或不匹配时,会有相应的报错提示。

表4 报错说明

报错信息

报错原因

TopicAuthorizationException

projectId(项目ID)、accessKey(AK)、accessSecret(SK)参数错误或者不匹配。

UnknownTopicOrPartitionException

logGroupId(日志组ID)、logStreamId(日志流ID)参数错误或者不匹配。

InvalidRecordException

日志格式错误或者日志中的projectId(项目ID)、logGroupId(日志组ID)、logStreamId(日志流ID)与外部设置参数不一致。

参数获取方式

表5 区域表

区域名称

终端节点

华北-北京四

lts-kafka.cn-north-4.myhuaweicloud.com

华北-北京一

lts-access.cn-north-1.myhuaweicloud.com

华东-上海一

lts-access.cn-east-3.myhuaweicloud.com

华东-上海二

lts-kafka.cn-east-2.myhuaweicloud.com

华南-广州

lts-access.cn-south-1.myhuaweicloud.com

亚太-新加坡

lts-kafka.ap-southeast-3.myhuaweicloud.com

中国-香港

lts-access.ap-southeast-1.myhuaweicloud.com

华北-乌兰察布一

lts-access.cn-north-9.myhuaweicloud.com