更新时间:2024-11-29 GMT+08:00

Flume配置参数说明

基本介绍

使用Flume需要配置Source、Channel和Sink,各模块配置参数说明可通过本节内容了解。

部分参数可通过Manager界面配置,选择“集群 > 服务 > Flume > 配置工具”,选择要使用的Source、Channel以及Sink,将其拖到右侧的操作界面中,双击对应的Source、Channel以及Sink,根据实际环境可配置Source、Channel和Sink参数。“channels”、“type”等参数仅在客户端配置文件“properties.properties”中进行配置,配置文件路径为“Flume客户端安装目录/fusioninsight-flume-Flume组件版本号/conf/properties.properties”。

部分配置可能需要填写加密后的信息,请参见使用Flume客户端加密工具

常用Source配置

  • Avro Source

    Avro Source监测Avro端口,接收外部Avro客户端数据并放入配置的Channel中。常用配置如表1所示:

    表1 Avro Source常用配置

    参数

    默认值

    描述

    channels

    -

    与之相连的Channel,可以配置多个。用空格隔开。

    在单个代理流程中,是通过channel连接sources和sinks。一个source实例对应多个channels,但一个sink实例只能对应一个channel。

    格式如下:

    <Agent >.sources.<Source>.channels = <channel1> <channel2> <channel3>...

    <Agent >.sinks.<Sink>.channels = <channel1>

    仅可在“properties.properties”文件中配置。

    type

    avro

    类型,需设置为“avro”。每一种source的类型都为相应的固定值。

    仅可在“properties.properties”文件中配置。

    bind

    -

    绑定和source关联的主机名或IP地址。

    port

    -

    绑定端口号。

    ssl

    false

    是否使用SSL加密。

    • true
    • false

    truststore-type

    JKS

    Java信任库类型。填写JKS或其他java支持的truststore类型。

    truststore

    -

    Java信任库文件。

    truststore-password

    -

    Java信任库密码。

    keystore-type

    JKS

    密钥存储类型。填写JKS或其他java支持的truststore类型。

    keystore

    -

    密钥存储文件。

    keystore-password

    -

    密钥存储密码。

  • SpoolDir Source

    SpoolDir Source监控并传输目录下新增的文件,可实现准实时数据传输。常用配置如表 2 Spooling Source常用配置所示:

    表2 SpoolDir Source常用配置

    参数

    默认值

    描述

    channels

    -

    与之相连的Channel,可以配置多个。

    仅可在“properties.properties”文件中配置。

    type

    spooldir

    类型,需设置为“spooldir”

    仅可在“properties.properties”文件中配置。

    monTime

    0(不开启)

    线程监控阈值,更新时间大于阈值时会重新启动该Source,单位:秒。

    spoolDir

    -

    监控目录。

    fileSuffix

    .COMPLETED

    文件传输完成后添加的后缀。

    deletePolicy

    never

    文件传输完成后源文件删除策略,支持“never”“immediate”。分别是从不删除和立即删除。

    ignorePattern

    ^$

    忽略文件的正则表达式表示。

    trackerDir

    .flumespool

    传输过程中元数据存储路径。

    batchSize

    1000

    Source传输粒度。

    decodeErrorPolicy

    FAIL

    编码错误策略。仅可在“properties.properties”文件中配置。

    可选FAIL、REPLACE、IGNORE。

    FAIL:发生异常并让解析失败。

    REPLACE:将不能识别的字符用其它字符代替,通常是字符U+FFFD。

    IGNORE:直接丢弃不能解析的字符串。

    说明:

    如果文件中有编码错误,请配置“decodeErrorPolicy”为“REPLACE”或“IGNORE”,Flume遇到编码错误将跳过编码错误,继续采集后续日志。

    deserializer

    LINE

    文件解析器,值为“LINE”“BufferedLine”

    • 配置为“LINE”时,对从文件读取的字符逐个转码。
    • 配置为“BufferedLine”时,对文件读取的一行或多行的字符进行批量转码,性能更优。

    deserializer.maxLineLength

    2048

    按行解析最大长度。0到 2,147,483,647。

    deserializer.maxBatchLine

    1

    按行解析最多行数,如果行数设置为多行,“maxLineLength”也应该设置为相应的倍数。例如maxBatchLine设置为2,“maxLineLength”相应的设置为2048*2为4096。

    selector.type

    replicating

    选择器类型,支持“replicating”“multiplexing”

    • “replicating”表示同样的内容会发给每一个channel。
    • “multiplexing”表示根据分发规则,有选择地发给某些channel。

    interceptors

    -

    拦截器配置。

    仅可在“properties.properties”文件中配置。

    Spooling Source在按行读取过程中,会忽略掉每一个Event的最后一个换行符,该换行符所占用的数据量指标不会被Flume统计。

  • Kafka Source

    Kafka Source从Kafka的topic中消费数据,可以设置多个Source消费同一个topic的数据,每个Source会消费topic的不同partitions。常用配置如表 3 Kafka Source常用配置所示:

    表3 Kafka Source常用配置

    参数

    默认值

    描述

    channels

    -

    与之相连的Channel,可以配置多个。

    仅可在“properties.properties”文件中配置。

    type

    org.apache.flume.source.kafka.KafkaSource

    类型,需设置为“org.apache.flume.source.kafka.KafkaSource”

    仅可在“properties.properties”文件中配置。

    monTime

    0(不开启)

    线程监控阈值,更新时间大于阈值时重新启动该Source,单位:秒。

    nodatatime

    0(不开启)

    告警阈值,从Kafka中订阅不到数据的时长大于阈值时发送告警,单位:秒。

    batchSize

    1000

    每次写入Channel的Event数量。

    batchDurationMillis

    1000

    每次消费topic数据的最大时长,单位:毫秒。

    keepTopicInHeader

    false

    是否在Event Header中保存topic,如果保存,Kafka Sink配置的topic将无效。

    • true
    • false

    仅可在“properties.properties”文件中配置。

    keepPartitionInHeader

    false

    是否在Event Header中保存partitionID,如果保存,Kafka Sink将写入对应的Partition。

    • true
    • false

    仅可在“properties.properties”文件中配置。

    kafka.bootstrap.servers

    -

    brokers地址列表,多个地址用英文逗号分隔。

    kafka.consumer.group.id

    -

    Kafka消费者组ID。

    kafka.topics

    -

    订阅的kafka topic列表,用英文逗号分隔。

    kafka.topics.regex

    -

    符合正则表达式的topic会被订阅,优先级高于“kafka.topics”,如果配置将覆盖“kafka.topics”。

    kafka.security.protocol

    SASL_PLAINTEXT

    Kafka安全协议,未启用Kerberos集群中须配置为“PLAINTEXT”

    kafka.kerberos.domain.name

    -

    此参数的值为Kafka集群中kerberos的“default_realm”,仅安全集群需要配置。

    仅可在“properties.properties”文件中配置。

    Other Kafka Consumer Properties

    -

    其他Kafka配置,可以接受任意Kafka支持的消费参数配置,配置需要加前缀“.kafka”

    仅可在“properties.properties”文件中配置。

  • Taildir Source

    Taildir Source监控目录下文件的变化并自动读取文件内容,可实现实时数据传输,常用配置如表4所示:

    表4 Taildir Source常用配置

    参数

    默认值

    描述

    channels

    -

    与之相连的Channel,可以配置多个。

    仅可在“properties.properties”文件中配置。

    type

    taildir

    类型,需配置为“taildir”

    仅可在“properties.properties”文件中配置。

    filegroups

    -

    设置采集文件目录分组名字,分组名字中间使用空格间隔。

    filegroups.<filegroupName>

    -

    文件路径,需要配置为绝对路径。

    filegroups.<filegroupName>.parentDir

    -

    父目录,需要配置为绝对路径。

    仅可在“properties.properties”文件中配置。

    filegroups.<filegroupName>.filePattern

    -

    相对父目录的文件路径,可以包含目录,支持正则表达式,须与父目录联合使用。

    仅可在“properties.properties”文件中配置。

    positionFile

    -

    传输过程中元数据存储路径。

    headers.<filegroupName>.<headerKey>

    -

    设置某一个分组采集数据时Event中的key-value值。

    仅可在“properties.properties”文件中配置。

    byteOffsetHeader

    false

    是否在每一个Event头中携带该Event在源文件中的位置信息,该信息保存在“byteoffset”变量中。

    skipToEnd

    false

    Flume在重启后是否直接定位到文件最新的位置处,以读取最新的数据。

    idleTimeout

    120000

    设置读取文件的空闲时间,单位:毫秒。如果在该时间内文件内容没有变更,关闭掉该文件,关闭后如果该文件有数据写入,重新打开并读取数据。

    writePosInterval

    3000

    设置将元数据写入到文件的周期,单位:毫秒。

    batchSize

    1000

    批次写入Channel的Event数量。

    monTime

    0(不开启)

    线程监控阈值,更新时间大于阈值时重新启动该Source,单位:秒。

  • Http Source

    Http Source接收外部HTTP客户端发送过来的数据,并放入配置的Channel中,常用配置如表5所示:

    表5 Http Source常用配置

    参数

    默认值

    描述

    channels

    -

    与之相连的Channel,可以配置多个。仅可在“properties.properties”文件中配置。

    type

    http

    类型,需配置为“http”。仅可在“properties.properties”文件中配置。

    bind

    -

    绑定关联的主机名或IP地址。

    port

    -

    绑定端口。

    handler

    org.apache.flume.source.http.JSONHandler

    http请求的消息解析方式,支持以下两种:

    • “org.apache.flume.source.http.JSONHandler”:表示Json格式解析。
    • “org.apache.flume.sink.solr.morphline.BlobHandler”:表示二进制Blob块解析。

    handler.*

    -

    设置handler的参数。

    enableSSL

    false

    http协议是否启用SSL。

    keystore

    -

    http启用SSL后设置keystore的路径。

    keystorePassword

    -

    http启用SSL后设置keystore的密码。

常用Channel配置

  • Memory Channel

    Memory Channel使用内存作为缓存区,Events存放在内存队列中。常用配置如表6所示:

    表6 Memory Channel常用配置

    参数

    默认值

    描述

    type

    -

    类型,需配置为“memory”。仅可在“properties.properties”文件中配置。

    capacity

    10000

    缓存在Channel中的最大Event数。

    transactionCapacity

    1000

    每次存取的最大Event数。

    channelfullcount

    10

    Channel full次数,达到该次数后发送告警。

  • File Channel

    File Channel使用本地磁盘作为缓存区,Events存放在设置的“dataDirs”配置项文件夹中。常用配置如表7所示:

    表7 File Channel常用配置

    参数

    默认值

    描述

    type

    -

    类型,需配置为“file”。仅可在“properties.properties”文件中配置。

    checkpointDir

    ${BIGDATA_DATA_HOME}/flume/checkpoint

    检查点存放路径。

    dataDirs

    ${BIGDATA_DATA_HOME}/flume/data

    数据缓存路径,设置多个路径可提升性能,中间用逗号分开。

    maxFileSize

    2146435071

    单个缓存文件的最大值,单位:字节。

    minimumRequiredSpace

    524288000

    缓冲区空闲空间最小值,单位:字节。

    capacity

    1000000

    缓存在Channel中的最大Event数。

    transactionCapacity

    10000

    每次存取的最大Event数。

    channelfullcount

    10

    Channel full次数,达到该次数后发送告警。

  • Kafka Channel
    Kafka Channel使用kafka集群缓存数据,Kafka提供高可用、多副本,以防Flume或Kafka Broker崩溃,Channel中的数据会立即被Sink消费。常用配置如表 10 Kafka Channel 常用配置所示:
    表8 Kafka Channel常用配置

    参数

    默认值

    描述

    type

    -

    类型,需配置为 “org.apache.flume.channel.kafka.KafkaChannel”.。

    仅可在“properties.properties”文件中配置。

    kafka.bootstrap.servers

    -

    kafka broker列表。

    kafka.topic

    flume-channel

    Channel用来缓存数据的topic。

    kafka.consumer.group.id

    flume

    Kafka消费者组ID。

    parseAsFlumeEvent

    true

    是否解析为Flume event。

    migrateZookeeperOffsets

    true

    当Kafka没有存储offset时,是否从ZooKeeper中查找,并提交到Kafka。

    kafka.consumer.auto.offset.reset

    latest

    当没有offset记录时,从指定的位置消费数据。

    kafka.producer.security.protocol

    SASL_PLAINTEXT

    Kafka生产者安全协议。

    kafka.consumer.security.protocol

    SASL_PLAINTEXT

    Kafka消费者安全协议。

常用Sink配置

  • HDFS Sink

    HDFS Sink将数据写入HDFS。常用配置如表9所示:

    表9 HDFS Sink常用配置

    参数

    默认值

    描述

    channel

    -

    与之相连的Channel。仅可在“properties.properties”文件中配置。

    type

    hdfs

    类型,需配置为“hdfs”。仅可在“properties.properties”文件中配置。

    monTime

    0(不开启)

    线程监控阈值,更新时间大于阈值时重新启动该Sink,单位:秒。

    hdfs.path

    -

    HDFS路径。

    hdfs.inUseSuffix

    .tmp

    正在写入的HDFS文件后缀。

    hdfs.rollInterval

    30

    按时间滚动文件,单位:秒,同时需将“hdfs.fileCloseByEndEvent”设置为“false”。

    hdfs.rollSize

    1024

    按大小滚动文件,单位:字节,同时需将“hdfs.fileCloseByEndEvent”设置为“false”。

    hdfs.rollCount

    10

    按Event个数滚动文件,同时需将“hdfs.fileCloseByEndEvent”设置为“false”。

    hdfs.idleTimeout

    0

    自动关闭空闲文件超时时间,单位:秒。

    hdfs.batchSize

    1000

    每次写入HDFS的Event个数。

    hdfs.kerberosPrincipal

    -

    认证HDFS的Kerberos用户名,未启用Kerberos认证集群不配置。

    hdfs.kerberosKeytab

    -

    认证HDFS的Kerberos keytab路径,未启用Kerberos认证集群不配置

    hdfs.fileCloseByEndEvent

    true

    收到最后一个Event时是否关闭文件。

    hdfs.batchCallTimeout

    -

    每次写入HDFS超时控制时间,单位:毫秒。

    当不配置此参数时,对每个Event写入HDFS进行超时控制。当“hdfs.batchSize”大于0时,配置此参数可以提升写入HDFS性能。

    说明:

    “hdfs.batchCallTimeout”设置多长时间需要考虑“hdfs.batchSize”的大小,“hdfs.batchSize”越大,“hdfs.batchCallTimeout”也要调整更长时间,设置过短时间容易导致数据写入HDFS失败。

    serializer.appendNewline

    true

    将一个Event写入HDFS后是否追加换行符('\n'),如果追加该换行符,该换行符所占用的数据量指标不会被HDFS Sink统计。

  • Avro Sink

    Avro Sink把events转化为Avro events并发送到配置的主机的监测端口。常用配置如表10所示:

    表10 Avro Sink常用配置

    参数

    默认值

    描述

    channel

    -

    与之相连的Channel。仅可在“properties.properties”文件中配置。

    type

    -

    类型,需配置为“avro”。仅可在“properties.properties”文件中配置。

    hostname

    -

    绑定关联的主机名或IP地址。

    port

    -

    监测端口。

    batch-size

    1000

    批次发送的Event个数。

    ssl

    false

    是否使用SSL加密。

    truststore-type

    JKS

    Java信任库类型。

    truststore

    -

    Java信任库文件。

    truststore-password

    -

    Java信任库密码。

    keystore-type

    JKS

    密钥存储类型。

    keystore

    -

    密钥存储文件。

    keystore-password

    -

    密钥存储密码

  • HBase Sink

    HBase Sink将数据写入到HBase中。常用配置如表11所示:

    表11 HBase Sink常用配置

    参数

    默认值

    描述

    channel

    -

    与之相连的Channel。仅可在“properties.properties”文件中配置。

    type

    -

    类型,需配置为“hbase”。仅可在“properties.properties”文件中配置。

    table

    -

    HBase表名称。

    monTime

    0(不开启)

    线程监控阈值,更新时间大于阈值时重新启动该Sink,单位:秒。

    columnFamily

    -

    HBase列族名称。

    batchSize

    1000

    每次写入HBase的Event个数。

    kerberosPrincipal

    -

    认证HBase的Kerberos用户名,未启用Kerberos认证集群不配置。

    kerberosKeytab

    -

    认证HBase的Kerberos keytab路径,未启用Kerberos认证集群不配置。

  • Kafka Sink

    Kafka Sink将数据写入到Kafka中。常用配置如表12所示:

    表12 Kafka Sink常用配置

    参数

    默认值

    描述

    channel

    -

    与之相连的Channel。仅可在“properties.properties”文件中配置。

    type

    -

    类型,需配置为“org.apache.flume.sink.kafka.KafkaSink”

    仅可在“properties.properties”文件中配置。

    kafka.bootstrap.servers

    -

    Kafkabrokers列表,多个用英文逗号分隔。

    monTime

    0(不开启)

    线程监控阈值,更新时间大于阈值时重新启动该Sink,单位:秒。

    kafka.topic

    default-flume-topic

    数据写入的topic。

    allowTopicOverride

    false

    是否将Event Header中保存的topic替换kafka.topic中配置的topic。

    flumeBatchSize

    1000

    每次写入Kafka的Event个数。

    kafka.security.protocol

    SASL_PLAINTEXT

    Kafka安全协议,未启用Kerberos认证集群下须配置为“PLAINTEXT”

    kafka.kerberos.domain.name

    -

    Kafka Domain名称。安全集群必填。仅可在“properties.properties”文件中配置。

    Other Kafka Producer Properties

    -

    其他Kafka配置,可以接受任意Kafka支持的生产参数配置,配置需要加前缀“.kafka”

    仅可在“properties.properties”文件中配置。