业务模型配置指导
Flume业务配置及模块选择过程中,一般要求Sink的极限吞吐量需要大于Source的极限吞吐量,否则在极限负载的场景下,Source往Channel的写入速度大于Sink从Channel取出的速度,从而导致Channel频繁被写满,进而影响性能表现。
Avro Source和Avro Sink一般都是成对出现,用于多个Flume Agent间进行数据中转,因此一般场景下Avro Source和Avro Sink都不会成为性能瓶颈。
模块间性能
根据模块间极限性能对比,可以看到对于前端是SpoolDir Source的场景下,Kafka Sink和HDFS Sink都能满足吞吐量要求,但是HBase Sink由于自身写入性能较低的原因,会成为性能瓶颈,会导致数据都积压在Channel中。但是如果有必须使用HBase Sink或者其他性能容易成为瓶颈的Sink的场景时,可以选择使用Channel Selector或者Sink Group来满足性能要求。
Channel Selector
Channel Selector可以允许一个Source对接多个Channel,通过选择不同的Selector类型来将Source的数据进行分流或者复制,目前Flume提供的Channel Selector有两种:Replicating和Multiplexing。
Replicating:表示Source的数据同步发送给所有Channel。
Multiplexing:表示根据Event中的Header的指定字段的值来进行判断,从而选择相应的Channel进行发送,从而起到根据业务类型进行分流的目的。
- Replicating配置样例:
client.sources = kafkasource client.channels = channel1 channel2 client.sources.kafkasource.type = org.apache.flume.source.kafka.KafkaSource client.sources.kafkasource.kafka.topics = topic1,topic2 client.sources.kafkasource.kafka.consumer.group.id = flume client.sources.kafkasource.kafka.bootstrap.servers = 10.69.112.108:21007 client.sources.kafkasource.kafka.security.protocol = SASL_PLAINTEXT client.sources.kafkasource.batchDurationMillis = 1000 client.sources.kafkasource.batchSize = 800 client.sources.kafkasource.channels = channel1 c el2 client.sources.kafkasource.selector.type = replicating client.sources.kafkasource.selector.optional = channel2
表1 Replicating配置样例参数说明 选项名称
默认值
描述
Selector.type
replicating
Selector类型,应配置为replicating
Selector.optional
-
可选Channel,可以配置为列表
- Multiplexing配置样例:
client.sources = kafkasource client.channels = channel1 channel2 client.sources.kafkasource.type = org.apache.flume.source.kafka.KafkaSource client.sources.kafkasource.kafka.topics = topic1,topic2 client.sources.kafkasource.kafka.consumer.group.id = flume client.sources.kafkasource.kafka.bootstrap.servers = 10.69.112.108:21007 client.sources.kafkasource.kafka.security.protocol = SASL_PLAINTEXT client.sources.kafkasource.batchDurationMillis = 1000 client.sources.kafkasource.batchSize = 800 client.sources.kafkasource.channels = channel1 channel2 client.sources.kafkasource.selector.type = multiplexing client.sources.kafkasource.selector.header = myheader client.sources.kafkasource.selector.mapping.topic1 = channel1 client.sources.kafkasource.selector.mapping.topic2 = channel2 client.sources.kafkasource.selector.default = channel1
表2 Multiplexing配置样例参数说明 选项名称
默认值
描述
Selector.type
replicating
Selector类型,应配置为multiplexing
Selector.header
Flume.selector.header
-
Selector.default
-
-
Selector.mapping.*
-
-
Multiplexing类型的Selector的样例中,选择Event中Header名称为topic的字段来进行判断,当Header中topic字段的值为topic1时,向channel1发送该Event,当Header中topic字段的值为topic2时,向channel2发送该Event。
这种Selector需要借助Source中Event的特定Header来进行Channel的选择,需要根据业务场景选择合理的Header来进行数据分流。
SinkGroup
当后端单Sink性能不足、需要高可靠性保证或者异构输出时可以使用Sink Group来将指定的Channel和多个Sink对接,从而满足相应的使用场景。目前Flume提供了两种Sink Processor用于对Sink Group中的Sink进行管理:Load Balancing和Failover。
Failover:表示在Sink Group中同一时间只有一个Sink处于活跃状态,其他Sink作为备份处于非活跃状态,当活跃状态的Sink故障时,根据优先级从非活跃状态的Sink中选择一个来接管业务,保证数据不会丢失,多用于高可靠性场景。
Load Balancing:表示在Sink Group中所有Sink都处于活跃状态,每个Sink都会从Channel中去获取数据并进行处理,并且保证在运行过程中该Sink Group的所有Sink的负载是均衡的,多用于性能提升场景。
- Load Balancing配置样例:
client.sources = source1 client.sinks = sink1 sink2 client.channels = channel1 client.sinkgroups = g1 client.sinkgroups.g1.sinks = sink1 sink2 client.sinkgroups.g1.processor.type = load_balance client.sinkgroups.g1.processor.backoff = true client.sinkgroups.g1.processor.selector = random client.sinks.sink1.type = logger client.sinks.sink1.channel = channel1 client.sinks.sink2.type = logger client.sinks.sink2.channel = channel1
表3 Load Balancing配置样例参数说明 选项名称
默认值
描述
sinks
-
Sink Group的sink列表,多个以空格分隔
processor.type
default
Processor的类型,应配置为load_balance
processor.backoff
false
是否以指数的形式退避失败的Sinks
processor.selector
round_robin
选择机制。必须是round_robin,random或者自定义的类,且该类继承了AbstractSinkSelector
processor.selector.maxTimeOut
30000
屏蔽故障sink的时间,默认是30000毫秒
- Failover配置样例:
client.sources = source1 client.sinks = sink1 sink2 client.channels = channel1 client.sinkgroups = g1 client.sinkgroups.g1.sinks = sink1 sink2 client.sinkgroups.g1.processor.type = failover client.sinkgroups.g1.processor.priority.sink1 = 10 client.sinkgroups.g1.processor.priority.sink2 = 5 client.sinkgroups.g1.processor.maxpenalty = 10000 client.sinks.sink1.type = logger client.sinks.sink1.channel = channel1 client.sinks.sink2.type = logger client.sinks.sink2.channel = channel1
表4 Failover配置样例参数说明 选项名称
默认值
描述
sinks
-
Sink Group的sink列表,多个以空格分隔
processor.type
default
Processor的类型,应配置为failover
processor.priority.<sinkName>
-
优先级值。<sinkName> 必须是sinks中有定义的。优先级值高Sink会更早被激活。值越大,优先级越高。注:多个sinks的话,优先级的值不要相同,如果优先级相同的话,只会有一个生效。
processor.maxpenalty
30000
失败的Sink最大的退避时间(单位:毫秒)
Interceptors
Flume的拦截器(Interceptor)支持在数据传输过程中修改或丢弃传输的基本单元Event。用户可以通过在配置中指定Flume内建拦截器的类名列表,也可以开发自定义的拦截器来实现Event的修改或丢弃。Flume内建支持的拦截器如下表所示,本章节会选取一个较为复杂的作为示例。其余的用户可以根据需要自行配置使用。
1. 拦截器用在Flume的Source、Channel之间,大部分的Source都带有Interceptor参数。用户可以依据需要配置。
2. Flume支持一个Source配置多个拦截器,各拦截器名称用空格分开。
3. 指定拦截器的顺序就是它们被调用的顺序。
4. 使用拦截器在Header中插入的内容,都可以在Sink中读取并使用。
拦截器类型 |
简要描述 |
---|---|
Timestamp Interceptor |
该拦截器会在Event的Header中插入一个时间戳。 |
Host Interceptor |
该拦截器会在Event的Header中插入当前Agent所在节点的IP或主机名。 |
Remove Header Interceptor |
该拦截器会依据Header中包含的符合正则匹配的字符串,丢弃掉对应的Event。 |
UUID Interceptor |
该拦截器会为每个Event的Header生成一个UUID字符串。 |
Search and Replace Interceptor |
该拦截器基于Java正则表达式提供简单的基于字符串的搜索和替换功能。与Java Matcher.replaceAll() 的规则相同。 |
Regex Filtering Interceptor |
该拦截器通过将Event的Body体解释为文本文件,与配置的正则表达式进行匹配来选择性的过滤Event。提供的正则表达式可用于排除或包含事件。 |
Regex Extractor Interceptor |
该拦截器使用正则表达式抽取原始events中的内容,并将该内容加入events的header中。 |
选项名称 |
默认值 |
描述 |
---|---|---|
type |
- |
组件类型名称,必须写为regex_filter。 |
regex |
- |
用于匹配事件的正则表达式。 |
excludeEvents |
false |
默认收集匹配到的Event。设置为true,则会删除匹配的Event,保留不匹配的。 |
#define the source、channel、sink server.sources = r1 server.channels = c1 server.sinks = k1 #config the source server.sources.r1.type = netcat server.sources.r1.bind = ${主机IP} server.sources.r1.port = 44444 server.sources.r1.interceptors= i1 server.sources.r1.interceptors.i1.type= regex_filter server.sources.r1.interceptors.i1.regex= (flume)|(myflume) server.sources.r1.interceptors.i1.excludeEvents= false server.sources.r1.channels = c1 #config the channel server.channels.c1.type = memory server.channels.c1.capacity = 1000 server.channels.c1.transactionCapacity = 100 #config the sink server.sinks.k1.type = logger server.sinks.k1.channel = c1