使用KAFKA协议上报日志到LTS
您可以通过Kafka协议上报日志到日志服务,目前支持各类Kafka Producer SDK或采集工具,仅依赖于Kafka协议。支持以下场景:
- 场景1:已有基于开源采集的自建系统,仅修改配置文件便可以将日志上报到 LTS,例如Logstash。
- 场景2:希望通过 Kafka producer SDK 来采集日志并上报,不必再安装采集ICAgent。
目前此功能仅支持华北-北京四、华南-广州、华东-上海一、华东-上海二、亚太-新加坡,其他局点需要提交工单申请使用。
前提条件
- 使用云日志SDK前,您需要注册用户账号,并开通云日志服务。
- 确认云日志服务的区域,请用户根据所在区域,获取regionid。
- 获取华为账号的AK/SK。
- 获取华为云账号的项目ID(project id),详情请参见“我的凭证 >API凭证”。
- 获取需要上报到LTS的日志组ID、日志流ID。
- 当前仅支持内网上报,需要在ECS主机上使用。
相关限制
- 当前仅支持内网上报,端口固定为9095,IP根据所在局点进行配置。
- 支持 Kafka 协议版本为:1.0.X,2.X.X,3.X.X。
- 支持压缩方式:gzip,snappy,lz4。
- KAFKA认证方式为 SASL_PLAINTEXT 认证。
- KAFKA协议的ACKS参数必须设置为0。
配置方式
- 使用Kafka协议上报日志时,需要使用到的通用参数如下。
表1 通用参数 参数名称
描述
类型
projectId
用户账号的项目ID(project id)
String
logGroupId
LTS的日志组ID
String
logStreamId
LTS的日志流ID
String
regionName
云日志服务的区域
String
accessKey
用户账号的AK
String
accessSecret
用户账号的SK
String
- 使用Kafka协议上报日志时,需要配置以下参数。
表2 配置参数 参数名称
说明
连接类型
当前支持SASL_PLAINTEXT
hosts
Kafka的IP和PORT地址,格式为lts-kafka.${regionName}.${external_global_domain_name}:9095或lts-access.${regionName}.${external_global_domain_name}:9095
其中IP根据局点进行配置,PORT固定为9095。详细请参考参数获取方式,例如北京四局点对应hosts为 lts-kafka.cn-north-4.myhuaweicloud.com:9095。
topic
Kafka的topic名称,格式为 ${日志组ID}_${日志流ID},即LTS的日志组ID和日志流ID通过下划线连接,作为topic的名称。
username
Kafka访问用户名,配置为用户账号的项目ID。
password
Kafka访问密码,格式为${accessKey}#${accessSecret},即用户账号的AK和SK通过#连接,作为Kafka的访问密码。
headers
设置自定义label字段时,需要配置headers。
headers中添加header,key值为LTS_LOG_TYPE,value值为FORMAT,用户需要上报符合要求的规范化日志。
- ${message}日志格式
仅当headers中添加了key为LTS_LOG_TYPE,value为FORMAT的header时,日志需要符合该格式规范。
表3 日志参数 参数名称
是否必选
参数类型
描述
tenant_project_id
是
String
用户账号的项目ID。
tenant_group_id
是
String
LTS的日志组ID。
tenant_stream_id
是
String
LTS的日志流ID。
log_time_ns
是
Long
日志数据采集时间,UTC时间(纳秒)。
说明:采集时间需在日志存储时间范围之内,否则上报日志会被删除。比如日志组的日志存储时间是7天,则此参数不应早于当前时间的7天前。
contents
是
Array of String
日志内容。
labels
是
Object
用户自定义label。
说明:请不要将字段名称设置为内置保留字段,否则可能会造成字段名称重复、查询不精确等问题。
日志示例
{ "tenant_project_id": "${projectId}", "tenant_group_id": "${logGroupId}", "tenant_stream_id": "${logStreamId}", "log_time_ns": "XXXXXXXXXXXXXXXXXXX", "contents": [ "This is a log 1", "This is a log 2" ], "labels": { "type": "kafka" } }
调用示例
- Beat系列软件调用(FileBeat等)。以FileBeat为例,配置参数如下:
output.kafka: hosts: ["${ip}:${port}"] partition.round_robin: reachable_only: false username: "${projectId}" password: "${accessKey}#${accessSecret}" topic: "${logGroupId}_${logStreamId}'" sasl.mechanism: "PLAIN" security.protocol: "SASL_PLAINTEXT" acks: "0" compression: gzip
- 通过Logstash软件上报日志。
input { stdin {} } output { kafka { # 配置地址 bootstrap_servers => "${ip}:${port}" # 配置topic topic_id => "${logGroupId}_${logStreamId}" # 配置消息确认机制 acks => "0" # 配置压缩方式 compression_type => "gzip" # 配置认证方式 security_protocol => "SASL_PLAINTEXT" sasl_mechanism => "PLAIN" # 用户名 projectId 密码 accessKey#accessSecret sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='${projectId}' password='${accessKey}#${accessSecret}';" } }
- 通过Flume软件上报日志。
#Name a1.sources = r1 a1.channels = c1 a1.sinks = k1 #Source a1.sources.r1.type = TAILDIR a1.sources.r1.channels = c1 a1.sources.r1.filegroups = f1 a1.sources.r1.filegroups.f1 = /tmp/test.txt a1.sources.r1.fileHeader = true a1.sources.r1.maxBatchCount = 1000 #Channel a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 100 #Sink a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId} a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port} a1.sinks.k1.kafka.producer.acks = 0 a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN a1.sinks.k1.kafka.producer.compression.type = gzip a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}"; #Bind a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
SDK 调用示例
- Java SDK调用示例。
<dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.7.1</version> </dependency> </dependencies>
代码示例:
package org.example; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; public class ProducerDemo { public static void main(String[] args) { Properties props = new Properties(); // 配置地址 props.put("bootstrap.servers", "${ip}:${port}"); // 配置消息确认机制 props.put("acks", "0"); // 配置认证方式 props.put("security.protocol", "SASL_PLAINTEXT"); props.put("sasl.mechanism", "PLAIN"); // 用户名 projectId 密码 accessKey#accessSecret props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='${projectId}' password='${accessKey}#${accessSecret}';"); // 配置压缩方式 props.put("compression.type", "${compress_type}"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 1.创建一个生产者对象 Producer<String, String> producer = new KafkaProducer<>(props); // 2.调用send方法 for (int i = 0; i < 1; i++) { ProducerRecord record = new ProducerRecord<>("${logGroupId}_${logStreamId}", "${message}"); // 配置recordHeader // record.headers().add(new RecordHeader("LTS_LOG_TYPE","FORMAT".getBytes())); producer.send(record); } // 3.关闭生产者 producer.close(); } }
- Python SDK调用示例。
from kafka import KafkaProducer producer = KafkaProducer( # 配置地址 bootstrap_servers="${ip}:${port}", # 配置消息确认机制 acks="0", # 配置压缩方式 compression_type ="${compression_type}" # 配置认证方式 sasl_mechanism="PLAIN", security_protocol="SASL_PLAINTEXT", # 用户名 projectId 密码 accessKey#accessSecret sasl_plain_username="${projectId}", sasl_plain_password="${accessKey}#${accessSecret}" ) print('start producer') for i in range(0, 3): data = bytes("${message}", encoding="utf-8") future = producer.send("${logGroupId}_{logStreamId}", data) result = future.get(timeout=10) print(result) print('end producer')
报错说明
当参数错误或不匹配时,会有相应的报错提示。
报错信息 |
报错原因 |
---|---|
TopicAuthorizationException |
projectId(项目ID)、accessKey(AK)、accessSecret(SK)参数错误或者不匹配。 |
UnknownTopicOrPartitionException |
logGroupId(日志组ID)、logStreamId(日志流ID)参数错误或者不匹配。 |
InvalidRecordException |
仅当配置headers,上报规范化日志时,会出现此类报错: 日志格式错误或者日志中的projectId(项目ID)、logGroupId(日志组ID)、logStreamId(日志流ID)与外部设置参数不一致。 |