更新时间:2024-11-29 GMT+08:00

业务模型配置指导

Flume业务配置及模块选择过程中,一般要求Sink的极限吞吐量需要大于Source的极限吞吐量,否则在极限负载的场景下,Source往Channel的写入速度大于Sink从Channel取出的速度,从而导致Channel频繁被写满,进而影响性能表现。

Avro Source和Avro Sink一般都是成对出现,用于多个Flume Agent间进行数据中转,因此一般场景下Avro Source和Avro Sink都不会成为性能瓶颈。

模块间性能

根据模块间极限性能对比,可以看到对于前端是SpoolDir Source的场景下,Kafka Sink和HDFS Sink都能满足吞吐量要求,但是HBase Sink由于自身写入性能较低的原因,会成为性能瓶颈,会导致数据都积压在Channel中。但是如果有必须使用HBase Sink或者其他性能容易成为瓶颈的Sink的场景时,可以选择使用Channel Selector或者Sink Group来满足性能要求。

Channel Selector

Channel Selector可以允许一个Source对接多个Channel,通过选择不同的Selector类型来将Source的数据进行分流或者复制,目前Flume提供的Channel Selector有两种:Replicating和Multiplexing。

Replicating:表示Source的数据同步发送给所有Channel。

Multiplexing:表示根据Event中的Header的指定字段的值来进行判断,从而选择相应的Channel进行发送,从而起到根据业务类型进行分流的目的。

  • Replicating配置样例:
    client.sources = kafkasource
    client.channels = channel1 channel2
    client.sources.kafkasource.type = org.apache.flume.source.kafka.KafkaSource
    client.sources.kafkasource.kafka.topics = topic1,topic2
    client.sources.kafkasource.kafka.consumer.group.id = flume
    client.sources.kafkasource.kafka.bootstrap.servers = 10.69.112.108:21007
    client.sources.kafkasource.kafka.security.protocol = SASL_PLAINTEXT
    client.sources.kafkasource.batchDurationMillis = 1000
    client.sources.kafkasource.batchSize = 800
    client.sources.kafkasource.channels = channel1 c el2
    
    client.sources.kafkasource.selector.type = replicating
    client.sources.kafkasource.selector.optional = channel2
    表1 Replicating配置样例参数说明

    选项名称

    默认值

    描述

    Selector.type

    replicating

    Selector类型,应配置为replicating

    Selector.optional

    -

    可选Channel,可以配置为列表

  • Multiplexing配置样例:
    client.sources = kafkasource
    client.channels = channel1 channel2
    client.sources.kafkasource.type = org.apache.flume.source.kafka.KafkaSource
    client.sources.kafkasource.kafka.topics = topic1,topic2
    client.sources.kafkasource.kafka.consumer.group.id = flume
    client.sources.kafkasource.kafka.bootstrap.servers = 10.69.112.108:21007
    client.sources.kafkasource.kafka.security.protocol = SASL_PLAINTEXT
    client.sources.kafkasource.batchDurationMillis = 1000
    client.sources.kafkasource.batchSize = 800
    client.sources.kafkasource.channels = channel1 channel2
    
    client.sources.kafkasource.selector.type = multiplexing
    client.sources.kafkasource.selector.header = myheader
    client.sources.kafkasource.selector.mapping.topic1 = channel1
    client.sources.kafkasource.selector.mapping.topic2 = channel2
    client.sources.kafkasource.selector.default = channel1
    表2 Multiplexing配置样例参数说明

    选项名称

    默认值

    描述

    Selector.type

    replicating

    Selector类型,应配置为multiplexing

    Selector.header

    Flume.selector.header

    -

    Selector.default

    -

    -

    Selector.mapping.*

    -

    -

    Multiplexing类型的Selector的样例中,选择Event中Header名称为topic的字段来进行判断,当Header中topic字段的值为topic1时,向channel1发送该Event,当Header中topic字段的值为topic2时,向channel2发送该Event。

    这种Selector需要借助Source中Event的特定Header来进行Channel的选择,需要根据业务场景选择合理的Header来进行数据分流。

SinkGroup

当后端单Sink性能不足、需要高可靠性保证或者异构输出时可以使用Sink Group来将指定的Channel和多个Sink对接,从而满足相应的使用场景。目前Flume提供了两种Sink Processor用于对Sink Group中的Sink进行管理:Load Balancing和Failover。

Failover:表示在Sink Group中同一时间只有一个Sink处于活跃状态,其他Sink作为备份处于非活跃状态,当活跃状态的Sink故障时,根据优先级从非活跃状态的Sink中选择一个来接管业务,保证数据不会丢失,多用于高可靠性场景。

Load Balancing:表示在Sink Group中所有Sink都处于活跃状态,每个Sink都会从Channel中去获取数据并进行处理,并且保证在运行过程中该Sink Group的所有Sink的负载是均衡的,多用于性能提升场景。

  • Load Balancing配置样例:
    client.sources = source1  
    client.sinks = sink1 sink2
    client.channels = channel1
    
    client.sinkgroups = g1
    client.sinkgroups.g1.sinks = sink1 sink2
    client.sinkgroups.g1.processor.type = load_balance
    client.sinkgroups.g1.processor.backoff = true
    client.sinkgroups.g1.processor.selector = random
    
    client.sinks.sink1.type = logger
    client.sinks.sink1.channel = channel1
    
    client.sinks.sink2.type = logger
    client.sinks.sink2.channel = channel1
    表3 Load Balancing配置样例参数说明

    选项名称

    默认值

    描述

    sinks

    -

    Sink Group的sink列表,多个以空格分隔

    processor.type

    default

    Processor的类型,应配置为load_balance

    processor.backoff

    false

    是否以指数的形式退避失败的Sinks

    processor.selector

    round_robin

    选择机制。必须是round_robin,random或者自定义的类,且该类继承了AbstractSinkSelector

    processor.selector.maxTimeOut

    30000

    屏蔽故障sink的时间,默认是30000毫秒

  • Failover配置样例:
    client.sources = source1       
    client.sinks = sink1 sink2
    client.channels = channel1
    
    client.sinkgroups = g1
    client.sinkgroups.g1.sinks = sink1 sink2
    client.sinkgroups.g1.processor.type = failover
    client.sinkgroups.g1.processor.priority.sink1 = 10
    client.sinkgroups.g1.processor.priority.sink2 = 5
    client.sinkgroups.g1.processor.maxpenalty = 10000
    
    client.sinks.sink1.type = logger
    client.sinks.sink1.channel = channel1
    
    client.sinks.sink2.type = logger
    client.sinks.sink2.channel = channel1
    表4 Failover配置样例参数说明

    选项名称

    默认值

    描述

    sinks

    -

    Sink Group的sink列表,多个以空格分隔

    processor.type

    default

    Processor的类型,应配置为failover

    processor.priority.<sinkName>

    -

    优先级值。<sinkName> 必须是sinks中有定义的。优先级值高Sink会更早被激活。值越大,优先级越高。:多个sinks的话,优先级的值不要相同,如果优先级相同的话,只会有一个生效。

    processor.maxpenalty

    30000

    失败的Sink最大的退避时间(单位:毫秒)

Interceptors

Flume的拦截器(Interceptor)支持在数据传输过程中修改或丢弃传输的基本单元Event。用户可以通过在配置中指定Flume内建拦截器的类名列表,也可以开发自定义的拦截器来实现Event的修改或丢弃。Flume内建支持的拦截器如下表所示,本章节会选取一个较为复杂的作为示例。其余的用户可以根据需要自行配置使用。

1. 拦截器用在Flume的Source、Channel之间,大部分的Source都带有Interceptor参数。用户可以依据需要配置。

2. Flume支持一个Source配置多个拦截器,各拦截器名称用空格分开。

3. 指定拦截器的顺序就是它们被调用的顺序。

4. 使用拦截器在Header中插入的内容,都可以在Sink中读取并使用。

表5 Flume内建支持的拦截器类型

拦截器类型

简要描述

Timestamp Interceptor

该拦截器会在Event的Header中插入一个时间戳。

Host Interceptor

该拦截器会在Event的Header中插入当前Agent所在节点的IP或主机名。

Remove Header Interceptor

该拦截器会依据Header中包含的符合正则匹配的字符串,丢弃掉对应的Event。

UUID Interceptor

该拦截器会为每个Event的Header生成一个UUID字符串。

Search and Replace Interceptor

该拦截器基于Java正则表达式提供简单的基于字符串的搜索和替换功能。与Java Matcher.replaceAll() 的规则相同。

Regex Filtering Interceptor

该拦截器通过将Event的Body体解释为文本文件,与配置的正则表达式进行匹配来选择性的过滤Event。提供的正则表达式可用于排除或包含事件。

Regex Extractor Interceptor

该拦截器使用正则表达式抽取原始events中的内容,并将该内容加入events的header中。

下面以Regex Filtering Interceptor 为例说明Interceptor使用(其余的可参考官网配置):
表6 Regex Filtering Interceptor配置参数说明

选项名称

默认值

描述

type

-

组件类型名称,必须写为regex_filter。

regex

-

用于匹配事件的正则表达式。

excludeEvents

false

默认收集匹配到的Event。设置为true,则会删除匹配的Event,保留不匹配的。

配置示例(为了方便观察,此模型使用了netcat tcp作为Source源,logger作为Sink)。配置好如下参数后,在Linux的配置的主机节点上执行Linux命令“telnet 主机名或IP 44444”,并任意敲入符合正则和不符合正则的字符串。会在日志中观察到,只有匹配到的字符串被传输了。
#define the source、channel、sink
server.sources = r1

server.channels = c1
server.sinks = k1

#config the source
server.sources.r1.type = netcat
server.sources.r1.bind = ${主机IP}
server.sources.r1.port = 44444
server.sources.r1.interceptors= i1
server.sources.r1.interceptors.i1.type= regex_filter
server.sources.r1.interceptors.i1.regex= (flume)|(myflume)
server.sources.r1.interceptors.i1.excludeEvents= false
server.sources.r1.channels = c1

#config the channel
server.channels.c1.type = memory
server.channels.c1.capacity = 1000
server.channels.c1.transactionCapacity = 100
#config the sink
server.sinks.k1.type = logger
server.sinks.k1.channel = c1