更新时间:2022-12-14 GMT+08:00

使用简介

Flume是一个分布式、可靠和高可用的海量日志聚合的系统。它能够将不同数据源的海量日志数据进行高效收集、聚合、移动,最后存储到一个中心化数据存储系统中。支持在系统中定制各类数据发送方,用于收集数据。同时,提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。

Flume分为客户端和服务端,两者都是FlumeAgent。服务端对应着FlumeServer实例,直接部署在集群内部。而客户端部署更灵活,可以部署在集群内部,也可以部署在集群外。它们之间没有必然联系,都可以独立工作,并且提供的功能是一样的。

Flume客户端需要单独安装,支持将数据直接导到集群中的HDFS和Kafka等组件上,也可以结合Flume服务端一起使用。

使用流程

通过Flume采集日志的流程如下所示。

  1. 安装Flume客户端。
  2. 配置Flume服务端和客户端参数。
  3. 查看Flume客户端收集日志。
  4. 停止及卸载Flume客户端。
图1 Flume使用流程

Flume客户端介绍

Flume客户端由Source、Channel、Sink组成,数据先进入Source然后传递到Channel,最后由Sink发送到客户端外部。各模块说明见表1

表1 模块说明

名称

说明

Source

Source负责接收数据或产生数据,并将数据批量放到一个或多个Channel。Source有两种类型:数据驱动和轮询。

典型的Source样例如下:

  • 和系统集成并接收数据的Sources:Syslog、Netcat。
  • 自动生成事件数据的Sources:Exec、SEQ。
  • 用于Agent和Agent之间通信的IPC Sources:Avro。

Source必须至少和一个Channel关联。

Channel

Channel位于Source和Sink之间,用于缓存Source传递的数据,当Sink成功将数据发送到下一跳的Channel或最终数据处理端,缓存数据将自动从Channel移除。

不同类型的Channel提供的持久化水平也是不一样的:

  • Memory Channel:非持久化
  • File Channel:基于预写式日志(Write-Ahead Logging,简称WAL)的持久化实现
  • JDBC Channel:基于嵌入Database的持久化实现

Channel支持事务特性,可保证简易的顺序操作,同时可以配合任意数量的Source和Sink共同工作。

Sink

Sink负责将数据传输到下一跳或最终目的,成功完成后将数据从Channel移除。

典型的Sink样例如下:

  • 存储数据到最终目的终端Sink,比如:HDFS、Kafka
  • 自动消耗的Sinks,比如:Null Sink
  • 用于Agent和Agent之间通信的IPC sink:Avro

Sink必须关联到一个Channel。

Flume客户端可以配置成多个Source、Channel、Sink,即一个Source将数据发送给多个Channel,再由多个Sink发送到客户端外部。

Flume还支持多个Flume客户端配置级联,即Sink将数据再发送给Source。

补充说明

  1. Flume可靠性保障措施。
    • Source与Channel、Channel与Sink之间支持事务机制。
    • Sink Processor支持配置failover、load_balance机制。
      例如load_balance示例如下:
      server.sinkgroups=g1
      server.sinkgroups.g1.sinks=k1 k2
      server.sinkgroups.g1.processor.type=load_balance
      server.sinkgroups.g1.processor.backoff=true
      server.sinkgroups.g1.processor.selector=random
  2. Flume多客户端聚合级联时的注意事项。
    • 级联时需要走Avro或者Thrift协议进行级联。
    • 聚合端存在多个节点时,连接配置尽量配置均衡,不要聚合到单节点上。
  3. Flume客户端可以包含多个独立的数据流,即在一个配置文件properties.properties中配置多个Source、Channel、Sink。这些组件可以链接以形成多个流。

    例如在一个配置中配置两个数据流,示例如下:

    server.sources = source1 source2
    server.sinks = sink1 sink2
    server.channels = channel1 channel2
    
    #dataflow1 
    server.sources.source1.channels = channel1
    server.sinks.sink1.channel = channel1
    
    #dataflow2
    server.sources.source2.channels = channel2
    server.sinks.sink2.channel = channel2