更新时间:2024-04-22 GMT+08:00
分享

FlinkServer对接Kafka

本章节适用于MRS 3.1.2及之后的版本。

操作场景

本章节介绍Kafka作为source表或者sink表的DDL定义,以及创建表时使用的WITH参数和代码示例,并指导如何在FlinkServer作业管理页面操作。

本示例以安全模式Kafka为例。

前提条件

  • 集群中已安装HDFS、Yarn、Kafka和Flink服务。
  • 包含Kafka服务的客户端已安装,例如安装路径为:/opt/client
  • 参考创建FlinkServer角色创建一个具有FlinkServer管理员权限的用户用于访问Flink WebUI,如:flink_admin。

操作步骤

  1. 使用flink_admin登录Manager,选择“集群 > 服务 > Flink”,在“Flink WebUI”右侧,单击链接,访问Flink的WebUI。
  2. 参考新建作业,新建Flink SQL流作业,在作业开发界面进行作业开发,配置完成后启动作业。

    需勾选“基础参数”中的“开启CheckPoint”,“时间间隔(ms)”可设置为“60000”,“模式”可使用默认值。
    CREATE TABLE KafkaSource (
      `user_id` VARCHAR,
      `user_name` VARCHAR,
      `age` INT
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'test_source',
      'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号',
      'properties.group.id' = 'testGroup',
      'scan.startup.mode' = 'latest-offset',
      'format' = 'csv',
      'properties.sasl.kerberos.service.name' = 'kafka',
      'properties.security.protocol' = 'SASL_PLAINTEXT',
      'properties.kerberos.domain.name' = 'hadoop.系统域名'
    );
    CREATE TABLE KafkaSink(
      `user_id` VARCHAR,
      `user_name` VARCHAR,
      `age` INT
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'test_sink',
      'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号',
      'scan.startup.mode' = 'latest-offset',
      'value.format' = 'csv',
      'properties.sasl.kerberos.service.name' = 'kafka',
      'properties.security.protocol' = 'SASL_PLAINTEXT',
      'properties.kerberos.domain.name' = 'hadoop.系统域名'
    );
    Insert into
      KafkaSink
    select
      *
    from
      KafkaSource;
    • Kafka端口号:
      • 集群的“认证模式”为“安全模式”时为“sasl.port”的值,默认为“21007”。
      • 集群的“认证模式”为“普通模式”时为“port”的值,默认为“9092”。如果配置端口号为9092,则需要配置“allow.everyone.if.no.acl.found”参数为true,具体操作如下:

        登录FusionInsight Manager系统,选择“集群 > 服务 > Kafka > 配置 > 全部配置”,搜索“allow.everyone.if.no.acl.found”配置,修改参数值为true,保存配置即可。

    • 系统域名:可登录FusionInsight Manager,选择“系统 > 权限 > 域和互信”,查看“本端域”参数,即为当前系统域名。
    • 使用Flink 1.15.0及以前版本对接Kafka,在扩容Kafka Topic分区后,需要重启相关的Flink作业,否则会导致新分区识别不及时漏消费数据。或在开发作业时,配置Flink动态发现Kafka Topic新分区功能。

      可在作业SQL Kafka source表的WITH属性中,添加“scan.topic-partition-discovery.interval”参数,设置值为动态刷新时间,如“5min”。

  3. 查看作业管理界面,作业状态为“运行中”。
  4. 参考管理Kafka主题中的消息,执行以下命令查看Sink表中是否接收到数据,即5执行完成后查看Kafka topic是否正常写入数据。

    sh kafka-console-consumer.sh --topic test_sink --bootstrap-server Kafka的Broker实例业务IP:Kafka端口号 --consumer.config /opt/client/Kafka/kafka/config/consumer.properties

  5. 参考管理Kafka主题中的消息,查看Topic并向Kafka中写入数据,输入完成后可在4中的窗口查看执行结果。

    ./kafka-topics.sh --list --zookeeper ZooKeeper的quorumpeer实例业务IP:ZooKeeper客户端端口号/kafka

    sh kafka-console-producer.sh --broker-list Kafka角色实例所在节点的IP地址:Kafka端口号 --topic 主题名称 --producer.config 客户端目录/Kafka/kafka/config/producer.properties

    例如本示例使用主题名称为test_source:sh kafka-console-producer.sh --broker-list Kafka角色实例所在节点的IP地址:Kafka端口号 --topic test_source --producer.config /opt/client/Kafka/kafka/config/producer.properties

    输入消息内容:
    1,clw,33

    输入完成后按回车发送消息。

    • ZooKeeper的quorumpeer实例业务IP:

      ZooKeeper服务所有quorumpeer实例业务IP。登录FusionInsight Manager,选择“集群 > 服务 > ZooKeeper > 实例”,可查看所有quorumpeer实例所在主机业务IP地址。

    • ZooKeeper客户端端口号:

      登录FusionInsight Manager,选择“集群 > 服务 > ZooKeeper”,在“配置”页签查看“clientPort”的值。

WITH主要参数说明

配置项

是否必选

类型

描述

connector

必选

String

指定要使用的连接器,Kafka使用“kafka”

topic

  • kafka作为sink,必选
  • kafka作为source,可选

String

主题名称

  • 当表用作source时,要从中读取数据的主题名称。支持主题列表,通过按分号分隔主题,如“主题-1;主题-2”
  • 当表用作sink时,主题名称为写入数据的主题。sink不支持主题列表

topic-pattern

kafka作为source时可选

String

主题模式

当表用作source时可设置该参数,主题名称需使用正则表达式

说明:

不能同时设置“topic-pattern” 和“topic” 。

properties.bootstrap.servers

必选

String

Kafka broker列表,以逗号分隔

properties.group.id

kafka作为source时必选

String

Kafka的使用者组ID

format

必选

String

用于反序列化和序列化Kafka消息的值部分的格式

properties.*

可选

String

安全模式下需增加认证相关的参数

分享:

    相关文档

    相关产品