Flume业务模型配置说明
业务模型配置指导
本任务旨在提供Flume常用模块的性能差异,用于指导用户进行合理的Flume业务配置,避免出现前端Source和后端Sink性能不匹配进而导致整体业务性能不达标的场景。
本任务只针对于单通道的场景进行比较说明。
Flume业务配置及模块选择过程中,一般要求Sink的极限吞吐量需要大于Source的极限吞吐量,否则在极限负载的场景下,Source往Channel的写入速度大于Sink从Channel取出的速度,从而导致Channel频繁被写满,进而影响性能表现。
Avro Source和Avro Sink一般都是成对出现,用于多个Flume Agent间进行数据中转,因此一般场景下Avro Source和Avro Sink都不会成为性能瓶颈。
模块间性能
根据模块间性能对比,可以看到对于前端是SpoolDir Source的场景下,Kafka Sink和HDFS Sink都能满足吞吐量要求,但是HBase Sink由于自身写入性能较低的原因,会成为性能瓶颈,会导致数据都积压在Channel中。但是如果有必须使用HBase Sink或者其他性能容易成为瓶颈的Sink的场景时,可以选择使用Channel Selector或者Sink Group来满足性能要求。
Channel Selector
Channel Selector可以允许一个Source对接多个Channel,通过选择不同的Selector类型来将Source的数据进行分流或者复制,目前Flume提供的Channel Selector有两种:Replicating和Multiplexing。
Replicating:表示Source的数据同步发送给所有Channel。
Multiplexing:表示根据Event中的Header的指定字段的值来进行判断,从而选择相应的Channel进行发送,从而起到根据业务类型进行分流的目的。
- Replicating配置样例:
client.sources = kafkasource client.channels = channel1 channel2 client.sources.kafkasource.type = org.apache.flume.source.kafka.KafkaSource client.sources.kafkasource.kafka.topics = topic1,topic2 client.sources.kafkasource.kafka.consumer.group.id = flume client.sources.kafkasource.kafka.bootstrap.servers = 10.69.112.108:21007 client.sources.kafkasource.kafka.security.protocol = SASL_PLAINTEXT client.sources.kafkasource.batchDurationMillis = 1000 client.sources.kafkasource.batchSize = 800 client.sources.kafkasource.channels = channel1 channel2 client.sources.kafkasource.selector.type = replicating client.sources.kafkasource.selector.optional = channel2
表1 Replicating配置样例参数说明 选项名称
默认值
描述
Selector.type
replicating
Selector类型,应配置为replicating
Selector.optional
-
可选Channel,可以配置为列表
- Multiplexing配置样例:
client.sources = kafkasource client.channels = channel1 channel2 client.sources.kafkasource.type = org.apache.flume.source.kafka.KafkaSource client.sources.kafkasource.kafka.topics = topic1,topic2 client.sources.kafkasource.kafka.consumer.group.id = flume client.sources.kafkasource.kafka.bootstrap.servers = 10.69.112.108:21007 client.sources.kafkasource.kafka.security.protocol = SASL_PLAINTEXT client.sources.kafkasource.batchDurationMillis = 1000 client.sources.kafkasource.batchSize = 800 client.sources.kafkasource.channels = channel1 channel2 client.sources.kafkasource.selector.type = multiplexing client.sources.kafkasource.selector.header = myheader client.sources.kafkasource.selector.mapping.topic1 = channel1 client.sources.kafkasource.selector.mapping.topic2 = channel2 client.sources.kafkasource.selector.default = channel1
表2 Multiplexing配置样例参数说明 选项名称
默认值
描述
Selector.type
replicating
Selector类型,应配置为multiplexing
Selector.header
Flume.selector.header
-
Multiplexing类型的Selector的样例中,选择Event中Header名称为topic的字段来进行判断,当Header中topic字段的值为topic1时,向channel1发送该Event,当Header中topic字段的值为topic2时,向channel2发送该Event。
这种Selector需要借助Source中Event的特定Header来进行Channel的选择,需要根据业务场景选择合理的Header来进行数据分流。
Sink Group
当后端单Sink性能不足、需要高可靠性保证或者异构输出时可以使用Sink Group来将指定的Channel和多个Sink对接,从而满足相应的使用场景。目前Flume提供了两种Sink Processor用于对Sink Group中的Sink进行管理:Load Balancing和Failover。
Failover:表示在Sink Group中同一时间只有一个Sink处于活跃状态,其他Sink作为备份处于非活跃状态,当活跃状态的Sink故障时,根据优先级从非活跃状态的Sink中选择一个来接管业务,保证数据不会丢失,多用于高可靠性场景。
Load Balancing:表示在Sink Group中所有Sink都处于活跃状态,每个Sink都会从Channel中去获取数据并进行处理,并且保证在运行过程中该Sink Group的所有Sink的负载是均衡的,多用于性能提升场景。
- Load Balancing配置样例:
client.sources = source1 client.sinks = sink1 sink2 client.channels = channel1 client.sinkgroups = g1 client.sinkgroups.g1.sinks = sink1 sink2 client.sinkgroups.g1.processor.type = load_balance client.sinkgroups.g1.processor.backoff = true client.sinkgroups.g1.processor.selector = random client.sinks.sink1.type = logger client.sinks.sink1.channel = channel1 client.sinks.sink2.type = logger client.sinks.sink2.channel = channel1
表3 Load Balancing配置样例参数说明 选项名称
默认值
描述
sinks
-
Sink Group的sink列表,多个以空格分隔
processor.type
default
Processor的类型,应配置为load_balance
processor.backoff
false
是否以指数的形式退避失败的Sinks
processor.selector
round_robin
选择机制。必须是round_robin,random或者自定义的类,且该类继承了AbstractSinkSelector
processor.selector.maxTimeOut
30000
屏蔽故障sink的时间,默认是30000毫秒
- Failover配置样例:
client.sources = source1 client.sinks = sink1 sink2 client.channels = channel1 client.sinkgroups = g1 client.sinkgroups.g1.sinks = sink1 sink2 client.sinkgroups.g1.processor.type = failover client.sinkgroups.g1.processor.priority.sink1 = 10 client.sinkgroups.g1.processor.priority.sink2 = 5 client.sinkgroups.g1.processor.maxpenalty = 10000 client.sinks.sink1.type = logger client.sinks.sink1.channel = channel1 client.sinks.sink2.type = logger client.sinks.sink2.channel = channel1
表4 Failover配置样例参数说明 选项名称
默认值
描述
sinks
-
Sink Group的sink列表,多个以空格分隔
processor.type
default
Processor的类型,应配置为failover
processor.priority.<sinkName>
-
优先级值。<sinkName> 必须是sinks中有定义的。优先级值高Sink会更早被激活。值越大,优先级越高。注:多个sinks的话,优先级的值不要相同,如果优先级相同的话,只会有一个生效。
processor.maxpenalty
30000
失败的Sink最大的退避时间(单位:毫秒)
Interceptors
Flume的拦截器(Interceptor)支持在数据传输过程中修改或丢弃传输的基本单元Event。用户可以通过在配置中指定Flume内建拦截器的类名列表,也可以开发自定义的拦截器来实现Event的修改或丢弃。Flume内建支持的拦截器如下表所示,本章节会选取一个较为复杂的作为示例。其余的用户可以根据需要自行配置使用。
- 拦截器用在Flume的Source、Channel之间,大部分的Source都带有Interceptor参数。用户可以依据需要配置。
- Flume支持一个Source配置多个拦截器,各拦截器名称用空格分开。
- 指定拦截器的顺序就是它们被调用的顺序。
- 使用拦截器在Header中插入的内容,都可以在Sink中读取并使用。
拦截器类型 |
简要描述 |
---|---|
Timestamp Interceptor |
该拦截器会在Event的Header中插入一个时间戳。 |
Host Interceptor |
该拦截器会在Event的Header中插入当前Agent所在节点的IP或主机名。 |
Remove Header Interceptor |
该拦截器会依据Header中包含的符合正则匹配的字符串,丢弃掉对应的Event。 |
UUID Interceptor |
该拦截器会为每个Event的Header生成一个UUID字符串。 |
Search and Replace Interceptor |
该拦截器基于Java正则表达式提供简单的基于字符串的搜索和替换功能。与Java Matcher.replaceAll() 的规则相同。 |
Regex Filtering Interceptor |
该拦截器通过将Event的Body体解释为文本文件,与配置的正则表达式进行匹配来选择性的过滤Event。提供的正则表达式可用于排除或包含事件。 |
Regex Extractor Interceptor |
该拦截器使用正则表达式抽取原始events中的内容,并将该内容加入events的header中。 |
选项名称 |
默认值 |
描述 |
---|---|---|
type |
- |
组件类型名称,必须写为regex_filter。 |
regex |
- |
用于匹配事件的正则表达式。 |
excludeEvents |
false |
默认收集匹配到的Event。设置为true,则会删除匹配的Event,保留不匹配的。 |
#define the source、channel、sink server.sources = r1 server.channels = c1 server.sinks = k1 #config the source server.sources.r1.type = netcat server.sources.r1.bind = ${主机IP} server.sources.r1.port = 44444 server.sources.r1.interceptors= i1 server.sources.r1.interceptors.i1.type= regex_filter server.sources.r1.interceptors.i1.regex= (flume)|(myflume) server.sources.r1.interceptors.i1.excludeEvents= false server.sources.r1.channels = c1 #config the channel server.channels.c1.type = memory server.channels.c1.capacity = 1000 server.channels.c1.transactionCapacity = 100 #config the sink server.sinks.k1.type = logger server.sinks.k1.channel = c1
- Sink的BatchSize参数必须小于Channel的transactionCapacity。
- 集群Flume配置工具界面篇幅有限,Source、Channel、Sink只展示部分参数,详细请参考如下常用配置。
- 集群Flume配置工具界面上所展示Customer Source、Customer Channel及Customer Sink需要用户根据自己开发的代码来进行配置,下述常用配置不再展示。
常用Source配置
- Avro Source
Avro Source监测Avro端口,接收外部Avro客户端数据并放入配置的Channel中。常用配置如下表所示:
表7 Avro Source常用配置 参数
默认值
描述
channels
-
与之相连的channel,可以配置多个。
type
avro
avro source的类型,必须为avro。
bind
-
监测主机名/IP。
port
-
绑定监测端口,该端口需未被占用。
threads
-
source工作的最大线程数。
compression-type
none
消息压缩格式:“none”或“deflate”。“none”表示不压缩,“deflate”表示压缩。
compression-level
6
数据压缩级别(1-9),数值越高,压缩率越高。
ssl
false
是否使用SSL加密。设置为true时还必须指定“密钥(keystore)”和“密钥存储密码(keystore-password)”。
truststore-type
JKS
Java信任库类型,“JKS”或“PKCS12”。
说明:JKS的密钥库和私钥采用不同的密码进行保护,而PKCS12的密钥库和私钥采用相同密码进行保护。
truststore
-
Java信任库文件。
truststore-password
-
Java信任库密码。
keystore-type
JKS
ssl启用后密钥存储类型,“JKS”或“PKCS12”。
说明:JKS的密钥库和私钥用不同的密码进行保护,而PKCS12的密钥库和私钥用相同密码进行保护。
keystore
-
ssl启用后密钥存储文件路径,开启ssl后,该参数必填。
keystore-password
-
ssl启用后密钥存储密码,开启ssl后,该参数必填。
trust-all-certs
false
是否关闭SSL server证书检查。设置为“true”时将不会检查远端source的SSL server证书,不建议在生产中使用。
exclude-protocols
SSLv3
排除的协议列表,用空格分开。默认排除SSLv3协议。
ipFilter
false
是否开启ip过滤。
ipFilter.rules
-
定义N网络的ipFilters,多个主机或IP地址用逗号分隔。ipFilter设置为“true”时,配置规则有允许和禁止两种,配置格式如下:
ipFilterRules=allow:ip:127.*, allow:name:localhost, deny:ip:*
- SpoolDir Source
Spool Dir Source监控并传输目录下新增的文件,可实现实时数据传输。常用配置如下表所示:
表8 Spooling Directory Source常用配置 参数
默认值
描述
channels
-
与之相连的channel,可以配置多个。
type
spooldir
spooling source的类型,必须设置为spooldir。
spoolDir
-
Spooldir source的监控目录,flume运行用户需要对该目录具有可读可写可执行权限。
monTime
0(不开启)
线程监控阈值,更新时间超过阈值后,重新启动该Source,单位:秒。
fileSuffix
.COMPLETED
文件传输完成后添加的后缀。
deletePolicy
never
文件传输完成后源文件删除策略,never或immediate。“never”表示不删除已完成传输的源文件,“immediate”表示传输完成后立刻删除源文件。
ignorePattern
^$
忽略文件的正则表达式表示。默认为“^$”,表示忽略空格。
includePattern
^.*$
包含文件的正则表达式表示。可以与ignorePattern同时使用,如果一个文件既满足ignorePattern也满足includePattern,则该文件会被忽略。另外,以“.”开头的文件不会被过滤。
trackerDir
.flumespool
传输过程中元数据存储路径。
batchSize
1000
批次写入Channel的Event数量。
decodeErrorPolicy
FAIL
编码错误策略。
说明:如果文件中有编码错误,请配置“decodeErrorPolicy”为“REPLACE”或“IGNORE”,Flume遇到编码错误将跳过编码错误,继续采集后续日志。
deserializer
LINE
文件解析器,值为“LINE”或 “BufferedLine”。
- 配置为“LINE”时,对从文件读取的字符逐个转码。
- 配置为“BufferedLine”时,对文件读取的一行或多行的字符进行批量转码,性能更优。
deserializer.maxLineLength
2048
按行解析最大长度。
deserializer.maxBatchLine
1
按行解析最多行数,如果行数设置为多行,maxLineLength也应该设置为相应的倍数。
说明:用户设置Interceptor时,需要考虑多行合并后的场景,否则会造成数据丢失。如果Interceptor无法处理多行合并场景,请将该配置设置为1。
selector.type
replicating
选择器类型,“replicating”或“multiplexing”。“replicating”表示将数据复制多份,分别传递给每一个channel,每个channel接收到的数据都是相同的,而“multiplexing”表示根据event中header的value来选择特定的channel,每个channel中的数据是不同的。
interceptors
-
拦截器。多个拦截器用空格分开。
inputCharset
UTF-8
读取文件的编码格式。须与读取数据源文件编码格式相同,否则字符解析可能会出错。
fileHeader
false
是否把文件名(包含路径)添加到event的header中。
fileHeaderKey
-
设置header中数据存储结构为<key,value>模式,需要fileHeaderKey与fileHeader配合使用。如果fileHeader设置为true,可参考如下示例。
示例:将fileHeaderKey定义为file,当读取到文件名为/root/a.txt的内容时,header中以file=/root/a.txt的形式存在。
basenameHeader
false
是否把文件名(不包含路径)添加到event的header中。
basenameHeaderKey
-
设置header中数据存储结构为<key,value>模式,需要basenameHeaderKey与basenameHeader配合使用。如果basenameHeader设置为true,可参考如下示例。
示例:将basenameHeaderKey定义为file,当读取到文件名为a.txt的内容时,header中以file=a.txt的形式存在。
pollDelay
500
轮询监控目录下新文件时的时延。单位:毫秒。
recursiveDirectorySearch
false
是否监控配置的目录下子目录中的新文件。
consumeOrder
oldest
监控目录下文件的消耗次序。如果配置为oldest或者youngest,会根据监控目录下文件的最后修改时间来决定,当目录下有大量文件时,会消耗较长时间去寻找oldest或者youngest的文件。需要注意的是,如果配置为random,创建比较早的文件有可能长时间未被读取。如果配置为oldest或者youngest,那么进程会需要较多时间来查找最新的或最旧的文件。可选值:random,youngest,oldest。
maxBackoff
4000
当Channel满了以后,尝试再次去写Channel所等待的最大时间。超过这个时间,则会发生异常。对应的Source会以一个较小的时间开始,然后每尝试一次,该时间数字指数增长直到达到当前指定的值,如果还不能成功写入,则认为失败。时间单位:秒。
emptyFileEvent
true
是否采集空文件信息发送到Sink端,默认值为true,表示将空文件信息发送到Sink端。该参数只对HDFS Sink有效,其他Sink该参数无效。以HDFS Sink为例,当参数为true时,如果spoolDir路径下存在空文件,那么HDFS的hdfs.path路径下就会创建一个同名的空文件。
SpoolDir Source在按行读取过程中会忽略掉每一个event的最后一个换行符,该换行符所占用的数据量指标不会被Flume统计。
- Kafka Source
Kafka Source从Kafka的topic中消费数据,可以设置多个Source消费同一个topic的数据,每个Source会消费topic的不同partitions。常用配置如下表所示:
表9 Kafka Source常用配置 参数
默认值
描述
channels
-
与之相连的channel,可以配置多个。
type
org.apache.flume.source.kafka.KafkaSource
kafka source的类型,必须设置为org.apache.flume.source.kafka.KafkaSource。
kafka.bootstrap.servers
-
Kafka的bootstrap地址端口列表。如果集群已安装Kafka并且配置已经同步,服务端可以不配置此项,默认值为Kafka集群中所有的broker列表。客户端必须配置该项,多个值用逗号分隔。端口和安全协议的匹配规则必须为:21007匹配安全模式(SASL_PLAINTEXT),9092匹配普通模式(PLAINTEXT)。
kafka.topics
-
订阅的Kafka topic列表,用逗号分隔。
kafka.topics.regex
-
符合正则表达式的topic会被订阅,优先级高于“kafka.topics”,如果存在将覆盖“kafka.topics”。
monTime
0(不开启)
线程监控阈值,更新时间超过阈值后,重新启动该Source,单位:秒。
nodatatime
0(不开启)
告警阈值,从Kafka中订阅不到数据的时长超过阈值时发送告警,单位:秒。该参数可在配置文件properties.properties进行设置。
batchSize
1000
批次写入Channel的Event数量。
batchDurationMillis
1000
批次消费topic数据的最大时长,单位:ms。
keepTopicInHeader
false
是否在Event Header中保存topic。设置为true,则Kafka Sink配置的topic将无效。
setTopicHeader
true
当设置为true时,会将“topicHeader”中定义的topic名称存储到Header中。
topicHeader
topic
当setTopicHeader属性设置为true,此参数用于定义存储接收的topic名称。如果与Kafka Sink的topicHeader属性结合使用,应该注意,避免将消息循环发送到同一主题。
useFlumeEventFormat
false
默认情况下,event会以字节的形式从kafka topic传递到event的body体中。设置为true,则会以Flume的Avro二进制格式来读取Event。与KafkaSink或KakfaChannel 中同名的parseAsFlumeEvent参数一起使用时,会保留从数据源产生的任何设定的Header。
keepPartitionInHeader
false
是否在Event Header中保存partitionID。设置为true,则Kafka Sink将写入对应的Partition。
kafka.consumer.group.id
flume
Kafka消费组ID。多个源或代理中设置相同的ID表示它们是同一个consumer group。
kafka.security.protocol
SASL_PLAINTEXT
Kafka安全协议,普通模式集群下须配置为“PLAINTEXT”。端口和安全协议的匹配规则必须为:21007匹配安全模式(SASL_PLAINTEXT),9092匹配普通模式(PLAINTEXT)。
Other Kafka Consumer Properties
-
其他Kafka配置,可以接受任意Kafka支持的消费配置,配置需要加前缀“kafka.”。
- Taildir Source
Taildir Source监控目录下文件的变化并自动读取文件内容,可实现实时数据传输,常用配置如下表所示:
表10 Taildir Source常用配置 参数
默认值
描述
channels
-
与之相连的channel,可以配置多个。
type
TAILDIR
taildir source的类型,必须为TAILDIR。
filegroups
-
设置采集文件目录分组名字,分组名字中间使用空格间隔。
filegroups.<filegroupName>
-
文件路径,需要配置为绝对路径。
filegroups.<filegroupName>.parentDir
-
父目录,需要配置为绝对路径。
filegroups.<filegroupName>.filePattern
-
相对父目录的文件路径,可以包含目录,支持正则表达式,须与父目录联合使用。
positionFile
-
传输过程中元数据存储路径。
headers.<filegroupName>.<headerKey>
-
设置某一个分组采集数据时event中的key-value值。
byteOffsetHeader
false
是否在每一个event头中携带该event在源文件中的位置信息。设置为true,则该信息保存在byteoffset变量中。
maxBatchCount
Long.MAX_VALUE
控制从一个文件中连续读取的最大批次。如果监控目录会一直读取多个文件,且其中一个文件以非常快的速率在写入,那么其他文件可能会无法处理。因为高速写入的这个文件会陷入无限读取的循环中。这种情况下,应该降低此值。
skipToEnd
false
Flume在重启后是否直接定位到文件最新的位置处读取最新的数据。设置为true,则重启后直接定位到文件最新位置读取最新数据。
idleTimeout
120000
设置读取文件的空闲时间,单位:毫秒,如果在该时间内文件内容没有变更,关闭掉该文件,关闭后如果该文件有数据写入,重新打开并读取数据。
writePosInterval
3000
设置将元数据写入到文件的周期,单位:毫秒。
batchSize
1000
批次写入Channel的Event数量。
monTime
0(不开启)
线程监控阈值,更新时间超过阈值后,重新启动该Source,单位:秒。
fileHeader
false
是否把文件名(包含路径)添加到event的header中。
fileHeaderKey
file
设置header中数据存储结构为<key,value>模式,需要fileHeaderKey与fileHeader配合使用。如果fileHeader设置为true,可参考如下示例。
示例:将fileHeaderKey定义为file,当读取到文件名为/root/a.txt的内容时,header中以file=/root/a.txt的形式存在。
- Http Source
Http Source接收外部HTTP客户端发送过来的数据,并放入配置的Channel中,常用配置如下表所示:
表11 Http Source常用配置 参数
默认值
描述
channels
-
与之相连的channel,可以配置多个。
type
http
http source的类型,必须为http。
bind
-
监测主机名/IP。
port
-
绑定监测端口,该端口需未被占用。
handler
org.apache.flume.source.http.JSONHandler
http请求的消息解析方式,支持Json格式解析(org.apache.flume.source.http.JSONHandler)和二进制Blob块解析(org.apache.flume.sink.solr.morphline.BlobHandler)。
handler.*
-
设置handler的参数。
exclude-protocols
SSLv3
排除的协议列表,用空格分开。默认排除SSLv3协议。
include-cipher-suites
-
包含的协议列表,用空格分开。如果设置为空,则默认支持所有协议。
enableSSL
false
http协议是否启用SSL。设置为true时还必须指定“密钥(keystore)”和“密钥存储密码(keystore-password)”。
keystore-type
JKS
Keystore类型,可以为JKS或者PKCS12。
keystore
-
http启用SSL后设置keystore的路径。
keystorePassword
-
http启用SSL后设置keystore的密码。
- Thrift Source
Thrift Source监测thrift端口,接收外部Thrift客户端数据并放入配置的Channel中。常用配置如下表所示:
参数
默认值
描述
channels
-
与之相连的channel,可以配置多个。
type
thrift
thrift source的类型,必须设置为thrift。
bind
-
监测主机名/IP。
port
-
绑定监测端口,该端口需未被占用。
threads
-
允许运行的最大的worker线程数目。
kerberos
false
是否启用Kerberos认证。
agent-keytab
-
服务端使用的keytab文件地址,必须使用机机账号。建议使用Flume服务安装目录下flume/conf/flume_server.keytab。
agent-principal
-
服务端使用的安全用户的Principal,必须使用机机账户。建议使用Flume服务默认用户flume_server/hadoop.<系统域名>@<系统域名>
说明:“flume_server/hadoop.<系统域名>”为用户名,用户的用户名所包含的系统域名所有字母为小写。例如“本端域”参数为“9427068F-6EFA-4833-B43E-60CB641E5B6C.COM”,用户名为“flume_server/hadoop.9427068f-6efa-4833-b43e-60cb641e5b6c.com”。
compression-type
none
消息压缩格式:“none”或“deflate”。“none”表示不压缩,“deflate”表示压缩。
ssl
false
是否使用SSL加密。设置为true时还必须指定“密钥(keystore)”和“密钥存储密码(keystore-password)”。
keystore-type
JKS
SSL启用后密钥存储类型。
keystore
-
SSL启用后密钥存储文件路径,开启SSL后,该参数必填。
keystore-password
-
SSL启用后密钥存储密码,开启ssl后,该参数必填。
truststore-type
JKS
Java信任库类型,“JKS”或“PKCS12”。
说明:JKS的密钥库和私钥采用不同的密码进行保护,而PKCS12的密钥库和私钥采用相同密码进行保护。
truststore
-
Java信任库文件。
truststore-password
-
Java信任库密码。
常用Channel配置
- Memory Channel
Memory Channel使用内存作为缓存区,Events存放在内存队列中。常用配置如下表所示:
表12 Memory Channel常用配置 参数
默认值
描述
type
-
memory channel的类型,必须设置为memory。
capacity
10000
缓存在channel中的最大Event数。
transactionCapacity
1000
每次存取的最大Event数。
说明:- 此参数值需要大于source和sink的batchSize。
- 事务缓存容量必须小于或等于Channel缓存容量。
channelfullcount
10
channel full次数,达到该次数后发送告警。
keep-alive
3
当事务缓存或Channel缓存满时,Put、Take线程等待时间。单位:秒。
byteCapacity
JVM最大内存的80%
channel中最多能容纳所有event body的总字节数,默认是 JVM最大可用内存(-Xmx )的80%,单位:bytes。
byteCapacityBufferPercentage
20
channel中字节容量百分比(%)。
- File Channel
File Channel使用本地磁盘作为缓存区,Events存放在设置的dataDirs配置项文件夹中。常用配置如下表所示:
表13 File Channel常用配置 参数
默认值
描述
type
-
file channel的类型,必须设置为file。
checkpointDir
${BIGDATA_DATA_HOME}/hadoop/data1~N/flume/checkpoint
说明:此路径随自定义数据路径变更。
检查点存放路径。
dataDirs
${BIGDATA_DATA_HOME}/hadoop/data1~N/flume/data
说明:此路径随自定义数据路径变更。
数据缓存路径,设置多个路径可提升性能,中间用逗号分开。
maxFileSize
2146435071
单个缓存文件的最大值,单位:bytes。
minimumRequiredSpace
524288000
缓冲区空闲空间最小值,单位:bytes。
capacity
1000000
缓存在channel中的最大Event数。
transactionCapacity
10000
每次存取的最大Event数。
说明:- 此参数值需要大于source和sink的batchSize。
- 事务缓存容量必须小于或等于Channel缓存容量。
channelfullcount
10
channel full次数,达到该次数后发送告警。
useDualCheckpoints
false
是否备份检查点。设置为“true”时,必须设置backupCheckpointDir的参数值。
backupCheckpointDir
-
备份检查点路径。
checkpointInterval
30000
检查点间隔时间,单位:秒。
keep-alive
3
当事务缓存或Channel缓存满时,Put、Take线程等待时间。单位:秒。
use-log-replay-v1
false
是否启用旧的回复逻辑。
use-fast-replay
false
是否使用队列回复。
checkpointOnClose
true
channel关闭时是否创建检查点。
- Memory File Channel
Memory File Channel同时使用内存和本地磁盘作为缓存区,消息可持久化,性能优于File Channel,接近Memory Channel的性能。此Channel目前处于试验阶段,可靠性不够高,不建议在生产环境使用。常用配置如下表所示:
表14 Memory File Channel常用配置 参数
默认值
描述
type
org.apache.flume.channel.MemoryFileChannel
memory file channel的类型,必须设置为“org.apache.flume.channel.MemoryFileChannel”。
capacity
50000
Channel缓存容量:缓存在Channel中的最大Event数。
transactionCapacity
5000
事务缓存容量:一次事务能处理的最大Event数。
说明:- 此参数值需要大于source和sink的batchSize。
- 事务缓存容量必须小于或等于Channel缓存容量。
subqueueByteCapacity
20971520
每个subqueue最多保存多少byte的Event,单位:byte。
Memory File Channel采用queue和subqueue两级缓存,event保存在subqueue,subqueue保存在queue。
subqueue能保存多少event,由“subqueueCapacity”和“subqueueInterval”两个参数决定,“subqueueCapacity”限制subqueue内的Event总容量,“subqueueInterval”限制subqueue保存Event的时长,只有subqueue达到“subqueueCapacity”或“subqueueInterval”上限时,subqueue内的Event才会发往目的地。
说明:“subqueueByteCapacity”必须大于一个batchsize内的Event总容量。
subqueueInterval
2000
每个subqueue最多保存一段多长时间的Event,单位:毫秒。
keep-alive
3
当事务缓存或Channel缓存满时,Put、Take线程等待时间。
单位:秒。
dataDir
-
缓存本地文件存储目录。
byteCapacity
JVM最大内存的80%
Channel缓存容量。
单位:bytes。
compression-type
None
消息压缩格式:“none”或“deflate”。“none”表示不压缩,“deflate”表示压缩。
channelfullcount
10
channel full次数,达到该次数后发送告警。
Memory File Channel配置样例:
server.channels.c1.type = org.apache.flume.channel.MemoryFileChannel server.channels.c1.dataDir = /opt/flume/mfdata server.channels.c1.subqueueByteCapacity = 20971520 server.channels.c1.subqueueInterval=2000 server.channels.c1.capacity = 500000 server.channels.c1.transactionCapacity = 40000
- Kafka Channel
Kafka Channel使用Kafka集群缓存数据,Kafka提供高可用、多副本,以防Flume或Kafka Broker崩溃,Channel中的数据会立即被Sink消费。
表15 Kafka channel 常用配置 Parameter
Default Value
Description
type
-
kafka channel的类型,必须设置为 “org.apache.flume.channel.kafka.KafkaChannel”。
kafka.bootstrap.servers
-
Kafka的bootstrap地址端口列表。
如果集群已安装Kafka并且配置已经同步,则服务端可以不配置此项,默认值为Kafka集群中所有的broker列表。客户端必须配置该项,多个值用逗号分隔。端口和安全协议的匹配规则必须为:21007匹配安全模式(SASL_PLAINTEXT),9092匹配普通模式(PLAINTEXT)。
kafka.topic
flume-channel
channel用来缓存数据的topic。
kafka.consumer.group.id
flume
从kafka中获取数据的组标识,此参数不能为空。
parseAsFlumeEvent
true
是否解析为Flume event。
migrateZookeeperOffsets
true
当Kafka没有存储offset时,是否从ZooKeeper中查找,并提交到Kafka。
kafka.consumer.auto.offset.reset
latest
当没有offset记录时从什么位置消费,可选为“earliest”、“latest”或“none”。“earliest”表示将offset重置为初始点,“latest”表示将offset置为最新位置点,“none”表示如果没有offset则发生异常。
kafka.producer.security.protocol
SASL_PLAINTEXT
Kafka生产安全协议。端口和安全协议的匹配规则必须为:21007匹配安全模式(SASL_PLAINTEXT),9092匹配普通模式(PLAINTEXT)。
说明:如果该参数没有显示,请单击弹窗左下角的"+"显示全部参数。
kafka.consumer.security.protocol
SASL_PLAINTEXT
同上,但用于消费。端口和安全协议的匹配规则必须为:21007匹配安全模式(SASL_PLAINTEXT),9092匹配普通模式(PLAINTEXT)。
pollTimeout
500
consumer调用poll()函数能接受的最大超时时间,单位:毫秒。
ignoreLongMessage
false
是否丢弃超大消息。
messageMaxLength
1000012
Flume写入Kafka的消息的最大长度。
常用Sink配置
- HDFS Sink
HDFS Sink将数据写入Hadoop分布式文件系统(HDFS)。常用配置如下表所示:
表16 HDFS Sink常用配置 参数
默认值
描述
channel
-
与之相连的channel。
type
hdfs
hdfs sink的类型,必须设置为hdfs。
hdfs.path
-
HDFS上数据存储路径,必须以“hdfs://hacluster/”开头。
monTime
0(不开启)
线程监控阈值,更新时间超过阈值后,重新启动该Sink,单位:秒。
hdfs.inUseSuffix
.tmp
正在写入的hdfs文件后缀。
hdfs.rollInterval
30
按时间滚动文件,单位:秒,同时需将“hdfs.fileCloseByEndEvent”设置为“false”。
hdfs.rollSize
1024
按大小滚动文件,单位:bytes,同时需将“hdfs.fileCloseByEndEvent”设置为“false”。
hdfs.rollCount
10
按Event个数滚动文件,同时需将“hdfs.fileCloseByEndEvent”设置为“false”。
说明:参数“rollInterval”、“rollSize”和“rollCount”可同时配置,三个参数采取优先原则,哪个参数值先满足,优先按照哪个参数进行压缩。
hdfs.idleTimeout
0
自动关闭空闲文件超时时间,单位:秒。
hdfs.batchSize
1000
批次写入HDFS的Event个数。
hdfs.kerberosPrincipal
-
认证HDFS的Kerberos principal,普通模式集群不配置,安全模式集群必须配置。
hdfs.kerberosKeytab
-
认证HDFS的Kerberos keytab,普通模式集群不配置,安全模式集群中,用户必须对jaas.cof文件中的keyTab路径有访问权限。
hdfs.fileCloseByEndEvent
true
收到源文件的最后一个Event时是否关闭hdfs文件。
hdfs.batchCallTimeout
-
批次写入HDFS超时控制时间,单位:毫秒。
当不配置此参数时,对每个Event写入HDFS进行超时控制。当“hdfs.batchSize”大于0时,配置此参数可以提升写入HDFS性能。
说明:“hdfs.batchCallTimeout”设置多长时间需要考虑“hdfs.batchSize”的大小,“hdfs.batchSize”越大,“hdfs.batchCallTimeout”也要调整更长时间,设置过短时间容易导致写HDFS失败。
serializer.appendNewline
true
将一个Event写入HDFS后是否追加换行符('\n'),如果追加该换行符,该换行符所占用的数据量指标不会被HDFS Sink统计。
hdfs.filePrefix
over_%{basename}
数据写入hdfs后文件名的前缀。
hdfs.fileSuffix
-
数据写入hdfs后文件名的后缀。
hdfs.inUsePrefix
-
正在写入的hdfs文件前缀。
hdfs.fileType
DataStream
hdfs文件格式,包括“SequenceFile”、“DataStream”以及“CompressedStream”。
说明:“SequenceFile”和“DataStream”不压缩输出文件,不能设置参数“codeC”,“CompressedStream”压缩输出文件,必须设置“codeC”参数值配合使用。
hdfs.codeC
-
文件压缩格式,包括gzip、bzip2、lzo、lzop、snappy。
hdfs.maxOpenFiles
5000
最大允许打开的hdfs文件数,当打开的文件数达到该值时,最早打开的文件将会被关闭。
hdfs.writeFormat
Writable
文件写入格式,“Writable”或者“Text”。
hdfs.callTimeout
10000
写入HDFS超时控制时间,单位:毫秒。
hdfs.threadsPoolSize
-
每个HDFS sink用于HDFS io操作的线程数。
hdfs.rollTimerPoolSize
-
每个HDFS sink用于调度定时文件滚动的线程数。
hdfs.round
false
时间戳是否四舍五入。如果设置为true,则会影响所有基于时间的转义序列(%t除外)。
hdfs.roundUnit
second
时间戳四舍五入单位,可选为“second”、“minute”或“hour”,分别对应为秒、分钟和小时。
hdfs.useLocalTimeStamp
true
是否启用本地时间戳,建议设置为“true”。
hdfs.closeTries
0
hdfs sink尝试关闭重命名文件的最大次数。默认为0表示sink会一直尝试重命名,直至重命名成功。
hdfs.retryInterval
180
尝试关闭hdfs文件的时间间隔,单位:秒。
说明:每个关闭请求都会有多个RPC往返Namenode,因此设置的太低可能导致Namenode超负荷。如果设置0,如果第一次尝试失败的话,该Sink将不会尝试关闭文件,并且把文件打开,或者用“.tmp”作为扩展名。
hdfs.failcount
10
数据写入hdfs失败的次数。该参数作为sink写入hdfs失败次数的阈值,当超过该阈值后上报数据传输异常告警。
- Avro Sink
Avro Sink把events转化为Avro events并发送到配置的主机的监测端口。常用配置如下表所示:
表17 Avro Sink常用配置 参数
默认值
描述
channel
-
与之相连的channel。
type
-
avro sink的类型,必须设置为avro。
hostname
-
绑定的主机名/IP。
port
-
监测端口,该端口需未被占用。
batch-size
1000
批次发送的Event个数。
client.type
DEFAULT
客户端实例类型,根据所配置的模型实际使用到的通信协议设置。该值可选值包括:
- DEFAULT,返回AvroRPC类型的客户端实例。
- OTHER,返回NULL。
- THRIFT,返回Thrift RPC类型的客户端实例。
- DEFAULT_LOADBALANCING, 返回LoadBalancing RPC 客户端实例。
- DEFAULT_FAILOVER, 返回Failover RPC 客户端实例。
ssl
false
是否使用SSL加密。设置为true时还必须指定“密钥(keystore)”和“密钥存储密码(keystore-password)”。
truststore-type
JKS
Java信任库类型,“JKS”或“PKCS12”。
说明:JKS的密钥库和私钥采用不同的密码进行保护,而PKCS12的密钥库和私钥采用相同密码进行保护。
truststore
-
Java信任库文件。
truststore-password
-
Java信任库密码。
keystore-type
JKS
ssl启用后密钥存储类型。
keystore
-
ssl启用后密钥存储文件路径,开启ssl后,该参数必填。
keystore-password
-
ssl启用后密钥存储密码,开启ssl后,该参数必填。
connect-timeout
20000
第一次连接的超时时间,单位:毫秒。
request-timeout
20000
第一次请求后一次请求的最大超时时间,单位:毫秒。
reset-connection-interval
0
一次断开连接后,等待多少时间后进行重新连接,单位:秒。默认为0表示不断尝试。
compression-type
none
批数据压缩类型,“none”或“deflate”,“none”表示不压缩,“deflate”表示压缩。该值必须与AvroSource的compression-type匹配。
compression-level
6
批数据压缩级别(1-9),数值越高,压缩率越高。
exclude-protocols
SSLv3
排除的协议列表,用空格分开。默认排除SSLv3协议。
- HBase Sink
HBase Sink将数据写入到HBase中。常用配置如下表所示:
表18 HBase Sink常用配置 参数
默认值
描述
channel
-
与之相连的channel。
type
-
hbase sink的类型,必须设置为hbase。
table
-
HBase表名称。
columnFamily
-
HBase列族。
monTime
0(不开启)
线程监控阈值,更新时间超过阈值后,重新启动该Sink,单位:秒。
batchSize
1000
批次写入HBase的Event个数。
kerberosPrincipal
-
认证HBase的Kerberos principal,普通模式集群不配置,安全模式集群必须配置。
kerberosKeytab
-
认证HBase的Kerberos keytab,普通模式集群不配置,安全模式集群中,flume运行用户必须对jaas.cof文件中的keyTab路径有访问权限。
coalesceIncrements
true
是否在同一个处理批次中,合并对同一个hbase cell多个操作。设置为true有利于提高性能。
- Kafka Sink
Kafka Sink将数据写入到Kafka中。常用配置如下表所示:
表19 Kafka Sink常用配置 参数
默认值
描述
channel
-
与之相连的channel。
type
-
kafka sink的类型,必须设置为org.apache.flume.sink.kafka.KafkaSink。
kafka.bootstrap.servers
-
Kafka 的bootstrap 地址端口列表。如果集群安装有kafka并且配置已经同步,服务端可以不配置此项,默认值为Kafka集群中所有的broker列表,客户端必须配置该项,多个用逗号分隔。端口和安全协议的匹配规则必须为:21007匹配安全模式(SASL_PLAINTEXT),9092匹配普通模式(PLAINTEXT)。
monTime
0(不开启)
线程监控阈值,更新时间超过阈值后,重新启动该Sink,单位:秒。
kafka.producer.acks
1
必须收到多少个replicas的确认信息才认为写入成功。0表示不需要接收确认信息,1表示只等待leader的确认信息。-1表示等待所有的relicas的确认信息。设置为-1,在某些leader失败的场景中可以避免数据丢失。
kafka.topic
-
数据写入的topic,必须填写。
allowTopicOverride
false
是否将Event Header中保存的topic替换kafka.topic中配置的topic。
flumeBatchSize
1000
批次写入Kafka的Event个数。
kafka.security.protocol
SASL_PLAINTEXT
Kafka安全协议,普通模式集群下须配置为“PLAINTEXT”。端口和安全协议的匹配规则必须为:21007匹配安全模式(SASL_PLAINTEXT),9092匹配普通模式(PLAINTEXT)。
ignoreLongMessage
false
是否丢弃超大消息的开关。
messageMaxLength
1000012
Flume写入Kafka的消息的最大长度。
defaultPartitionId
-
用于指定channel中的events被传输到哪一个Kafka partition ID ,此值会被partitionIdHeader覆盖。默认情况下,如果此参数不设置,会由Kafka Producer's partitioner 进行events分发(可以通过指定key或者kafka.partitioner.class自定义的partitioner)。
partitionIdHeader
-
设置时,对应的Sink 将从Event 的Header中获取使用此属性的值命名的字段的值,并将消息发送到主题的指定分区。 如果该值无对应的有效分区,则会发生EventDeliveryException。 如果Header 值已经存在,则此设置将覆盖参数defaultPartitionId。
Other Kafka Producer Properties
-
其他Kafka配置,可以接受任意Kafka支持的生产配置,配置需要加前缀 .kafka。
- Thrift Sink
Thrift Sink把events转化为Thrift events并发送到配置的主机的监测端口。常用配置如下表所示:
表20 Thrift Sink常用配置 参数
默认值
描述
channel
-
与之相连的channel。
type
thrift
thrift sink的类型,必须设置为thrift。
hostname
-
绑定的主机名/IP。
port
-
监测端口,该端口需未被占用。
batch-size
1000
批次发送的Event个数。
connect-timeout
20000
第一次连接的超时时间,单位:毫秒。
request-timeout
20000
第一次请求后一次请求的最大超时时间,单位:毫秒。
kerberos
false
是否启用Kerberos认证。
client-keytab
-
客户端使用的keytab文件地址,flume运行用户必须对认证文件具有访问权限。
client-principal
-
客户端使用的安全用户的Principal。
server-principal
-
服务端使用的安全用户的Principal。
compression-type
none
Flume发送数据的压缩类型,“none”或“deflate”,“none”表示不压缩,“deflate”表示压缩。
maxConnections
5
Flume发送数据时的最大连接池大小。
ssl
false
是否使用SSL加密。
truststore-type
JKS
Java信任库类型。
truststore
-
Java信任库文件。
truststore-password
-
Java信任库密码。
reset-connection-interval
0
一次断开连接后,等待多少时间后进行重新连接,单位:秒。默认为0表示不断尝试。
注意事项
- Flume可靠性保障措施有哪些?
- Source&Channel、Channel&Sink之间的事务机制。
- Sink Processor支持配置failover、load_blance机制,例如负载均衡示例如下。
server.sinkgroups=g1 server.sinkgroups.g1.sinks=k1 k2 server.sinkgroups.g1.processor.type=load_balance server.sinkgroups.g1.processor.backoff=true server.sinkgroups.g1.processor.selector=random
- Flume多agent聚合级联时的注意事项?
- 级联时需要使用Avro或者Thrift协议进行级联。
- 聚合端存在多个节点时,连接配置尽量配置均衡,不要聚合到单节点上。