文档首页/ 云日志服务 LTS/ 用户指南/ 日志接入/ 使用Flume采集器上报日志到LTS
更新时间:2024-11-11 GMT+08:00

使用Flume采集器上报日志到LTS

Flume是一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方的能力。

用户使用Flume系统采集日志,并且通过LTS侧提供的KAFKA协议方式上报日志。以下是部分常用数据采集场景示例:

  1. 使用Flume采集文本日志上报到LTS
  2. 使用Flume采集数据库表数据并且上报至LTS
  3. 使用Flume采集syslog协议传输的日志上报到LTS
  4. 通过Flume采集TCP/UDP协议传输的日志上报到LTS
  5. 通过Flume采集SNMP协议上报的设备管理数据并发送到LTS
  6. 使用默认拦截器处理日志
  7. 自定义拦截器处理日志
  8. 使用外部数据源丰富日志内容并上报到LTS

前提条件

  • 用户机器已经安装了JDK。
  • 用户已经安装Flume,并且需要在Flume中配置文件中配置JDK路径。

使用Flume采集文本日志上报到LTS

支持使用Flume采集文本日志内容上报至LTS,参考如下示例添加采集文本日志的conf文件。以下示例中的参数介绍请参考使用KAFKA协议上报日志

#Named
a1.sources = r1
a1.channels = c1
a1.sinks = k1

#Source
a1.sources.r1.type = TAILDIR
a1.sources.r1.channels = c1
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /tmp/test.txt
a1.sources.r1.fileHeader = true
a1.sources.r1.maxBatchCount = 1000

#Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100

#Sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId}
a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port}
a1.sinks.k1.kafka.producer.acks = 0
a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT
a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN
a1.sinks.k1.kafka.producer.compression.type = gzip
a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}";

#Bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

使用Flume采集数据库表数据并且上报至LTS

使用Flume采集数据库表数据并且上报至LTS,实现对表数据变动监控。以下示例中的参数介绍请参考使用KAFKA协议上报日志

  1. https://github.com/keedio/flume-ng-sql-source页面下载flume-ng-sql-source插件,转换为jar包并取名为flume-ng-sql-source.jar,打包前注意将pom文件中的flume-ng-core 版本与flume安装版本保持一致,并且将jar包放在安装Flume包路径的lib目录下面,例如FLUME_HOME/lib目录下(例子中的FLUME_HOME为Flume安装路径,仅供参考,请以实际安装路径为准)。
  2. 添加MySQL驱动到FLUME_HOME/lib目录下:

    1. 下载MySQL驱动。
      wget https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-5.1.35.tar.gz
    2. 将驱动包解压并打为jar包。
      tar xzf mysql-connector-java-5.1.35.tar.gz
    3. 将jar包存放在FLUME_HOME/lib/路径。
      cp mysql-connector-java-5.1.35-bin.jar  FLUME_HOME/lib/

  3. 添加采集MySQL的conf文件。

    # a1表示agent的名称
    # source是a1的输入源
    # channels是缓冲区
    # sinks是a1输出目的地,本例子sinks使用了kafka
    a1.channels = c1
    a1.sources = r1
    a1.sinks = k1
    
    #source
    a1.sources.r1.type = org.keedio.flume.source.SQLSource
    # 连接mysql的一系列操作,{mysql_host}改为您虚拟机的ip地址,可以通过ifconfig或者ip addr查看,{database_name}改为数据库名称
    # url中要加入?useUnicode=true&characterEncoding=utf-8&useSSL=false,否则有可能连接失败
    a1.sources.r1.hibernate.connection.url = jdbc:mysql://{mysql_host}:3306/{database_name}?useUnicode=true&characterEncoding=utf-8&useSSL=false
    # Hibernate Database connection properties
    # mysql账号,一般都是root
    a1.sources.r1.hibernate.connection.user = root
    # 填入您的mysql密码
    a1.sources.r1.hibernate.connection.password = xxxxxxxx
    a1.sources.r1.hibernate.connection.autocommit = true
    # mysql驱动
    a1.sources.r1.hibernate.dialect = org.hibernate.dialect.MySQL5Dialect
    a1.sources.r1.hibernate.connection.driver_class = com.mysql.jdbc.Driver
    # 存放status文件
    a1.sources.r1.status.file.path = FLUME_HOME/bin
    a1.sources.r1.status.file.name = sqlSource.status
    # Custom query
    # 填写需要采集的数据表名{table_name},也可以使用下面的方法:
    a1.sources.r1.custom.query = select * from {table_name}
    
    
    #Sink
    a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
    a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId}
    a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port}
    a1.sinks.k1.kafka.producer.acks = 0
    a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT
    a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN
    a1.sinks.k1.kafka.producer.compression.type = gzip
    a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}";
    
    
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 10000
    a1.channels.c1.transactionCapacity = 10000
    a1.channels.c1.byteCapacityBufferPercentage = 20
    a1.channels.c1.byteCapacity = 800000

  4. 启动Flume后,即可开始采集数据库中的表数据到LTS。

使用Flume采集syslog协议传输的日志上报到LTS

Syslog协议是一种用于在IP网络中传输日志消息的协议,通过Flume将syslog协议传输的日志采集并上报到LTS。以下示例中的参数介绍请参考使用KAFKA协议上报日志

  • 接收UDP日志,参考如下示例添加采集Syslog协议的conf文件。
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    a1.sources.r1.type=syslogudp
    #host_port为syslog服务器的端口
    a1.sources.r1.port = {host_port}
    #host_ip为syslog服务器的ip地址
    a1.sources.r1.host = {host_ip}
    a1.sources.r1.channels = c1
    
    a1.channels.c1.type = memory
    
    #Sink
    a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
    a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId}
    a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port}
    a1.sinks.k1.kafka.producer.acks = 0
    a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT
    a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN
    a1.sinks.k1.kafka.producer.compression.type = gzip
    a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}";
    a1.sinks.k1.channel = c1
  • 接收TCP日志,参考如下示例添加采集Syslog协议的conf文件。
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    a1.sources.r1.type=syslogtcp
    #host_port为syslog服务器的端口
    a1.sources.r1.port = {host_port}
    #host_ip为syslog服务器的ip地址
    a1.sources.r1.host = {host_ip}
    a1.sources.r1.channels = c1
    
    a1.channels.c1.type = memory
    
    #Sink
    a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
    a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId}
    a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port}
    a1.sinks.k1.kafka.producer.acks = 0
    a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT
    a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN
    a1.sinks.k1.kafka.producer.compression.type = gzip
    a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}";
    a1.sinks.k1.channel = c1

通过Flume采集TCP/UDP协议传输的日志上报到LTS

通过Flume采集TCP/UDP协议传输的日志上报到LTS。以下示例中的参数介绍请参考使用KAFKA协议上报日志

  • 采集TCP端口日志,参考如下示例添加采集端口的conf文件。
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = 0.0.0.0
    a1.sources.r1.port = {host_port}
    
    a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
    a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId}
    a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port}
    a1.sinks.k1.kafka.producer.acks = 0
    a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT
    a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN
    a1.sinks.k1.kafka.producer.compression.type = gzip
    a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}";
    
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
  • 采集UDP端口日志,参考如下示例添加采集端口的conf文件。
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    a1.sources.r1.type = netcatudp
    a1.sources.r1.bind = 0.0.0.0
    a1.sources.r1.port = {host_port}
    
    a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
    a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId}
    a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port}
    a1.sinks.k1.kafka.producer.acks = 0
    a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT
    a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN
    a1.sinks.k1.kafka.producer.compression.type = gzip
    a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}";
    
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1

通过Flume采集SNMP协议上报的设备管理数据并发送到LTS

通过Flume采集SNMP协议上报的设备管理数据并发送到LTS。以下示例中的参数介绍请参考使用KAFKA协议上报日志

  • 监听SNMP协议通信端口号161。参考如下示例添加SNMP协议接受日志的conf。
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    a1.sources.r1.type = netcatudp
    a1.sources.r1.bind = 0.0.0.0
    a1.sources.r1.port = 161
    
    a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
    a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId}
    a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port}
    a1.sinks.k1.kafka.producer.acks = 0
    a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT
    a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN
    a1.sinks.k1.kafka.producer.compression.type = gzip
    a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}";
    
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
  • 监听SNMP协议陷阱(Trap)通信的端口号162,参考如下示例添加SNMP协议接受日志的conf。
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    
    a1.sources.r1.type = netcatudp
    a1.sources.r1.bind = 0.0.0.0
    a1.sources.r1.port = 162
    
    
    a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
    a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId}
    a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port}
    a1.sinks.k1.kafka.producer.acks = 0
    a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT
    a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN
    a1.sinks.k1.kafka.producer.compression.type = gzip
    a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}";
    
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1

使用默认拦截器处理日志

使用Flume采集器时,拦截器是简单的插件式组件,设置在Source和Channel之间。Source接收到的事件Event,在写入Channel之前,拦截器都可以进行转换或者删除这些事件。每个拦截器只处理同一个Source接收到的事件。

  • 时间戳拦截器

    该拦截器的作用是将时间戳插入到flume的事件报头中。如果不使用任何拦截器,flume接收到的只有message。时间戳拦截器的配置, 参数默认值描述type,类型名称timestamp,也可以使用类名的全路径preserveExisting为false。如果设置为true,若事件中报头已经存在,不会替换时间戳报头的值。source连接到时间戳拦截器的配置:

    a1.sources.r1.interceptors = timestamp 
    a1.sources.r1.interceptors.timestamp.type=timestamp 
    a1.sources.r1.interceptors.timestamp.preserveExisting=false
  • 正则过滤拦截器

    在日志采集的时候,可能有一些数据是不需要的,添加过滤拦截器可以过滤掉不需要的日志,也可以根据需要收集满足正则条件的日志。参数默认值描述type,类型名称REGEX_FILTER。excludeEvents为false时默认收集匹配到的事件。如果为true,则会删除匹配到的event,收集未匹配到的。source连接到正则过滤拦截器的配置:

    a1.sources.r1.interceptors = regex 
    a1.sources.r1.interceptors.regex.type=REGEX_FILTER 
    a1.sources.r1.interceptors.regex.regex=(today)|(Monday) 
    a1.sources.r1.interceptors.regex.excludeEvents=false

    这样配置的拦截器就只会接收日志消息中带有today或者Monday的日志。

  • 搜索并替换拦截器

    拦截器基于Java正则表达式提供简单的基于字符串的搜索和替换功能。配置如下:

    # 拦截器别名
    a1.sources.r1.interceptors = search-replace
    # 拦截器类型,必须是search_replace
    a1.sources.r1.interceptors.search-replace.type = search_replace
    
    #删除事件正文中的字符,根据正则匹配event内容
    a1.sources.r1.interceptors.search-replace.searchPattern = today
    # 替换匹配到的event内容
    a1.sources.r1.interceptors.search-replace.replaceString = yesterday
    # 设置字符集,默认是utf8
    a1.sources.r1.interceptors.search-replace.charset = utf8

自定义拦截器处理日志

在Flume中自定义拦截器的方式主要流程如下(以java语言为例),以下示例中的FLUME_HOME表示Flume的安装路径,例如/tools/flume(仅供参考),实际配置的时候,请以用户安装Flume的实际路径为准。

  1. 创建MAVEN工程项目,引入Flume依赖。

    根据集群中的 Flume 版本,引入 Flume 依赖,如下所示:

    <dependencies>
            <dependency>
                <groupId>org.apache.flume</groupId>
                <artifactId>flume-ng-core</artifactId>
                <version>1.10.1</version>
                <scope>provided</scope>
            </dependency>
        </dependencies>

    无需将该依赖打包进最后的JAR包中,故将其作用域设置为provided。

  2. 创建类实现拦截器接口Interceptor,并且实现相关方法。

    • initialize() 方法:初始化拦截器操作,读取配置信息、建立连接等。
    • intercept(Event event) 方法:用于拦截单个事件,并对事件进行处理。接收一个事件对象作为输入,并返回一个修改后的事件对象。
    • intercept(List<Event> list) 方法:事件批处理,拦截事件列表,并对事件列表进行处理。
    • close() 方法:关闭拦截器,在这里释放资源、关闭连接等。
    import org.apache.flume.Event;
    import org.apache.flume.interceptor.Interceptor;
    
    import java.nio.charset.StandardCharsets;
    import java.util.ArrayList;
    import java.util.List;
    
    public class TestInterceptor implements Interceptor {
        @Override
        public void initialize() {
    
        }
        @Override
        public Event intercept(Event event) {
    
            // 获取事件数据
            String eventData = new String(event.getBody(), StandardCharsets.UTF_8);
            // 检查事件数据中是否包含指定字符串
            if (eventData.contains("hello")) {
                // 如果包含指定字符串,则过滤掉该事件,返回 null
                return null;
            }
    
            return event;
        }
    
        @Override
        public List<Event> intercept(List<Event> events) {
            // 创建一个新的列表,存储处理过后的事件
            List<Event> interceptedEvents = new ArrayList<>();
            for (Event event : events) {
                Event interceptedEvent = intercept(event);
                if (interceptedEvent != null) {
                    interceptedEvents.add(interceptedEvent);
                }
            }
            return interceptedEvents;
        }
    
        @Override
        public void close() {
    
        }
    
    }

  3. 构建拦截器,拦截器的创建和配置通常是通过 Builder 模式来完成的,完整的代码如下所示:

    import org.apache.flume.Context;
    import org.apache.flume.Event;
    import org.apache.flume.interceptor.Interceptor;
    
    import java.nio.charset.StandardCharsets;
    import java.util.ArrayList;
    import java.util.List;
    
    public class TestInterceptor implements Interceptor {
        @Override
        public void initialize() {
        }
        @Override
        public Event intercept(Event event) {
            // 获取事件数据
            String eventData = new String(event.getBody(), StandardCharsets.UTF_8);
            // 检查事件数据中是否包含指定字符串
            if (eventData.contains("hello")) {
                // 如果包含指定字符串,则过滤掉该事件,返回 null
                return null;
            }
            return event;
        }
        @Override
        public List<Event> intercept(List<Event> events) {
            List<Event> interceptedEvents = new ArrayList<>();
            for (Event event : events) {
                Event interceptedEvent = intercept(event);
                if (interceptedEvent != null) {
                    interceptedEvents.add(interceptedEvent);
                }
            }
            return interceptedEvents;
        }
    
        @Override
        public void close() {
    
        }
    
        // 拦截器构建
        public static class Builder implements Interceptor.Builder {
    
            @Override
            public void configure(Context context) {
    
            }
    
            @Override
            public Interceptor build() {
                return new TestInterceptor();
            }
    
        }
    
    }

  4. 转换为jar包,并且将其上传至Flume安装路径下的lib文件夹下(请以用户安装Flume的实际路径为准)。
  5. 编写配置文件,需要将自定义的拦截器配置进去。

    拦截器全类名配置时需要注意,格式为拦截器的全类名 + $Builder。

    # 拦截器配置
    # 拦截器定义
    a1.sources.r1.interceptors = i1
    # 拦截器全类名
    a1.sources.r1.interceptors.i1.type = TestInterceptor$Builder

  6. 运行Flume即可。
KV解析日志:用空格分隔字符串并且转换为Map类型字符串。
public class TestInterceptor implements Interceptor {
    @Override
    public void initialize() {
    }

    @Override
    public Event intercept(Event event) {
        // 获取事件数据
        String eventData = new String(event.getBody(), StandardCharsets.UTF_8);
        Map<String, Object> splitMap = new HashMap<>();
        String[] splitList = eventData.split(" ");
        for (int i = 0; i < splitList.length; i++) {
            splitMap.put("field" + i, splitList[i].trim());
        }
        eventData.setBody(splitMap.toString().getBytes(StandardCharsets.UTF_8));
        return event;
    }

    @Override
    public List<Event> intercept(List<Event> events) {
        List<Event> interceptedEvents = new ArrayList<>();
        for (Event event : events) {
            Event interceptedEvent = intercept(event);
            if (interceptedEvent != null) {
                interceptedEvents.add(interceptedEvent);
            }
        }
        return interceptedEvents;
    }

    @Override
    public void close() {
    }
}

使用外部数据源丰富日志内容并上报到LTS

Flume数据传输的基本单元,以Event的形式将数据从源头传输至目的地。Event由Header和Body两部分组成,Header用来存放该Event的一些属性,为K-V结构,Body用来存放该条数据,形式为字节数组。

有外部数据源时,如果您需要丰富日志内容,例如修改日志内容、添加字段、删除内容等操作,将修改内容添加至Event的body中,Flume才能将日志内容上报到LTS。例如使用Java自定义扩展日志内容。以下示例中的参数介绍请参考使用KAFKA协议上报日志

import com.alibaba.fastjson.JSONObject;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;

public class TestInterceptor implements Interceptor {
    @Override
    public void initialize() {
    }

    @Override
    public Event intercept(Event event) {
        // 获取事件数据,将原数据转换为json字符串并且添加额外字段
        String eventData = new String(event.getBody(), StandardCharsets.UTF_8);
        JSONObject object = new JSONObject();
        object.put("content", eventData);
        object.put("workLoadType", "RelipcaSet");
        eventData = object.toJSONString();
        eventData.setBody(eventData.getBytes(StandardCharsets.UTF_8));
        return event;
    }

    @Override
    public List<Event> intercept(List<Event> events) {
        List<Event> interceptedEvents = new ArrayList<>();
        for (Event event : events) {
            Event interceptedEvent = intercept(event);
            if (interceptedEvent != null) {
                interceptedEvents.add(interceptedEvent);
            }
        }
        return interceptedEvents;
    }

    @Override
    public void close() {
    }
}