基于Kafka的Word Count数据流统计案例
应用场景
Kafka是一个分布式的消息发布-订阅系统。它采用独特的设计提供了类似JMS的特性,主要用于处理活跃的流式数据。
Kafka有很多适用的场景:消息队列、行为跟踪、运维数据监控、日志收集、流处理、事件溯源、持久化日志等。
Kafka有如下几个特点:
- 高吞吐量
- 消息持久化到磁盘
- 分布式系统易扩展
- 容错性好
MRS服务提供了Kafka多种场景下的样例开发工程,本章节以最常见的WordCount样例Demo进行说明,对应示例场景的开发思路:
- 使用Kafka客户端创建两个Topic,用于输入Topic和输出Topic。
- 开发一个Kafka Streams完成单词统计功能,通过读取输入Topic中的消息,统计每条消息中的单词个数,从输出Topic消费数据,将统计结果以Key-Value的形式输出。
方案架构
生产者(Producer)将消息发布到Kafka主题(Topic)上,消费者(Consumer)订阅这些主题并消费这些消息。
在Kafka集群上一个服务器称为一个Broker。对于每一个主题,Kafka集群保留一个用于缩放、并行化和容错性的分区(Partition)。每个分区是一个有序、不可变的消息序列,并不断追加到提交日志文件。分区的消息每个也被赋值一个称为偏移顺序(Offset)的序列化编号。
操作流程
本实践操作流程如下所示:
- 步骤1:创建MRS集群:创建一个包含有Kafka组件的MRS集群。
- 步骤2:准备应用程序:准备用于进行分析的源数据及程序。
- 步骤3:上传jar包及源数据:将相关程序和数据上传至MRS集群内。
- 步骤4:运行作业并查看结果:运行程序并查看数据分析结果。
步骤1:创建MRS集群
- 创建并购买一个包含有Kafka组件的MRS集群,详情请参见购买自定义集群。
本文以购买的MRS 3.1.0版本的集群为例,组件包含Hadoop、Kafka组件,集群未开启Kerberos认证。
- 集群购买成功后,在MRS集群的任一节点内,安装集群客户端,具体操作可参考安装并使用集群客户端。
例如客户端安装在主管理节点中,安装目录为“/opt/client”。
- 客户端安装完成后,在客户端内创建“lib”目录,用于放置相关jar包。
将安装客户端过程中解压的目录中Kafka相关jar包复制到“lib”目录。
例如客户端软件包的下载路径为主管理节点的“/tmp/FusionInsight-Client”目录,执行以下命令:
mkdir /opt/client/lib
cd /tmp/FusionInsight-Client/FusionInsight_Cluster_1_Services_ClientConfig
scp Kafka/install_files/kafka/libs/* /opt/client/lib
步骤2:准备应用程序
- 通过开源镜像站获取样例工程。
下载样例工程的Maven工程源码和配置文件,并在本地配置好相关开发工具,可参考通过开源镜像站获取样例工程。
根据集群版本选择对应的分支,下载并获取MRS相关样例工程。
例如本章节场景对应示例为“WordCountDemo”样例,获取地址:https://github.com/huaweicloud/huaweicloud-mrs-example/tree/mrs-3.1.0/src/kafka-examples。
- 本地使用IDEA工具导入样例工程,等待Maven工程下载相关依赖包。
本地配置好Maven及SDK相关参数后,样例工程会自动加载相关依赖包,具体操作可参考配置并导入样例工程。
图2 导入Kafka样例程序
在示例程序“WordCountDemo”中,通过调用Kafka接口来获取单词记录,然后把单词记录分类统计,得到每个单词记录数,关键代码片段如下:
... static Properties getStreamsConfig() { final Properties props = new Properties(); KafkaProperties kafkaProc = KafkaProperties.getInstance(); // Broker地址列表,根据集群实际情况配置 props.put(BOOTSTRAP_SERVERS, kafkaProc.getValues(BOOTSTRAP_SERVERS, "node-group-1kLFk.mrs-rbmq.com:9092")); props.put(SASL_KERBEROS_SERVICE_NAME, "kafka"); props.put(KERBEROS_DOMAIN_NAME, kafkaProc.getValues(KERBEROS_DOMAIN_NAME, "hadoop.hadoop.com")); props.put(APPLICATION_ID, kafkaProc.getValues(APPLICATION_ID, "streams-wordcount")); // 协议类型:当前支持配置为SASL_PLAINTEXT或者PLAINTEXT props.put(SECURITY_PROTOCOL, kafkaProc.getValues(SECURITY_PROTOCOL, "PLAINTEXT")); props.put(CACHE_MAX_BYTES_BUFFERING, 0); props.put(DEFAULT_KEY_SERDE, Serdes.String().getClass().getName()); props.put(DEFAULT_VALUE_SERDE, Serdes.String().getClass().getName()); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); return props; } static void createWordCountStream(final StreamsBuilder builder) { // 从 input-topic 接收输入记录 final KStream<String, String> source = builder.stream(INPUT_TOPIC_NAME); // 聚合 key-value 键值对的计算结果 final KTable<String, Long> counts = source .flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split(REGEX_STRING))) .groupBy((key, value) -> value) .count(); // 将计算结果的 key-value 键值对从 output topic 输出 counts.toStream().to(OUTPUT_TOPIC_NAME, Produced.with(Serdes.String(), Serdes.Long())); } ...
- BOOTSTRAP_SERVERS需根据集群实际情况,配置为Kafka Broker节点的主机名及端口,可通过集群FusionInsight Manager界面中单击“集群 > 服务 > Kafka > 实例”查看Broker在Linux中调测程序实例信息。
- SECURITY_PROTOCOL为连接Kafka的协议类型,在本示例中,配置为“PLAINTEXT”。
- 确认“WordCountDemo.java”内的参数无误后,将工程编译后进行打包,获取打包后的jar文件。
编译jar包详细操作可参考在Linux中调测程序。
图3 编译Kafka程序
例如打包后的jar文件为“kafka-demo.jar”。
步骤3:上传jar包及源数据
如果本地网络无法直接连接客户端节点上传文件,可先将jar文件或者源数据上传至OBS文件系统中,然后通过MRS管理控制台集群内的“文件管理”页面导入HDFS中,再通过HDFS客户端使用hdfs dfs -get命令下载到客户端节点本地。
步骤4:运行作业并查看结果
- 使用root用户登录安装了集群客户端的节点。
cd /opt/client
source bigdata_env
- 创建输入Topic和输出Topic,与样例代码中指定的Topic名称保持一致,输出Topic的清理策略设置为compact。
kafka-topics.sh --create --zookeeper quorumpeer实例IP地址:ZooKeeper客户端连接端口/kafka --replication-factor 1 --partitions 1 --topic Topic名称
quorumpeer实例IP地址可登录集群的FusionInsight Manager界面,在“集群 > 服务 > ZooKeeper > 实例”界面中查询,多个地址可用“,”分隔。ZooKeeper客户端连接端口可通过ZooKeeper服务配置参数“clientPort”查询,默认为2181。
例如执行以下命令:
kafka-topics.sh --create --zookeeper 192.168.0.17:2181/kafka --replication-factor 1 --partitions 1 --topic streams-wordcount-input
kafka-topics.sh --create --zookeeper 192.168.0.17:2181/kafka --replication-factor 1 --partitions 1 --topic streams-wordcount-output --config cleanup.policy=compact
- Topic创建成功后,执行以下命令运行程序。
java -cp .:/opt/client/lib/* com.huawei.bigdata.kafka.example.WordCountDemo
- 重新打开一个客户端连接窗口,执行以下命令,使用“kafka-console-producer.sh”向输入Topic中写入消息:
cd /opt/client
source bigdata_env
kafka-console-producer.sh --broker-list Broker实例IP地址:Kafka连接端口(例如192.168.0.13:9092) --topic streams-wordcount-input --producer.config /opt/client/Kafka/kafka/config/producer.properties
- 重新打开一个客户端连接窗口,执行以下命令,使用“kafka-console-consumer.sh”命令消费数据,查看统计结果。
cd /opt/client
source bigdata_env
kafka-console-consumer.sh --topic streams-wordcount-output --bootstrap-server Broker实例IP地址:Kafka连接端口 --consumer.config /opt/client/Kafka/kafka/config/consumer.properties --from-beginning --property print.key=true --property print.value=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer --formatter kafka.tools.DefaultMessageFormatter
向步骤 4中打开的写入窗口中写入消息:
>This is Kafka Streams test >test starting >now Kafka Streams is running >test end
查看到消息输出:
this 1 is 1 kafka 1 streams 1 test 1 test 2 starting 1 now 1 kafka 2 streams 2 is 2 running 1 test 3 end 1