使用Flume采集器上报日志到LTS
Flume是一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方的能力。
用户使用Flume系统采集日志,并且通过LTS侧提供的KAFKA协议方式上报日志。以下是部分常用数据采集场景示例:
- 使用Flume采集文本日志上报到LTS
- 使用Flume采集数据库表数据并且上报至LTS
- 使用Flume采集syslog协议传输的日志上报到LTS
- 通过Flume采集TCP/UDP协议传输的日志上报到LTS
- 通过Flume采集SNMP协议上报的设备管理数据并发送到LTS
- 使用默认拦截器处理日志
- 自定义拦截器处理日志
- 使用外部数据源丰富日志内容并上报到LTS
前提条件
- 用户机器已经安装了JDK。
- 用户已经安装Flume,并且需要在Flume中配置文件中配置JDK路径。
使用Flume采集文本日志上报到LTS
支持使用Flume采集文本日志内容上报至LTS,参考如下示例添加采集文本日志的conf文件。以下示例中的参数介绍请参考使用KAFKA协议上报日志。
#Named a1.sources = r1 a1.channels = c1 a1.sinks = k1 #Source a1.sources.r1.type = TAILDIR a1.sources.r1.channels = c1 a1.sources.r1.filegroups = f1 a1.sources.r1.filegroups.f1 = /tmp/test.txt a1.sources.r1.fileHeader = true a1.sources.r1.maxBatchCount = 1000 #Channel a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 100 #Sink a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId} a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port} a1.sinks.k1.kafka.producer.acks = 0 a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN a1.sinks.k1.kafka.producer.compression.type = gzip a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}"; #Bind a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
使用Flume采集数据库表数据并且上报至LTS
使用Flume采集数据库表数据并且上报至LTS,实现对表数据变动监控。以下示例中的参数介绍请参考使用KAFKA协议上报日志。
- 在https://github.com/keedio/flume-ng-sql-source页面下载flume-ng-sql-source插件,转换为jar包并取名为flume-ng-sql-source.jar,打包前注意将pom文件中的flume-ng-core 版本与flume安装版本保持一致,并且将jar包放在安装Flume包路径的lib目录下面,例如FLUME_HOME/lib目录下(例子中的FLUME_HOME为Flume安装路径,仅供参考,请以实际安装路径为准)。
- 添加MySQL驱动到FLUME_HOME/lib目录下:
- 下载MySQL驱动。
wget https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-5.1.35.tar.gz
- 将驱动包解压并打为jar包。
tar xzf mysql-connector-java-5.1.35.tar.gz
- 将jar包存放在FLUME_HOME/lib/路径。
cp mysql-connector-java-5.1.35-bin.jar FLUME_HOME/lib/
- 下载MySQL驱动。
- 添加采集MySQL的conf文件。
# a1表示agent的名称 # source是a1的输入源 # channels是缓冲区 # sinks是a1输出目的地,本例子sinks使用了kafka a1.channels = c1 a1.sources = r1 a1.sinks = k1 #source a1.sources.r1.type = org.keedio.flume.source.SQLSource # 连接mysql的一系列操作,{mysql_host}改为您虚拟机的ip地址,可以通过ifconfig或者ip addr查看,{database_name}改为数据库名称 # url中要加入?useUnicode=true&characterEncoding=utf-8&useSSL=false,否则有可能连接失败 a1.sources.r1.hibernate.connection.url = jdbc:mysql://{mysql_host}:3306/{database_name}?useUnicode=true&characterEncoding=utf-8&useSSL=false # Hibernate Database connection properties # mysql账号,一般都是root a1.sources.r1.hibernate.connection.user = root # 填入您的mysql密码 a1.sources.r1.hibernate.connection.password = xxxxxxxx a1.sources.r1.hibernate.connection.autocommit = true # mysql驱动 a1.sources.r1.hibernate.dialect = org.hibernate.dialect.MySQL5Dialect a1.sources.r1.hibernate.connection.driver_class = com.mysql.jdbc.Driver # 存放status文件 a1.sources.r1.status.file.path = FLUME_HOME/bin a1.sources.r1.status.file.name = sqlSource.status # Custom query # 填写需要采集的数据表名{table_name},也可以使用下面的方法: a1.sources.r1.custom.query = select * from {table_name} #Sink a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId} a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port} a1.sinks.k1.kafka.producer.acks = 0 a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN a1.sinks.k1.kafka.producer.compression.type = gzip a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}"; a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 10000 a1.channels.c1.byteCapacityBufferPercentage = 20 a1.channels.c1.byteCapacity = 800000
- 启动Flume后,即可开始采集数据库中的表数据到LTS。
使用Flume采集syslog协议传输的日志上报到LTS
Syslog协议是一种用于在IP网络中传输日志消息的协议,通过Flume将syslog协议传输的日志采集并上报到LTS。以下示例中的参数介绍请参考使用KAFKA协议上报日志。
- 接收UDP日志,参考如下示例添加采集Syslog协议的conf文件。
a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type=syslogudp #host_port为syslog服务器的端口 a1.sources.r1.port = {host_port} #host_ip为syslog服务器的ip地址 a1.sources.r1.host = {host_ip} a1.sources.r1.channels = c1 a1.channels.c1.type = memory #Sink a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId} a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port} a1.sinks.k1.kafka.producer.acks = 0 a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN a1.sinks.k1.kafka.producer.compression.type = gzip a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}"; a1.sinks.k1.channel = c1
- 接收TCP日志,参考如下示例添加采集Syslog协议的conf文件。
a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type=syslogtcp #host_port为syslog服务器的端口 a1.sources.r1.port = {host_port} #host_ip为syslog服务器的ip地址 a1.sources.r1.host = {host_ip} a1.sources.r1.channels = c1 a1.channels.c1.type = memory #Sink a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId} a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port} a1.sinks.k1.kafka.producer.acks = 0 a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN a1.sinks.k1.kafka.producer.compression.type = gzip a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}"; a1.sinks.k1.channel = c1
通过Flume采集TCP/UDP协议传输的日志上报到LTS
通过Flume采集TCP/UDP协议传输的日志上报到LTS。以下示例中的参数介绍请参考使用KAFKA协议上报日志。
- 采集TCP端口日志,参考如下示例添加采集端口的conf文件。
a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = netcat a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = {host_port} a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId} a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port} a1.sinks.k1.kafka.producer.acks = 0 a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN a1.sinks.k1.kafka.producer.compression.type = gzip a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}"; a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
- 采集UDP端口日志,参考如下示例添加采集端口的conf文件。
a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = netcatudp a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = {host_port} a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId} a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port} a1.sinks.k1.kafka.producer.acks = 0 a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN a1.sinks.k1.kafka.producer.compression.type = gzip a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}"; a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
通过Flume采集SNMP协议上报的设备管理数据并发送到LTS
通过Flume采集SNMP协议上报的设备管理数据并发送到LTS。以下示例中的参数介绍请参考使用KAFKA协议上报日志。
- 监听SNMP协议通信端口号161。参考如下示例添加SNMP协议接受日志的conf。
a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = netcatudp a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 161 a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId} a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port} a1.sinks.k1.kafka.producer.acks = 0 a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN a1.sinks.k1.kafka.producer.compression.type = gzip a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}"; a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
- 监听SNMP协议陷阱(Trap)通信的端口号162,参考如下示例添加SNMP协议接受日志的conf。
a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = netcatudp a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 162 a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId} a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port} a1.sinks.k1.kafka.producer.acks = 0 a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN a1.sinks.k1.kafka.producer.compression.type = gzip a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}"; a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
使用默认拦截器处理日志
使用Flume采集器时,拦截器是简单的插件式组件,设置在Source和Channel之间。Source接收到的事件Event,在写入Channel之前,拦截器都可以进行转换或者删除这些事件。每个拦截器只处理同一个Source接收到的事件。
- 时间戳拦截器
该拦截器的作用是将时间戳插入到flume的事件报头中。如果不使用任何拦截器,flume接收到的只有message。时间戳拦截器的配置, 参数默认值描述type,类型名称timestamp,也可以使用类名的全路径preserveExisting为false。如果设置为true,若事件中报头已经存在,不会替换时间戳报头的值。source连接到时间戳拦截器的配置:
a1.sources.r1.interceptors = timestamp a1.sources.r1.interceptors.timestamp.type=timestamp a1.sources.r1.interceptors.timestamp.preserveExisting=false
- 正则过滤拦截器
在日志采集的时候,可能有一些数据是不需要的,添加过滤拦截器可以过滤掉不需要的日志,也可以根据需要收集满足正则条件的日志。参数默认值描述type,类型名称REGEX_FILTER。excludeEvents为false时默认收集匹配到的事件。如果为true,则会删除匹配到的event,收集未匹配到的。source连接到正则过滤拦截器的配置:
a1.sources.r1.interceptors = regex a1.sources.r1.interceptors.regex.type=REGEX_FILTER a1.sources.r1.interceptors.regex.regex=(today)|(Monday) a1.sources.r1.interceptors.regex.excludeEvents=false
这样配置的拦截器就只会接收日志消息中带有today或者Monday的日志。
- 搜索并替换拦截器
拦截器基于Java正则表达式提供简单的基于字符串的搜索和替换功能。配置如下:
# 拦截器别名 a1.sources.r1.interceptors = search-replace # 拦截器类型,必须是search_replace a1.sources.r1.interceptors.search-replace.type = search_replace #删除事件正文中的字符,根据正则匹配event内容 a1.sources.r1.interceptors.search-replace.searchPattern = today # 替换匹配到的event内容 a1.sources.r1.interceptors.search-replace.replaceString = yesterday # 设置字符集,默认是utf8 a1.sources.r1.interceptors.search-replace.charset = utf8
自定义拦截器处理日志
在Flume中自定义拦截器的方式主要流程如下(以java语言为例),以下示例中的FLUME_HOME表示Flume的安装路径,例如/tools/flume(仅供参考),实际配置的时候,请以用户安装Flume的实际路径为准。
- 创建MAVEN工程项目,引入Flume依赖。
根据集群中的 Flume 版本,引入 Flume 依赖,如下所示:
<dependencies> <dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-core</artifactId> <version>1.10.1</version> <scope>provided</scope> </dependency> </dependencies>
无需将该依赖打包进最后的JAR包中,故将其作用域设置为provided。
- 创建类实现拦截器接口Interceptor,并且实现相关方法。
- initialize() 方法:初始化拦截器操作,读取配置信息、建立连接等。
- intercept(Event event) 方法:用于拦截单个事件,并对事件进行处理。接收一个事件对象作为输入,并返回一个修改后的事件对象。
- intercept(List<Event> list) 方法:事件批处理,拦截事件列表,并对事件列表进行处理。
- close() 方法:关闭拦截器,在这里释放资源、关闭连接等。
import org.apache.flume.Event; import org.apache.flume.interceptor.Interceptor; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; public class TestInterceptor implements Interceptor { @Override public void initialize() { } @Override public Event intercept(Event event) { // 获取事件数据 String eventData = new String(event.getBody(), StandardCharsets.UTF_8); // 检查事件数据中是否包含指定字符串 if (eventData.contains("hello")) { // 如果包含指定字符串,则过滤掉该事件,返回 null return null; } return event; } @Override public List<Event> intercept(List<Event> events) { // 创建一个新的列表,存储处理过后的事件 List<Event> interceptedEvents = new ArrayList<>(); for (Event event : events) { Event interceptedEvent = intercept(event); if (interceptedEvent != null) { interceptedEvents.add(interceptedEvent); } } return interceptedEvents; } @Override public void close() { } }
- 构建拦截器,拦截器的创建和配置通常是通过 Builder 模式来完成的,完整的代码如下所示:
import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.interceptor.Interceptor; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; public class TestInterceptor implements Interceptor { @Override public void initialize() { } @Override public Event intercept(Event event) { // 获取事件数据 String eventData = new String(event.getBody(), StandardCharsets.UTF_8); // 检查事件数据中是否包含指定字符串 if (eventData.contains("hello")) { // 如果包含指定字符串,则过滤掉该事件,返回 null return null; } return event; } @Override public List<Event> intercept(List<Event> events) { List<Event> interceptedEvents = new ArrayList<>(); for (Event event : events) { Event interceptedEvent = intercept(event); if (interceptedEvent != null) { interceptedEvents.add(interceptedEvent); } } return interceptedEvents; } @Override public void close() { } // 拦截器构建 public static class Builder implements Interceptor.Builder { @Override public void configure(Context context) { } @Override public Interceptor build() { return new TestInterceptor(); } } }
- 转换为jar包,并且将其上传至Flume安装路径下的lib文件夹下(请以用户安装Flume的实际路径为准)。
- 编写配置文件,需要将自定义的拦截器配置进去。
拦截器全类名配置时需要注意,格式为拦截器的全类名 + $Builder。
# 拦截器配置 # 拦截器定义 a1.sources.r1.interceptors = i1 # 拦截器全类名 a1.sources.r1.interceptors.i1.type = TestInterceptor$Builder
- 运行Flume即可。
public class TestInterceptor implements Interceptor { @Override public void initialize() { } @Override public Event intercept(Event event) { // 获取事件数据 String eventData = new String(event.getBody(), StandardCharsets.UTF_8); Map<String, Object> splitMap = new HashMap<>(); String[] splitList = eventData.split(" "); for (int i = 0; i < splitList.length; i++) { splitMap.put("field" + i, splitList[i].trim()); } eventData.setBody(splitMap.toString().getBytes(StandardCharsets.UTF_8)); return event; } @Override public List<Event> intercept(List<Event> events) { List<Event> interceptedEvents = new ArrayList<>(); for (Event event : events) { Event interceptedEvent = intercept(event); if (interceptedEvent != null) { interceptedEvents.add(interceptedEvent); } } return interceptedEvents; } @Override public void close() { } }
使用外部数据源丰富日志内容并上报到LTS
Flume数据传输的基本单元,以Event的形式将数据从源头传输至目的地。Event由Header和Body两部分组成,Header用来存放该Event的一些属性,为K-V结构,Body用来存放该条数据,形式为字节数组。
有外部数据源时,如果您需要丰富日志内容,例如修改日志内容、添加字段、删除内容等操作,将修改内容添加至Event的body中,Flume才能将日志内容上报到LTS。例如使用Java自定义扩展日志内容。以下示例中的参数介绍请参考使用KAFKA协议上报日志。
import com.alibaba.fastjson.JSONObject; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.interceptor.Interceptor; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; public class TestInterceptor implements Interceptor { @Override public void initialize() { } @Override public Event intercept(Event event) { // 获取事件数据,将原数据转换为json字符串并且添加额外字段 String eventData = new String(event.getBody(), StandardCharsets.UTF_8); JSONObject object = new JSONObject(); object.put("content", eventData); object.put("workLoadType", "RelipcaSet"); eventData = object.toJSONString(); eventData.setBody(eventData.getBytes(StandardCharsets.UTF_8)); return event; } @Override public List<Event> intercept(List<Event> events) { List<Event> interceptedEvents = new ArrayList<>(); for (Event event : events) { Event interceptedEvent = intercept(event); if (interceptedEvent != null) { interceptedEvents.add(interceptedEvent); } } return interceptedEvents; } @Override public void close() { } }