更新时间:2024-05-22 GMT+08:00
分享

使用KAFKA协议上报日志

您可以通过Kafka协议上报日志到日志服务,目前支持各类Kafka Producer SDK或采集工具,仅依赖于Kafka协议。支持以下场景:

  • 场景1:您已有基于开源采集的自建系统,仅修改配置文件便可以将日志上报到 LTS,例如Logstash。
  • 场景2:您希望通过 Kafka producer SDK 来采集日志并上报,不必再安装采集ICAgent。

目前此功能仅支持华北-北京四,其他局点需要提交工单申请使用。

前提条件

  • 使用云日志SDK前,您需要注册用户账号,并开通云日志服务。
  • 确认云日志服务的区域,请用户根据所在区域,获取regionName。
  • 获取华为账号的AK/SK
  • 获取华为云账号的项目ID(project id),步骤参考:请参见“我的凭证 >API凭证”。
  • 获取需要上报到LTS的日志组ID、日志流ID。
  • 当前仅支持内网上报,需要在ECS主机上使用。

相关限制

  • 当前仅支持内网上报,端口固定为9095,IP根据所在局点进行配置。
  • 支持 Kafka 协议版本为:1.0.X,2.X.X,3.X.X。
  • 支持压缩方式:gzip,snappy,lz4。
  • KAFKA认证方式为 SASL_PLAINTEXT 认证。
  • KAFKA协议的ACKS参数必须设置为0。

配置方式

  • 使用Kafka协议上报日志时,需要使用到的通用参数如下。
    表1 通用参数

    参数名称

    描述

    类型

    projectId

    用户账号的项目ID(project id)

    String

    logGroupId

    LTS的日志组ID

    String

    logStreamId

    LTS的日志流ID

    String

    regionName

    云日志服务的区域

    String

    accessKey

    用户账号的AK

    String

    accessSecret

    用户账号的SK

    String

  • 使用Kafka协议上报日志时,需要配置以下参数。
    表2 配置参数

    参数名称

    说明

    连接类型

    当前支持SASL_PLAINTEXT

    hosts

    Kafka的IP和PORT地址,格式为 lts-kafka.${regionName}.myhuaweicloud.com:9095

    其中IP根据局点进行配置,PORT固定为9095。例如北京四局点对应hosts为 lts-kafka.cn-north-4.myhuaweicloud.com:9095。

    topic

    Kafka的topic名称,格式为 ${日志组ID}_${日志流ID},即LTS的日志组ID和日志流ID通过下划线连接,作为topic的名称。

    username

    Kafka访问用户名,配置为用户账号的项目ID。

    password

    Kafka访问密码,格式为${accessKey}#${accessSecret},即用户账号的AK和SK通过#连接,作为Kafka的访问密码。

    headers

    当您希望设置自定义label字段时,需要配置headers。headers的参数配置分为以下两种情况:

    • 不配置headers,对上报的日志格式没有要求。
    • headers中添加header,key为LTS_LOG_TYPE,value为FORMAT,用户需要上报符合要求的规范化日志。
  • ${message}日志格式

    仅当headers中添加了key为LTS_LOG_TYPE,value为FORMAT的header时,日志需要符合该格式规范。

    表3 日志参数

    参数名称

    是否必选

    参数类型

    描述

    tenant_project_id

    String

    用户账号的项目ID。

    tenant_group_id

    String

    LTS的日志组ID。

    tenant_stream_id

    String

    LTS的日志流ID。

    log_time_ns

    Long

    日志数据采集时间,UTC时间(纳秒)。

    说明:

    采集时间需在日志存储时间范围之内,否则上报日志会被删除。比如日志组的日志存储时间是7天,则此参数不应早于当前时间的7天前。

    contents

    Array of String

    日志内容

    labels

    Object

    用户自定义label。

    说明:

    请不要将字段名称设置为内置保留字段,否则可能会造成字段名称重复、查询不精确等问题。

日志示例

{
    "tenant_project_id": "${projectId}",
    "tenant_group_id": "${logGroupId}",
    "tenant_stream_id": "${logStreamId}",
    "log_time_ns": "XXXXXXXXXXXXXXXXXXX",
    "contents": [
        "This is a log 1",
        "This is a log 2"
    ],
    "labels": {
        "type": "kafka"
    }
}

调用示例

  1. Beat系列软件调用(FileBeat等)。以FileBeat为例,配置参数如下:
    output.kafka:
    hosts: ["${ip}:${port}"]
    partition.round_robin:
    reachable_only: false
    username: "${projectId}"
    password: "${accessKey}#${accessSecret}"
    topic: "${logGroupId}_${logStreamId}'"
    sasl.mechanism: "PLAIN"
    security.protocol: "SASL_PLAINTEXT"
    acks: "0"
    compression: gzip
  2. 通过Logstash软件上报日志。
    input {
    stdin {}
    }
    output {
    kafka {
    # 配置地址
    bootstrap_servers => "${ip}:${port}"
    # 配置topic
    topic_id => "${logGroupId}_${logStreamId}"
    # 配置消息确认机制
    acks => "0"
    # 配置压缩方式
    compression_type => "gzip"
    # 配置认证方式
    security_protocol => "SASL_PLAINTEXT"
    sasl_mechanism => "PLAIN"
    # 用户名 projectId 密码 accessKey#accessSecret
    sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='${projectId}' password='${accessKey}#${accessSecret}';"
    }
    }
  3. 通过Flume软件上报日志。
    #Name
    a1.sources = r1
    a1.channels = c1
    a1.sinks = k1
    #Source
    a1.sources.r1.type = TAILDIR
    a1.sources.r1.channels = c1
    a1.sources.r1.filegroups = f1
    a1.sources.r1.filegroups.f1 = /tmp/test.txt
    a1.sources.r1.fileHeader = true
    a1.sources.r1.maxBatchCount = 1000
    #Channel
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 10000
    a1.channels.c1.transactionCapacity = 100
    #Sink
    a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
    a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId}
    a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port}
    a1.sinks.k1.kafka.producer.acks = 0
    a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT
    a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN
    a1.sinks.k1.kafka.producer.compression.type = gzip
    a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}";
    #Bind
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1

SDK 调用示例

  1. Java SDK调用示例。

    maven依赖(示例kafka协议版本为2.7.1):

    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.7.1</version>
        </dependency>
    </dependencies>

    代码示例:

    package org.example;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.Producer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import java.util.Properties;
    public class ProducerDemo {
    public static void main(String[] args) {
    Properties props = new Properties();
    // 配置地址
    props.put("bootstrap.servers", "${ip}:${port}");
    // 配置消息确认机制
    props.put("acks", "0");
    // 配置认证方式
    props.put("security.protocol", "SASL_PLAINTEXT");
    props.put("sasl.mechanism", "PLAIN");
    // 用户名 projectId 密码 accessKey#accessSecret
    props.put("sasl.jaas.config",
    "org.apache.kafka.common.security.plain.PlainLoginModule required username='${projectId}' password='${accessKey}#${accessSecret}';");
    // 配置压缩方式
    props.put("compression.type", "${compress_type}");
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    // 1.创建一个生产者对象
    Producer<String, String> producer = new KafkaProducer<>(props);
    // 2.调用send方法
    for (int i = 0; i < 1; i++) {
    ProducerRecord record = new ProducerRecord<>("${logGroupId}_${logStreamId}", "${message}");
    // 配置recordHeader
    // record.headers().add(new RecordHeader("LTS_LOG_TYPE","FORMAT".getBytes()));
    producer.send(record);
    }
    // 3.关闭生产者
    producer.close();
    }
    }
  2. Python SDK调用示例。
    from kafka import KafkaProducer
    producer = KafkaProducer(
    # 配置地址
    bootstrap_servers="${ip}:${port}",
    # 配置消息确认机制
    acks="0",
    # 配置压缩方式
    compression_type ="${compression_type}"
    # 配置认证方式
    sasl_mechanism="PLAIN",
    security_protocol="SASL_PLAINTEXT",
    # 用户名 projectId 密码 accessKey#accessSecret
    sasl_plain_username="${projectId}",
    sasl_plain_password="${accessKey}#${accessSecret}"
    )
    print('start producer')
    for i in range(0, 3):
        data = bytes("${message}", encoding="utf-8")
        future = producer.send("${logGroupId}_{logStreamId}", data)
        result = future.get(timeout=10)
        print(result)
    print('end producer')

报错说明

当参数错误或不匹配时,会有相应的报错提示。

表4 报错说明

报错信息

报错原因

TopicAuthorizationException

projectId(项目ID)、accessKey(AK)、accessSecret(SK)参数错误或者不匹配。

UnknownTopicOrPartitionException

logGroupId(日志组ID)、logStreamId(日志流ID)参数错误或者不匹配。

InvalidRecordException

仅当配置headers,上报规范化日志时,会出现此类报错:

日志格式错误或者日志中的projectId(项目ID)、logGroupId(日志组ID)、logStreamId(日志流ID)与外部设置参数不一致。

分享:

    相关文档

    相关产品