更新时间:2023-06-20 GMT+08:00
分享

调测Low Level KafkaStreams API样例

在Windows中调测程序

在Windows环境调测程序步骤请参考在Windows中调测程序

在Linux环境调测程序

  1. 编译并生成Jar包,并将Jar包拷贝到与依赖库文件夹同级的目录“src/main/resources”下,具体步骤请参考在Linux调测程序
  2. 使用root用户登录安装了集群客户端的节点。

    cd /opt/client

    source bigdata_env

  3. 创建输入Topic和输出Topic,与样例代码中指定的Topic名称保持一致,输出Topic的清理策略设置为compact。

    kafka-topics.sh --create --zookeeper quorumpeer实例IP地址:ZooKeeper客户端连接端口/kafka --replication-factor 1 --partitions 1 --topic Topic名称

    quorumpeer实例IP地址可登录集群的FusionInsight Manager界面,在“集群 > 服务 > ZooKeeper > 实例”界面中查询,多个地址可用“,”分隔。ZooKeeper客户端连接端口可通过ZooKeeper服务配置参数“clientPort”查询,默认为2181。

    例如执行以下命令:

    kafka-topics.sh --create --zookeeper 192.168.0.17:2181/kafka --replication-factor 1 --partitions 1 --topic streams-wordcount-processor-input

    kafka-topics.sh --create --zookeeper 192.168.0.17:2181/kafka --replication-factor 1 --partitions 1 --topic streams-wordcount-processor-output --config cleanup.policy=compact

  4. Topic创建成功后,执行以下命令运行程序。

    java -cp /opt/client/lib/*:/opt/client/src/main/resources com.huawei.bigdata.kafka.example.WordCountDemo

  5. 重新打开一个客户端连接窗口,执行以下命令,使用“kafka-console-producer.sh”向输入Topic中写入消息:

    cd /opt/client

    source bigdata_env

    kafka-console-producer.sh --broker-list Broker实例IP地址:Kafka连接端口(例如192.168.0.13:9092) --topic streams-wordcount-processor-input --producer.config /opt/client/Kafka/kafka/config/producer.properties

  6. 重新打开一个客户端连接窗口,执行以下命令,使用“kafka-console-consumer.sh”从输出Topic消费数据,查看统计结果。

    cd /opt/client

    source bigdata_env

    kafka-console-consumer.sh --topic streams-wordcount-processor-output --bootstrap-server Broker实例IP地址:Kafka连接端口 --consumer.config /opt/client/Kafka/kafka/config/consumer.properties --from-beginning --property print.key=true --property print.value=true

    向输入Topic中写入消息:

    >This is Kafka Streams test 
    >test starting 
    >now Kafka Streams is running 
    >test end 

    消息输出:

    this    1 
    is      1 
    kafka   1 
    streams 1 
    test    1 
    test    2 
    starting 1 
    now     1 
    kafka   2 
    streams 2 
    is      2 
    running 1 
    test    3 
    end     1

分享:

    相关文档

    相关产品