更新时间:2024-10-31 GMT+08:00

Flink Kafka样例程序开发思路

场景说明

假定某个Flink业务每秒就会收到1个消息记录。

基于某些业务要求,开发的Flink应用程序实现功能:实时输出带有前缀的消息内容。

数据规划

Flink样例工程的数据存储在Kafka组件中。向Kafka组件发送数据(需要有Kafka权限用户),并从Kafka组件接收数据。
  1. 确保集群安装完成,包括HDFS、Yarn、Flink和Kafka。
  2. 创建Topic。
    1. 在服务端配置用户创建topic的权限。

      将Kafka的Broker配置参数“allow.everyone.if.no.acl.found”的值修改为“true”,如图1所示。配置完后重启Kafka服务。

      图1 配置用户创建topic的权限
    2. 用户使用Linux命令行创建topic,执行命令前需要使用kinit命令进行人机认证,如:kinit flinkuser。

      flinkuser需要用户自己创建,并拥有创建Kafka的topic权限。具体操作请参考准备MRS应用开发用户章节。

      创建topic的命令格式:

      bin/kafka-topics.sh --create --zookeeper {zkQuorum}/kafka --partitions {partitionNum} --replication-factor {replicationNum} --topic {Topic}

      表1 参数说明

      参数名

      说明

      {zkQuorum}

      ZooKeeper集群信息,格式为IP:port。

      {partitionNum}

      topic的分区数。

      {replicationNum}

      topic中每个partition数据的副本数。

      {Topic}

      Topic名称。

      示例:在Kafka的客户端路径下执行命令,此处以ZooKeeper集群的IP:port是10.96.101.32:2181,10.96.101.251:2181,10.96.101.177:2181,10.91.8.160:2181,Topic名称为topic1的数据为例。
      bin/kafka-topics.sh --create --zookeeper 10.96.101.32:2181,10.96.101.251:2181,10.96.101.177:2181,10.91.8.160:2181/kafka --partitions 5 --replication-factor 1 --topic topic1
  3. 安全认证。

    安全认证的方式有三种:Kerberos认证、SSL加密认证和Kerberos+SSL模式认证,用户在使用的时候可任选其中一种方式进行认证。

    • Kerberos认证配置
      1. 客户端配置。

        在Flink配置文件“flink-conf.yaml”中,增加kerberos认证相关配置(主要在“contexts”项中增加“KafkaClient”),示例如下:

        security.kerberos.login.keytab: /home/demo/flink/release/flink-1.12.2/keytab/user.keytab
        security.kerberos.login.principal: flinkuser
        security.kerberos.login.contexts: Client,KafkaClient
        security.kerberos.login.use-ticket-cache: false
      2. 运行参数。

        关于“SASL_PLAINTEXT”协议的运行参数示例如下:

        --topic topic1 --bootstrap.servers 10.96.101.32:21007 --security.protocol SASL_PLAINTEXT  --sasl.kerberos.service.name kafka --kerberos.domain.name hadoop.系统域名.com //10.96.101.32:21007表示kafka服务器的IP:port
    • SSL加密配置
      • 服务端配置。

        配置“ssl.mode.enable”“true”,如图2所示:

        图2 服务端配置
      • 客户端配置。
        1. 登录FusionInsight Manager系统,选择“集群 > 待操作集群的名称 > 服务 > Kafka > 更多 > 下载客户端” ,下载客户端压缩文件到本地机器。如图3所示:
          图3 客户端配置
        1. 使用客户端根目录中的“ca.crt”证书文件生成客户端的“truststore”
          执行命令如下:
          keytool -noprompt -import -alias myservercert -file ca.crt -keystore truststore.jks 

          命令执行结果查看:

        2. 运行参数。

          “ssl.truststore.password”参数内容需要跟创建“truststore”时输入的密码保持一致,执行以下命令运行参数。

          --topic topic1 --bootstrap.servers 10.96.101.32:9093 --security.protocol SSL --ssl.truststore.location /home/zgd/software/FusionInsight_XXX_Kafka_ClientConfig/truststore.jks --ssl.truststore.password xxx //10.96.101.32:9093表示kafka服务器的IP:port,XXX表示FusionInsight相应的版本号,xxx表示密码。
    • Kerberos+SSL模式配置

      完成上文中Kerberos和SSL各自的服务端和客户端配置后,只需要修改运行参数中的端口号和协议类型即可启动Kerberos+SSL模式。

      --topic topic1 --bootstrap.servers 10.96.101.32:21009 --security.protocol SASL_SSL  --sasl.kerberos.service.name kafka --ssl.truststore.location --kerberos.domain.name hadoop.系统域名.com /home/zgd/software/FusionInsight_XXX_Kafka_ClientConfig/truststore.jks --ssl.truststore.password xxx //10.96.101.32:21009表示kafka服务器的IP:port,xxx表示密码。

开发思路

  1. 启动Flink Kafka Producer应用向Kafka发送数据。
  2. 启动Flink Kafka Consumer应用从Kafka接收数据,保证topic与producer一致。
  3. 在数据内容中增加前缀并进行打印。