Flume业务配置指南
该操作指导用户完成Flume常用业务的配置。
- 各个表格中所示参数,黑体加粗的参数为必选参数。
- Sink的BatchSize参数必须小于Channel的transactionCapacity。
- 集群Flume配置工具界面篇幅有限,Source、Channel、Sink只展示部分参数,详细请参考如下常用配置。
- 集群Flume配置工具界面上所展示Customer Source、Customer Channel及Customer Sink需要用户根据自己开发的代码来进行配置,下述常用配置不再展示。
常用Source配置
- Avro Source
Avro Source监测Avro端口,接收外部Avro客户端数据并放入配置的Channel中。常用配置如下表所示:
表1 Avro Source常用配置 参数
默认值
描述
channels
-
与之相连的channel,可以配置多个。
type
avro
avro source的类型,必须为avro。
bind
-
监测主机名/IP。
port
-
绑定监测端口,该端口需未被占用。
threads
-
source工作的最大线程数。
compression-type
none
消息压缩格式:“none”或“deflate”。“none”表示不压缩,“deflate”表示压缩。
compression-level
6
数据压缩级别(1-9),数值越高,压缩率越高。
ssl
false
是否使用SSL加密。设置为true时还必须指定“密钥(keystore)”和“密钥存储密码(keystore-password)”。
truststore-type
JKS
Java信任库类型,“JKS”或“PKCS12”。
说明:JKS的密钥库和私钥采用不同的密码进行保护,而PKCS12的密钥库和私钥采用相同密码进行保护。
truststore
-
Java信任库文件。
truststore-password
-
Java信任库密码。
keystore-type
JKS
ssl启用后密钥存储类型,“JKS”或“PKCS12”。
说明:JKS的密钥库和私钥用不同的密码进行保护,而PKCS12的密钥库和私钥用相同密码进行保护。
keystore
-
ssl启用后密钥存储文件路径,开启ssl后,该参数必填。
keystore-password
-
ssl启用后密钥存储密码,开启ssl后,该参数必填。
trust-all-certs
false
是否关闭SSL server证书检查。设置为“true”时将不会检查远端source的SSL server证书,不建议在生产中使用。
exclude-protocols
SSLv3
排除的协议列表,用空格分开。默认排除SSLv3协议。
ipFilter
false
是否开启ip过滤。
ipFilter.rules
-
定义N网络的ipFilters,多个主机或IP地址用逗号分割。ipFilter设置为“true”时,配置规则有允许和禁止两种,配置格式如下:
ipFilterRules=allow:ip:127.*, allow:name:localhost, deny:ip:*
- SpoolDir Source
Spool Dir Source监控并传输目录下新增的文件,可实现实时数据传输。常用配置如下表所示:
表2 Spooling Directory Source常用配置 参数
默认值
描述
channels
-
与之相连的channel,可以配置多个。
type
spooldir
spooling source的类型,必须设置为spooldir。
spoolDir
-
Spooldir source的监控目录,flume运行用户需要对该目录具有可读可写可执行权限。
monTime
0(不开启)
线程监控阈值,更新时间超过阈值后,重新启动该Source,单位:秒。
fileSuffix
.COMPLETED
文件传输完成后添加的后缀。
deletePolicy
never
文件传输完成后源文件删除策略,never或immediate。“never”表示不删除已完成传输的源文件,“immediate”表示传输完成后立刻删除源文件。
ignorePattern
^$
忽略文件的正则表达式表示。默认为“^$”,表示忽略空格。
includePattern
^.*$
包含文件的正则表达式表示。可以与ignorePattern同时使用,如果一个文件既满足ignorePattern也满足includePattern,则该文件会被忽略。另外,以“.”开头的文件不会被过滤。
trackerDir
.flumespool
传输过程中元数据存储路径。
batchSize
1000
批次写入Channel的Event数量。
decodeErrorPolicy
FAIL
编码错误策略。
说明:如果文件中有编码错误,请配置“decodeErrorPolicy”为“REPLACE”或“IGNORE”,Flume遇到编码错误将跳过编码错误,继续采集后续日志。
deserializer
LINE
文件解析器,值为“LINE”或 “BufferedLine”。
- 配置为“LINE”时,对从文件读取的字符逐个转码。
- 配置为“BufferedLine”时,对文件读取的一行或多行的字符进行批量转码,性能更优。
deserializer.maxLineLength
2048
按行解析最大长度。
deserializer.maxBatchLine
1
按行解析最多行数,如果行数设置为多行,maxLineLength也应该设置为相应的倍数。
说明:用户设置Interceptor时,需要考虑多行合并后的场景,否则会造成数据丢失。如果Interceptor无法处理多行合并场景,请将该配置设置为1。
selector.type
replicating
选择器类型,“replicating”或“multiplexing”。“replicating”表示将数据复制多份,分别传递给每一个channel,每个channel接收到的数据都是相同的,而“multiplexing”表示根据event中header的value来选择特定的channel,每个channel中的数据是不同的。
interceptors
-
拦截器。多个拦截器用空格分开。
inputCharset
UTF-8
读取文件的编码格式。须与读取数据源文件编码格式相同,否则字符解析可能会出错。
fileHeader
false
是否把文件名(包含路径)添加到event的header中。
fileHeaderKey
-
设置header中数据存储结构为<key,value>模式,需要fileHeaderKey与fileHeader配合使用。若fileHeader设置为true,可参考如下示例。
示例:将fileHeaderKey定义为file,当读取到文件名为/root/a.txt的内容时,header中以file=/root/a.txt的形式存在。
basenameHeader
false
是否把文件名(不包含路径)添加到event的header中。
basenameHeaderKey
-
设置header中数据存储结构为<key,value>模式,需要basenameHeaderKey与basenameHeader配合使用。若basenameHeader设置为true,可参考如下示例。
示例:将basenameHeaderKey定义为file,当读取到文件名为a.txt的内容时,header中以file=a.txt的形式存在。
pollDelay
500
轮询监控目录下新文件时的时延。单位:毫秒。
recursiveDirectorySearch
false
是否监控配置的目录下子目录中的新文件。
consumeOrder
oldest
监控目录下文件的消耗次序。如果配置为oldest或者youngest,会根据监控目录下文件的最后修改时间来决定,当目录下有大量文件时,会消耗较长时间去寻找oldest或者youngest的文件。需要注意的是,如果配置为random,创建比较早的文件有可能长时间未被读取。如果配置为oldest或者youngest,那么进程会需要较多时间来查找最新的或最旧的文件。可选值:random,youngest,oldest。
maxBackoff
4000
当Channel满了以后,尝试再次去写Channel所等待的最大时间。超过这个时间,则会发生异常。对应的Source会以一个较小的时间开始,然后每尝试一次,该时间数字指数增长直到达到当前指定的值,如果还不能成功写入,则认为失败。时间单位:秒。
emptyFileEvent
true
是否采集空文件信息发送到Sink端,默认值为true,表示将空文件信息发送到Sink端。该参数只对HDFS Sink有效,其他Sink该参数无效。以HDFS Sink为例,当参数为true时,如果spoolDir路径下存在空文件,那么HDFS的hdfs.path路径下就会创建一个同名的空文件。
SpoolDir Source在按行读取过程中会忽略掉每一个event的最后一个换行符,该换行符所占用的数据量指标不会被Flume统计。
- Kafka Source
Kafka Source从Kafka的topic中消费数据,可以设置多个Source消费同一个topic的数据,每个Source会消费topic的不同partitions。常用配置如下表所示:
表3 Kafka Source常用配置 参数
默认值
描述
channels
-
与之相连的channel,可以配置多个。
type
org.apache.flume.source.kafka.KafkaSource
kafka source的类型,必须设置为org.apache.flume.source.kafka.KafkaSource。
kafka.bootstrap.servers
-
Kafka的bootstrap地址端口列表。如果集群已安装Kafka并且配置已经同步,服务端可以不配置此项,默认值为Kafka集群中所有的broker列表。客户端必须配置该项,多个值用逗号分隔。端口和安全协议的匹配规则必须为:21007匹配安全模式(SASL_PLAINTEXT),9092匹配普通模式(PLAINTEXT)。
kafka.topics
-
订阅的Kafka topic列表,用逗号分隔。
kafka.topics.regex
-
符合正则表达式的topic会被订阅,优先级高于“kafka.topics”,如果存在将覆盖“kafka.topics”。
monTime
0(不开启)
线程监控阈值,更新时间超过阈值后,重新启动该Source,单位:秒。
nodatatime
0(不开启)
告警阈值,从Kafka中订阅不到数据的时长超过阈值时发送告警,单位:秒。该参数可在配置文件properties.properties进行设置。
batchSize
1000
批次写入Channel的Event数量。
batchDurationMillis
1000
批次消费topic数据的最大时长,单位:ms。
keepTopicInHeader
false
是否在Event Header中保存topic。设置为true,则Kafka Sink配置的topic将无效。
setTopicHeader
true
当设置为true时,会将“topicHeader”中定义的topic名称存储到Header中。
topicHeader
topic
当setTopicHeader属性设置为true,此参数用于定义存储接收的topic名称。如果与Kafka Sink的topicHeader属性结合使用,应该注意,避免将消息循环发送到同一主题。
useFlumeEventFormat
false
默认情况下,event会以字节的形式从kafka topic传递到event的body体中。设置为true,则会以Flume的Avro二进制格式来读取Event。与KafkaSink或KakfaChannel 中同名的parseAsFlumeEvent参数一起使用时,会保留从数据源产生的任何设定的Header。
keepPartitionInHeader
false
是否在Event Header中保存partitionID。设置为true,则Kafka Sink将写入对应的Partition。
kafka.consumer.group.id
flume
Kafka消费组ID。多个源或代理中设置相同的ID表示它们是同一个consumer group。
kafka.security.protocol
SASL_PLAINTEXT
Kafka安全协议,普通模式集群下须配置为“PLAINTEXT”。端口和安全协议的匹配规则必须为:21007匹配安全模式(SASL_PLAINTEXT),9092匹配普通模式(PLAINTEXT)。
Other Kafka Consumer Properties
-
其他Kafka配置,可以接受任意Kafka支持的消费配置,配置需要加前缀“kafka.”。
- Taildir Source
Taildir Source监控目录下文件的变化并自动读取文件内容,可实现实时数据传输,常用配置如下表所示:
表4 Taildir Source常用配置 参数
默认值
描述
channels
-
与之相连的channel,可以配置多个。
type
TAILDIR
taildir source的类型,必须为TAILDIR。
filegroups
-
设置采集文件目录分组名字,分组名字中间使用空格间隔。
filegroups.<filegroupName>
-
文件路径,需要配置为绝对路径。
filegroups.<filegroupName>.parentDir
-
父目录,需要配置为绝对路径。
filegroups.<filegroupName>.filePattern
-
相对父目录的文件路径,可以包含目录,支持正则表达式,须与父目录联合使用。
positionFile
-
传输过程中元数据存储路径。
headers.<filegroupName>.<headerKey>
-
设置某一个分组采集数据时event中的key-value值。
byteOffsetHeader
false
是否在每一个event头中携带该event在源文件中的位置信息。设置为true,则该信息保存在byteoffset变量中。
maxBatchCount
Long.MAX_VALUE
控制从一个文件中连续读取的最大批次。如果监控目录会一直读取多个文件,且其中一个文件以非常快的速率在写入,那么其他文件可能会无法处理。因为高速写入的这个文件会陷入无限读取的循环中。这种情况下,应该降低此值。
skipToEnd
false
Flume在重启后是否直接定位到文件最新的位置处读取最新的数据。设置为true,则重启后直接定位到文件最新位置读取最新数据。
idleTimeout
120000
设置读取文件的空闲时间,单位:毫秒,如果在该时间内文件内容没有变更,关闭掉该文件,关闭后如果该文件有数据写入,重新打开并读取数据。
writePosInterval
3000
设置将元数据写入到文件的周期,单位:毫秒。
batchSize
1000
批次写入Channel的Event数量。
monTime
0(不开启)
线程监控阈值,更新时间超过阈值后,重新启动该Source,单位:秒。
fileHeader
false
是否把文件名(包含路径)添加到event的header中。
fileHeaderKey
file
设置header中数据存储结构为<key,value>模式,需要fileHeaderKey与fileHeader配合使用。若fileHeader设置为true,可参考如下示例。
示例:将fileHeaderKey定义为file,当读取到文件名为/root/a.txt的内容时,header中以file=/root/a.txt的形式存在。
- Http Source
Http Source接收外部HTTP客户端发送过来的数据,并放入配置的Channel中,常用配置如下表所示:
表5 Http Source常用配置 参数
默认值
描述
channels
-
与之相连的channel,可以配置多个。
type
http
http source的类型,必须为http。
bind
-
监测主机名/IP。
port
-
绑定监测端口,该端口需未被占用。
handler
org.apache.flume.source.http.JSONHandler
http请求的消息解析方式,支持Json格式解析(org.apache.flume.source.http.JSONHandler)和二进制Blob块解析(org.apache.flume.sink.solr.morphline.BlobHandler)。
handler.*
-
设置handler的参数。
exclude-protocols
SSLv3
排除的协议列表,用空格分开。默认排除SSLv3协议。
include-cipher-suites
-
包含的协议列表,用空格分开。如果设置为空,则默认支持所有协议。
enableSSL
false
http协议是否启用SSL。设置为true时还必须指定“密钥(keystore)”和“密钥存储密码(keystore-password)”。
keystore-type
JKS
Keystore类型,可以为JKS或者PKCS12。
keystore
-
http启用SSL后设置keystore的路径。
keystorePassword
-
http启用SSL后设置keystore的密码。
- Thrift Source
Thrift Source监测thrift端口,接收外部Thrift客户端数据并放入配置的Channel中。常用配置如下表所示:
参数
默认值
描述
channels
-
与之相连的channel,可以配置多个。
type
thrift
thrift source的类型,必须设置为thrift。
bind
-
监测主机名/IP。
port
-
绑定监测端口,该端口需未被占用。
threads
-
允许运行的最大的worker线程数目。
kerberos
false
是否启用Kerberos认证。
agent-keytab
-
服务端使用的keytab文件地址,必须使用机机账号。建议使用Flume服务安装目录下flume/conf/flume_server.keytab。
agent-principal
-
服务端使用的安全用户的Principal,必须使用机机账户。建议使用Flume服务默认用户flume_server/hadoop.<系统域名>@<系统域名>
说明:“flume_server/hadoop.<系统域名>”为用户名,用户的用户名所包含的系统域名所有字母为小写。例如“本端域”参数为“9427068F-6EFA-4833-B43E-60CB641E5B6C.COM”,用户名为“flume_server/hadoop.9427068f-6efa-4833-b43e-60cb641e5b6c.com”。
compression-type
none
消息压缩格式:“none”或“deflate”。“none”表示不压缩,“deflate”表示压缩。
ssl
false
是否使用SSL加密。设置为true时还必须指定“密钥(keystore)”和“密钥存储密码(keystore-password)”。
keystore-type
JKS
SSL启用后密钥存储类型。
keystore
-
SSL启用后密钥存储文件路径,开启SSL后,该参数必填。
keystore-password
-
SSL启用后密钥存储密码,开启ssl后,该参数必填。
truststore-type
JKS
Java信任库类型,“JKS”或“PKCS12”。
说明:JKS的密钥库和私钥采用不同的密码进行保护,而PKCS12的密钥库和私钥采用相同密码进行保护。
truststore
-
Java信任库文件。
truststore-password
-
Java信任库密码。
常用Channel配置
- Memory Channel
Memory Channel使用内存作为缓存区,Events存放在内存队列中。常用配置如下表所示:
表6 Memory Channel常用配置 参数
默认值
描述
type
-
memory channel的类型,必须设置为memory。
capacity
10000
缓存在channel中的最大Event数。
transactionCapacity
1000
每次存取的最大Event数。
说明:- 此参数值需要大于source和sink的batchSize。
- 事务缓存容量必须小于或等于Channel缓存容量。
channelfullcount
10
channel full次数,达到该次数后发送告警。
keep-alive
3
当事务缓存或Channel缓存满时,Put、Take线程等待时间。单位:秒。
byteCapacity
JVM最大内存的80%
channel中最多能容纳所有event body的总字节数,默认是 JVM最大可用内存(-Xmx )的80%,单位:bytes。
byteCapacityBufferPercentage
20
channel中字节容量百分比(%)。
- File Channel
File Channel使用本地磁盘作为缓存区,Events存放在设置的dataDirs配置项文件夹中。常用配置如下表所示:
表7 File Channel常用配置 参数
默认值
描述
type
-
file channel的类型,必须设置为file。
checkpointDir
${BIGDATA_DATA_HOME}/hadoop/data1~N/flume/checkpoint
说明:此路径随自定义数据路径变更。
检查点存放路径。
dataDirs
${BIGDATA_DATA_HOME}/hadoop/data1~N/flume/data
说明:此路径随自定义数据路径变更。
数据缓存路径,设置多个路径可提升性能,中间用逗号分开。
maxFileSize
2146435071
单个缓存文件的最大值,单位:bytes。
minimumRequiredSpace
524288000
缓冲区空闲空间最小值,单位:bytes。
capacity
1000000
缓存在channel中的最大Event数。
transactionCapacity
10000
每次存取的最大Event数。
说明:- 此参数值需要大于source和sink的batchSize。
- 事务缓存容量必须小于或等于Channel缓存容量。
channelfullcount
10
channel full次数,达到该次数后发送告警。
useDualCheckpoints
false
是否备份检查点。设置为“true”时,必须设置backupCheckpointDir的参数值。
backupCheckpointDir
-
备份检查点路径。
checkpointInterval
30000
检查点间隔时间,单位:秒。
keep-alive
3
当事务缓存或Channel缓存满时,Put、Take线程等待时间。单位:秒。
use-log-replay-v1
false
是否启用旧的回复逻辑。
use-fast-replay
false
是否使用队列回复。
checkpointOnClose
true
channel关闭时是否创建检查点。
- Memory File Channel
Memory File Channel同时使用内存和本地磁盘作为缓存区,消息可持久化,性能优于File Channel,接近Memory Channel的性能。此Channel目前处于试验阶段,可靠性不够高,不建议在生产环境使用。常用配置如下表所示:
表8 Memory File Channel常用配置 参数
默认值
描述
type
org.apache.flume.channel.MemoryFileChannel
memory file channel的类型,必须设置为“org.apache.flume.channel.MemoryFileChannel”。
capacity
50000
Channel缓存容量:缓存在Channel中的最大Event数。
transactionCapacity
5000
事务缓存容量:一次事务能处理的最大Event数。
说明:- 此参数值需要大于source和sink的batchSize。
- 事务缓存容量必须小于或等于Channel缓存容量。
subqueueByteCapacity
20971520
每个subqueue最多保存多少byte的Event,单位:byte。
Memory File Channel采用queue和subqueue两级缓存,event保存在subqueue,subqueue保存在queue。
subqueue能保存多少event,由“subqueueCapacity”和“subqueueInterval”两个参数决定,“subqueueCapacity”限制subqueue内的Event总容量,“subqueueInterval”限制subqueue保存Event的时长,只有subqueue达到“subqueueCapacity”或“subqueueInterval”上限时,subqueue内的Event才会发往目的地。
说明:“subqueueByteCapacity”必须大于一个batchsize内的Event总容量。
subqueueInterval
2000
每个subqueue最多保存一段多长时间的Event,单位:毫秒。
keep-alive
3
当事务缓存或Channel缓存满时,Put、Take线程等待时间。
单位:秒。
dataDir
-
缓存本地文件存储目录。
byteCapacity
JVM最大内存的80%
Channel缓存容量。
单位:bytes。
compression-type
None
消息压缩格式:“none”或“deflate”。“none”表示不压缩,“deflate”表示压缩。
channelfullcount
10
channel full次数,达到该次数后发送告警。
Memory File Channel配置样例:
server.channels.c1.type = org.apache.flume.channel.MemoryFileChannel server.channels.c1.dataDir = /opt/flume/mfdata server.channels.c1.subqueueByteCapacity = 20971520 server.channels.c1.subqueueInterval=2000 server.channels.c1.capacity = 500000 server.channels.c1.transactionCapacity = 40000
- Kafka Channel
Kafka Channel使用Kafka集群缓存数据,Kafka提供高可用、多副本,以防Flume或Kafka Broker崩溃,Channel中的数据会立即被Sink消费。
表9 Kafka channel 常用配置 Parameter
Default Value
Description
type
-
kafka channel的类型,必须设置为 “org.apache.flume.channel.kafka.KafkaChannel”。
kafka.bootstrap.servers
-
Kafka的bootstrap地址端口列表。
如果集群已安装Kafka并且配置已经同步,则服务端可以不配置此项,默认值为Kafka集群中所有的broker列表。客户端必须配置该项,多个值用逗号分隔。端口和安全协议的匹配规则必须为:21007匹配安全模式(SASL_PLAINTEXT),9092匹配普通模式(PLAINTEXT)。
kafka.topic
flume-channel
channel用来缓存数据的topic。
kafka.consumer.group.id
flume
从kafka中获取数据的组标识,此参数不能为空。
parseAsFlumeEvent
true
是否解析为Flume event。
migrateZookeeperOffsets
true
当Kafka没有存储offset时,是否从ZooKeeper中查找,并提交到Kafka。
kafka.consumer.auto.offset.reset
latest
当没有offset记录时从什么位置消费,可选为“earliest”、“latest”或“none”。“earliest”表示将offset重置为初始点,“latest”表示将offset置为最新位置点,“none”表示若没有offset则发生异常。
kafka.producer.security.protocol
SASL_PLAINTEXT
Kafka生产安全协议。端口和安全协议的匹配规则必须为:21007匹配安全模式(SASL_PLAINTEXT),9092匹配普通模式(PLAINTEXT)。
说明:若该参数没有显示,请单击弹窗左下角的"+"显示全部参数。
kafka.consumer.security.protocol
SASL_PLAINTEXT
同上,但用于消费。端口和安全协议的匹配规则必须为:21007匹配安全模式(SASL_PLAINTEXT),9092匹配普通模式(PLAINTEXT)。
pollTimeout
500
consumer调用poll()函数能接受的最大超时时间,单位:毫秒。
ignoreLongMessage
false
是否丢弃超大消息。
messageMaxLength
1000012
Flume写入Kafka的消息的最大长度。
常用Sink配置
- HDFS Sink
HDFS Sink将数据写入Hadoop分布式文件系统(HDFS)。常用配置如下表所示:
表10 HDFS Sink常用配置 参数
默认值
描述
channel
-
与之相连的channel。
type
hdfs
HDFS sink的类型,必须设置为HDFS。
hdfs.path
-
HDFS上数据存储路径,必须以“hdfs://hacluster/”开头。
monTime
0(不开启)
线程监控阈值,更新时间超过阈值后,重新启动该Sink,单位:秒。
hdfs.inUseSuffix
.tmp
正在写入的HDFS文件后缀。
hdfs.rollInterval
30
按时间滚动文件,单位:秒,同时需将“hdfs.fileCloseByEndEvent”设置为“false”。
hdfs.rollSize
1024
按大小滚动文件,单位:bytes,同时需将“hdfs.fileCloseByEndEvent”设置为“false”。
hdfs.rollCount
10
按Event个数滚动文件,同时需将“hdfs.fileCloseByEndEvent”设置为“false”。
说明:参数“rollInterval”、“rollSize”和“rollCount”可同时配置,三个参数采取优先原则,哪个参数值先满足,优先按照哪个参数进行压缩。
hdfs.idleTimeout
0
自动关闭空闲文件超时时间,单位:秒。
hdfs.batchSize
1000
批次写入HDFS的Event个数。
hdfs.kerberosPrincipal
-
认证HDFS的Kerberos principal,普通模式集群不配置,安全模式集群必须配置。
hdfs.kerberosKeytab
-
认证HDFS的Kerberos keytab,普通模式集群不配置,安全模式集群中,用户必须对jaas.cof文件中的keyTab路径有访问权限。
hdfs.fileCloseByEndEvent
true
收到源文件的最后一个Event时是否关闭HDFS文件。
hdfs.batchCallTimeout
-
批次写入HDFS超时控制时间,单位:毫秒。
当不配置此参数时,对每个Event写入HDFS进行超时控制。当“hdfs.batchSize”大于0时,配置此参数可以提升写入HDFS性能。
说明:“hdfs.batchCallTimeout”设置多长时间需要考虑“hdfs.batchSize”的大小,“hdfs.batchSize”越大,“hdfs.batchCallTimeout”也要调整更长时间,设置过短时间容易导致写HDFS失败。
serializer.appendNewline
true
将一个Event写入HDFS后是否追加换行符('\n'),如果追加该换行符,该换行符所占用的数据量指标不会被HDFS Sink统计。
hdfs.filePrefix
over_%{basename}
数据写入HDFS后文件名的前缀。默认值为“over_%{basename}”,其中“basename”可自定义,为存储在event的header中的文件名。
当SpoolDir Source中的“basenameHeader”为“true”,且“basenameHeaderKey”为用户自定义的basename的值时生效。
hdfs.fileSuffix
-
数据写入HDFS后文件名的后缀。
hdfs.inUsePrefix
-
正在写入的HDFS文件前缀。
hdfs.fileType
DataStream
HDFS文件格式,包括“SequenceFile”、“DataStream”以及“CompressedStream”。
说明:“SequenceFile”和“DataStream”不压缩输出文件,不能设置参数“codeC”,“CompressedStream”压缩输出文件,必须设置“codeC”参数值配合使用。
hdfs.codeC
-
文件压缩格式,包括gzip、bzip2、lzo、lzop、snappy。
hdfs.maxOpenFiles
5000
最大允许打开的HDFS文件数,当打开的文件数达到该值时,最早打开的文件将会被关闭。
hdfs.writeFormat
Writable
文件写入格式,“Writable”或者“Text”。
hdfs.callTimeout
10000
写入HDFS超时控制时间,单位:毫秒。
hdfs.threadsPoolSize
-
每个HDFS sink用于HDFS io操作的线程数。
hdfs.rollTimerPoolSize
-
每个HDFS sink用于调度定时文件滚动的线程数。
hdfs.round
false
时间戳是否四舍五入。若设置为true,则会影响所有基于时间的转义序列(%t除外)。
hdfs.roundUnit
second
时间戳四舍五入单位,可选为“second”、“minute”或“hour”,分别对应为秒、分钟和小时。
hdfs.useLocalTimeStamp
true
是否启用本地时间戳,建议设置为“true”。
hdfs.closeTries
0
HDFS sink尝试关闭重命名文件的最大次数。默认为0表示sink会一直尝试重命名,直至重命名成功。
hdfs.retryInterval
180
尝试关闭HDFS文件的时间间隔,单位:秒。
说明:每个关闭请求都会有多个RPC往返Namenode,因此设置的太低可能导致Namenode超负荷。如果设置0,如果第一次尝试失败的话,该Sink将不会尝试关闭文件,并且把文件打开,或者用“.tmp”作为扩展名。
hdfs.failcount
10
数据写入HDFS失败的次数。该参数作为sink写入HDFS失败次数的阈值,当超过该阈值后上报数据传输异常告警。
- Avro Sink
Avro Sink把events转化为Avro events并发送到配置的主机的监测端口。常用配置如下表所示:
表11 Avro Sink常用配置 参数
默认值
描述
channel
-
与之相连的channel。
type
-
avro sink的类型,必须设置为avro。
hostname
-
绑定的主机名/IP。
port
-
监测端口,该端口需未被占用。
batch-size
1000
批次发送的Event个数。
client.type
DEFAULT
客户端实例类型,根据所配置的模型实际使用到的通信协议设置。该值可选值包括:
- DEFAULT,返回AvroRPC类型的客户端实例。
- OTHER,返回NULL。
- THRIFT,返回Thrift RPC类型的客户端实例。
- DEFAULT_LOADBALANCING, 返回LoadBalancing RPC 客户端实例。
- DEFAULT_FAILOVER, 返回Failover RPC 客户端实例。
ssl
false
是否使用SSL加密。设置为true时还必须指定“密钥(keystore)”和“密钥存储密码(keystore-password)”。
truststore-type
JKS
Java信任库类型,“JKS”或“PKCS12”。
说明:JKS的密钥库和私钥采用不同的密码进行保护,而PKCS12的密钥库和私钥采用相同密码进行保护。
truststore
-
Java信任库文件。
truststore-password
-
Java信任库密码。
keystore-type
JKS
ssl启用后密钥存储类型。
keystore
-
ssl启用后密钥存储文件路径,开启ssl后,该参数必填。
keystore-password
-
ssl启用后密钥存储密码,开启ssl后,该参数必填。
connect-timeout
20000
第一次连接的超时时间,单位:毫秒。
request-timeout
20000
第一次请求后一次请求的最大超时时间,单位:毫秒。
reset-connection-interval
0
一次断开连接后,等待多少时间后进行重新连接,单位:秒。默认为0表示不断尝试。
compression-type
none
批数据压缩类型,“none”或“deflate”,“none”表示不压缩,“deflate”表示压缩。该值必须与AvroSource的compression-type匹配。
compression-level
6
批数据压缩级别(1-9),数值越高,压缩率越高。
exclude-protocols
SSLv3
排除的协议列表,用空格分开。默认排除SSLv3协议。
- HBase Sink
HBase Sink将数据写入到HBase中。常用配置如下表所示:
表12 HBase Sink常用配置 参数
默认值
描述
channel
-
与之相连的channel。
type
-
hbase sink的类型,必须设置为hbase。
table
-
HBase表名称。
columnFamily
-
HBase列族。
monTime
0(不开启)
线程监控阈值,更新时间超过阈值后,重新启动该Sink,单位:秒。
batchSize
1000
批次写入HBase的Event个数。
kerberosPrincipal
-
认证HBase的Kerberos principal,普通模式集群不配置,安全模式集群必须配置。
kerberosKeytab
-
认证HBase的Kerberos keytab,普通模式集群不配置,安全模式集群中,flume运行用户必须对jaas.cof文件中的keyTab路径有访问权限。
coalesceIncrements
true
是否在同一个处理批次中,合并对同一个hbase cell多个操作。设置为true有利于提高性能。
- Kafka Sink
Kafka Sink将数据写入到Kafka中。常用配置如下表所示:
表13 Kafka Sink常用配置 参数
默认值
描述
channel
-
与之相连的channel。
type
-
kafka sink的类型,必须设置为org.apache.flume.sink.kafka.KafkaSink。
kafka.bootstrap.servers
-
Kafka 的bootstrap 地址端口列表。如果集群安装有kafka并且配置已经同步,服务端可以不配置此项,默认值为Kafka集群中所有的broker列表,客户端必须配置该项,多个用逗号分隔。端口和安全协议的匹配规则必须为:21007匹配安全模式(SASL_PLAINTEXT),9092匹配普通模式(PLAINTEXT)。
monTime
0(不开启)
线程监控阈值,更新时间超过阈值后,重新启动该Sink,单位:秒。
kafka.producer.acks
1
必须收到多少个replicas的确认信息才认为写入成功。0表示不需要接收确认信息,1表示只等待leader的确认信息。-1表示等待所有的relicas的确认信息。设置为-1,在某些leader失败的场景中可以避免数据丢失。
kafka.topic
-
数据写入的topic,必须填写。
allowTopicOverride
false
是否将Event Header中保存的topic替换kafka.topic中配置的topic。
flumeBatchSize
1000
批次写入Kafka的Event个数。
kafka.security.protocol
SASL_PLAINTEXT
Kafka安全协议,普通模式集群下须配置为“PLAINTEXT”。端口和安全协议的匹配规则必须为:21007匹配安全模式(SASL_PLAINTEXT),9092匹配普通模式(PLAINTEXT)。
ignoreLongMessage
false
是否丢弃超大消息的开关。
messageMaxLength
1000012
Flume写入Kafka的消息的最大长度。
defaultPartitionId
-
用于指定channel中的events被传输到哪一个Kafka partition ID ,此值会被partitionIdHeader覆盖。默认情况下,如果此参数不设置,会由Kafka Producer's partitioner 进行events分发(可以通过指定key或者kafka.partitioner.class自定义的partitioner)。
partitionIdHeader
-
设置时,对应的Sink 将从Event 的Header中获取使用此属性的值命名的字段的值,并将消息发送到主题的指定分区。 如果该值无对应的有效分区,则会发生EventDeliveryException。 如果Header 值已经存在,则此设置将覆盖参数defaultPartitionId。
Other Kafka Producer Properties
-
其他Kafka配置,可以接受任意Kafka支持的生产配置,配置需要加前缀 .kafka。
- Solr Sink
Solr Sink将数据写入到Apache Solr servers中。常用配置如下表所示:
表14 Solr Sink常用配置 参数
默认值
描述
channel
-
与之相连的channel。
type
-
solr sink的类型,必须设置为org.apache.flume.sink.solr.morphline.MorphlineSolrSink。
morphlineFile
-
morphline 配置文件。
batchSize
1000
批次写入Solr的Event个数。
handlerClass
org.apache.flume.sink.solr.morphline.MorphlineHandlerImpl
操作类,需要实现org.apache.flume.sink.solr.morphline.MorphlineHandler。
kerberosPrincipal
-
认证Solr的Kerberos principal,普通模式集群不配置,安全模式集群必须配置。
kerberosKeytab
-
认证Solr的Kerberos keytab,普通模式集群不配置,安全模式集群中,flume运行用户必须对jaas.cof文件中的keyTab路径有访问权限。
batchDurationMillis
1000
channel中数据存储多长时间后开始传输,单位:毫秒。
说明:batchSize与batchDurationMillis采取优先原则,哪个参数值先满足,优先按照哪个参数作为标准进行传输。
Solr Sink 可支持将数据写入Solr中,使用集群中的Solr时,需要注意:
使用的时候需要导入一些Solr相关的jar包。且请使用集群上Solr组件下的相关jar包。请勿使用maven官网上原始的同名jar包。替换到需要测试用的Flume实例的lib包下后,需要重启Flume实例使jar包生效。需要导入jar包(lucene-* 以及solr-*这些jar包需要从集群中solr服务的带有solr-8.11.2的lib路径下拷贝)列表如下:
lucene-analyzers-common-8.11.2.jar
lucene-analyzers-kuromoji-8.11.2.jar
lucene-analyzers-phonetic-8.11.2.jar
lucene-backward-codecs-8.11.2.jar
lucene-classification-8.11.2.jar
lucene-codecs-8.11.2.jar
lucene-core-8.11.2.jar
lucene-expressions-8.11.2.jar
lucene-grouping-8.11.2.jar
lucene-highlighter-8.11.2.jar
lucene-join-8.11.2.jar
lucene-memory-8.11.2.jar
lucene-misc-8.11.2.jar
lucene-queries-8.11.2.jar
lucene-queryparser-8.11.2.jar
lucene-sandbox-8.11.2.jar
lucene-spatial-extras-8.11.2.jar
lucene-suggest-8.11.2.jar
httpmime-4.3.1.jar
noggit-0.5.jar
solr-cell-8.11.2.jar
solr-core-8.11.2.jar
solr-morphlines-core-8.11.2.jar
solr-solrj-8.11.2.jar
- Thrift Sink
Thrift Sink把events转化为Thrift events并发送到配置的主机的监测端口。常用配置如下表所示:
表15 Thrift Sink常用配置 参数
默认值
描述
channel
-
与之相连的channel。
type
thrift
thrift sink的类型,必须设置为thrift。
hostname
-
绑定的主机名/IP。
port
-
监测端口,该端口需未被占用。
batch-size
1000
批次发送的Event个数。
connect-timeout
20000
第一次连接的超时时间,单位:毫秒。
request-timeout
20000
第一次请求后一次请求的最大超时时间,单位:毫秒。
kerberos
false
是否启用Kerberos认证。
client-keytab
-
客户端使用的keytab文件地址,flume运行用户必须对认证文件具有访问权限。
client-principal
-
客户端使用的安全用户的Principal。
server-principal
-
服务端使用的安全用户的Principal。
compression-type
none
Flume发送数据的压缩类型,“none”或“deflate”,“none”表示不压缩,“deflate”表示压缩。
maxConnections
5
Flume发送数据时的最大连接池大小。
ssl
false
是否使用SSL加密。
truststore-type
JKS
Java信任库类型。
truststore
-
Java信任库文件。
truststore-password
-
Java信任库密码。
reset-connection-interval
0
一次断开连接后,等待多少时间后进行重新连接,单位:秒。默认为0表示不断尝试。
注意事项
- Flume可靠性保障措施有哪些?
- Source&Channel、Channel&Sink之间的事务机制。
- Sink Processor支持配置failover、load_blance机制,例如负载均衡示例如下。
server.sinkgroups=g1 server.sinkgroups.g1.sinks=k1 k2 server.sinkgroups.g1.processor.type=load_balance server.sinkgroups.g1.processor.backoff=true server.sinkgroups.g1.processor.selector=random
- Flume多agent聚合级联时的注意事项?
- 级联时需要使用Avro或者Thrift协议进行级联。
- 聚合端存在多个节点时,连接配置尽量配置均衡,不要聚合到单节点上。