Updated on 2024-10-08 GMT+08:00

Flume Service Configuration Guide

This section applies to MRS 3.x or later.

This configuration guide describes how to configure common Flume services. For non-common Source, Channel, and Sink configuration, see the user manual provided by the Flume community. You can obtain the user manual at http://flume.apache.org/releases/1.9.0.html.

  • Parameters in bold in the following tables are mandatory.
  • The value of BatchSize of the Sink must be less than that of transactionCapacity of the Channel.
  • Only some parameters of Source, Channel, and Sink are displayed on the Flume configuration tool page. For details, see the following configurations.
  • The Customer Source, Customer Channel, and Customer Sink displayed on the Flume configuration tool page need to be configured based on self-developed code. The following common configurations are not displayed.

Common Source Configurations

  • Avro Source

    Avro Source monitors the Avro port, receives data from the external Avro client, and puts the data into the configured channel. Common configurations are as follows:

    Table 1 Common configurations of an Avro source

    Parameter

    Default Value

    Description

    channels

    -

    Specifies the channel connected to the source. Multiple channels can be configured.

    type

    avro

    Specifies the type of the avro source, which must be avro.

    bind

    -

    Specifies the monitoring host name and IP address.

    port

    -

    Specifies the bound monitoring port. Ensure that this port is not occupied.

    threads

    -

    Specifies the maximum number of source threads.

    compression-type

    none

    Specifies the message compression format, which can be set to none or deflate. none indicates that data is not compressed, while deflate indicates that data is compressed.

    compression-level

    6

    Specifies the data compression level, which ranges from 1 to 9. The larger the value is, the higher the compression rate is.

    ssl

    false

    Specifies whether to use SSL encryption. If this parameter is set to true, keystore and keystore-password must be specified.

    truststore-type

    JKS

    Specifies the Java trust store type, which can be set to JKS or PKCS12.

    NOTE:

    Different passwords are used to protect the key store and private key of JKS, while the same password is used to protect the key store and private key of PKCS12.

    truststore

    -

    Specifies the Java trust store file.

    truststore-password

    -

    Specifies the Java trust store password.

    keystore-type

    JKS

    Specifies the keystore type set after SSL is enabled, which can be set to JKS or PKCS12.

    NOTE:

    Different passwords are used to protect the key store and private key of JKS, while the same password is used to protect the key store and private key of PKCS12.

    keystore

    -

    Specifies the keystore file path set after SSL is enabled. This parameter is mandatory if SSL is enabled.

    keystore-password

    -

    Specifies the keystore password set after SSL is enabled. This parameter is mandatory if SSL is enabled.

    trust-all-certs

    false

    Specifies whether to disable the check for the SSL server certificate. If this parameter is set to true, the SSL server certificate of the remote source is not checked. You are not advised to perform this operation during the production.

    exclude-protocols

    SSLv3

    Specifies the excluded protocols. The entered protocols must be separated by spaces. The default value is SSLv3.

    ipFilter

    false

    Specifies whether to enable the IP address filtering.

    ipFilter.rules

    -

    Specifies the rules of N network ipFilters. Host names or IP addresses must be separated by commas (,). If this parameter is set to true, there are two configuration rules: allow and forbidden. The configuration format is as follows:

    ipFilterRules=allow:ip:127.*, allow:name:localhost, deny:ip:*

  • SpoolDir Source

    SpoolDir Source monitors and transmits new files that have been added to directories in real-time mode. Common configurations are as follows:

    Table 2 Common configurations of a Spooling Directory source

    Parameter

    Default Value

    Description

    channels

    -

    Specifies the channel connected to the source. Multiple channels can be configured.

    type

    spooldir

    Specifies the type of the spooling source, which must be set to spooldir.

    spoolDir

    -

    Specifies the monitoring directory of the Spooldir source. A Flume running user must have the read, write, and execution permissions on the directory.

    monTime

    0 (Disabled)

    Specifies the thread monitoring threshold. When the update time exceeds the threshold, the source is restarted. Unit: second

    fileSuffix

    .COMPLETED

    Specifies the suffix added after file transmission is complete.

    deletePolicy

    never

    Specifies the source file deletion policy after file transmission is complete. The value can be either never or immediate. never indicates that the source file is not deleted after file transmission is complete, while immediate indicates that the source file is immediately deleted after file transmission is complete.

    ignorePattern

    ^$

    Specifies the regular expression of a file to be ignored. The default value is ^$, indicating that spaces are ignored.

    includePattern

    ^.*$

    Specifies the regular expression that contains a file. This parameter can be used together with ignorePattern. If a file meets both ignorePattern and includePattern, the file is ignored. In addition, when a file starts with a period (.), the file will not be filtered.

    trackerDir

    .flumespool

    Specifies the metadata storage path during data transmission.

    batchSize

    1000

    Specifies the number of events written to the channel in batches.

    decodeErrorPolicy

    FAIL

    Specifies the code error policy.

    NOTE:

    If a code error occurs in the file, set decodeErrorPolicy to REPLACE or IGNORE. Flume will skip the code error and continue to collect subsequent logs.

    deserializer

    LINE

    Specifies the file parser. The value can be either LINE or BufferedLine.

    • When the value is set to LINE, characters read from the file are transcoded one by one.
    • When the value is set to BufferedLine, one line or multiple lines of characters read from the file are transcoded in batches, which delivers better performance.

    deserializer.maxLineLength

    2048

    Specifies the maximum length for resolution by line.

    deserializer.maxBatchLine

    1

    Specifies the maximum number of lines for resolution by line. If multiple lines are set, maxLineLength must be set to a corresponding multiplier.

    NOTE:

    When configuring the Interceptor, take the multi-line combination into consideration to avoid data loss. If the Interceptor cannot process combined lines, set this parameter to 1.

    selector.type

    replicating

    Specifies the selector type. The value can be either replicating or multiplexing. replicating indicates that data is replicated and then transferred to each channel so that each channel receives the same data, while multiplexing indicates that a channel is selected based on the value of the header in the event and each channel has different data.

    interceptors

    -

    Specifies the interceptor. Multiple interceptors are separated by spaces.

    inputCharset

    UTF-8

    Specifies the encoding format of a read file. The encoding format must be the same as that of the data source file that has been read. Otherwise, an error may occur during character parsing.

    fileHeader

    false

    Specifies whether to add the file name (including the file path) to the event header.

    fileHeaderKey

    -

    Specifies that the data storage structure in header is set in the <key,value> mode. Parameters fileHeaderKey and fileHeader must be used together. Following is an example if fileHeader is set to true:

    Define fileHeaderKey as file. When the /root/a.txt file is read, fileHeaderKey exists in the header in the file=/root/a.txt format.

    basenameHeader

    false

    Specifies whether to add the file name (excluding the file path) to the event header.

    basenameHeaderKey

    -

    Specifies that the data storage structure in header is set in the <key,value> mode. Parameters basenameHeaderKey and basenameHeader must be used together. Following is an example if basenameHeader is set to true:

    Define basenameHeaderKey as file. When the a.txt file is read, fileHeaderKey exists in the header in the file=a.txt format.

    pollDelay

    500

    Specifies the delay for polling new files in the monitoring directory. Unit: milliseconds

    recursiveDirectorySearch

    false

    Specifies whether to monitor new files in the subdirectory of the configured directory.

    consumeOrder

    oldest

    Specifies the consumption order of files in a directory. If this parameter is set to oldest or youngest, the sequence of files to be read is determined by the last modification time of files in the monitored directory. If there are a large number of files in the directory, it takes a long time to search for oldest or youngest files. If this parameter is set to random, an earlier created file may not be read for a long time. If this parameter is set to oldest or youngest, it takes a long time to find the latest and the earliest file. The options are as follows: random, youngest, and oldest.

    maxBackoff

    4000

    Specifies the maximum time to wait between consecutive attempts to write to a channel if the channel is full. If the time exceeds the threshold, an exception is thrown. The corresponding source starts to write at a smaller time value. Each time the source attempts, the digital exponent increases until the current specified value is reached. If data cannot be written, the data write fails. Unit: second

    emptyFileEvent

    true

    Specifies whether to collect empty file information and send it to the sink end. The default value is true, indicating that empty file information is sent to the sink end. This parameter is valid only for HDFS Sink. Taking HDFS Sink as an example, if this parameter is set to true and an empty file exists in the spoolDir directory, an empty file with the same name will be created in the hdfs.path directory of HDFS.

    SpoolDir Source ignores the last line feed character of each event when data is reading by row. Therefore, Flume does not calculate the data volume counters used by the last line feed character.

  • Kafka Source

    A Kafka source consumes data from Kafka topics. Multiple sources can consume data of the same topic, and the sources consume different partitions of the topic. Common configurations are as follows:

    Table 3 Common configurations of a Kafka source

    Parameter

    Default Value

    Description

    channels

    -

    Specifies the channel connected to the source. Multiple channels can be configured.

    type

    org.apache.flume.source.kafka.KafkaSource

    Specifies the type of the Kafka source, which must be set to org.apache.flume.source.kafka.KafkaSource.

    kafka.bootstrap.servers

    -

    Specifies the bootstrap address port list of Kafka. If Kafka has been installed in the cluster and the configuration has been synchronized to the server, you do not need to set this parameter on the server. The default value is the list of all brokers in the Kafka cluster. This parameter must be configured on the client. Use commas (,) to separate multiple values of IP address:Port number. The rules for matching ports and security protocols must be as follows: port 21007 matches the security mode (SASL_PLAINTEXT), and port 9092 matches the common mode (PLAINTEXT).

    kafka.topics

    -

    Specifies the list of subscribed Kafka topics, which are separated by commas (,).

    kafka.topics.regex

    -

    Specifies the subscribed topics that comply with regular expressions. kafka.topics.regex has a higher priority than kafka.topics and will overwrite kafka.topics.

    monTime

    0 (Disabled)

    Specifies the thread monitoring threshold. When the update time exceeds the threshold, the source is restarted. Unit: second

    nodatatime

    0 (Disabled)

    Specifies the alarm threshold. An alarm is triggered when the duration that Kafka does not release data to subscribers exceeds the threshold. Unit: second This parameter can be configured in the properties.properties file.

    batchSize

    1000

    Specifies the number of events written to the channel in batches.

    batchDurationMillis

    1000

    Specifies the maximum duration of topic data consumption at a time, expressed in milliseconds.

    keepTopicInHeader

    false

    Specifies whether to save topics in the event header. If the parameter value is true, topics configured in Kafka Sink become invalid.

    setTopicHeader

    true

    If this parameter is set to true, the topic name defined in topicHeader is stored in the header.

    topicHeader

    topic

    When setTopicHeader is set to true, this parameter specifies the name of the topic received by the storage device. If the property is used with that of Kafka Sink topicHeader, be careful not to send messages to the same topic cyclically.

    useFlumeEventFormat

    false

    By default, an event is transferred from a Kafka topic to the body of the event in the form of bytes. If this parameter is set to true, the Avro binary format of Flume is used to read events. When used together with the parseAsFlumeEvent parameter with the same name in KafkaSink or KakfaChannel, any set header generated from the data source is retained.

    keepPartitionInHeader

    false

    Specifies whether to save partition IDs in the event header. If the parameter value is true, Kafka Sink writes data to the corresponding partition.

    kafka.consumer.group.id

    flume

    Specifies the Kafka consumer group ID. Sources or proxies having the same ID are in the same consumer group.

    kafka.security.protocol

    SASL_PLAINTEXT

    Specifies the Kafka security protocol. The parameter value must be set to PLAINTEXT in a common cluster. The rules for matching ports and security protocols must be as follows: port 21007 matches the security mode (SASL_PLAINTEXT), and port 9092 matches the common mode (PLAINTEXT).

    Other Kafka Consumer Properties

    -

    Specifies other Kafka configurations. This parameter can be set to any consumption configuration supported by Kafka, and the .kafka prefix must be added to the configuration.

  • Taildir Source

    A Taildir source monitors file changes in a directory and automatically reads the file content. In addition, it can transmit data in real time. Common configurations are as follows:

    Table 4 Common configurations of a Taildir source

    Parameter

    Default Value

    Description

    channels

    -

    Specifies the channel connected to the source. Multiple channels can be configured.

    type

    TAILDIR

    Specifies the type of the taildir source, which must be set to TAILDIR.

    filegroups

    -

    Specifies the group name of a collection file directory. Group names are separated by spaces.

    filegroups.<filegroupName>.parentDir

    -

    Specifies the parent directory. The value must be an absolute path.

    filegroups.<filegroupName>.filePattern

    -

    Specifies the relative file path of the file group's parent directory. Directories can be included and regular expressions are supported. It must be used together with parentDir.

    positionFile

    -

    Specifies the metadata storage path during data transmission.

    headers.<filegroupName>.<headerKey>

    -

    Specifies the key-value of an event when data of a group is being collected.

    byteOffsetHeader

    false

    Specifies whether each event header contains the event location information in the source file. If the parameter value is true, the location information is saved in the byteoffset variable.

    maxBatchCount

    Long.MAX_VALUE

    Specifies the maximum number of batches that can be consecutively read from a file. If the monitored directory reads multiple files consecutively and one of the files is written at a rapid rate, other files may fail to be processed. This is because the file that is written at a high speed will be in an infinite read loop. In this case, set this parameter to a smaller value.

    skipToEnd

    false

    Specifies whether Flume can locate the latest location of a file and read the latest data after restart. If the parameter value is true, Flume locates and reads the latest file data after restart.

    idleTimeout

    120000

    Specifies the idle duration during file reading, expressed in milliseconds. If file content is not changed in the preset time duration, close the file. If data is written to this file after the file is closed, open the file and read data.

    writePosInterval

    3000

    Specifies the interval for writing metadata to a file, expressed in milliseconds.

    batchSize

    1000

    Specifies the number of events written to the channel in batches.

    monTime

    0 (Disabled)

    Specifies the thread monitoring threshold. When the update time exceeds the threshold, the source is restarted. Unit: second

    fileHeader

    false

    Specifies whether to add the file name (including the file path) to the event header.

    fileHeaderKey

    file

    Specifies that the data storage structure in header is set in the <key,value> mode. Parameters fileHeaderKey and fileHeader must be used together. Following is an example if fileHeader is set to true:

    Define fileHeaderKey as file. When the /root/a.txt file is read, fileHeaderKey exists in the header in the file=/root/a.txt format.

  • Http Source

    An HTTP source receives data from an external HTTP client and sends the data to the configured channels. Common configurations are as follows:

    Table 5 Common configurations of an HTTP source

    Parameter

    Default Value

    Description

    channels

    -

    Specifies the channel connected to the source. Multiple channels can be configured.

    type

    http

    Specifies the type of the http source, which must be set to http.

    bind

    -

    Specifies the monitoring host name and IP address.

    port

    -

    Specifies the bound monitoring port. Ensure that this port is not occupied.

    handler

    org.apache.flume.source.http.JSONHandler

    Specifies the message parsing method of an HTTP request. Two formats are supported: JSON (org.apache.flume.source.http.JSONHandler) and BLOB (org.apache.flume.sink.solr.morphline.BlobHandler).

    handler.*

    -

    Specifies handler parameters.

    exclude-protocols

    SSLv3

    Specifies the excluded protocols. The entered protocols must be separated by spaces. The default value is SSLv3.

    include-cipher-suites

    -

    Specifies the included protocols. The entered protocols must be separated by spaces. If this parameter is left empty, all protocols are supported by default.

    enableSSL

    false

    Specifies whether SSL is enabled in HTTP. If this parameter is set to true, keystore and keystore-password must be specified.

    keystore-type

    JKS

    Specifies the keystore type, which can be JKS or PKCS12.

    keystore

    -

    Specifies the keystore path set after SSL is enabled in HTTP.

    keystorePassword

    -

    Specifies the keystore password set after SSL is enabled in HTTP.

  • Thrift Source

    Thrift Source monitors the thrift port, receives data from the external Thrift clients, and puts the data into the configured channel. Common configurations are as follows:

    Parameter

    Default Value

    Description

    channels

    -

    Specifies the channel connected to the source. Multiple channels can be configured.

    type

    thrift

    Specifies the type of the thrift source, which must be set to thrift.

    bind

    -

    Specifies the monitoring host name and IP address.

    port

    -

    Specifies the bound monitoring port. Ensure that this port is not occupied.

    threads

    -

    Specifies the maximum number of worker threads that can be run.

    kerberos

    false

    Specifies whether Kerberos authentication is enabled.

    agent-keytab

    -

    Specifies the address of the keytab file used by the server. The machine-machine account must be used. You are advised to use flume/conf/flume_server.keytab in the Flume service installation directory.

    agent-principal

    -

    Specifies the principal of the security user used by the server. The machine-machine account must be used. You are advised to use the default user of Flume: flume_server/hadoop.<system domain name>@<system domain name>

    NOTE:

    flume_server/hadoop.<system domain name> is the username. All letters in the system domain name contained in the username are lowercase letters. For example, Local Domain is set to 9427068F-6EFA-4833-B43E-60CB641E5B6C.COM, and the username is flume_server/hadoop.9427068f-6efa-4833-b43e-60cb641e5b6c.com.

    compression-type

    none

    Specifies the message compression format, which can be set to none or deflate. none indicates that data is not compressed, while deflate indicates that data is compressed.

    ssl

    false

    Specifies whether to use SSL encryption. If this parameter is set to true, keystore and keystore-password must be specified.

    keystore-type

    JKS

    Specifies the keystore type set after SSL is enabled.

    keystore

    -

    Specifies the keystore file path set after SSL is enabled. This parameter is mandatory if SSL is enabled.

    keystore-password

    -

    Specifies the keystore password set after SSL is enabled. This parameter is mandatory if SSL is enabled.

Common Channel Configurations

  • Memory Channel

    A memory channel uses memory as the cache. Events are stored in memory queues. Common configurations are as follows:

    Table 6 Common configurations of a memory channel

    Parameter

    Default Value

    Description

    type

    -

    Specifies the type of the memory channel, which must be set to memory.

    capacity

    10000

    Specifies the maximum number of events cached in a channel.

    transactionCapacity

    1000

    Specifies the maximum number of events accessed each time.

    NOTE:
    • The parameter value must be greater than the batchSize of the source and sink.
    • The value of transactionCapacity must be less than or equal to that of capacity.

    channelfullcount

    10

    Specifies the channel full count. When the count reaches the threshold, an alarm is reported.

    keep-alive

    3

    Specifies the waiting time of the Put and Take threads when the transaction or channel cache is full. Unit: second

    byteCapacity

    80% of the maximum JVM memory

    Specifies the total bytes of all event bodies in a channel. The default value is the 80% of the maximum JVM memory (indicated by -Xmx). Unit: bytes

    byteCapacityBufferPercentage

    20

    Specifies the percentage of bytes in a channel (%).

  • File Channel

    A file channel uses local disks as the cache. Events are stored in the folder specified by dataDirs. Common configurations are as follows:

    Table 7 Common configurations of a file channel

    Parameter

    Default Value

    Description

    type

    -

    Specifies the type of the file channel, which must be set to file.

    checkpointDir

    ${BIGDATA_DATA_HOME}/hadoop/data1~N/flume/checkpoint

    NOTE:

    This path is changed with the custom data path.

    Specifies the checkpoint storage directory.

    dataDirs

    ${BIGDATA_DATA_HOME}/hadoop/data1~N/flume/data

    NOTE:

    This path is changed with the custom data path.

    Specifies the data cache directory. Multiple directories can be configured to improve performance. The directories are separated by commas (,).

    maxFileSize

    2146435071

    Specifies the maximum size of a single cache file, expressed in bytes.

    minimumRequiredSpace

    524288000

    Specifies the minimum idle space in the cache, expressed in bytes.

    capacity

    1000000

    Specifies the maximum number of events cached in a channel.

    transactionCapacity

    10000

    Specifies the maximum number of events accessed each time.

    NOTE:
    • The parameter value must be greater than the batchSize of the source and sink.
    • The value of transactionCapacity must be less than or equal to that of capacity.

    channelfullcount

    10

    Specifies the channel full count. When the count reaches the threshold, an alarm is reported.

    useDualCheckpoints

    false

    Specifies the backup checkpoint. If this parameter is set to true, the backupCheckpointDir parameter value must be set.

    backupCheckpointDir

    -

    Specifies the path of the backup checkpoint.

    checkpointInterval

    30000

    Specifies the check interval, expressed in seconds.

    keep-alive

    3

    Specifies the waiting time of the Put and Take threads when the transaction or channel cache is full. Unit: second

    use-log-replay-v1

    false

    Specifies whether to enable the old reply logic.

    use-fast-replay

    false

    Specifies whether to enable the queue reply.

    checkpointOnClose

    true

    Specifies that whether a checkpoint is created when a channel is disabled.

  • Memory File Channel

    A memory file channel uses both memory and local disks as its cache and supports message persistence. It provides similar performance as a memory channel and better performance than a file channel. This channel is currently experimental and not recommended for use in production. The following table describes common configuration items: Common configurations are as follows:

    Table 8 Common configurations of a memory file channel

    Parameter

    Default Value

    Description

    type

    org.apache.flume.channel.MemoryFileChannel

    Specifies the type of the memory file channel, which must be set to org.apache.flume.channel.MemoryFileChannel.

    capacity

    50000

    Specifies the maximum number of events cached in a channel.

    transactionCapacity

    5000

    Specifies the maximum number of events processed by a transaction.

    NOTE:
    • The parameter value must be greater than the batchSize of the source and sink.
    • The value of transactionCapacity must be less than or equal to that of capacity.

    subqueueByteCapacity

    20971520

    Specifies the maximum size of events that can be stored in a subqueue, expressed in bytes.

    A memory file channel uses both queues and subqueues to cache data. Events are stored in a subqueue, and subqueues are stored in a queue.

    subqueueCapacity and subqueueInterval determine the size of events that can be stored in a subqueue. subqueueCapacity specifies the capacity of a subqueue, and subqueueInterval specifies the duration that a subqueue can store events. Events in a subqueue are sent to the destination only after the subqueue reaches the upper limit of subqueueCapacity or subqueueInterval.

    NOTE:

    The value of subqueueByteCapacity must be greater than the number of events specified by batchSize.

    subqueueInterval

    2000

    Specifies the maximum duration that a subqueue can store events, expressed in milliseconds.

    keep-alive

    3

    Specifies the waiting time of the Put and Take threads when the transaction or channel cache is full.

    Unit: second

    dataDir

    -

    Specifies the cache directory for local files.

    byteCapacity

    80% of the maximum JVM memory

    Specifies the channel cache capacity.

    Unit: bytes

    compression-type

    None

    Specifies the message compression format, which can be set to none or deflate. none indicates that data is not compressed, while deflate indicates that data is compressed.

    channelfullcount

    10

    Specifies the channel full count. When the count reaches the threshold, an alarm is reported.

    The following is a configuration example of a memory file channel:

    server.channels.c1.type = org.apache.flume.channel.MemoryFileChannel
    server.channels.c1.dataDir = /opt/flume/mfdata
    server.channels.c1.subqueueByteCapacity = 20971520
    server.channels.c1.subqueueInterval=2000
    server.channels.c1.capacity = 500000
    server.channels.c1.transactionCapacity = 40000
  • Kafka Channel
    A Kafka channel uses a Kafka cluster as the cache. Kafka provides high availability and multiple copies to prevent data from being immediately consumed by sinks when Flume or Kafka Broker crashes.
    Table 9 Common configurations of a Kafka channel

    Parameter

    Default Value

    Description

    type

    -

    Specifies the type of the Kafka channel, which must be set to org.apache.flume.channel.kafka.KafkaChannel.

    kafka.bootstrap.servers

    -

    Specifies the bootstrap address port list of Kafka.

    If Kafka has been installed in the cluster and the configuration has been synchronized to the server, you do not need to set this parameter on the server. The default value is the list of all brokers in the Kafka cluster. This parameter must be configured on the client. Use commas (,) to separate multiple values of IP address:Port number. The rules for matching ports and security protocols must be as follows: port 21007 matches the security mode (SASL_PLAINTEXT), and port 9092 matches the common mode (PLAINTEXT).

    kafka.topic

    flume-channel

    Specifies the Kafka topic used by the channel to cache data.

    kafka.consumer.group.id

    flume

    Specifies the data group ID obtained from Kafka. This parameter cannot be left blank.

    parseAsFlumeEvent

    true

    Specifies whether data is parsed into Flume events.

    migrateZookeeperOffsets

    true

    Specifies whether to search for offsets in ZooKeeper and submit them to Kafka when there is no offset in Kafka.

    kafka.consumer.auto.offset.reset

    latest

    Specifies where to consume if there is no offset record, which can be set to earliest, latest, or none. earliest indicates that the offset is reset to the initial point, latest indicates that the offset is set to the latest position, and none indicates that an exception is thrown if there is no offset.

    kafka.producer.security.protocol

    SASL_PLAINTEXT

    Specifies the Kafka producer security protocol. The rules for matching ports and security protocols must be as follows: port 21007 matches the security mode (SASL_PLAINTEXT), and port 9092 matches the common mode (PLAINTEXT).

    NOTE:

    If the parameter is not displayed, click + in the lower left corner of the dialog box to display all parameters.

    kafka.consumer.security.protocol

    SASL_PLAINTEXT

    Specifies the Kafka consumer security protocol. The rules for matching ports and security protocols must be as follows: port 21007 matches the security mode (SASL_PLAINTEXT), and port 9092 matches the common mode (PLAINTEXT).

    pollTimeout

    500

    Specifies the maximum timeout interval for the consumer to invoke the poll function. Unit: milliseconds

    ignoreLongMessage

    false

    Specifies whether to discard oversized messages.

    messageMaxLength

    1000012

    Specifies the maximum length of a message written by Flume to Kafka.

Common Sink Configurations

  • HDFS Sink

    An HDFS sink writes data into HDFS. Common configurations are as follows:

    Table 10 Common configurations of an HDFS sink

    Parameter

    Default Value

    Description

    channel

    -

    Specifies the channel connected to the sink.

    type

    hdfs

    Specifies the type of the hdfs sink, which must be set to hdfs.

    hdfs.path

    -

    Specifies the data storage path in HDFS. The value must start with hdfs://hacluster/.

    monTime

    0 (Disabled)

    Specifies the thread monitoring threshold. When the update time exceeds the threshold, the sink is restarted. Unit: second

    hdfs.inUseSuffix

    .tmp

    Specifies the suffix of the HDFS file to which data is being written.

    hdfs.rollInterval

    30

    Specifies file rolling by time, in seconds. Set hdfs.fileCloseByEndEvent to false if you set this parameter.

    hdfs.rollSize

    1024

    Specifies file rolling by size, in bytes. Set hdfs.fileCloseByEndEvent to false if you set this parameter.

    hdfs.rollCount

    10

    Specifies file rolling by the number of events, set hdfs.fileCloseByEndEvent to false if you set this parameter.

    NOTE:

    Parameters rollInterval, rollSize, and rollCount can be configured at the same time. The parameter meeting the requirements takes precedence for compression.

    hdfs.idleTimeout

    0

    Specifies the timeout interval for closing idle files automatically, expressed in seconds.

    hdfs.batchSize

    1000

    Specifies the number of events written into HDFS in batches.

    hdfs.kerberosPrincipal

    -

    Specifies the Kerberos principal of HDFS authentication. This parameter is mandatory in a secure mode, but not required in a common mode.

    hdfs.kerberosKeytab

    -

    Specifies the Kerberos keytab of HDFS authentication. This parameter is not required in a common mode, but in a secure mode, the Flume running user must have the permission to access keyTab path in the jaas.cof file.

    hdfs.fileCloseByEndEvent

    true

    Specifies whether to close the HDFS file when the last event of the source file is received.

    hdfs.batchCallTimeout

    -

    Specifies the timeout control duration when events are written into HDFS in batches. Unit: milliseconds

    If this parameter is not specified, the timeout duration is controlled when each event is written into HDFS. When the value of hdfs.batchSize is greater than 0, configure this parameter to improve the performance of writing data into HDFS.

    NOTE:

    The value of hdfs.batchCallTimeout depends on hdfs.batchSize. A greater hdfs.batchSize requires a larger hdfs.batchCallTimeout. If the value of hdfs.batchCallTimeout is too small, writing events to HDFS may fail.

    serializer.appendNewline

    true

    Specifies whether to add a line feed character (\n) after an event is written to HDFS. If a line feed character is added, the data volume counters used by the line feed character will not be calculated by HDFS sinks.

    hdfs.filePrefix

    over_%{basename}

    Specifies the file name prefix after data is written to HDFS.

    hdfs.fileSuffix

    -

    Specifies the file name suffix after data is written to HDFS.

    hdfs.inUsePrefix

    -

    Specifies the prefix of the HDFS file to which data is being written.

    hdfs.fileType

    DataStream

    Specifies the HDFS file format, which can be set to SequenceFile, DataStream, or CompressedStream.

    NOTE:

    If the parameter is set to SequenceFile or DataStream, output files are not compressed, and the codeC parameter cannot be configured. However, if the parameter is set to CompressedStream, the output files are compressed, and the codeC parameter must be configured together.

    hdfs.codeC

    -

    Specifies the file compression format, which can be set to gzip, bzip2, lzo, lzop, or snappy.

    hdfs.maxOpenFiles

    5000

    Specifies the maximum number of HDFS files that can be opened. If the number of opened files reaches this value, the earliest opened files are closed.

    hdfs.writeFormat

    Writable

    Specifies the file write format, which can be set to Writable or Text.

    hdfs.callTimeout

    10000

    Specifies the timeout control duration each time events are written into HDFS, expressed in milliseconds.

    hdfs.threadsPoolSize

    -

    Specifies the number of threads used by each HDFS sink for HDFS I/O operations.

    hdfs.rollTimerPoolSize

    -

    Specifies the number of threads used by each HDFS sink to schedule the scheduled file rolling.

    hdfs.round

    false

    Specifies whether to round off the timestamp value. If this parameter is set to true, all time-based escape sequences (except %t) are affected.

    hdfs.roundUnit

    second

    Specifies the unit of the timestamp value that has been rounded off, which can be set to second, minute, or hour.

    hdfs.useLocalTimeStamp

    true

    Specifies whether to enable the local timestamp. The recommended parameter value is true.

    hdfs.closeTries

    0

    Specifies the maximum attempts for the hdfs sink to stop renaming a file. If the parameter is set to the default value 0, the sink does not stop renaming the file until the file is successfully renamed.

    hdfs.retryInterval

    180

    Specifies the interval of request for closing the HDFS file, expressed in seconds.

    NOTE:

    For each closing request, there are multiple RPCs working on the NameNode back and forth, which may make the NameNode overloaded if the parameter value is too small. Also, when the parameter is set to 0, the Sink will not attempt to close the file, but opens the file or uses .tmp as the file name extension, if the first closing attempt fails.

    hdfs.failcount

    10

    Specifies the number of times that data fails to be written to HDFS. If the number of times that the sink fails to write data to HDFS exceeds the parameter value, an alarm indicating abnormal data transmission is reported.

  • Avro Sink

    Avro Sink converts events into Avro events and sends them to the monitoring ports of the hosts. Common configurations are as follows:

    Table 11 Common configurations of an Avro sink

    Parameter

    Default Value

    Description

    channel

    -

    Specifies the channel connected to the sink.

    type

    -

    Specifies the type of the avro sink, which must be set to avro.

    hostname

    -

    Specifies the bound host name or IP address.

    port

    -

    Specifies the bound monitoring port. Ensure that this port is not occupied.

    batch-size

    1000

    Specifies the number of events sent in a batch.

    client.type

    DEFAULT

    Specifies the client instance type. Set this parameter based on the communication protocol used by the configured model. The options are as follows:

    • DEFAULT: The client instance of the AvroRPC type is returned.
    • OTHER: NULL is returned.
    • THRIFT: The client instance of the Thrift RPC type is returned.
    • DEFAULT_LOADBALANCING: The client instance of the LoadBalancing RPC type is returned.
    • DEFAULT_FAILOVER: The client instance of the Failover RPC type is returned.

    ssl

    false

    Specifies whether to use SSL encryption. If this parameter is set to true, keystore and keystore-password must be specified.

    truststore-type

    JKS

    Specifies the Java trust store type, which can be set to JKS or PKCS12.

    NOTE:

    Different passwords are used to protect the key store and private key of JKS, while the same password is used to protect the key store and private key of PKCS12.

    truststore

    -

    Specifies the Java trust store file.

    truststore-password

    -

    Specifies the Java trust store password.

    keystore-type

    JKS

    Specifies the keystore type set after SSL is enabled.

    keystore

    -

    Specifies the keystore file path set after SSL is enabled. This parameter is mandatory if SSL is enabled.

    keystore-password

    -

    Specifies the keystore password after SSL is enabled. This parameter is mandatory if SSL is enabled.

    connect-timeout

    20000

    Specifies the timeout for the first connection, expressed in milliseconds.

    request-timeout

    20000

    Specifies the maximum timeout for a request after the first request, expressed in milliseconds.

    reset-connection-interval

    0

    Specifies the interval between a connection failure and a second connection, expressed in seconds. If the parameter is set to 0, the system continuously attempts to perform a connection.

    compression-type

    none

    Specifies the compression type of the batch data, which can be set to none or deflate. none indicates that data is not compressed, while deflate indicates that data is compressed. This parameter value must be the same as that of the AvroSource compression-type.

    compression-level

    6

    Specifies the compression level of batch data, which can be set to 1 to 9. A larger value indicates a higher compression rate.

    exclude-protocols

    SSLv3

    Specifies the excluded protocols. The entered protocols must be separated by spaces. The default value is SSLv3.

  • HBase Sink

    An HBase sink writes data into HBase. Common configurations are as follows:

    Table 12 Common configurations of an HBase sink

    Parameter

    Default Value

    Description

    channel

    -

    Specifies the channel connected to the sink.

    type

    -

    Specifies the type of the HBase sink, which must be set to hbase.

    table

    -

    Specifies the HBase table name.

    columnFamily

    -

    Specifies the HBase column family.

    monTime

    0 (Disabled)

    Specifies the thread monitoring threshold. When the update time exceeds the threshold, the sink is restarted. Unit: second

    batchSize

    1000

    Specifies the number of events written into HBase in batches.

    kerberosPrincipal

    -

    Specifies the Kerberos principal of HBase authentication. This parameter is mandatory in a secure mode, but not required in a common mode.

    kerberosKeytab

    -

    Specifies the Kerberos keytab of HBase authentication. This parameter is not required in a common mode, but in a secure mode, the Flume running user must have the permission to access keyTab path in the jaas.cof file.

    coalesceIncrements

    true

    Specifies whether to perform multiple operations on the same hbase cell in a same processing batch. Setting this parameter to true improves performance.

  • Kafka Sink

    A Kafka sink writes data into Kafka. Common configurations are as follows:

    Table 13 Common configurations of a Kafka sink

    Parameter

    Default Value

    Description

    channel

    -

    Specifies the channel connected to the sink.

    type

    -

    Specifies the type of the kafka sink, which must be set to org.apache.flume.sink.kafka.KafkaSink.

    kafka.bootstrap.servers

    -

    Specifies the bootstrap address port list of Kafka. If Kafka has been installed in the cluster and the configuration has been synchronized to the server, you do not need to set this parameter on the server. The default value is the list of all brokers in the Kafka cluster. The client must be configured with this parameter. If there are multiple values, use commas (,) to separate the values. The rules for matching ports and security protocols must be as follows: port 21007 matches the security mode (SASL_PLAINTEXT), and port 9092 matches the common mode (PLAINTEXT).

    monTime

    0 (Disabled)

    Specifies the thread monitoring threshold. When the update time exceeds the threshold, the sink is restarted. Unit: second

    kafka.producer.acks

    1

    Successful write is determined by the number of received acknowledgement messages about replicas. The value 0 indicates that no confirm message needs to be received, the value 1 indicates that the system is only waiting for only the acknowledgement information from a leader, and the value -1 indicates that the system is waiting for the acknowledgement messages of all replicas. If this parameter is set to -1, data loss can be avoided in some leader failure scenarios.

    kafka.topic

    -

    Specifies the topic to which data is written. This parameter is mandatory.

    flumeBatchSize

    1000

    Specifies the number of events written into Kafka in batches.

    kafka.security.protocol

    SASL_PLAINTEXT

    Specifies the Kafka security protocol. The parameter value must be set to PLAINTEXT in a common cluster. The rules for matching ports and security protocols must be as follows: port 21007 matches the security mode (SASL_PLAINTEXT), and port 9092 matches the common mode (PLAINTEXT).

    ignoreLongMessage

    false

    Specifies whether to discard oversized messages.

    messageMaxLength

    1000012

    Specifies the maximum length of a message written by Flume to Kafka.

    defaultPartitionId

    -

    Specifies the ID of the Kafka partition to which the events of a channel are transferred. The partitionIdHeader value overwrites this parameter value. By default, if this parameter is left blank, events will be distributed by the Kafka Producer's partitioner (by a specified key or a partitioner customized by kafka.partitioner.class).

    partitionIdHeader

    -

    When you set this parameter, the sink will take the value of the field named using the value of this property from the event header and send the message to the specified partition of the topic. If the value does not have a valid partition, EventDeliveryException is thrown. If the header value already exists, this setting overwrites the defaultPartitionId parameter.

    Other Kafka Producer Properties

    -

    Specifies other Kafka configurations. This parameter can be set to any production configuration supported by Kafka, and the .kafka prefix must be added to the configuration.

  • Thrift Sink

    Thrift Sink converts events to Thrift events and sends them to the monitoring port of the configured host. Common configurations are as follows:

    Table 14 Common configurations of a Thrift sink

    Parameter

    Default Value

    Description

    channel

    -

    Specifies the channel connected to the sink.

    type

    thrift

    Specifies the type of the thrift sink, which must be set to thrift.

    hostname

    -

    Specifies the bound host name or IP address.

    port

    -

    Specifies the bound monitoring port. Ensure that this port is not occupied.

    batch-size

    1000

    Specifies the number of events sent in a batch.

    connect-timeout

    20000

    Specifies the timeout for the first connection, expressed in milliseconds.

    request-timeout

    20000

    Specifies the maximum timeout for a request after the first request, expressed in milliseconds.

    kerberos

    false

    Specifies whether Kerberos authentication is enabled.

    client-keytab

    -

    Specifies the path of the client keytab file. The Flume running user must have the access permission on the authentication file.

    client-principal

    -

    Specifies the principal of the security user used by the client.

    server-principal

    -

    Specifies the principal of the security user used by the server.

    compression-type

    none

    Specifies the compression type of data sent by Flume, which can be set to none or deflate. none indicates that data is not compressed, while deflate indicates that data is compressed.

    maxConnections

    5

    Specifies the maximum size of the connection pool for Flume to send data.

    ssl

    false

    Specifies whether to use SSL encryption.

    truststore-type

    JKS

    Specifies the Java trust store type.

    truststore

    -

    Specifies the Java trust store file.

    truststore-password

    -

    Specifies the Java trust store password.

    reset-connection-interval

    0

    Specifies the interval between a connection failure and a second connection, expressed in seconds. If the parameter is set to 0, the system continuously attempts to perform a connection.

Precautions

  • What are the reliability measures of Flume?
    • Use the transaction mechanisms between Source and Channel as well as between Channel and Sink.
    • Configure the failover and load_balance mechanisms for Sink Processor. The following shows a load balancing example. For details, see http://flume.apache.org/releases/1.9.0.html.
      server.sinkgroups=g1
      server.sinkgroups.g1.sinks=k1 k2
      server.sinkgroups.g1.processor.type=load_balance
      server.sinkgroups.g1.processor.backoff=true
      server.sinkgroups.g1.processor.selector=random
  • What are the precautions for the aggregation and cascading of multiple Flume agents?
    • Avro or Thrift protocol can be used for cascading.
    • When the aggregation end contains multiple nodes, evenly distribute the agents and do not aggregate all agents on a single node.