调测Kafka High level Streams样例程序
在Windows中调测程序
在Windows环境调测程序步骤请参考在Windows中调测程序。
在Linux环境调测程序
- 编译并生成Jar包,并将Jar包复制到与依赖库文件夹同级的目录“src/main/resources”下,具体步骤请参考在Linux调测程序。
- 使用集群安装用户登录集群客户端节点。
cd /opt/client
source bigdata_env
kinit 组件操作用户(例如developuser)
- 创建输入Topic和输出Topic,与样例代码中指定的Topic名称保持一致,输出Topic的清理策略设置为compact。
kafka-topics.sh --create --zookeeper quorumpeer实例IP地址:ZooKeeper客户端连接端口/kafka --replication-factor 1 --partitions 1 --topic Topic名称
quorumpeer实例IP地址可登录集群的FusionInsight Manager界面,在“集群 > 服务 > ZooKeeper > 实例”界面中查询,多个地址可用“,”分隔。ZooKeeper客户端连接端口可通过ZooKeeper服务配置参数“clientPort”查询,例如端口号为2181。
例如执行以下命令:
kafka-topics.sh --create --zookeeper 192.168.0.17:2181/kafka --replication-factor 1 --partitions 1 --topic streams-wordcount-input
kafka-topics.sh --create --zookeeper 192.168.0.17:2181/kafka --replication-factor 1 --partitions 1 --topic streams-wordcount-output --config cleanup.policy=compact
- Topic创建成功后,执行以下命令运行程序。
java -cp /opt/client/lib/*:/opt/client/src/main/resources com.huawei.bigdata.kafka.example.WordCountDemo
- 重新打开一个客户端连接窗口,执行以下命令,使用“kafka-console-producer.sh”向输入Topic中写入消息:
cd /opt/client
source bigdata_env
kinit 组件操作用户(例如developuser)
kafka-console-producer.sh --broker-list Broker实例IP地址:Kafka连接端口 --topic streams-wordcount-input --producer.config /opt/client/Kafka/kafka/config/producer.properties
- Broker实例IP地址:登录FusionInsight Manager,选择“集群 > 服务 > Kafka > 实例”,在实例列表页面中查看并记录任意一个Broker实例业务IP地址。
- Kafka连接端口:集群已启用Kerberos认证(安全模式)时Broker端口为“sasl.port”参数的值。集群未启用Kerberos认证(普通模式)时Broker端口为“port”的值。
- 重新打开一个客户端连接窗口,执行以下命令,使用“kafka-console-consumer.sh”从输出Topic消费数据,查看统计结果。
cd /opt/client
source bigdata_env
kinit 组件操作用户(例如developuser)
kafka-console-consumer.sh --topic streams-wordcount-output --bootstrap-server Broker实例IP地址:Kafka连接端口 --consumer.config /opt/client/Kafka/kafka/config/consumer.properties --from-beginning --property print.key=true --property print.value=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer --formatter kafka.tools.DefaultMessageFormatter
向输入Topic中写入消息:
>This is Kafka Streams test >test starting >now Kafka Streams is running >test end
消息输出:
this 1 is 1 kafka 1 streams 1 test 1 test 2 starting 1 now 1 kafka 2 streams 2 is 2 running 1 test 3 end 1