更新时间:2024-08-03 GMT+08:00

调测Kafka High Level KafkaStreams API样例程序

在Windows中调测程序

在Windows环境调测程序步骤请参考在Windows中调测程序

在Linux环境调测程序

  1. 编译并生成Jar包,并将Jar包复制到与依赖库文件夹同级的目录“src/main/resources”下,具体步骤请参考在Linux调测程序
  2. 使用集群安装用户登录集群客户端节点。

    cd /opt/client

    source bigdata_env

  3. 创建输入Topic和输出Topic,与样例代码中指定的Topic名称保持一致,输出Topic的清理策略设置为compact。

    kafka-topics.sh --create --zookeeper quorumpeer实例IP地址:ZooKeeper客户端连接端口/kafka --replication-factor 1 --partitions 1 --topic Topic名称

    quorumpeer实例IP地址可登录集群的FusionInsight Manager界面,在“集群 > 服务 > ZooKeeper > 实例”界面中查询,多个地址可用“,”分隔。ZooKeeper客户端连接端口可通过ZooKeeper服务配置参数“clientPort”查询,例如端口号为2181。

    例如执行以下命令:

    kafka-topics.sh --create --zookeeper 192.168.0.17:2181/kafka --replication-factor 1 --partitions 1 --topic streams-wordcount-input

    kafka-topics.sh --create --zookeeper 192.168.0.17:2181/kafka --replication-factor 1 --partitions 1 --topic streams-wordcount-output --config cleanup.policy=compact

  4. Topic创建成功后,执行以下命令运行程序。

    java -cp /opt/client/lib/*:/opt/client/src/main/resources com.huawei.bigdata.kafka.example.WordCountDemo

  5. 重新打开一个客户端连接窗口,执行以下命令,使用“kafka-console-producer.sh”向输入Topic中写入消息:

    cd /opt/client

    source bigdata_env

    kafka-console-producer.sh --broker-list Broker实例IP地址:Kafka连接端口 --topic streams-wordcount-input --producer.config /opt/client/Kafka/kafka/config/producer.properties

    • Broker实例IP地址:登录FusionInsight Manager,选择“集群 > 服务 > Kafka > 实例”,在实例列表页面中查看并记录任意一个Broker实例业务IP地址。
    • Kafka连接端口:集群已启用Kerberos认证(安全模式)时Broker端口为“sasl.port”参数的值。集群未启用Kerberos认证(普通模式)时Broker端口为“port”的值。

  6. 重新打开一个客户端连接窗口,执行以下命令,使用“kafka-console-consumer.sh”从输出Topic消费数据,查看统计结果。

    cd /opt/client

    source bigdata_env

    kafka-console-consumer.sh --topic streams-wordcount-output --bootstrap-server Broker实例IP地址:Kafka连接端口 --consumer.config /opt/client/Kafka/kafka/config/consumer.properties --from-beginning --property print.key=true --property print.value=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer --formatter kafka.tools.DefaultMessageFormatter

    向输入Topic中写入消息:

    >This is Kafka Streams test 
    >test starting 
    >now Kafka Streams is running 
    >test end 

    消息输出:

    this    1 
    is      1 
    kafka   1 
    streams 1 
    test    1 
    test    2 
    starting 1 
    now     1 
    kafka   2 
    streams 2 
    is      2 
    running 1 
    test    3 
    end     1