使用Flume服务端从本地采集静态日志保存到Kafka
该任务指导用户使用Flume服务端从本地采集静态日志保存到Kafka的Topic列表(test1)。
本配置默认集群网络环境是安全的,数据传输过程不需要启用SSL认证。如需使用加密方式,请参考配置Flume加密传输数据采集任务。该配置为只用一个Flume场景,例如:Spooldir Source+Memory Channel+Kafka Sink。
前提条件
- 已成功安装集群,包含Kafka及Flume服务。
- 确保集群网络环境安全。
- MRS集群管理员已明确业务需求,并准备一个Kafka管理员用户flume_kafka。
操作步骤
- 配置Flume的参数。
使用Manager界面中的Flume配置工具来配置Flume角色服务端参数并生成配置文件。
- 登录FusionInsight Manager,选择“集群 > 服务 > Flume > 配置工具”。
图1 选择配置工具
- “Agent名”选择“server”,然后选择要使用的source、channel以及sink,将其拖到右侧的操作界面中并将其连接。
采用SpoolDir Source、Memory Channel和Kafka Sink,如图2所示。
- 双击对应的source、channel以及sink,根据实际环境并参考表1设置对应的配置参数。
- 如果想在之前的“properties.propretites”文件上进行修改后继续使用,则登录Manager,选择“集群 > 服务 > Flume > 配置工具 > 导入”,将该文件导入后再修改非加密传输的相关配置项即可。
- 导入配置文件时,建议配置Source/Channel/Sink的各自的个数都不要超过40个,否则可能导致界面响应时间过长。
表1 Flume角色服务端所需修改的参数列表 参数名称
参数值填写规则
参数样例
名称
不能为空,必须唯一
test
spoolDir
待采集的文件所在的目录路径,此参数不能为空。该路径需存在,且对flume运行用户有读写执行权限。
/srv/BigData/hadoop/data1/zb
trackerDir
flume采集文件信息元数据保存路径。
/srv/BigData/hadoop/data1/tracker
batchSize
Flume一次发送的事件个数(数据条数)。增大会提升性能,降低实时性;反之降低性能,提升实时性。
61200
kafka.topics
订阅的Kafka topic列表,多个topic用逗号分隔,此参数不能为空。
test1
kafka.bootstrap.servers
Kafka的bootstrap地址端口列表,默认值为Kafka集群中所有的Kafkabrokers。
192.168.101.10:21007
- 单击“导出”,将配置文件“properties.properties”保存到本地。
- 登录FusionInsight Manager,选择“集群 > 服务 > Flume > 配置工具”。
- 上传配置文件。
登录FusionInsight Manager,选择“集群 > 服务 > Flume”,在“实例”下单击准备上传配置文件的“Flume”角色,单击“实例配置 ”页面“flume.config.file”参数后的“上传文件”,选择1.d导出的“properties.properties”文件完成操作。
- 验证日志是否传输成功。
- 登录Kafka客户端:
kinit flume_kafka(输入密码)
- 读取KafkaTopic中的数据(修改命令中的中文为实际参数)。
bin/kafka-console-consumer.sh --topic 主题名称 --bootstrap-server Kafka角色实例所在节点的业务IP地址:21007 --consumer.config config/consumer.properties --from-beginning
系统显示待采集文件目录下的内容:
[root@host1 kafka]# bin/kafka-console-consumer.sh --topic test1 --bootstrap-server 192.168.101.10:21007 --consumer.config config/consumer.properties --from-beginning Welcome to flume
- 登录Kafka客户端: