典型场景:从本地采集静态日志保存到Kafka
操作场景
该任务指导用户使用Flume服务端从本地采集静态日志保存到Kafka的Topic列表(test1)。
本章节适用于MRS 3.x及之后版本。
本配置默认集群网络环境是安全的,数据传输过程不需要启用SSL认证。如需使用加密方式,请参考配置加密传输。该配置为只用一个Flume场景,例如:Spooldir Source+Memory Channel+Kafka Sink.
前提条件
- 已成功安装集群,包含Kafka及Flume服务。
- 确保集群网络环境安全。
- MRS集群管理员已明确业务需求,并准备一个Kafka管理员用户flume_kafka。
操作步骤
- 配置Flume的参数。
使用Manager界面中的Flume配置工具来配置Flume角色服务端参数并生成配置文件。
- 登录FusionInsight Manager,选择“集群 > 服务 > Flume > 配置工具”。
- “Agent名”选择“client”,然后选择要使用的source、channel以及sink,将其拖到右侧的操作界面中并将其连接。
采用SpoolDir Source、Memory Channel和Kafka Sink,如图1所示。
- 双击对应的source、channel以及sink,根据实际环境并参考表1设置对应的配置参数。
- 如果想在之前的“properties.propretites”文件上进行修改后继续使用,则登录Manager,选择“集群 > 服务 > Flume > 配置工具 > 导入”,将该文件导入后再修改非加密传输的相关配置项即可。
- 导入配置文件时,建议配置Source/Channel/Sink的各自的个数都不要超过40个,否则可能导致界面响应时间过长。
表1 Flume角色服务端所需修改的参数列表 参数名称
参数值填写规则
参数样例
名称
不能为空,必须唯一
test
spoolDir
待采集的文件所在的目录路径,此参数不能为空。该路径需存在,且对flume运行用户有读写执行权限。
/srv/BigData/hadoop/data1/zb
trackerDir
flume采集文件信息元数据保存路径。
/srv/BigData/hadoop/data1/tracker
batchSize
Flume一次发送的事件个数(数据条数)。增大会提升性能,降低实时性;反之降低性能,提升实时性。
61200
kafka.topics
订阅的Kafka topic列表,多个topic用逗号分隔,此参数不能为空。
test1
kafka.bootstrap.servers
Kafka的bootstrap地址端口列表,默认值为Kafka集群中所有的Kafkabrokers。
192.168.101.10:21007
- 单击“导出”,将配置文件“properties.properties”保存到本地。
- 上传配置文件。
- 验证日志是否传输成功。
- 登录Kafka客户端:
kinit flume_kafka(输入密码)
- 读取KafkaTopic中的数据。
bin/kafka-console-consumer.sh --topic 主题名称 --bootstrap-server Kafka角色实例所在节点的业务IP地址:21007 --consumer.config config/consumer.properties --from-beginning
系统显示待采集文件目录下的内容:
[root@host1 kafka]# bin/kafka-console-consumer.sh --topic test1 --bootstrap-server 192.168.101.10:21007 --consumer.config config/consumer.properties --from-beginning Welcome to flume
- 登录Kafka客户端: