Flume常用配置参数
部分参数可在Manager界面配置。
使用Flume需要配置Source、Channel和Sink,各模块配置参数说明可通过本节内容了解。
MRS 3.x及之后版本部分参数可通过Manager界面配置,选择“集群 > 服务 > Flume > 配置工具”,选择要使用的Source、Channel以及Sink,将其拖到右侧的操作界面中,双击对应的Source、Channel以及Sink,根据实际环境可配置Source、Channel和Sink参数。“channels”、“type”等参数仅在客户端配置文件“properties.properties”中进行配置,配置文件路径为“Flume客户端安装目录/fusioninsight-flume-Flume组件版本号/conf/properties.properties”。
部分配置可能需要填写加密后的信息,请参见使用Flume客户端加密工具。
常用Source配置
- Avro Source
Avro Source监测Avro端口,接收外部Avro客户端数据并放入配置的Channel中。常用配置如表1所示:
图1 Avro Source
表1 Avro Source常用配置 参数
默认值
描述
channels
-
与之相连的Channel,可以配置多个。用空格隔开。
在单个代理流程中,是通过channel连接sources和sinks。一个source实例对应多个channels,但一个sink实例只能对应一个channel。
格式如下:
<Agent >.sources.<Source>.channels = <channel1> <channel2> <channel3>...
<Agent >.sinks.<Sink>.channels = <channel1>
仅可在“properties.properties”文件中配置。
type
avro
类型,需设置为“avro”。每一种source的类型都为相应的固定值。
仅可在“properties.properties”文件中配置。
bind
-
绑定和source关联的主机名或IP地址。
port
-
绑定端口号。
ssl
false
是否使用SSL加密。
- true
- false
truststore-type
JKS
Java信任库类型。填写JKS或其他java支持的truststore类型。
truststore
-
Java信任库文件。
truststore-password
-
Java信任库密码。
keystore-type
JKS
密钥存储类型。填写JKS或其他java支持的truststore类型。
keystore
-
密钥存储文件。
keystore-password
-
密钥存储密码。
- SpoolDir Source
SpoolDir Source监控并传输目录下新增的文件,可实现准实时数据传输。常用配置如表 2 Spooling Source常用配置所示:
图2 SpoolDir Source
表2 SpoolDir Source常用配置 参数
默认值
描述
channels
-
与之相连的Channel,可以配置多个。
仅可在“properties.properties”文件中配置。
type
spooldir
类型,需设置为“spooldir”。
仅可在“properties.properties”文件中配置。
monTime
0(不开启)
线程监控阈值,更新时间大于阈值时会重新启动该Source,单位:秒。
spoolDir
-
监控目录。
fileSuffix
.COMPLETED
文件传输完成后添加的后缀。
deletePolicy
never
文件传输完成后源文件删除策略,支持“never”或“immediate”。分别是从不删除和立即删除。
ignorePattern
^$
忽略文件的正则表达式表示。
trackerDir
.flumespool
传输过程中元数据存储路径。
batchSize
1000
Source传输粒度。
decodeErrorPolicy
FAIL
编码错误策略。仅可在“properties.properties”文件中配置。
可选FAIL、REPLACE、IGNORE。
FAIL:发生异常并让解析失败。
REPLACE:将不能识别的字符用其他字符代替,通常是字符U+FFFD。
IGNORE:直接丢弃不能解析的字符串。
说明:如果文件中有编码错误,请配置“decodeErrorPolicy”为“REPLACE”或“IGNORE”,Flume遇到编码错误将跳过编码错误,继续采集后续日志。
deserializer
LINE
文件解析器,值为“LINE”或“BufferedLine”。
- 配置为“LINE”时,对从文件读取的字符逐个转码。
- 配置为“BufferedLine”时,对文件读取的一行或多行的字符进行批量转码,性能更优。
deserializer.maxLineLength
2048
按行解析最大长度。0到 2,147,483,647。
deserializer.maxBatchLine
1
按行解析最多行数,如果行数设置为多行,“maxLineLength”也应该设置为相应的倍数。例如maxBatchLine设置为2,“maxLineLength”相应的设置为2048*2为4096。
selector.type
replicating
选择器类型,支持“replicating”或“multiplexing”。
- “replicating”表示同样的内容会发给每一个channel。
- “multiplexing”表示根据分发规则,有选择地发给某些channel。
interceptors
-
拦截器配置。
仅可在“properties.properties”文件中配置。
Spooling Source在按行读取过程中,会忽略掉每一个Event的最后一个换行符,该换行符所占用的数据量指标不会被Flume统计。
- Kafka Source
Kafka Source从Kafka的topic中消费数据,可以设置多个Source消费同一个topic的数据,每个Source会消费topic的不同partitions。常用配置如表 3 Kafka Source常用配置所示:
图3 Kafka Source
表3 Kafka Source常用配置 参数
默认值
描述
channels
-
与之相连的Channel,可以配置多个。
仅可在“properties.properties”文件中配置。
type
org.apache.flume.source.kafka.KafkaSource
类型,需设置为“org.apache.flume.source.kafka.KafkaSource”。
仅可在“properties.properties”文件中配置。
monTime
0(不开启)
线程监控阈值,更新时间大于阈值时重新启动该Source,单位:秒。
nodatatime
0(不开启)
告警阈值,从Kafka中订阅不到数据的时长大于阈值时发送告警,单位:秒。
batchSize
1000
每次写入Channel的Event数量。
batchDurationMillis
1000
每次消费topic数据的最大时长,单位:毫秒。
keepTopicInHeader
false
是否在Event Header中保存topic,如果保存,Kafka Sink配置的topic将无效。
- true
- false
仅可在“properties.properties”文件中配置。
keepPartitionInHeader
false
是否在Event Header中保存partitionID,如果保存,Kafka Sink将写入对应的Partition。
- true
- false
仅可在“properties.properties”文件中配置。
kafka.bootstrap.servers
-
brokers地址列表,多个地址用英文逗号分隔。
kafka.consumer.group.id
-
Kafka消费者组ID。
kafka.topics
-
订阅的kafka topic列表,用英文逗号分隔。
kafka.topics.regex
-
符合正则表达式的topic会被订阅,优先级高于“kafka.topics”,如果配置将覆盖“kafka.topics”。
kafka.security.protocol
SASL_PLAINTEXT
Kafka安全协议,未启用Kerberos集群中须配置为“PLAINTEXT”。
kafka.kerberos.domain.name
-
此参数的值为Kafka集群中kerberos的“default_realm”,仅安全集群需要配置。
仅可在“properties.properties”文件中配置。
Other Kafka Consumer Properties
-
其他Kafka配置,可以接受任意Kafka支持的消费参数配置,配置需要加前缀“.kafka”。
仅可在“properties.properties”文件中配置。
- Taildir Source
Taildir Source监控目录下文件的变化并自动读取文件内容,可实现实时数据传输,常用配置如表4所示:
图4 Taildir Source
表4 Taildir Source常用配置 参数
默认值
描述
channels
-
与之相连的Channel,可以配置多个。
仅可在“properties.properties”文件中配置。
type
taildir
类型,需配置为“taildir”。
仅可在“properties.properties”文件中配置。
filegroups
-
设置采集文件目录分组名字,分组名字中间使用空格间隔。
filegroups.<filegroupName>.parentDir
-
父目录,需要配置为绝对路径。
仅可在“properties.properties”文件中配置。
filegroups.<filegroupName>.filePattern
-
相对父目录的文件路径,可以包含目录,支持正则表达式,须与父目录联合使用。
仅可在“properties.properties”文件中配置。
positionFile
-
传输过程中元数据存储路径。
headers.<filegroupName>.<headerKey>
-
设置某一个分组采集数据时Event中的key-value值。
仅可在“properties.properties”文件中配置。
byteOffsetHeader
false
是否在每一个Event头中携带该Event在源文件中的位置信息,该信息保存在“byteoffset”变量中。
skipToEnd
false
Flume在重启后是否直接定位到文件最新的位置处,以读取最新的数据。
idleTimeout
120000
设置读取文件的空闲时间,单位:毫秒。如果在该时间内文件内容没有变更,关闭掉该文件,关闭后如果该文件有数据写入,重新打开并读取数据。
writePosInterval
3000
设置将元数据写入到文件的周期,单位:毫秒。
batchSize
1000
批次写入Channel的Event数量。
monTime
0(不开启)
线程监控阈值,更新时间大于阈值时重新启动该Source,单位:秒。
- Http Source
Http Source接收外部HTTP客户端发送过来的数据,并放入配置的Channel中,常用配置如表5所示:
图5 Http Source
表5 Http Source常用配置 参数
默认值
描述
channels
-
与之相连的Channel,可以配置多个。仅可在“properties.properties”文件中配置。
type
http
类型,需配置为“http”。仅可在“properties.properties”文件中配置。
bind
-
绑定关联的主机名或IP地址。
port
-
绑定端口。
handler
org.apache.flume.source.http.JSONHandler
http请求的消息解析方式,支持以下两种:
- “org.apache.flume.source.http.JSONHandler”:表示Json格式解析。
- “org.apache.flume.sink.solr.morphline.BlobHandler”:表示二进制Blob块解析。
handler.*
-
设置handler的参数。
enableSSL
false
http协议是否启用SSL。
keystore
-
http启用SSL后设置keystore的路径。
keystorePassword
-
http启用SSL后设置keystore的密码。
常用Channel配置
- Memory Channel
Memory Channel使用内存作为缓存区,Events存放在内存队列中。常用配置如表6所示:
图6 Memory Channel
- File Channel
File Channel使用本地磁盘作为缓存区,Events存放在设置的“dataDirs”配置项文件夹中。常用配置如表7所示:
图7 File Channel
表7 File Channel常用配置 参数
默认值
描述
type
-
类型,需配置为“file”。仅可在“properties.properties”文件中配置。
checkpointDir
${BIGDATA_DATA_HOME}/flume/checkpoint
检查点存放路径。
dataDirs
${BIGDATA_DATA_HOME}/flume/data
数据缓存路径,设置多个路径可提升性能,中间用逗号分开。
maxFileSize
2146435071
单个缓存文件的最大值,单位:字节。
minimumRequiredSpace
524288000
缓冲区空闲空间最小值,单位:字节。
capacity
1000000
缓存在Channel中的最大Event数。
transactionCapacity
10000
每次存取的最大Event数。
channelfullcount
10
Channel full次数,达到该次数后发送告警。
- Kafka Channel
Kafka Channel使用kafka集群缓存数据,Kafka提供高可用、多副本,以防Flume或Kafka Broker崩溃,Channel中的数据会立即被Sink消费。常用配置如表 10 Kafka Channel 常用配置所示:图8 Kafka Channel
表8 Kafka Channel常用配置 参数
默认值
描述
type
-
类型,需配置为 “org.apache.flume.channel.kafka.KafkaChannel”.。
仅可在“properties.properties”文件中配置。
kafka.bootstrap.servers
-
kafka broker列表。
kafka.topic
flume-channel
Channel用来缓存数据的topic。
kafka.consumer.group.id
flume
Kafka消费者组ID。
parseAsFlumeEvent
true
是否解析为Flume event。
migrateZookeeperOffsets
true
当Kafka没有存储offset时,是否从ZooKeeper中查找,并提交到Kafka。
kafka.consumer.auto.offset.reset
latest
当没有offset记录时,从指定的位置消费数据。
kafka.producer.security.protocol
SASL_PLAINTEXT
Kafka生产者安全协议。
kafka.consumer.security.protocol
SASL_PLAINTEXT
Kafka消费者安全协议。
常用Sink配置
- HDFS Sink
HDFS Sink将数据写入HDFS。常用配置如表9所示:
图9 HDFS Sink
表9 HDFS Sink常用配置 参数
默认值
描述
channel
-
与之相连的Channel。仅可在“properties.properties”文件中配置。
type
hdfs
类型,需配置为“hdfs”。仅可在“properties.properties”文件中配置。
monTime
0(不开启)
线程监控阈值,更新时间大于阈值时重新启动该Sink,单位:秒。
hdfs.path
-
HDFS路径。
hdfs.inUseSuffix
.tmp
正在写入的HDFS文件后缀。
hdfs.rollInterval
30
按时间滚动文件,单位:秒,同时需将“hdfs.fileCloseByEndEvent”设置为“false”。
hdfs.rollSize
1024
按大小滚动文件,单位:字节,同时需将“hdfs.fileCloseByEndEvent”设置为“false”。
hdfs.rollCount
10
按Event个数滚动文件,同时需将“hdfs.fileCloseByEndEvent”设置为“false”。
hdfs.idleTimeout
0
自动关闭空闲文件超时时间,单位:秒。
hdfs.batchSize
1000
每次写入HDFS的Event个数。
hdfs.kerberosPrincipal
-
认证HDFS的Kerberos用户名,未启用Kerberos认证集群不配置。
hdfs.kerberosKeytab
-
认证HDFS的Kerberos keytab路径,未启用Kerberos认证集群不配置
hdfs.fileCloseByEndEvent
true
收到最后一个Event时是否关闭文件。
hdfs.batchCallTimeout
-
每次写入HDFS超时控制时间,单位:毫秒。
当不配置此参数时,对每个Event写入HDFS进行超时控制。当“hdfs.batchSize”大于0时,配置此参数可以提升写入HDFS性能。
说明:“hdfs.batchCallTimeout”设置多长时间需要考虑“hdfs.batchSize”的大小,“hdfs.batchSize”越大,“hdfs.batchCallTimeout”也要调整更长时间,设置过短时间容易导致数据写入HDFS失败。
serializer.appendNewline
true
将一个Event写入HDFS后是否追加换行符('\n'),如果追加该换行符,该换行符所占用的数据量指标不会被HDFS Sink统计。
- Avro Sink
Avro Sink把events转化为Avro events并发送到配置的主机的监测端口。常用配置如表10所示:
图10 Avro Sink
表10 Avro Sink常用配置 参数
默认值
描述
channel
-
与之相连的Channel。仅可在“properties.properties”文件中配置。
type
-
类型,需配置为“avro”。仅可在“properties.properties”文件中配置。
hostname
-
绑定关联的主机名或IP地址。
port
-
监测端口。
batch-size
1000
批次发送的Event个数。
ssl
false
是否使用SSL加密。
truststore-type
JKS
Java信任库类型。
truststore
-
Java信任库文件。
truststore-password
-
Java信任库密码。
keystore-type
JKS
密钥存储类型。
keystore
-
密钥存储文件。
keystore-password
-
密钥存储密码
- HBase Sink
HBase Sink将数据写入到HBase中。常用配置如表11所示:
图11 HBase Sink
表11 HBase Sink常用配置 参数
默认值
描述
channel
-
与之相连的Channel。仅可在“properties.properties”文件中配置。
type
-
类型,需配置为“hbase”。仅可在“properties.properties”文件中配置。
table
-
HBase表名称。
monTime
0(不开启)
线程监控阈值,更新时间大于阈值时重新启动该Sink,单位:秒。
columnFamily
-
HBase列族名称。
batchSize
1000
每次写入HBase的Event个数。
kerberosPrincipal
-
认证HBase的Kerberos用户名,未启用Kerberos认证集群不配置。
kerberosKeytab
-
认证HBase的Kerberos keytab路径,未启用Kerberos认证集群不配置。
- Kafka Sink
Kafka Sink将数据写入到Kafka中。常用配置如表12所示:
图12 Kafka Sink
表12 Kafka Sink常用配置 参数
默认值
描述
channel
-
与之相连的Channel。仅可在“properties.properties”文件中配置。
type
-
类型,需配置为“org.apache.flume.sink.kafka.KafkaSink”。
仅可在“properties.properties”文件中配置。
kafka.bootstrap.servers
-
Kafkabrokers列表,多个用英文逗号分隔。
monTime
0(不开启)
线程监控阈值,更新时间大于阈值时重新启动该Sink,单位:秒。
kafka.topic
default-flume-topic
数据写入的topic。
flumeBatchSize
1000
每次写入Kafka的Event个数。
kafka.security.protocol
SASL_PLAINTEXT
Kafka安全协议,未启用Kerberos认证集群下须配置为“PLAINTEXT”。
kafka.kerberos.domain.name
-
Kafka Domain名称。安全集群必填。仅可在“properties.properties”文件中配置。
Other Kafka Producer Properties
-
其他Kafka配置,可以接受任意Kafka支持的生产参数配置,配置需要加前缀“.kafka”。
仅可在“properties.properties”文件中配置。