更新时间:2024-07-24 GMT+08:00

Flume业务模型配置说明

业务模型配置指导

本任务旨在提供Flume常用模块的性能差异,用于指导用户进行合理的Flume业务配置,避免出现前端Source和后端Sink性能不匹配进而导致整体业务性能不达标的场景。

本任务只针对于单通道的场景进行比较说明。

Flume业务配置及模块选择过程中,一般要求Sink的极限吞吐量需要大于Source的极限吞吐量,否则在极限负载的场景下,Source往Channel的写入速度大于Sink从Channel取出的速度,从而导致Channel频繁被写满,进而影响性能表现。

Avro Source和Avro Sink一般都是成对出现,用于多个Flume Agent间进行数据中转,因此一般场景下Avro Source和Avro Sink都不会成为性能瓶颈。

模块间性能

根据模块间性能对比,可以看到对于前端是SpoolDir Source的场景下,Kafka Sink和HDFS Sink都能满足吞吐量要求,但是HBase Sink由于自身写入性能较低的原因,会成为性能瓶颈,会导致数据都积压在Channel中。但是如果有必须使用HBase Sink或者其他性能容易成为瓶颈的Sink的场景时,可以选择使用Channel Selector或者Sink Group来满足性能要求。

Channel Selector

Channel Selector可以允许一个Source对接多个Channel,通过选择不同的Selector类型来将Source的数据进行分流或者复制,目前Flume提供的Channel Selector有两种:Replicating和Multiplexing。

Replicating:表示Source的数据同步发送给所有Channel。

Multiplexing:表示根据Event中的Header的指定字段的值来进行判断,从而选择相应的Channel进行发送,从而起到根据业务类型进行分流的目的。

  • Replicating配置样例:
    client.sources = kafkasource
    client.channels = channel1 channel2
    client.sources.kafkasource.type = org.apache.flume.source.kafka.KafkaSource
    client.sources.kafkasource.kafka.topics = topic1,topic2
    client.sources.kafkasource.kafka.consumer.group.id = flume
    client.sources.kafkasource.kafka.bootstrap.servers = 10.69.112.108:21007
    client.sources.kafkasource.kafka.security.protocol = SASL_PLAINTEXT
    client.sources.kafkasource.batchDurationMillis = 1000
    client.sources.kafkasource.batchSize = 800
    client.sources.kafkasource.channels = channel1 c el2
    
    client.sources.kafkasource.selector.type = replicating
    client.sources.kafkasource.selector.optional = channel2
    表1 Replicating配置样例参数说明

    选项名称

    默认值

    描述

    Selector.type

    replicating

    Selector类型,应配置为replicating

    Selector.optional

    -

    可选Channel,可以配置为列表

  • Multiplexing配置样例:
    client.sources = kafkasource
    client.channels = channel1 channel2
    client.sources.kafkasource.type = org.apache.flume.source.kafka.KafkaSource
    client.sources.kafkasource.kafka.topics = topic1,topic2
    client.sources.kafkasource.kafka.consumer.group.id = flume
    client.sources.kafkasource.kafka.bootstrap.servers = 10.69.112.108:21007
    client.sources.kafkasource.kafka.security.protocol = SASL_PLAINTEXT
    client.sources.kafkasource.batchDurationMillis = 1000
    client.sources.kafkasource.batchSize = 800
    client.sources.kafkasource.channels = channel1 channel2
    
    client.sources.kafkasource.selector.type = multiplexing
    client.sources.kafkasource.selector.header = myheader
    client.sources.kafkasource.selector.mapping.topic1 = channel1
    client.sources.kafkasource.selector.mapping.topic2 = channel2
    client.sources.kafkasource.selector.default = channel1
    表2 Multiplexing配置样例参数说明

    选项名称

    默认值

    描述

    Selector.type

    replicating

    Selector类型,应配置为multiplexing

    Selector.header

    Flume.selector.header

    -

    Multiplexing类型的Selector的样例中,选择Event中Header名称为topic的字段来进行判断,当Header中topic字段的值为topic1时,向channel1发送该Event,当Header中topic字段的值为topic2时,向channel2发送该Event。

    这种Selector需要借助Source中Event的特定Header来进行Channel的选择,需要根据业务场景选择合理的Header来进行数据分流。

Sink Group

当后端单Sink性能不足、需要高可靠性保证或者异构输出时可以使用Sink Group来将指定的Channel和多个Sink对接,从而满足相应的使用场景。目前Flume提供了两种Sink Processor用于对Sink Group中的Sink进行管理:Load Balancing和Failover。

Failover:表示在Sink Group中同一时间只有一个Sink处于活跃状态,其他Sink作为备份处于非活跃状态,当活跃状态的Sink故障时,根据优先级从非活跃状态的Sink中选择一个来接管业务,保证数据不会丢失,多用于高可靠性场景。

Load Balancing:表示在Sink Group中所有Sink都处于活跃状态,每个Sink都会从Channel中去获取数据并进行处理,并且保证在运行过程中该Sink Group的所有Sink的负载是均衡的,多用于性能提升场景。

  • Load Balancing配置样例:
    client.sources = source1  
    client.sinks = sink1 sink2
    client.channels = channel1
    
    client.sinkgroups = g1
    client.sinkgroups.g1.sinks = sink1 sink2
    client.sinkgroups.g1.processor.type = load_balance
    client.sinkgroups.g1.processor.backoff = true
    client.sinkgroups.g1.processor.selector = random
    
    client.sinks.sink1.type = logger
    client.sinks.sink1.channel = channel1
    
    client.sinks.sink2.type = logger
    client.sinks.sink2.channel = channel1
    表3 Load Balancing配置样例参数说明

    选项名称

    默认值

    描述

    sinks

    -

    Sink Group的sink列表,多个以空格分隔

    processor.type

    default

    Processor的类型,应配置为load_balance

    processor.backoff

    false

    是否以指数的形式退避失败的Sinks

    processor.selector

    round_robin

    选择机制。必须是round_robin,random或者自定义的类,且该类继承了AbstractSinkSelector

    processor.selector.maxTimeOut

    30000

    屏蔽故障sink的时间,默认是30000毫秒

  • Failover配置样例:
    client.sources = source1       
    client.sinks = sink1 sink2
    client.channels = channel1
    
    client.sinkgroups = g1
    client.sinkgroups.g1.sinks = sink1 sink2
    client.sinkgroups.g1.processor.type = failover
    client.sinkgroups.g1.processor.priority.sink1 = 10
    client.sinkgroups.g1.processor.priority.sink2 = 5
    client.sinkgroups.g1.processor.maxpenalty = 10000
    
    client.sinks.sink1.type = logger
    client.sinks.sink1.channel = channel1
    
    client.sinks.sink2.type = logger
    client.sinks.sink2.channel = channel1
    表4 Failover配置样例参数说明

    选项名称

    默认值

    描述

    sinks

    -

    Sink Group的sink列表,多个以空格分隔

    processor.type

    default

    Processor的类型,应配置为failover

    processor.priority.<sinkName>

    -

    优先级值。<sinkName> 必须是sinks中有定义的。优先级值高Sink会更早被激活。值越大,优先级越高。:多个sinks的话,优先级的值不要相同,如果优先级相同的话,只会有一个生效。

    processor.maxpenalty

    30000

    失败的Sink最大的退避时间(单位:毫秒)

Interceptors

Flume的拦截器(Interceptor)支持在数据传输过程中修改或丢弃传输的基本单元Event。用户可以通过在配置中指定Flume内建拦截器的类名列表,也可以开发自定义的拦截器来实现Event的修改或丢弃。Flume内建支持的拦截器如下表所示,本章节会选取一个较为复杂的作为示例。其余的用户可以根据需要自行配置使用。

  • 拦截器用在Flume的Source、Channel之间,大部分的Source都带有Interceptor参数。用户可以依据需要配置。
  • Flume支持一个Source配置多个拦截器,各拦截器名称用空格分开。
  • 指定拦截器的顺序就是它们被调用的顺序。
  • 使用拦截器在Header中插入的内容,都可以在Sink中读取并使用。
表5 Flume内建支持的拦截器类型

拦截器类型

简要描述

Timestamp Interceptor

该拦截器会在Event的Header中插入一个时间戳。

Host Interceptor

该拦截器会在Event的Header中插入当前Agent所在节点的IP或主机名。

Remove Header Interceptor

该拦截器会依据Header中包含的符合正则匹配的字符串,丢弃掉对应的Event。

UUID Interceptor

该拦截器会为每个Event的Header生成一个UUID字符串。

Search and Replace Interceptor

该拦截器基于Java正则表达式提供简单的基于字符串的搜索和替换功能。与Java Matcher.replaceAll() 的规则相同。

Regex Filtering Interceptor

该拦截器通过将Event的Body体解释为文本文件,与配置的正则表达式进行匹配来选择性的过滤Event。提供的正则表达式可用于排除或包含事件。

Regex Extractor Interceptor

该拦截器使用正则表达式抽取原始events中的内容,并将该内容加入events的header中。

下面以Regex Filtering Interceptor 为例说明Interceptor使用(其余的可参考官网配置):
表6 Regex Filtering Interceptor配置参数说明

选项名称

默认值

描述

type

-

组件类型名称,必须写为regex_filter。

regex

-

用于匹配事件的正则表达式。

excludeEvents

false

默认收集匹配到的Event。设置为true,则会删除匹配的Event,保留不匹配的。

配置示例(为了方便观察,此模型使用了netcat tcp作为Source源,logger作为Sink)。配置好如下参数后,在Linux的配置的主机节点上执行Linux命令“telnet 主机名或IP 44444”,并任意敲入符合正则和不符合正则的字符串。会在日志中观察到,只有匹配到的字符串被传输了。
#define the source、channel、sink
server.sources = r1

server.channels = c1
server.sinks = k1

#config the source
server.sources.r1.type = netcat
server.sources.r1.bind = ${主机IP}
server.sources.r1.port = 44444
server.sources.r1.interceptors= i1
server.sources.r1.interceptors.i1.type= regex_filter
server.sources.r1.interceptors.i1.regex= (flume)|(myflume)
server.sources.r1.interceptors.i1.excludeEvents= false
server.sources.r1.channels = c1

#config the channel
server.channels.c1.type = memory
server.channels.c1.capacity = 1000
server.channels.c1.transactionCapacity = 100
#config the sink
server.sinks.k1.type = logger
server.sinks.k1.channel = c1
  • Sink的BatchSize参数必须小于Channel的transactionCapacity。
  • 集群Flume配置工具界面篇幅有限,Source、Channel、Sink只展示部分参数,详细请参考如下常用配置。
  • 集群Flume配置工具界面上所展示Customer Source、Customer Channel及Customer Sink需要用户根据自己开发的代码来进行配置,下述常用配置不再展示。

常用Source配置

  • Avro Source

    Avro Source监测Avro端口,接收外部Avro客户端数据并放入配置的Channel中。常用配置如下表所示:

    表7 Avro Source常用配置

    参数

    默认值

    描述

    channels

    -

    与之相连的channel,可以配置多个。

    type

    avro

    avro source的类型,必须为avro。

    bind

    -

    监测主机名/IP。

    port

    -

    绑定监测端口,该端口需未被占用。

    threads

    -

    source工作的最大线程数。

    compression-type

    none

    消息压缩格式:“none”或“deflate”。“none”表示不压缩,“deflate”表示压缩。

    compression-level

    6

    数据压缩级别(1-9),数值越高,压缩率越高。

    ssl

    false

    是否使用SSL加密。设置为true时还必须指定“密钥(keystore)”和“密钥存储密码(keystore-password)”。

    truststore-type

    JKS

    Java信任库类型,“JKS”或“PKCS12”。

    说明:

    JKS的密钥库和私钥采用不同的密码进行保护,而PKCS12的密钥库和私钥采用相同密码进行保护。

    truststore

    -

    Java信任库文件。

    truststore-password

    -

    Java信任库密码。

    keystore-type

    JKS

    ssl启用后密钥存储类型,“JKS”或“PKCS12”。

    说明:

    JKS的密钥库和私钥用不同的密码进行保护,而PKCS12的密钥库和私钥用相同密码进行保护。

    keystore

    -

    ssl启用后密钥存储文件路径,开启ssl后,该参数必填。

    keystore-password

    -

    ssl启用后密钥存储密码,开启ssl后,该参数必填。

    trust-all-certs

    false

    是否关闭SSL server证书检查。设置为“true”时将不会检查远端source的SSL server证书,不建议在生产中使用。

    exclude-protocols

    SSLv3

    排除的协议列表,用空格分开。默认排除SSLv3协议。

    ipFilter

    false

    是否开启ip过滤。

    ipFilter.rules

    -

    定义N网络的ipFilters,多个主机或IP地址用逗号分割。ipFilter设置为“true”时,配置规则有允许和禁止两种,配置格式如下:

    ipFilterRules=allow:ip:127.*, allow:name:localhost, deny:ip:*

  • SpoolDir Source

    Spool Dir Source监控并传输目录下新增的文件,可实现实时数据传输。常用配置如下表所示:

    表8 Spooling Directory Source常用配置

    参数

    默认值

    描述

    channels

    -

    与之相连的channel,可以配置多个。

    type

    spooldir

    spooling source的类型,必须设置为spooldir。

    spoolDir

    -

    Spooldir source的监控目录,flume运行用户需要对该目录具有可读可写可执行权限。

    monTime

    0(不开启)

    线程监控阈值,更新时间超过阈值后,重新启动该Source,单位:秒。

    fileSuffix

    .COMPLETED

    文件传输完成后添加的后缀。

    deletePolicy

    never

    文件传输完成后源文件删除策略,never或immediate。“never”表示不删除已完成传输的源文件,“immediate”表示传输完成后立刻删除源文件。

    ignorePattern

    ^$

    忽略文件的正则表达式表示。默认为“^$”,表示忽略空格。

    includePattern

    ^.*$

    包含文件的正则表达式表示。可以与ignorePattern同时使用,如果一个文件既满足ignorePattern也满足includePattern,则该文件会被忽略。另外,以“.”开头的文件不会被过滤。

    trackerDir

    .flumespool

    传输过程中元数据存储路径。

    batchSize

    1000

    批次写入Channel的Event数量。

    decodeErrorPolicy

    FAIL

    编码错误策略。

    说明:

    如果文件中有编码错误,请配置“decodeErrorPolicy”为“REPLACE”或“IGNORE”,Flume遇到编码错误将跳过编码错误,继续采集后续日志。

    deserializer

    LINE

    文件解析器,值为“LINE”“BufferedLine”

    • 配置为“LINE”时,对从文件读取的字符逐个转码。
    • 配置为“BufferedLine”时,对文件读取的一行或多行的字符进行批量转码,性能更优。

    deserializer.maxLineLength

    2048

    按行解析最大长度。

    deserializer.maxBatchLine

    1

    按行解析最多行数,如果行数设置为多行,maxLineLength也应该设置为相应的倍数。

    说明:

    用户设置Interceptor时,需要考虑多行合并后的场景,否则会造成数据丢失。如果Interceptor无法处理多行合并场景,请将该配置设置为1。

    selector.type

    replicating

    选择器类型,“replicating”或“multiplexing”。“replicating”表示将数据复制多份,分别传递给每一个channel,每个channel接收到的数据都是相同的,而“multiplexing”表示根据event中header的value来选择特定的channel,每个channel中的数据是不同的。

    interceptors

    -

    拦截器。多个拦截器用空格分开。

    inputCharset

    UTF-8

    读取文件的编码格式。须与读取数据源文件编码格式相同,否则字符解析可能会出错。

    fileHeader

    false

    是否把文件名(包含路径)添加到event的header中。

    fileHeaderKey

    -

    设置header中数据存储结构为<key,value>模式,需要fileHeaderKey与fileHeader配合使用。如果fileHeader设置为true,可参考如下示例。

    示例:将fileHeaderKey定义为file,当读取到文件名为/root/a.txt的内容时,header中以file=/root/a.txt的形式存在。

    basenameHeader

    false

    是否把文件名(不包含路径)添加到event的header中。

    basenameHeaderKey

    -

    设置header中数据存储结构为<key,value>模式,需要basenameHeaderKey与basenameHeader配合使用。如果basenameHeader设置为true,可参考如下示例。

    示例:将basenameHeaderKey定义为file,当读取到文件名为a.txt的内容时,header中以file=a.txt的形式存在。

    pollDelay

    500

    轮询监控目录下新文件时的时延。单位:毫秒。

    recursiveDirectorySearch

    false

    是否监控配置的目录下子目录中的新文件。

    consumeOrder

    oldest

    监控目录下文件的消耗次序。如果配置为oldest或者youngest,会根据监控目录下文件的最后修改时间来决定,当目录下有大量文件时,会消耗较长时间去寻找oldest或者youngest的文件。需要注意的是,如果配置为random,创建比较早的文件有可能长时间未被读取。如果配置为oldest或者youngest,那么进程会需要较多时间来查找最新的或最旧的文件。可选值:random,youngest,oldest。

    maxBackoff

    4000

    当Channel满了以后,尝试再次去写Channel所等待的最大时间。超过这个时间,则会发生异常。对应的Source会以一个较小的时间开始,然后每尝试一次,该时间数字指数增长直到达到当前指定的值,如果还不能成功写入,则认为失败。时间单位:秒。

    emptyFileEvent

    true

    是否采集空文件信息发送到Sink端,默认值为true,表示将空文件信息发送到Sink端。该参数只对HDFS Sink有效,其他Sink该参数无效。以HDFS Sink为例,当参数为true时,如果spoolDir路径下存在空文件,那么HDFS的hdfs.path路径下就会创建一个同名的空文件。

    SpoolDir Source在按行读取过程中会忽略掉每一个event的最后一个换行符,该换行符所占用的数据量指标不会被Flume统计。

  • Kafka Source

    Kafka Source从Kafka的topic中消费数据,可以设置多个Source消费同一个topic的数据,每个Source会消费topic的不同partitions。常用配置如下表所示:

    表9 Kafka Source常用配置

    参数

    默认值

    描述

    channels

    -

    与之相连的channel,可以配置多个。

    type

    org.apache.flume.source.kafka.KafkaSource

    kafka source的类型,必须设置为org.apache.flume.source.kafka.KafkaSource。

    kafka.bootstrap.servers

    -

    Kafka的bootstrap地址端口列表。如果集群已安装Kafka并且配置已经同步,服务端可以不配置此项,默认值为Kafka集群中所有的broker列表。客户端必须配置该项,多个值用逗号分隔。端口和安全协议的匹配规则必须为:21007匹配安全模式(SASL_PLAINTEXT),9092匹配普通模式(PLAINTEXT)。

    kafka.topics

    -

    订阅的Kafka topic列表,用逗号分隔。

    kafka.topics.regex

    -

    符合正则表达式的topic会被订阅,优先级高于“kafka.topics”,如果存在将覆盖“kafka.topics”。

    monTime

    0(不开启)

    线程监控阈值,更新时间超过阈值后,重新启动该Source,单位:秒。

    nodatatime

    0(不开启)

    告警阈值,从Kafka中订阅不到数据的时长超过阈值时发送告警,单位:秒。该参数可在配置文件properties.properties进行设置。

    batchSize

    1000

    批次写入Channel的Event数量。

    batchDurationMillis

    1000

    批次消费topic数据的最大时长,单位:ms。

    keepTopicInHeader

    false

    是否在Event Header中保存topic。设置为true,则Kafka Sink配置的topic将无效。

    setTopicHeader

    true

    当设置为true时,会将“topicHeader”中定义的topic名称存储到Header中。

    topicHeader

    topic

    当setTopicHeader属性设置为true,此参数用于定义存储接收的topic名称。如果与Kafka Sink的topicHeader属性结合使用,应该注意,避免将消息循环发送到同一主题。

    useFlumeEventFormat

    false

    默认情况下,event会以字节的形式从kafka topic传递到event的body体中。设置为true,则会以Flume的Avro二进制格式来读取Event。与KafkaSink或KakfaChannel 中同名的parseAsFlumeEvent参数一起使用时,会保留从数据源产生的任何设定的Header。

    keepPartitionInHeader

    false

    是否在Event Header中保存partitionID。设置为true,则Kafka Sink将写入对应的Partition。

    kafka.consumer.group.id

    flume

    Kafka消费组ID。多个源或代理中设置相同的ID表示它们是同一个consumer group。

    kafka.security.protocol

    SASL_PLAINTEXT

    Kafka安全协议,普通模式集群下须配置为“PLAINTEXT”。端口和安全协议的匹配规则必须为:21007匹配安全模式(SASL_PLAINTEXT),9092匹配普通模式(PLAINTEXT)。

    Other Kafka Consumer Properties

    -

    其他Kafka配置,可以接受任意Kafka支持的消费配置,配置需要加前缀“kafka.”。

  • Taildir Source

    Taildir Source监控目录下文件的变化并自动读取文件内容,可实现实时数据传输,常用配置如下表所示:

    表10 Taildir Source常用配置

    参数

    默认值

    描述

    channels

    -

    与之相连的channel,可以配置多个。

    type

    TAILDIR

    taildir source的类型,必须为TAILDIR。

    filegroups

    -

    设置采集文件目录分组名字,分组名字中间使用空格间隔。

    filegroups.<filegroupName>

    -

    文件路径,需要配置为绝对路径。

    filegroups.<filegroupName>.parentDir

    -

    父目录,需要配置为绝对路径。

    filegroups.<filegroupName>.filePattern

    -

    相对父目录的文件路径,可以包含目录,支持正则表达式,须与父目录联合使用。

    positionFile

    -

    传输过程中元数据存储路径。

    headers.<filegroupName>.<headerKey>

    -

    设置某一个分组采集数据时event中的key-value值。

    byteOffsetHeader

    false

    是否在每一个event头中携带该event在源文件中的位置信息。设置为true,则该信息保存在byteoffset变量中。

    maxBatchCount

    Long.MAX_VALUE

    控制从一个文件中连续读取的最大批次。如果监控目录会一直读取多个文件,且其中一个文件以非常快的速率在写入,那么其他文件可能会无法处理。因为高速写入的这个文件会陷入无限读取的循环中。这种情况下,应该降低此值。

    skipToEnd

    false

    Flume在重启后是否直接定位到文件最新的位置处读取最新的数据。设置为true,则重启后直接定位到文件最新位置读取最新数据。

    idleTimeout

    120000

    设置读取文件的空闲时间,单位:毫秒,如果在该时间内文件内容没有变更,关闭掉该文件,关闭后如果该文件有数据写入,重新打开并读取数据。

    writePosInterval

    3000

    设置将元数据写入到文件的周期,单位:毫秒。

    batchSize

    1000

    批次写入Channel的Event数量。

    monTime

    0(不开启)

    线程监控阈值,更新时间超过阈值后,重新启动该Source,单位:秒

    fileHeader

    false

    是否把文件名(包含路径)添加到event的header中。

    fileHeaderKey

    file

    设置header中数据存储结构为<key,value>模式,需要fileHeaderKey与fileHeader配合使用。如果fileHeader设置为true,可参考如下示例。

    示例:将fileHeaderKey定义为file,当读取到文件名为/root/a.txt的内容时,header中以file=/root/a.txt的形式存在。

  • Http Source

    Http Source接收外部HTTP客户端发送过来的数据,并放入配置的Channel中,常用配置如下表所示:

    表11 Http Source常用配置

    参数

    默认值

    描述

    channels

    -

    与之相连的channel,可以配置多个。

    type

    http

    http source的类型,必须为http。

    bind

    -

    监测主机名/IP。

    port

    -

    绑定监测端口,该端口需未被占用。

    handler

    org.apache.flume.source.http.JSONHandler

    http请求的消息解析方式,支持Json格式解析(org.apache.flume.source.http.JSONHandler)和二进制Blob块解析(org.apache.flume.sink.solr.morphline.BlobHandler)。

    handler.*

    -

    设置handler的参数。

    exclude-protocols

    SSLv3

    排除的协议列表,用空格分开。默认排除SSLv3协议。

    include-cipher-suites

    -

    包含的协议列表,用空格分开。如果设置为空,则默认支持所有协议。

    enableSSL

    false

    http协议是否启用SSL。设置为true时还必须指定“密钥(keystore)”和“密钥存储密码(keystore-password)”。

    keystore-type

    JKS

    Keystore类型,可以为JKS或者PKCS12。

    keystore

    -

    http启用SSL后设置keystore的路径。

    keystorePassword

    -

    http启用SSL后设置keystore的密码。

  • Thrift Source

    Thrift Source监测thrift端口,接收外部Thrift客户端数据并放入配置的Channel中。常用配置如下表所示:

    参数

    默认值

    描述

    channels

    -

    与之相连的channel,可以配置多个。

    type

    thrift

    thrift source的类型,必须设置为thrift。

    bind

    -

    监测主机名/IP。

    port

    -

    绑定监测端口,该端口需未被占用。

    threads

    -

    允许运行的最大的worker线程数目。

    kerberos

    false

    是否启用Kerberos认证。

    agent-keytab

    -

    服务端使用的keytab文件地址,必须使用机机账号。建议使用Flume服务安装目录下flume/conf/flume_server.keytab。

    agent-principal

    -

    服务端使用的安全用户的Principal,必须使用机机账户。建议使用Flume服务默认用户flume_server/hadoop.<系统域名>@<系统域名>

    说明:

    “flume_server/hadoop.<系统域名>为用户名,用户的用户名所包含的系统域名所有字母为小写。例如“本端域”参数为“9427068F-6EFA-4833-B43E-60CB641E5B6C.COM”,用户名为“flume_server/hadoop.9427068f-6efa-4833-b43e-60cb641e5b6c.com”。

    compression-type

    none

    消息压缩格式:“none”或“deflate”。“none”表示不压缩,“deflate”表示压缩。

    ssl

    false

    是否使用SSL加密。设置为true时还必须指定“密钥(keystore)”和“密钥存储密码(keystore-password)”。

    keystore-type

    JKS

    SSL启用后密钥存储类型。

    keystore

    -

    SSL启用后密钥存储文件路径,开启SSL后,该参数必填。

    keystore-password

    -

    SSL启用后密钥存储密码,开启ssl后,该参数必填。

    truststore-type

    JKS

    Java信任库类型,“JKS”或“PKCS12”。

    说明:

    JKS的密钥库和私钥采用不同的密码进行保护,而PKCS12的密钥库和私钥采用相同密码进行保护。

    truststore

    -

    Java信任库文件。

    truststore-password

    -

    Java信任库密码。

常用Channel配置

  • Memory Channel

    Memory Channel使用内存作为缓存区,Events存放在内存队列中。常用配置如下表所示:

    表12 Memory Channel常用配置

    参数

    默认值

    描述

    type

    -

    memory channel的类型,必须设置为memory。

    capacity

    10000

    缓存在channel中的最大Event数。

    transactionCapacity

    1000

    每次存取的最大Event数。

    说明:
    • 此参数值需要大于source和sink的batchSize。
    • 事务缓存容量必须小于或等于Channel缓存容量。

    channelfullcount

    10

    channel full次数,达到该次数后发送告警。

    keep-alive

    3

    当事务缓存或Channel缓存满时,Put、Take线程等待时间。单位:秒。

    byteCapacity

    JVM最大内存的80%

    channel中最多能容纳所有event body的总字节数,默认是 JVM最大可用内存(-Xmx )的80%,单位:bytes。

    byteCapacityBufferPercentage

    20

    channel中字节容量百分比(%)。

  • File Channel

    File Channel使用本地磁盘作为缓存区,Events存放在设置的dataDirs配置项文件夹中。常用配置如下表所示:

    表13 File Channel常用配置

    参数

    默认值

    描述

    type

    -

    file channel的类型,必须设置为file。

    checkpointDir

    ${BIGDATA_DATA_HOME}/hadoop/data1~N/flume/checkpoint

    说明:

    此路径随自定义数据路径变更。

    检查点存放路径。

    dataDirs

    ${BIGDATA_DATA_HOME}/hadoop/data1~N/flume/data

    说明:

    此路径随自定义数据路径变更。

    数据缓存路径,设置多个路径可提升性能,中间用逗号分开。

    maxFileSize

    2146435071

    单个缓存文件的最大值,单位:bytes。

    minimumRequiredSpace

    524288000

    缓冲区空闲空间最小值,单位:bytes。

    capacity

    1000000

    缓存在channel中的最大Event数。

    transactionCapacity

    10000

    每次存取的最大Event数。

    说明:
    • 此参数值需要大于source和sink的batchSize。
    • 事务缓存容量必须小于或等于Channel缓存容量。

    channelfullcount

    10

    channel full次数,达到该次数后发送告警。

    useDualCheckpoints

    false

    是否备份检查点。设置为“true”时,必须设置backupCheckpointDir的参数值。

    backupCheckpointDir

    -

    备份检查点路径。

    checkpointInterval

    30000

    检查点间隔时间,单位:秒。

    keep-alive

    3

    当事务缓存或Channel缓存满时,Put、Take线程等待时间。单位:秒。

    use-log-replay-v1

    false

    是否启用旧的回复逻辑。

    use-fast-replay

    false

    是否使用队列回复。

    checkpointOnClose

    true

    channel关闭时是否创建检查点。

  • Memory File Channel

    Memory File Channel同时使用内存和本地磁盘作为缓存区,消息可持久化,性能优于File Channel,接近Memory Channel的性能。此Channel目前处于试验阶段,可靠性不够高,不建议在生产环境使用。常用配置如下表所示:

    表14 Memory File Channel常用配置

    参数

    默认值

    描述

    type

    org.apache.flume.channel.MemoryFileChannel

    memory file channel的类型,必须设置为“org.apache.flume.channel.MemoryFileChannel”

    capacity

    50000

    Channel缓存容量:缓存在Channel中的最大Event数。

    transactionCapacity

    5000

    事务缓存容量:一次事务能处理的最大Event数。

    说明:
    • 此参数值需要大于source和sink的batchSize。
    • 事务缓存容量必须小于或等于Channel缓存容量。

    subqueueByteCapacity

    20971520

    每个subqueue最多保存多少byte的Event,单位:byte。

    Memory File Channel采用queue和subqueue两级缓存,event保存在subqueue,subqueue保存在queue。

    subqueue能保存多少event,由“subqueueCapacity”“subqueueInterval”两个参数决定,“subqueueCapacity”限制subqueue内的Event总容量,“subqueueInterval”限制subqueue保存Event的时长,只有subqueue达到“subqueueCapacity”“subqueueInterval”上限时,subqueue内的Event才会发往目的地。

    说明:

    “subqueueByteCapacity”必须大于一个batchsize内的Event总容量。

    subqueueInterval

    2000

    每个subqueue最多保存一段多长时间的Event,单位:毫秒。

    keep-alive

    3

    当事务缓存或Channel缓存满时,Put、Take线程等待时间。

    单位:秒。

    dataDir

    -

    缓存本地文件存储目录。

    byteCapacity

    JVM最大内存的80%

    Channel缓存容量。

    单位:bytes。

    compression-type

    None

    消息压缩格式:“none”或“deflate”。“none”表示不压缩,“deflate”表示压缩。

    channelfullcount

    10

    channel full次数,达到该次数后发送告警。

    Memory File Channel配置样例:

    server.channels.c1.type = org.apache.flume.channel.MemoryFileChannel
    server.channels.c1.dataDir = /opt/flume/mfdata
    server.channels.c1.subqueueByteCapacity = 20971520
    server.channels.c1.subqueueInterval=2000
    server.channels.c1.capacity = 500000
    server.channels.c1.transactionCapacity = 40000
  • Kafka Channel
    Kafka Channel使用Kafka集群缓存数据,Kafka提供高可用、多副本,以防Flume或Kafka Broker崩溃,Channel中的数据会立即被Sink消费。
    表15 Kafka channel 常用配置

    Parameter

    Default Value

    Description

    type

    -

    kafka channel的类型,必须设置为 “org.apache.flume.channel.kafka.KafkaChannel”

    kafka.bootstrap.servers

    -

    Kafka的bootstrap地址端口列表。

    如果集群已安装Kafka并且配置已经同步,则服务端可以不配置此项,默认值为Kafka集群中所有的broker列表。客户端必须配置该项,多个值用逗号分隔。端口和安全协议的匹配规则必须为:21007匹配安全模式(SASL_PLAINTEXT),9092匹配普通模式(PLAINTEXT)。

    kafka.topic

    flume-channel

    channel用来缓存数据的topic。

    kafka.consumer.group.id

    flume

    从kafka中获取数据的组标识,此参数不能为空。

    parseAsFlumeEvent

    true

    是否解析为Flume event。

    migrateZookeeperOffsets

    true

    当Kafka没有存储offset时,是否从ZooKeeper中查找,并提交到Kafka。

    kafka.consumer.auto.offset.reset

    latest

    当没有offset记录时从什么位置消费,可选为“earliest”、“latest”或“none”。“earliest”表示将offset重置为初始点,“latest”表示将offset置为最新位置点,“none”表示如果没有offset则发生异常。

    kafka.producer.security.protocol

    SASL_PLAINTEXT

    Kafka生产安全协议。端口和安全协议的匹配规则必须为:21007匹配安全模式(SASL_PLAINTEXT),9092匹配普通模式(PLAINTEXT)。

    说明:

    如果该参数没有显示,请单击弹窗左下角的"+"显示全部参数。

    kafka.consumer.security.protocol

    SASL_PLAINTEXT

    同上,但用于消费。端口和安全协议的匹配规则必须为:21007匹配安全模式(SASL_PLAINTEXT),9092匹配普通模式(PLAINTEXT)。

    pollTimeout

    500

    consumer调用poll()函数能接受的最大超时时间,单位:毫秒。

    ignoreLongMessage

    false

    是否丢弃超大消息。

    messageMaxLength

    1000012

    Flume写入Kafka的消息的最大长度。

常用Sink配置

  • HDFS Sink

    HDFS Sink将数据写入Hadoop分布式文件系统(HDFS)。常用配置如下表所示:

    表16 HDFS Sink常用配置

    参数

    默认值

    描述

    channel

    -

    与之相连的channel。

    type

    hdfs

    hdfs sink的类型,必须设置为hdfs。

    hdfs.path

    -

    HDFS上数据存储路径,必须以“hdfs://hacluster/”开头。

    monTime

    0(不开启)

    线程监控阈值,更新时间超过阈值后,重新启动该Sink,单位:秒。

    hdfs.inUseSuffix

    .tmp

    正在写入的hdfs文件后缀。

    hdfs.rollInterval

    30

    按时间滚动文件,单位:秒,同时需将“hdfs.fileCloseByEndEvent”设置为“false”。

    hdfs.rollSize

    1024

    按大小滚动文件,单位:bytes,同时需将“hdfs.fileCloseByEndEvent”设置为“false”。

    hdfs.rollCount

    10

    按Event个数滚动文件,同时需将“hdfs.fileCloseByEndEvent”设置为“false”。

    说明:

    参数“rollInterval”、“rollSize”和“rollCount”可同时配置,三个参数采取优先原则,哪个参数值先满足,优先按照哪个参数进行压缩。

    hdfs.idleTimeout

    0

    自动关闭空闲文件超时时间,单位:秒。

    hdfs.batchSize

    1000

    批次写入HDFS的Event个数。

    hdfs.kerberosPrincipal

    -

    认证HDFS的Kerberos principal,普通模式集群不配置,安全模式集群必须配置。

    hdfs.kerberosKeytab

    -

    认证HDFS的Kerberos keytab,普通模式集群不配置,安全模式集群中,用户必须对jaas.cof文件中的keyTab路径有访问权限。

    hdfs.fileCloseByEndEvent

    true

    收到源文件的最后一个Event时是否关闭hdfs文件。

    hdfs.batchCallTimeout

    -

    批次写入HDFS超时控制时间,单位:毫秒。

    当不配置此参数时,对每个Event写入HDFS进行超时控制。当“hdfs.batchSize”大于0时,配置此参数可以提升写入HDFS性能。

    说明:

    “hdfs.batchCallTimeout”设置多长时间需要考虑“hdfs.batchSize”的大小,“hdfs.batchSize”越大,“hdfs.batchCallTimeout”也要调整更长时间,设置过短时间容易导致写HDFS失败。

    serializer.appendNewline

    true

    将一个Event写入HDFS后是否追加换行符('\n'),如果追加该换行符,该换行符所占用的数据量指标不会被HDFS Sink统计。

    hdfs.filePrefix

    over_%{basename}

    数据写入hdfs后文件名的前缀。

    hdfs.fileSuffix

    -

    数据写入hdfs后文件名的后缀。

    hdfs.inUsePrefix

    -

    正在写入的hdfs文件前缀。

    hdfs.fileType

    DataStream

    hdfs文件格式,包括“SequenceFile”、“DataStream”以及“CompressedStream”。

    说明:

    “SequenceFile”和“DataStream”不压缩输出文件,不能设置参数“codeC”,“CompressedStream”压缩输出文件,必须设置“codeC”参数值配合使用。

    hdfs.codeC

    -

    文件压缩格式,包括gzip、bzip2、lzo、lzop、snappy。

    hdfs.maxOpenFiles

    5000

    最大允许打开的hdfs文件数,当打开的文件数达到该值时,最早打开的文件将会被关闭。

    hdfs.writeFormat

    Writable

    文件写入格式,“Writable”或者“Text”。

    hdfs.callTimeout

    10000

    写入HDFS超时控制时间,单位:毫秒。

    hdfs.threadsPoolSize

    -

    每个HDFS sink用于HDFS io操作的线程数。

    hdfs.rollTimerPoolSize

    -

    每个HDFS sink用于调度定时文件滚动的线程数。

    hdfs.round

    false

    时间戳是否四舍五入。如果设置为true,则会影响所有基于时间的转义序列(%t除外)。

    hdfs.roundUnit

    second

    时间戳四舍五入单位,可选为“second”、“minute”或“hour”,分别对应为秒、分钟和小时。

    hdfs.useLocalTimeStamp

    true

    是否启用本地时间戳,建议设置为“true”。

    hdfs.closeTries

    0

    hdfs sink尝试关闭重命名文件的最大次数。默认为0表示sink会一直尝试重命名,直至重命名成功。

    hdfs.retryInterval

    180

    尝试关闭hdfs文件的时间间隔,单位:秒。

    说明:

    每个关闭请求都会有多个RPC往返Namenode,因此设置的太低可能导致Namenode超负荷。如果设置0,如果第一次尝试失败的话,该Sink将不会尝试关闭文件,并且把文件打开,或者用“.tmp”作为扩展名。

    hdfs.failcount

    10

    数据写入hdfs失败的次数。该参数作为sink写入hdfs失败次数的阈值,当超过该阈值后上报数据传输异常告警。

  • Avro Sink

    Avro Sink把events转化为Avro events并发送到配置的主机的监测端口。常用配置如下表所示:

    表17 Avro Sink常用配置

    参数

    默认值

    描述

    channel

    -

    与之相连的channel。

    type

    -

    avro sink的类型,必须设置为avro。

    hostname

    -

    绑定的主机名/IP。

    port

    -

    监测端口,该端口需未被占用。

    batch-size

    1000

    批次发送的Event个数。

    client.type

    DEFAULT

    客户端实例类型,根据所配置的模型实际使用到的通信协议设置。该值可选值包括:

    • DEFAULT,返回AvroRPC类型的客户端实例。
    • OTHER,返回NULL。
    • THRIFT,返回Thrift RPC类型的客户端实例。
    • DEFAULT_LOADBALANCING, 返回LoadBalancing RPC 客户端实例。
    • DEFAULT_FAILOVER, 返回Failover RPC 客户端实例。

    ssl

    false

    是否使用SSL加密。设置为true时还必须指定“密钥(keystore)”和“密钥存储密码(keystore-password)”。

    truststore-type

    JKS

    Java信任库类型,“JKS”或“PKCS12”。

    说明:

    JKS的密钥库和私钥采用不同的密码进行保护,而PKCS12的密钥库和私钥采用相同密码进行保护。

    truststore

    -

    Java信任库文件。

    truststore-password

    -

    Java信任库密码。

    keystore-type

    JKS

    ssl启用后密钥存储类型。

    keystore

    -

    ssl启用后密钥存储文件路径,开启ssl后,该参数必填。

    keystore-password

    -

    ssl启用后密钥存储密码,开启ssl后,该参数必填。

    connect-timeout

    20000

    第一次连接的超时时间,单位:毫秒。

    request-timeout

    20000

    第一次请求后一次请求的最大超时时间,单位:毫秒。

    reset-connection-interval

    0

    一次断开连接后,等待多少时间后进行重新连接,单位:秒。默认为0表示不断尝试。

    compression-type

    none

    批数据压缩类型,“none”或“deflate”,“none”表示不压缩,“deflate”表示压缩。该值必须与AvroSource的compression-type匹配。

    compression-level

    6

    批数据压缩级别(1-9),数值越高,压缩率越高。

    exclude-protocols

    SSLv3

    排除的协议列表,用空格分开。默认排除SSLv3协议。

  • HBase Sink

    HBase Sink将数据写入到HBase中。常用配置如下表所示:

    表18 HBase Sink常用配置

    参数

    默认值

    描述

    channel

    -

    与之相连的channel。

    type

    -

    hbase sink的类型,必须设置为hbase。

    table

    -

    HBase表名称。

    columnFamily

    -

    HBase列族。

    monTime

    0(不开启)

    线程监控阈值,更新时间超过阈值后,重新启动该Sink,单位:秒。

    batchSize

    1000

    批次写入HBase的Event个数。

    kerberosPrincipal

    -

    认证HBase的Kerberos principal,普通模式集群不配置,安全模式集群必须配置。

    kerberosKeytab

    -

    认证HBase的Kerberos keytab,普通模式集群不配置,安全模式集群中,flume运行用户必须对jaas.cof文件中的keyTab路径有访问权限。

    coalesceIncrements

    true

    是否在同一个处理批次中,合并对同一个hbase cell多个操作。设置为true有利于提高性能。

  • Kafka Sink

    Kafka Sink将数据写入到Kafka中。常用配置如下表所示:

    表19 Kafka Sink常用配置

    参数

    默认值

    描述

    channel

    -

    与之相连的channel。

    type

    -

    kafka sink的类型,必须设置为org.apache.flume.sink.kafka.KafkaSink。

    kafka.bootstrap.servers

    -

    Kafka 的bootstrap 地址端口列表。如果集群安装有kafka并且配置已经同步,服务端可以不配置此项,默认值为Kafka集群中所有的broker列表,客户端必须配置该项,多个用逗号分隔。端口和安全协议的匹配规则必须为:21007匹配安全模式(SASL_PLAINTEXT),9092匹配普通模式(PLAINTEXT)。

    monTime

    0(不开启)

    线程监控阈值,更新时间超过阈值后,重新启动该Sink,单位:秒。

    kafka.producer.acks

    1

    必须收到多少个replicas的确认信息才认为写入成功。0表示不需要接收确认信息,1表示只等待leader的确认信息。-1表示等待所有的relicas的确认信息。设置为-1,在某些leader失败的场景中可以避免数据丢失。

    kafka.topic

    -

    数据写入的topic,必须填写。

    allowTopicOverride

    false

    是否将Event Header中保存的topic替换kafka.topic中配置的topic。

    flumeBatchSize

    1000

    批次写入Kafka的Event个数。

    kafka.security.protocol

    SASL_PLAINTEXT

    Kafka安全协议,普通模式集群下须配置为“PLAINTEXT”。端口和安全协议的匹配规则必须为:21007匹配安全模式(SASL_PLAINTEXT),9092匹配普通模式(PLAINTEXT)。

    ignoreLongMessage

    false

    是否丢弃超大消息的开关。

    messageMaxLength

    1000012

    Flume写入Kafka的消息的最大长度。

    defaultPartitionId

    -

    用于指定channel中的events被传输到哪一个Kafka partition ID ,此值会被partitionIdHeader覆盖。默认情况下,如果此参数不设置,会由Kafka Producer's partitioner 进行events分发(可以通过指定key或者kafka.partitioner.class自定义的partitioner)。

    partitionIdHeader

    -

    设置时,对应的Sink 将从Event 的Header中获取使用此属性的值命名的字段的值,并将消息发送到主题的指定分区。 如果该值无对应的有效分区,则会发生EventDeliveryException。 如果Header 值已经存在,则此设置将覆盖参数defaultPartitionId。

    Other Kafka Producer Properties

    -

    其他Kafka配置,可以接受任意Kafka支持的生产配置,配置需要加前缀 .kafka。

  • Thrift Sink

    Thrift Sink把events转化为Thrift events并发送到配置的主机的监测端口。常用配置如下表所示:

    表20 Thrift Sink常用配置

    参数

    默认值

    描述

    channel

    -

    与之相连的channel。

    type

    thrift

    thrift sink的类型,必须设置为thrift。

    hostname

    -

    绑定的主机名/IP。

    port

    -

    监测端口,该端口需未被占用。

    batch-size

    1000

    批次发送的Event个数。

    connect-timeout

    20000

    第一次连接的超时时间,单位:毫秒。

    request-timeout

    20000

    第一次请求后一次请求的最大超时时间,单位:毫秒。

    kerberos

    false

    是否启用Kerberos认证。

    client-keytab

    -

    客户端使用的keytab文件地址,flume运行用户必须对认证文件具有访问权限。

    client-principal

    -

    客户端使用的安全用户的Principal。

    server-principal

    -

    服务端使用的安全用户的Principal。

    compression-type

    none

    Flume发送数据的压缩类型,“none”或“deflate”,“none”表示不压缩,“deflate”表示压缩。

    maxConnections

    5

    Flume发送数据时的最大连接池大小。

    ssl

    false

    是否使用SSL加密。

    truststore-type

    JKS

    Java信任库类型。

    truststore

    -

    Java信任库文件。

    truststore-password

    -

    Java信任库密码。

    reset-connection-interval

    0

    一次断开连接后,等待多少时间后进行重新连接,单位:秒。默认为0表示不断尝试。

注意事项

  • Flume可靠性保障措施有哪些?
    • Source&Channel、Channel&Sink之间的事务机制。
    • Sink Processor支持配置failover、load_blance机制,例如负载均衡示例如下。
      server.sinkgroups=g1
      server.sinkgroups.g1.sinks=k1 k2
      server.sinkgroups.g1.processor.type=load_balance
      server.sinkgroups.g1.processor.backoff=true
      server.sinkgroups.g1.processor.selector=random
  • Flume多agent聚合级联时的注意事项?
    • 级联时需要使用Avro或者Thrift协议进行级联。
    • 聚合端存在多个节点时,连接配置尽量配置均衡,不要聚合到单节点上。