更新时间:2024-08-03 GMT+08:00

Flink Kafka样例程序开发思路

场景说明

假定某个Flink业务每秒就会收到1个消息记录。

基于某些业务要求,开发的Flink应用程序实现功能:实时输出带有前缀的消息内容。

数据规划

Flink样例工程的数据存储在Kafka组件中。向Kafka组件发送数据(需要有Kafka权限用户),并从Kafka组件接收数据。
  1. 确保集群安装完成,包括HDFS、Yarn、Flink和Kafka。
  2. 创建Topic。

    创建topic的命令格式:

    bin/kafka-topics.sh --create --zookeeper {zkQuorum}/kafka --partitions {partitionNum} --replication-factor {replicationNum} --topic {Topic}

    表1 参数说明

    参数名

    说明

    {zkQuorum}

    ZooKeeper集群信息,格式为IP:port。

    {partitionNum}

    topic的分区数。

    {replicationNum}

    topic中每个partition数据的副本数。

    {Topic}

    topic名称。

    示例:在Kafka的客户端路径下执行命令,此处以ZooKeeper集群的IP:port是10.96.101.32:2181,10.96.101.251:2181,10.96.101.177:2181,10.91.8.160:2181,Topic名称为topic1的数据为例。
    bin/kafka-topics.sh --create --zookeeper 10.96.101.32:2181,10.96.101.251:2181,10.96.101.177:2181,10.91.8.160:2181/kafka --partitions 5 --replication-factor 1 --topic topic1

开发思路

  1. 启动Flink Kafka Producer应用向Kafka发送数据。
  2. 启动Flink Kafka Consumer应用从Kafka接收数据,保证topic与producer一致。
  3. 在数据内容中增加前缀并进行打印。