更新时间:2024-08-03 GMT+08:00

快速开发Kafka应用

Kafka是一个分布式的消息发布-订阅系统。它采用独特的设计提供了类似JMS的特性,主要用于处理活跃的流式数据。

Kafka有很多适用的场景:消息队列、行为跟踪、运维数据监控、日志收集、流处理、事件溯源、持久化日志等。

Kafka有如下几个特点:

  • 高吞吐量
  • 消息持久化到磁盘
  • 分布式系统易扩展
  • 容错性好
  • 支持online和offline场景

MRS对外提供了基于Kafka组件的应用开发样例工程,本实践用于指导您创建MRS集群后,获取并导入样例工程并在本地进行编译调测,用于实现流式数据的处理。

本章节对应示例场景的开发思路:

  1. 使用Kafka客户端创建两个Topic,用于输入Topic和输出Topic。
  2. 开发一个Kafka Streams完成单词统计功能,通过读取输入Topic中的消息,统计每条消息中的单词个数,从输出Topic消费数据,将统计结果以Key-Value的形式输出。

创建MRS集群

  1. 购买一个包含有Kafka组件的MRS集群,详情请参见购买自定义集群

    本文以购买的MRS 3.1.0版本的集群为例,组件包含Hadoop、Kafka组件,集群未开启Kerberos认证。

  2. 集群购买成功后,在MRS集群的任一节点内,安装集群客户端,具体操作可参考安装并使用集群客户端

    例如客户端安装在主管理节点中,安装目录为“/opt/client”。

  3. 客户端安装完成后,在客户端内创建“lib”目录,用于放置相关jar包。

    将安装客户端过程中解压的目录中Kafka相关jar包复制到“lib”目录。

    例如客户端软件包的下载路径为主管理节点的“/tmp/FusionInsight-Client”目录,执行以下命令:

    mkdir /opt/client/lib

    cd /tmp/FusionInsight-Client/FusionInsight_Cluster_1_Services_ClientConfig

    scp Kafka/install_files/kafka/libs/* /opt/client/lib

准备应用程序

  1. 通过开源镜像站获取样例工程。

    下载样例工程的Maven工程源码和配置文件,并在本地配置好相关开发工具,可参考通过开源镜像站获取样例工程

    根据集群版本选择对应的分支,下载并获取MRS相关样例工程。

    例如本章节场景对应示例为“WordCountDemo”样例,获取地址:https://github.com/huaweicloud/huaweicloud-mrs-example/tree/mrs-3.1.0/src/kafka-examples

  2. 本地使用IDEA工具导入样例工程,等待Maven工程下载相关依赖包。

    本地配置好Maven及SDK相关参数后,样例工程会自动加载相关依赖包,具体操作可参考配置并导入样例工程

    在示例程序“WordCountDemo”中,通过调用Kafka接口来获取单词记录,然后把单词记录分类统计,得到每个单词记录数,关键代码片段如下:

    ...
        static Properties getStreamsConfig() {
            final Properties props = new Properties();
            KafkaProperties kafkaProc = KafkaProperties.getInstance();
            // Broker地址列表,根据集群实际情况配置
            props.put(BOOTSTRAP_SERVERS, kafkaProc.getValues(BOOTSTRAP_SERVERS, "node-group-1kLFk.mrs-rbmq.com:9092"));
            props.put(SASL_KERBEROS_SERVICE_NAME, "kafka");
            props.put(KERBEROS_DOMAIN_NAME, kafkaProc.getValues(KERBEROS_DOMAIN_NAME, "hadoop.hadoop.com"));
            props.put(APPLICATION_ID, kafkaProc.getValues(APPLICATION_ID, "streams-wordcount"));
            // 协议类型:当前支持配置为SASL_PLAINTEXT或者PLAINTEXT
            props.put(SECURITY_PROTOCOL, kafkaProc.getValues(SECURITY_PROTOCOL, "PLAINTEXT"));
            props.put(CACHE_MAX_BYTES_BUFFERING, 0);
            props.put(DEFAULT_KEY_SERDE, Serdes.String().getClass().getName());
            props.put(DEFAULT_VALUE_SERDE, Serdes.String().getClass().getName());
            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
            return props;
        }
        static void createWordCountStream(final StreamsBuilder builder) {
            // 从 input-topic 接收输入记录
            final KStream<String, String> source = builder.stream(INPUT_TOPIC_NAME);
            // 聚合 key-value 键值对的计算结果
            final KTable<String, Long> counts = source
                    .flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split(REGEX_STRING)))
                    .groupBy((key, value) -> value)
                    .count();
            // 将计算结果的 key-value 键值对从 output topic 输出
            counts.toStream().to(OUTPUT_TOPIC_NAME, Produced.with(Serdes.String(), Serdes.Long()));
        }
    ...
    • BOOTSTRAP_SERVERS需根据集群实际情况,配置为Kafka Broker节点的主机名及端口,可通过集群FusionInsight Manager界面中选择“集群 > 服务 > Kafka > 实例”查看Broker实例信息。
    • SECURITY_PROTOCOL为连接Kafka的协议类型,在本示例中,配置为“PLAINTEXT”。

  3. 确认“WordCountDemo.java”内的参数无误后,将工程编译后进行打包,获取打包后的jar文件。

    编译jar包详细操作可参考在Linux中调测程序

    例如打包后的jar文件为“kafka-demo.jar”。

上传jar包及源数据

  1. 将编译后的jar包上传到客户端节点,例如上传到“/opt/client/lib”目录下。

    如果本地网络无法直接连接客户端节点上传文件,可先将jar文件或者源数据上传至OBS文件系统中,然后通过MRS管理控制台集群内的“文件管理”页面导入HDFS中,再通过HDFS客户端使用hdfs dfs -get命令下载到客户端节点本地。

运行作业并查看结果

  1. 使用root用户登录安装了集群客户端的节点。

    cd /opt/client

    source bigdata_env

  2. 创建输入Topic和输出Topic,与样例代码中指定的Topic名称保持一致,输出Topic的清理策略设置为compact。

    kafka-topics.sh --create --zookeeper quorumpeer实例IP地址:ZooKeeper客户端连接端口/kafka --replication-factor 1 --partitions 1 --topic Topic名称

    quorumpeer实例IP地址可登录集群的FusionInsight Manager界面,在“集群 > 服务 > ZooKeeper > 实例”界面中查询,多个地址可用“,”分隔。ZooKeeper客户端连接端口可通过ZooKeeper服务配置参数“clientPort”查询,默认为2181。

    例如执行以下命令:

    kafka-topics.sh --create --zookeeper 192.168.0.17:2181/kafka --replication-factor 1 --partitions 1 --topic streams-wordcount-input

    kafka-topics.sh --create --zookeeper 192.168.0.17:2181/kafka --replication-factor 1 --partitions 1 --topic streams-wordcount-output --config cleanup.policy=compact

  3. Topic创建成功后,执行以下命令运行程序。

    java -cp .:/opt/client/lib/* com.huawei.bigdata.kafka.example.WordCountDemo

  4. 重新打开一个客户端连接窗口,执行以下命令,使用“kafka-console-producer.sh”向输入Topic中写入消息:

    cd /opt/client

    source bigdata_env

    kafka-console-producer.sh --broker-list Broker实例IP地址:Kafka连接端口(例如192.168.0.13:9092) --topic streams-wordcount-input --producer.config /opt/client/Kafka/kafka/config/producer.properties

  5. 重新打开一个客户端连接窗口,执行以下命令,使用“kafka-console-consumer.sh”从输出Topic消费数据,查看统计结果。

    cd /opt/client

    source bigdata_env

    kafka-console-consumer.sh --topic streams-wordcount-output --bootstrap-server Broker实例IP地址:Kafka连接端口 --consumer.config /opt/client/Kafka/kafka/config/consumer.properties --from-beginning --property print.key=true --property print.value=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer --formatter kafka.tools.DefaultMessageFormatter

    向输入Topic中写入消息:

    >This is Kafka Streams test 
    >test starting 
    >now Kafka Streams is running 
    >test end 

    消息输出:

    this    1 
    is      1 
    kafka   1 
    streams 1 
    test    1 
    test    2 
    starting 1 
    now     1 
    kafka   2 
    streams 2 
    is      2 
    running 1 
    test    3 
    end     1