文档首页/ MapReduce服务 MRS/ 组件操作指南(LTS版)/ 使用Flink/ 创建FlinkServer作业/ 创建FlinkServer作业写入数据至数据仓库服务(DWS)
更新时间:2024-12-11 GMT+08:00

创建FlinkServer作业写入数据至数据仓库服务(DWS)

本章节适用于MRS 3.3.1及之后的版本。

操作场景

数据仓库服务(DWS)是在线数据分析处理数据库。本示例以安全模式FlinkServer、Kafka为例,以DWS作为sink表,以及创建表时使用的with参数和代码示例,指导如何在Flinkserver作业管理页面对接DWS。

FlinkServer界面回显FlinkSQL时,SQL中的“password”字段将显示为空,在回显状态下需要将密码信息补齐后再提交作业。

前提条件

FlinkServer所在集群:
  • 集群中已安装HDFS、Yarn、Kafka、ZooKeeper和Flink等服务。
  • 包含Kafka服务的客户端已安装,安装路径如:/opt/client
  • 参考创建FlinkServer权限角色创建一个具有FlinkServer管理员权限的用户用于访问Flink WebUI,如:flinkuser

配置FlinkServer对接数据仓库服务DWS步骤

  1. 在数据仓库服务(DWS)已创建接收数据的空表,如表"dws_test"。
  2. 使用flinkuser登录Manager,选择“集群 > 服务 > Flink”,在“Flink WebUI”右侧,单击链接,访问Flink的WebUI。
  3. 参考如何创建FlinkServer作业,新建Flink SQL流作业,参考如下内容在作业开发界面进行作业开发,配置完成后启动作业。

    需勾选“基础参数”中的“开启CheckPoint”,“时间间隔(ms)”可设置为“60000”,“模式”可使用默认值。

    CREATE TABLE dws_test(
      c_customer_sk INTEGER,
      c_customer_name VARCHAR(32)
    ) WITH(
      'connector' = 'dws',
      'url' = 'jdbc:postgresql://DWS连接地址/gaussdb',
      'table-name' = 'dws_test',
      'username' = 'DWS用户名',
      'password' = 'DWS用户对应密码'
     
    
    );
    CREATE TABLE KafkaSource (
      c_customer_sk INTEGER,
      c_customer_name VARCHAR(32)
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'customer_source',
      'properties.bootstrap.servers' = 'KafkaBroker实例业务IP:Kafka端口号',
      'properties.group.id' = 'testGroup',
      'scan.startup.mode' = 'latest-offset',
      'value.format' = 'csv',
      'properties.sasl.kerberos.service.name' = 'kafka',  --FlinkServer所在集群为非安全模式去掉此参数
      'properties.security.protocol' = 'SASL_PLAINTEXT',  --FlinkServer所在集群为非安全模式去掉此参数
      'properties.kerberos.domain.name' = 'hadoop.系统域名'  --FlinkServer所在集群为非安全模式去掉此参数
    );
    Insert into
      dws_test
    select
      *
    from
      KafkaSource;
    • Kafka Broker实例IP地址及端口号说明:
      • 服务的实例IP地址可通过登录FusionInsight Manager后,单击“集群 > 服务 > Kafka > 实例”,在实例列表页面中查询。
      • 集群已启用Kerberos认证(安全模式)时Broker端口为“sasl.port”参数的值,默认为“21007”。
      • 集群未启用Kerberos认证(普通模式)时Broker端口为“port”的值,默认为“9092”。如果配置端口号为9092,则需要配置“allow.everyone.if.no.acl.found”参数为true,具体操作如下:

        登录FusionInsight Manager系统,选择“集群 > 服务 > Kafka > 配置 > 全部配置”,搜索“allow.everyone.if.no.acl.found”配置,修改参数值为true,保存配置即可。

    • 系统域名:可登录FusionInsight Manager,选择“系统 > 权限 > 域和互信”,查看“本端域”参数,即为当前系统域名。
    • properties.group.id:Kafka的使用者组ID,Kafka作为source时必选。
    • properties.kerberos.domain.name:为“hadoop.系统域名”。可登录FusionInsight Manager界面,选择“系统 > 域和互信”中查看集群实际域名。
    • dws sink支持配置攒批参数,满足其中一个条件,就会触发一次sink。
      • autoFlushBatchSize:自动刷库的批大小(攒批大小)。默认值:5000。
      • autoFlushMaxInterval:自动刷库的最大间隔时间(攒批时长)。默认值:5s。

  4. 查看作业管理界面,作业状态为“运行中”。
  5. 参考管理Kafka Topic中的消息,查看Topic并向Kafka中写入数据。

    ../kafka-topics.sh --list --bootstrap-server Kafka的Broker实例业务IP:Kafka端口号 --command-config 客户端目录/Kafka/kafka/config/client.properties

    sh kafka-console-producer.sh --broker-list Kafka的Broker实例所在节点的IP地址:Kafka端口号 --topic 主题名称 --producer.config 客户端目录/Kafka/kafka/config/producer.properties

    例如本示例使用主题名称为customer_source:

    sh kafka-console-producer.sh --broker-list Kafka的Broker实例所在节点的IP地址:Kafka端口号 --topic customer_source --producer.config /opt/client/Kafka/kafka/config/producer.properties

    输入消息内容:

    3,zhangsan
    4,wangwu
    8,zhaosi

    输入完成后按回车发送消息。

  6. 登录数据仓库服务(DWS)客户端执行以下命令查看Sink表中是否接收到数据。

    Select * from dws_test;