更新时间:2024-05-11 GMT+08:00

Flume业务配置指南

本章节适用于MRS 3.x及之后版本。

该操作指导用户完成Flume常用业务的配置。

  • 各个表格中所示参数,黑体加粗的参数为必选参数。
  • Sink的BatchSize参数必须小于Channel的transactionCapacity。
  • 集群Flume配置工具界面篇幅有限,Source、Channel、Sink只展示部分参数,详细请参考如下常用配置。
  • 集群Flume配置工具界面上所展示Customer Source、Customer Channel及Customer Sink需要用户根据自己开发的代码来进行配置,下述常用配置不再展示。

常用Source配置

  • Avro Source

    Avro Source监听Avro端口,接收外部Avro客户端数据并放入配置的Channel中。常用配置如下表所示:

    表1 Avro Source常用配置

    参数

    默认值

    描述

    channels

    -

    与之相连的channel,可以配置多个。

    type

    avro

    avro source的类型,必须为avro。

    bind

    -

    监听主机名/IP。

    port

    -

    绑定监听端口,该端口需未被占用。

    threads

    -

    source工作的最大线程数。

    compression-type

    none

    消息压缩格式:“none”或“deflate”。“none”表示不压缩,“deflate”表示压缩。

    compression-level

    6

    数据压缩级别(1-9),数值越高,压缩率越高。

    ssl

    false

    是否使用SSL加密。设置为true时还必须指定“密钥(keystore)”和“密钥存储密码(keystore-password)”。

    truststore-type

    JKS

    Java信任库类型,“JKS”或“PKCS12”。

    说明:

    JKS的密钥库和私钥采用不同的密码进行保护,而PKCS12的密钥库和私钥采用相同密码进行保护。

    truststore

    -

    Java信任库文件。

    truststore-password

    -

    Java信任库密码。

    keystore-type

    JKS

    ssl启用后密钥存储类型,“JKS”或“PKCS12”。

    说明:

    JKS的密钥库和私钥用不同的密码进行保护,而PKCS12的密钥库和私钥用相同密码进行保护。

    keystore

    -

    ssl启用后密钥存储文件路径,开启ssl后,该参数必填。

    keystore-password

    -

    ssl启用后密钥存储密码,开启ssl后,该参数必填。

    trust-all-certs

    false

    是否关闭SSL server证书检查。设置为“true”时将不会检查远端source的SSL server证书,不建议在生产中使用。

    exclude-protocols

    SSLv3

    排除的协议列表,用空格分开。默认排除SSLv3协议。

    ipFilter

    false

    是否开启ip过滤。

    ipFilter.rules

    -

    定义N网络的ipFilters,多个主机或IP地址用逗号分割。ipFilter设置为“true”时,配置规则有允许和禁止两种,配置格式如下:

    ipFilterRules=allow:ip:127.*, allow:name:localhost, deny:ip:*

  • SpoolDir Source

    Spool Dir Source监控并传输目录下新增的文件,可实现实时数据传输。常用配置如下表所示:

    表2 Spooling Directory Source常用配置

    参数

    默认值

    描述

    channels

    -

    与之相连的channel,可以配置多个。

    type

    spooldir

    spooling source的类型,必须设置为spooldir。

    spoolDir

    -

    Spooldir source的监控目录,flume运行用户需要对该目录具有可读可写可执行权限。

    monTime

    0(不开启)

    线程监控阈值,更新时间超过阈值后,重新启动该Source,单位:秒。

    fileSuffix

    .COMPLETED

    文件传输完成后添加的后缀。

    deletePolicy

    never

    文件传输完成后源文件删除策略,never或immediate。“never”表示不删除已完成传输的源文件,“immediate”表示传输完成后立刻删除源文件。

    ignorePattern

    ^$

    忽略文件的正则表达式表示。默认为“^$”,表示忽略空格。

    includePattern

    ^.*$

    包含文件的正则表达式表示。可以与ignorePattern同时使用,如果一个文件既满足ignorePattern也满足includePattern,则该文件会被忽略。另外,以“.”开头的文件不会被过滤。

    trackerDir

    .flumespool

    传输过程中元数据存储路径。

    batchSize

    1000

    批次写入Channel的Event数量。

    decodeErrorPolicy

    FAIL

    编码错误策略。

    说明:

    如果文件中有编码错误,请配置“decodeErrorPolicy”为“REPLACE”或“IGNORE”,Flume遇到编码错误将跳过编码错误,继续采集后续日志。

    deserializer

    LINE

    文件解析器,值为“LINE”“BufferedLine”

    • 配置为“LINE”时,对从文件读取的字符逐个转码。
    • 配置为“BufferedLine”时,对文件读取的一行或多行的字符进行批量转码,性能更优。

    deserializer.maxLineLength

    2048

    按行解析最大长度。

    deserializer.maxBatchLine

    1

    按行解析最多行数,如果行数设置为多行,maxLineLength也应该设置为相应的倍数。

    说明:

    用户设置Interceptor时,需要考虑多行合并后的场景,否则会造成数据丢失。如果Interceptor无法处理多行合并场景,请将该配置设置为1。

    selector.type

    replicating

    选择器类型,“replicating”或“multiplexing”。“replicating”表示将数据复制多份,分别传递给每一个channel,每个channel接收到的数据都是相同的,而“multiplexing”表示根据event中header的value来选择特定的channel,每个channel中的数据是不同的。

    interceptors

    -

    拦截器。多个拦截器用空格分开。

    inputCharset

    UTF-8

    读取文件的编码格式。须与读取数据源文件编码格式相同,否则字符解析可能会出错。

    fileHeader

    false

    是否把文件名(包含路径)添加到event的header中。

    fileHeaderKey

    -

    设置header中数据存储结构为<key,value>模式,需要fileHeaderKey与fileHeader配合使用。若fileHeader设置为true,可参考如下示例。

    示例:将fileHeaderKey定义为file,当读取到文件名为/root/a.txt的内容时,header中以file=/root/a.txt的形式存在。

    basenameHeader

    false

    是否把文件名(不包含路径)添加到event的header中。

    basenameHeaderKey

    -

    设置header中数据存储结构为<key,value>模式,需要basenameHeaderKey与basenameHeader配合使用。若basenameHeader设置为true,可参考如下示例。

    示例:将basenameHeaderKey定义为file,当读取到文件名为a.txt的内容时,header中以file=a.txt的形式存在。

    pollDelay

    500

    轮询监控目录下新文件时的时延。单位:毫秒。

    recursiveDirectorySearch

    false

    是否监控配置的目录下子目录中的新文件。

    consumeOrder

    oldest

    监控目录下文件的消耗次序。如果配置为oldest或者youngest,会根据监控目录下文件的最后修改时间来决定,当目录下有大量文件时,会消耗较长时间去寻找oldest或者youngest的文件。需要注意的是,如果配置为random,创建比较早的文件有可能长时间未被读取。如果配置为oldest或者youngest,那么进程会需要较多时间来查找最新的或最旧的文件。可选值:random,youngest,oldest。

    maxBackoff

    4000

    当Channel满了以后,尝试再次去写Channel所等待的最大时间。超过这个时间,则会抛出异常。对应的Source会以一个较小的时间开始,然后每尝试一次,该时间数字指数增长直到达到当前指定的值,如果还不能成功写入,则认为失败。时间单位:秒。

    emptyFileEvent

    true

    是否采集空文件信息发送到Sink端,默认值为true,表示将空文件信息发送到Sink端。该参数只对HDFS Sink有效,其他Sink该参数无效。以HDFS Sink为例,当参数为true时,如果spoolDir路径下存在空文件,那么HDFS的hdfs.path路径下就会创建一个同名的空文件。

    SpoolDir Source在按行读取过程中会忽略掉每一个event的最后一个换行符,该换行符所占用的数据量指标不会被Flume统计。

  • Kafka Source

    Kafka Source从Kafka的topic中消费数据,可以设置多个Source消费同一个topic的数据,每个Source会消费topic的不同partitions。常用配置如下表所示:

    表3 Kafka Source常用配置

    参数

    默认值

    描述

    channels

    -

    与之相连的channel,可以配置多个。

    type

    org.apache.flume.source.kafka.KafkaSource

    kafka source的类型,必须设置为org.apache.flume.source.kafka.KafkaSource。

    kafka.bootstrap.servers

    -

    Kafka的bootstrap地址端口列表。如果集群已安装Kafka并且配置已经同步,服务端可以不配置此项,默认值为Kafka集群中所有的broker列表。客户端必须配置该项,多个值用逗号分隔。端口和安全协议的匹配规则必须为:21007匹配安全模式(SASL_PLAINTEXT),9092匹配普通模式(PLAINTEXT)。

    kafka.topics

    -

    订阅的Kafka topic列表,用逗号分隔。

    kafka.topics.regex

    -

    符合正则表达式的topic会被订阅,优先级高于“kafka.topics”,如果存在将覆盖“kafka.topics”。

    monTime

    0(不开启)

    线程监控阈值,更新时间超过阈值后,重新启动该Source,单位:秒。

    nodatatime

    0(不开启)

    告警阈值,从Kafka中订阅不到数据的时长超过阈值时发送告警,单位:秒。该参数可在配置文件properties.properties进行设置。

    batchSize

    1000

    批次写入Channel的Event数量。

    batchDurationMillis

    1000

    批次消费topic数据的最大时长,单位:ms。

    keepTopicInHeader

    false

    是否在Event Header中保存topic。设置为true,则Kafka Sink配置的topic将无效。

    setTopicHeader

    true

    当设置为true时,会将“topicHeader”中定义的topic名称存储到Header中。

    topicHeader

    topic

    当setTopicHeader属性设置为true,此参数用于定义存储接收的topic名称。如果与Kafka Sink的topicHeader属性结合使用,应该注意,避免将消息循环发送到同一主题。

    useFlumeEventFormat

    false

    默认情况下,event会以字节的形式从kafka topic传递到event的body体中。设置为true,则会以Flume的Avro二进制格式来读取Event。与KafkaSink或KakfaChannel 中同名的parseAsFlumeEvent参数一起使用时,会保留从数据源产生的任何设定的Header。

    keepPartitionInHeader

    false

    是否在Event Header中保存partitionID。设置为true,则Kafka Sink将写入对应的Partition。

    kafka.consumer.group.id

    flume

    Kafka消费组ID。多个源或代理中设置相同的ID表示它们是同一个consumer group。

    kafka.security.protocol

    SASL_PLAINTEXT

    Kafka安全协议,普通模式集群下须配置为“PLAINTEXT”。端口和安全协议的匹配规则必须为:21007匹配安全模式(SASL_PLAINTEXT),9092匹配普通模式(PLAINTEXT)。

    Other Kafka Consumer Properties

    -

    其他Kafka配置,可以接受任意Kafka支持的消费配置,配置需要加前缀“kafka.”。

  • Taildir Source

    Taildir Source监控目录下文件的变化并自动读取文件内容,可实现实时数据传输,常用配置如下表所示:

    表4 Taildir Source常用配置

    参数

    默认值

    描述

    channels

    -

    与之相连的channel,可以配置多个。

    type

    TAILDIR

    taildir source的类型,必须为TAILDIR。

    filegroups

    -

    设置采集文件目录分组名字,分组名字中间使用空格间隔。

    filegroups.<filegroupName>.parentDir

    -

    父目录,需要配置为绝对路径。

    filegroups.<filegroupName>.filePattern

    -

    相对父目录的文件路径,可以包含目录,支持正则表达式,须与父目录联合使用。

    positionFile

    -

    传输过程中元数据存储路径。

    headers.<filegroupName>.<headerKey>

    -

    设置某一个分组采集数据时event中的key-value值。

    byteOffsetHeader

    false

    是否在每一个event头中携带该event在源文件中的位置信息。设置为true,则该信息保存在byteoffset变量中。

    maxBatchCount

    Long.MAX_VALUE

    控制从一个文件中连续读取的最大批次。如果监控目录会一直读取多个文件,且其中一个文件以非常快的速率在写入,那么其他文件可能会无法处理。因为高速写入的这个文件会陷入无限读取的循环中。这种情况下,应该降低此值。

    skipToEnd

    false

    Flume在重启后是否直接定位到文件最新的位置处读取最新的数据。设置为true,则重启后直接定位到文件最新位置读取最新数据。

    idleTimeout

    120000

    设置读取文件的空闲时间,单位:毫秒,如果在该时间内文件内容没有变更,关闭掉该文件,关闭后如果该文件有数据写入,重新打开并读取数据。

    writePosInterval

    3000

    设置将元数据写入到文件的周期,单位:毫秒。

    batchSize

    1000

    批次写入Channel的Event数量。

    monTime

    0(不开启)

    线程监控阈值,更新时间超过阈值后,重新启动该Source,单位:秒

    fileHeader

    false

    是否把文件名(包含路径)添加到event的header中。

    fileHeaderKey

    file

    设置header中数据存储结构为<key,value>模式,需要fileHeaderKey与fileHeader配合使用。若fileHeader设置为true,可参考如下示例。

    示例:将fileHeaderKey定义为file,当读取到文件名为/root/a.txt的内容时,header中以file=/root/a.txt的形式存在。

  • Http Source

    Http Source接收外部HTTP客户端发送过来的数据,并放入配置的Channel中,常用配置如下表所示:

    表5 Http Source常用配置

    参数

    默认值

    描述

    channels

    -

    与之相连的channel,可以配置多个。

    type

    http

    http source的类型,必须为http。

    bind

    -

    监听主机名/IP。

    port

    -

    绑定监听端口,该端口需未被占用。

    handler

    org.apache.flume.source.http.JSONHandler

    http请求的消息解析方式,支持Json格式解析(org.apache.flume.source.http.JSONHandler)和二进制Blob块解析(org.apache.flume.sink.solr.morphline.BlobHandler)。

    handler.*

    -

    设置handler的参数。

    exclude-protocols

    SSLv3

    排除的协议列表,用空格分开。默认排除SSLv3协议。

    include-cipher-suites

    -

    包含的协议列表,用空格分开。如果设置为空,则默认支持所有协议。

    enableSSL

    false

    http协议是否启用SSL。设置为true时还必须指定“密钥(keystore)”和“密钥存储密码(keystore-password)”。

    keystore-type

    JKS

    Keystore类型,可以为JKS或者PKCS12。

    keystore

    -

    http启用SSL后设置keystore的路径。

    keystorePassword

    -

    http启用SSL后设置keystore的密码。

  • Thrift Source

    Thrift Source监听thrift端口,接收外部Thrift客户端数据并放入配置的Channel中。常用配置如下表所示:

    参数

    默认值

    描述

    channels

    -

    与之相连的channel,可以配置多个。

    type

    thrift

    thrift source的类型,必须设置为thrift。

    bind

    -

    监听主机名/IP。

    port

    -

    绑定监听端口,该端口需未被占用。

    threads

    -

    允许运行的最大的worker线程数目。

    kerberos

    false

    是否启用Kerberos认证。

    agent-keytab

    -

    服务端使用的keytab文件地址,必须使用机机帐号。建议使用Flume服务安装目录下flume/conf/flume_server.keytab。

    agent-principal

    -

    服务端使用的安全用户的Principal,必须使用机机帐户。建议使用Flume服务默认用户flume_server/hadoop.<系统域名>@<系统域名>

    说明:

    “flume_server/hadoop.<系统域名>为用户名,用户的用户名所包含的系统域名所有字母为小写。例如“本端域”参数为“9427068F-6EFA-4833-B43E-60CB641E5B6C.COM”,用户名为“flume_server/hadoop.9427068f-6efa-4833-b43e-60cb641e5b6c.com”。

    compression-type

    none

    消息压缩格式:“none”或“deflate”。“none”表示不压缩,“deflate”表示压缩。

    ssl

    false

    是否使用SSL加密。设置为true时还必须指定“密钥(keystore)”和“密钥存储密码(keystore-password)”。

    keystore-type

    JKS

    SSL启用后密钥存储类型。

    keystore

    -

    SSL启用后密钥存储文件路径,开启SSL后,该参数必填。

    keystore-password

    -

    SSL启用后密钥存储密码,开启ssl后,该参数必填。

常用Channel配置

  • Memory Channel

    Memory Channel使用内存作为缓存区,Events存放在内存队列中。常用配置如下表所示:

    表6 Memory Channel常用配置

    参数

    默认值

    描述

    type

    -

    memory channel的类型,必须设置为memory。

    capacity

    10000

    缓存在channel中的最大Event数。

    transactionCapacity

    1000

    每次存取的最大Event数。

    说明:
    • 此参数值需要大于source和sink的batchSize。
    • 事务缓存容量必须小于或等于Channel缓存容量。

    channelfullcount

    10

    channel full次数,达到该次数后发送告警。

    keep-alive

    3

    当事务缓存或Channel缓存满时,Put、Take线程等待时间。单位:秒。

    byteCapacity

    JVM最大内存的80%

    channel中最多能容纳所有event body的总字节数,默认是 JVM最大可用内存(-Xmx )的80%,单位:bytes。

    byteCapacityBufferPercentage

    20

    channel中字节容量百分比(%)。

  • File Channel

    File Channel使用本地磁盘作为缓存区,Events存放在设置的dataDirs配置项文件夹中。常用配置如下表所示:

    表7 File Channel常用配置

    参数

    默认值

    描述

    type

    -

    file channel的类型,必须设置为file。

    checkpointDir

    ${BIGDATA_DATA_HOME}/hadoop/data1~N/flume/checkpoint

    说明:

    此路径随自定义数据路径变更。

    检查点存放路径。

    dataDirs

    ${BIGDATA_DATA_HOME}/hadoop/data1~N/flume/data

    说明:

    此路径随自定义数据路径变更。

    数据缓存路径,设置多个路径可提升性能,中间用逗号分开。

    maxFileSize

    2146435071

    单个缓存文件的最大值,单位:bytes。

    minimumRequiredSpace

    524288000

    缓冲区空闲空间最小值,单位:bytes。

    capacity

    1000000

    缓存在channel中的最大Event数。

    transactionCapacity

    10000

    每次存取的最大Event数。

    说明:
    • 此参数值需要大于source和sink的batchSize。
    • 事务缓存容量必须小于或等于Channel缓存容量。

    channelfullcount

    10

    channel full次数,达到该次数后发送告警。

    useDualCheckpoints

    false

    是否备份检查点。设置为“true”时,必须设置backupCheckpointDir的参数值。

    backupCheckpointDir

    -

    备份检查点路径。

    checkpointInterval

    30000

    检查点间隔时间,单位:秒。

    keep-alive

    3

    当事务缓存或Channel缓存满时,Put、Take线程等待时间。单位:秒。

    use-log-replay-v1

    false

    是否启用旧的回复逻辑。

    use-fast-replay

    false

    是否使用队列回复。

    checkpointOnClose

    true

    channel关闭时是否创建检查点。

  • Memory File Channel

    Memory File Channel同时使用内存和本地磁盘作为缓存区,消息可持久化,性能优于File Channel,接近Memory Channel的性能。此Channel目前处于试验阶段,可靠性不够高,不建议在生产环境使用。常用配置如下表所示:

    表8 Memory File Channel常用配置

    参数

    默认值

    描述

    type

    org.apache.flume.channel.MemoryFileChannel

    memory file channel的类型,必须设置为“org.apache.flume.channel.MemoryFileChannel”

    capacity

    50000

    Channel缓存容量:缓存在Channel中的最大Event数。

    transactionCapacity

    5000

    事务缓存容量:一次事务能处理的最大Event数。

    说明:
    • 此参数值需要大于source和sink的batchSize。
    • 事务缓存容量必须小于或等于Channel缓存容量。

    subqueueByteCapacity

    20971520

    每个subqueue最多保存多少byte的Event,单位:byte。

    Memory File Channel采用queue和subqueue两级缓存,event保存在subqueue,subqueue保存在queue。

    subqueue能保存多少event,由“subqueueCapacity”“subqueueInterval”两个参数决定,“subqueueCapacity”限制subqueue内的Event总容量,“subqueueInterval”限制subqueue保存Event的时长,只有subqueue达到“subqueueCapacity”“subqueueInterval”上限时,subqueue内的Event才会发往目的地。

    说明:

    “subqueueByteCapacity”必须大于一个batchsize内的Event总容量。

    subqueueInterval

    2000

    每个subqueue最多保存一段多长时间的Event,单位:毫秒。

    keep-alive

    3

    当事务缓存或Channel缓存满时,Put、Take线程等待时间。

    单位:秒。

    dataDir

    -

    缓存本地文件存储目录。

    byteCapacity

    JVM最大内存的80%

    Channel缓存容量。

    单位:bytes。

    compression-type

    None

    消息压缩格式:“none”或“deflate”。“none”表示不压缩,“deflate”表示压缩。

    channelfullcount

    10

    channel full次数,达到该次数后发送告警。

    Memory File Channel配置样例:

    server.channels.c1.type = org.apache.flume.channel.MemoryFileChannel
    server.channels.c1.dataDir = /opt/flume/mfdata
    server.channels.c1.subqueueByteCapacity = 20971520
    server.channels.c1.subqueueInterval=2000
    server.channels.c1.capacity = 500000
    server.channels.c1.transactionCapacity = 40000
  • Kafka Channel
    Kafka Channel使用Kafka集群缓存数据,Kafka提供高可用、多副本,以防Flume或Kafka Broker崩溃,Channel中的数据会立即被Sink消费。
    表9 Kafka channel 常用配置

    Parameter

    Default Value

    Description

    type

    -

    kafka channel的类型,必须设置为 “org.apache.flume.channel.kafka.KafkaChannel”

    kafka.bootstrap.servers

    -

    Kafka的bootstrap地址端口列表。

    如果集群已安装Kafka并且配置已经同步,则服务端可以不配置此项,默认值为Kafka集群中所有的broker列表。客户端必须配置该项,多个值用逗号分隔。端口和安全协议的匹配规则必须为:21007匹配安全模式(SASL_PLAINTEXT),9092匹配普通模式(PLAINTEXT)。

    kafka.topic

    flume-channel

    channel用来缓存数据的topic。

    kafka.consumer.group.id

    flume

    从kafka中获取数据的组标识,此参数不能为空。

    parseAsFlumeEvent

    true

    是否解析为Flume event。

    migrateZookeeperOffsets

    true

    当Kafka没有存储offset时,是否从ZooKeeper中查找,并提交到Kafka。

    kafka.consumer.auto.offset.reset

    latest

    当没有offset记录时从什么位置消费,可选为“earliest”、“latest”或“none”。“earliest”表示将offset重置为初始点,“latest”表示将offset置为最新位置点,“none”表示若没有offset则抛出异常。

    kafka.producer.security.protocol

    SASL_PLAINTEXT

    Kafka生产安全协议。端口和安全协议的匹配规则必须为:21007匹配安全模式(SASL_PLAINTEXT),9092匹配普通模式(PLAINTEXT)。

    说明:

    若该参数没有显示,请单击弹窗左下角的"+"显示全部参数。

    kafka.consumer.security.protocol

    SASL_PLAINTEXT

    同上,但用于消费。端口和安全协议的匹配规则必须为:21007匹配安全模式(SASL_PLAINTEXT),9092匹配普通模式(PLAINTEXT)。

    pollTimeout

    500

    consumer调用poll()函数能接受的最大超时时间,单位:毫秒。

    ignoreLongMessage

    false

    是否丢弃超大消息。

    messageMaxLength

    1000012

    Flume写入Kafka的消息的最大长度。

常用Sink配置

  • HDFS Sink

    HDFS Sink将数据写入Hadoop分布式文件系统(HDFS)。常用配置如下表所示:

    表10 HDFS Sink常用配置

    参数

    默认值

    描述

    channel

    -

    与之相连的channel。

    type

    hdfs

    hdfs sink的类型,必须设置为hdfs。

    hdfs.path

    -

    HDFS上数据存储路径,必须以“hdfs://hacluster/”开头。

    monTime

    0(不开启)

    线程监控阈值,更新时间超过阈值后,重新启动该Sink,单位:秒。

    hdfs.inUseSuffix

    .tmp

    正在写入的hdfs文件后缀。

    hdfs.rollInterval

    30

    按时间滚动文件,单位:秒。

    hdfs.rollSize

    1024

    按大小滚动文件,单位:bytes。

    hdfs.rollCount

    10

    按Event个数滚动文件。

    说明:

    参数“rollInterval”、“rollSize”和“rollCount”可同时配置,三个参数采取优先原则,哪个参数值先满足,优先按照哪个参数进行压缩。

    hdfs.idleTimeout

    0

    自动关闭空闲文件超时时间,单位:秒。

    hdfs.batchSize

    1000

    批次写入HDFS的Event个数。

    hdfs.kerberosPrincipal

    -

    认证HDFS的Kerberos principal,普通模式集群不配置,安全模式集群必须配置。

    hdfs.kerberosKeytab

    -

    认证HDFS的Kerberos keytab,普通模式集群不配置,安全模式集群中,用户必须对jaas.cof文件中的keyTab路径有访问权限。

    hdfs.fileCloseByEndEvent

    true

    收到源文件的最后一个Event时是否关闭hdfs文件。

    hdfs.batchCallTimeout

    -

    批次写入HDFS超时控制时间,单位:毫秒。

    当不配置此参数时,对每个Event写入HDFS进行超时控制。当“hdfs.batchSize”大于0时,配置此参数可以提升写入HDFS性能。

    说明:

    “hdfs.batchCallTimeout”设置多长时间需要考虑“hdfs.batchSize”的大小,“hdfs.batchSize”越大,“hdfs.batchCallTimeout”也要调整更长时间,设置过短时间容易导致写HDFS失败。

    serializer.appendNewline

    true

    将一个Event写入HDFS后是否追加换行符('\n'),如果追加该换行符,该换行符所占用的数据量指标不会被HDFS Sink统计。

    hdfs.filePrefix

    over_%{basename}

    数据写入hdfs后文件名的前缀。

    hdfs.fileSuffix

    -

    数据写入hdfs后文件名的后缀。

    hdfs.inUsePrefix

    -

    正在写入的hdfs文件前缀。

    hdfs.fileType

    DataStream

    hdfs文件格式,包括“SequenceFile”、“DataStream”以及“CompressedStream”。

    说明:

    “SequenceFile”和“DataStream”不压缩输出文件,不能设置参数“codeC”,“CompressedStream”压缩输出文件,必须设置“codeC”参数值配合使用。

    hdfs.codeC

    -

    文件压缩格式,包括gzip、bzip2、lzo、lzop、snappy。

    hdfs.maxOpenFiles

    5000

    最大允许打开的hdfs文件数,当打开的文件数达到该值时,最早打开的文件将会被关闭。

    hdfs.writeFormat

    Writable

    文件写入格式,“Writable”或者“Text”。

    hdfs.callTimeout

    10000

    写入HDFS超时控制时间,单位:毫秒。

    hdfs.threadsPoolSize

    -

    每个HDFS sink用于HDFS io操作的线程数。

    hdfs.rollTimerPoolSize

    -

    每个HDFS sink用于调度定时文件滚动的线程数。

    hdfs.round

    false

    时间戳是否四舍五入。若设置为true,则会影响所有基于时间的转义序列(%t除外)。

    hdfs.roundUnit

    second

    时间戳四舍五入单位,可选为“second”、“minute”或“hour”,分别对应为秒、分钟和小时。

    hdfs.useLocalTimeStamp

    true

    是否启用本地时间戳,建议设置为“true”。

    hdfs.closeTries

    0

    hdfs sink尝试关闭重命名文件的最大次数。默认为0表示sink会一直尝试重命名,直至重命名成功。

    hdfs.retryInterval

    180

    尝试关闭hdfs文件的时间间隔,单位:秒。

    说明:

    每个关闭请求都会有多个RPC往返Namenode,因此设置的太低可能导致Namenode超负荷。如果设置0,如果第一次尝试失败的话,该Sink将不会尝试关闭文件,并且把文件打开,或者用“.tmp”作为扩展名。

    hdfs.failcount

    10

    数据写入hdfs失败的次数。该参数作为sink写入hdfs失败次数的阈值,当超过该阈值后上报数据传输异常告警。

  • Avro Sink

    Avro Sink把events转化为Avro events并发送到配置的主机的监听端口。常用配置如下表所示:

    表11 Avro Sink常用配置

    参数

    默认值

    描述

    channel

    -

    与之相连的channel。

    type

    -

    avro sink的类型,必须设置为avro。

    hostname

    -

    绑定的主机名/IP。

    port

    -

    监听端口,该端口需未被占用。

    batch-size

    1000

    批次发送的Event个数。

    client.type

    DEFAULT

    客户端实例类型,根据所配置的模型实际使用到的通信协议设置。该值可选值包括:

    • DEFAULT,返回AvroRPC类型的客户端实例。
    • OTHER,返回NULL。
    • THRIFT,返回Thrift RPC类型的客户端实例。
    • DEFAULT_LOADBALANCING, 返回LoadBalancing RPC 客户端实例。
    • DEFAULT_FAILOVER, 返回Failover RPC 客户端实例。

    ssl

    false

    是否使用SSL加密。设置为true时还必须指定“密钥(keystore)”和“密钥存储密码(keystore-password)”。

    truststore-type

    JKS

    Java信任库类型,“JKS”或“PKCS12”。

    说明:

    JKS的密钥库和私钥采用不同的密码进行保护,而PKCS12的密钥库和私钥采用相同密码进行保护。

    truststore

    -

    Java信任库文件。

    truststore-password

    -

    Java信任库密码。

    keystore-type

    JKS

    ssl启用后密钥存储类型。

    keystore

    -

    ssl启用后密钥存储文件路径,开启ssl后,该参数必填。

    keystore-password

    -

    ssl启用后密钥存储密码,开启ssl后,该参数必填。

    connect-timeout

    20000

    第一次连接的超时时间,单位:毫秒。

    request-timeout

    20000

    第一次请求后一次请求的最大超时时间,单位:毫秒。

    reset-connection-interval

    0

    一次断开连接后,等待多少时间后进行重新连接,单位:秒。默认为0表示不断尝试。

    compression-type

    none

    批数据压缩类型,“none”或“deflate”,“none”表示不压缩,“deflate”表示压缩。该值必须与AvroSource的compression-type匹配。

    compression-level

    6

    批数据压缩级别(1-9),数值越高,压缩率越高。

    exclude-protocols

    SSLv3

    排除的协议列表,用空格分开。默认排除SSLv3协议。

  • HBase Sink

    HBase Sink将数据写入到HBase中。常用配置如下表所示:

    表12 HBase Sink常用配置

    参数

    默认值

    描述

    channel

    -

    与之相连的channel。

    type

    -

    hbase sink的类型,必须设置为hbase。

    table

    -

    HBase表名称。

    columnFamily

    -

    HBase列族。

    monTime

    0(不开启)

    线程监控阈值,更新时间超过阈值后,重新启动该Sink,单位:秒。

    batchSize

    1000

    批次写入HBase的Event个数。

    kerberosPrincipal

    -

    认证HBase的Kerberos principal,普通模式集群不配置,安全模式集群必须配置。

    kerberosKeytab

    -

    认证HBase的Kerberos keytab,普通模式集群不配置,安全模式集群中,flume运行用户必须对jaas.cof文件中的keyTab路径有访问权限。

    coalesceIncrements

    true

    是否在同一个处理批次中,合并对同一个hbase cell多个操作。设置为true有利于提高性能。

  • Kafka Sink

    Kafka Sink将数据写入到Kafka中。常用配置如下表所示:

    表13 Kafka Sink常用配置

    参数

    默认值

    描述

    channel

    -

    与之相连的channel。

    type

    -

    kafka sink的类型,必须设置为org.apache.flume.sink.kafka.KafkaSink。

    kafka.bootstrap.servers

    -

    Kafka 的bootstrap 地址端口列表。如果集群安装有kafka并且配置已经同步,服务端可以不配置此项,默认值为Kafka集群中所有的broker列表,客户端必须配置该项,多个用逗号分隔。端口和安全协议的匹配规则必须为:21007匹配安全模式(SASL_PLAINTEXT),9092匹配普通模式(PLAINTEXT)。

    monTime

    0(不开启)

    线程监控阈值,更新时间超过阈值后,重新启动该Sink,单位:秒。

    kafka.producer.acks

    1

    必须收到多少个replicas的确认信息才认为写入成功。0表示不需要接收确认信息,1表示只等待leader的确认信息。-1表示等待所有的relicas的确认信息。设置为-1,在某些leader失败的场景中可以避免数据丢失。

    kafka.topic

    -

    数据写入的topic,必须填写。

    flumeBatchSize

    1000

    批次写入Kafka的Event个数。

    kafka.security.protocol

    SASL_PLAINTEXT

    Kafka安全协议,普通模式集群下须配置为“PLAINTEXT”。端口和安全协议的匹配规则必须为:21007匹配安全模式(SASL_PLAINTEXT),9092匹配普通模式(PLAINTEXT)。

    ignoreLongMessage

    false

    是否丢弃超大消息的开关。

    messageMaxLength

    1000012

    Flume写入Kafka的消息的最大长度。

    defaultPartitionId

    -

    用于指定channel中的events被传输到哪一个Kafka partition ID ,此值会被partitionIdHeader覆盖。默认情况下,如果此参数不设置,会由Kafka Producer's partitioner 进行events分发(可以通过指定key或者kafka.partitioner.class自定义的partitioner)。

    partitionIdHeader

    -

    设置时,对应的Sink 将从Event 的Header中获取使用此属性的值命名的字段的值,并将消息发送到主题的指定分区。 如果该值无对应的有效分区,则会抛出EventDeliveryException。 如果Header 值已经存在,则此设置将覆盖参数defaultPartitionId。

    Other Kafka Producer Properties

    -

    其他Kafka配置,可以接受任意Kafka支持的生产配置,配置需要加前缀 .kafka。

  • Thrift Sink

    Thrift Sink把events转化为Thrift events并发送到配置的主机的监听端口。常用配置如下表所示:

    表14 Thrift Sink常用配置

    参数

    默认值

    描述

    channel

    -

    与之相连的channel。

    type

    thrift

    thrift sink的类型,必须设置为thrift。

    hostname

    -

    绑定的主机名/IP。

    port

    -

    监听端口,该端口需未被占用。

    batch-size

    1000

    批次发送的Event个数。

    connect-timeout

    20000

    第一次连接的超时时间,单位:毫秒。

    request-timeout

    20000

    第一次请求后一次请求的最大超时时间,单位:毫秒。

    kerberos

    false

    是否启用Kerberos认证。

    client-keytab

    -

    客户端使用的keytab文件地址,flume运行用户必须对认证文件具有访问权限。

    client-principal

    -

    客户端使用的安全用户的Principal。

    server-principal

    -

    服务端使用的安全用户的Principal。

    compression-type

    none

    Flume发送数据的压缩类型,“none”或“deflate”,“none”表示不压缩,“deflate”表示压缩。

    maxConnections

    5

    Flume发送数据时的最大连接池大小。

    ssl

    false

    是否使用SSL加密。

    truststore-type

    JKS

    Java信任库类型。

    truststore

    -

    Java信任库文件。

    truststore-password

    -

    Java信任库密码。

    reset-connection-interval

    0

    一次断开连接后,等待多少时间后进行重新连接,单位:秒。默认为0表示不断尝试。

注意事项

  • Flume可靠性保障措施有哪些?
    • Source&Channel、Channel&Sink之间的事务机制。
    • Sink Processor支持配置failover、load_blance机制,例如负载均衡示例如下。
      server.sinkgroups=g1
      server.sinkgroups.g1.sinks=k1 k2
      server.sinkgroups.g1.processor.type=load_balance
      server.sinkgroups.g1.processor.backoff=true
      server.sinkgroups.g1.processor.selector=random
  • Flume多agent聚合级联时的注意事项?
    • 级联时需要使用Avro或者Thrift协议进行级联。
    • 聚合端存在多个节点时,连接配置尽量配置均衡,不要聚合到单节点上。