更新时间:2024-11-29 GMT+08:00

FlinkServer对接GaussDB(DWS)

操作场景

FlinkServer支持对接GaussDB(DWS)8.1.x及之后版本,本章节介绍GaussDB(DWS)作为Source表、Sink表以及维表的DDL定义,以及创建表时使用的WITH参数和代码示例,并指导如何在FlinkServer作业管理页面操作。

本示例以安全模式FlinkServer、Kafka为例,对接安全模式GaussDB(DWS)。

FlinkServer界面回显FlinkSQL时,SQL中的“password”字段将显示为空,在回显状态下需要将密码信息补齐后再提交作业。

FlinkSQL与GaussDB(DWS)数据类型对应关系

FlinkSQL数据类型

GaussDB(DWS)数据类型

BOOLEAN

BOOLEAN

TINYINT

-

SMALLINT

SMALLINT(INT2)

SMALLSERIAL(SERIAL2)

INTEGER

INTEGER

SERIAL

BIGINT

BIGINT

BIGSERIAL

FLOAT

REAL

FLOAT4

DOUBLE

DOUBLE

FLOAT8

CHAR

CHAR(n)

VARCHAR

VARCHAR(n)

DATE

DATE

TIMESTAMP

TIMESTAMP[(p)] [WITHOUT TIME ZONE]

DECIMAL

NUMERIC[(p[,s])]

DECIMAL[(p[,s])]

前提条件

  • FlinkServer所在集群(安全模式):
    • 集群中已安装HDFS、Yarn、Kafka、ZooKeeper和Flink服务。
    • 包含Kafka服务的客户端已安装,安装路径如:/opt/client
    • 参考创建FlinkServer角色创建一个具有FlinkServer管理员权限的用户用于访问Flink WebUI,如:flinkuser
  • 待对接的GaussDB(DWS)所在集群(安全模式):
    • 需将FlinkServer所在节点的所有业务IP添加到白名单。

      添加IP至白名单步骤:

      1. root用户登录GaussDB(DWS)所在节点,执行以下命令切换至omm用户。

        su - omm

      2. 执行以下命令加载环境变量。

        source ${BIGDATA_HOME}/mppdb/.mppdbgs_profile

      3. 执行以下命令设置白名单,多个业务IP需多次执行该命令。

        gs_guc set -Z coordinator -N all -I all -h "host all all FlinkServer节点IP/32 sha256"

    • 已创建用于接受数据的空表,如表“customer_t1”。

      创建GaussDB(DWS)数据表步骤:

      1. root用户登录GaussDB(DWS)所在节点,执行以下命令切换至omm用户。

        su - omm

      2. 执行以下命令加载环境变量。

        source ${BIGDATA_HOME}/mppdb/.mppdbgs_profile

      3. 执行以下命令连接数据库。
        gsql -d postgres -U username –p 25308-W password –r
        • postgres:需要连接的数据库名称。
        • username和password:连接数据库的用户名及密码。命令中如果携带认证密码信息可能存在安全风险,在执行命令前建议关闭系统的history命令记录功能,避免信息泄露。
        • 25308:Coordinator的端口号,请根据实际情况替换,可使用gs_om -t status --detail查询Coordinator数据路径,在该路径下的“postgresql.conf”文件中查看端口号信息。
      4. 执行以下命令创建数据表。

        CREATE TABLE customer_t1(c_customer_sk INTEGER, c_customer_name VARCHAR(32));

GaussDB作为Sink表

  1. 使用flinkuser登录Manager,选择“集群 > 服务 > Flink”,在“Flink WebUI”右侧,单击链接,访问Flink的WebUI。
  2. 参考新建作业,新建Flink SQL流作业,参考如下内容在作业开发界面进行作业开发,配置完成后启动作业。

    需勾选“基础参数”中的“开启CheckPoint”,“时间间隔(ms)”可设置为“60000”,“模式”可使用默认值。

    CREATE TABLE MyUserTable(
      c_customer_sk INTEGER,
      c_customer_name VARCHAR(32)
    ) WITH(
      'connector' = 'jdbc',
      'url' = 'jdbc:gaussdb://GaussDB的服务器IP:25308/postgres',
      'table-name' = 'customer_t1',--若在schema(名为“base”)下创建表“customer_t1”时,配置规则为'table-name' = 'base"."customer_t1’
      'username' = 'username',--连接GaussDB(DWS)数据库的用户名
      'password' = 'password',--连接GaussDB(DWS)数据库的密码
      'write.mode' = 'upsert',--数据写入模式为upsert时可设置是否忽略空值
      'ignoreNullWhenUpsert' = 'false'--true表示忽略null值,false表示不忽略空值,将空值写到数据库中
    );
    CREATE TABLE KafkaSource (
      c_customer_sk INTEGER,
      c_customer_name VARCHAR(32)
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'customer_source',
      'properties.bootstrap.servers' = 'KafkaBroker实例业务IP:Kafka端口号',
      'properties.group.id' = 'testGroup',
      'scan.startup.mode' = 'latest-offset',
      'value.format' = 'csv',
      'properties.sasl.kerberos.service.name' = 'kafka',  --FlinkServer所在集群为非安全模式去掉此参数
      'properties.security.protocol' = 'SASL_PLAINTEXT',  --FlinkServer所在集群为非安全模式去掉此参数
      'properties.kerberos.domain.name' = 'hadoop.系统域名'  --FlinkServer所在集群为非安全模式去掉此参数
    );
    Insert into
      MyUserTable
    select
      *
    from
      KafkaSource;
    • Kafka Broker实例IP地址及端口号说明:
      • 服务的实例IP地址可通过登录FusionInsight Manager后,单击“集群 > 服务 > Kafka > 实例”,在实例列表页面中查询。
      • 集群已启用Kerberos认证(安全模式)时Broker端口为“sasl.port”参数的值,默认为“21007”。
      • 集群未启用Kerberos认证(普通模式)时Broker端口为“port”的值,默认为“9092”。如果配置端口号为9092,则需要配置“allow.everyone.if.no.acl.found”参数为true,具体操作如下:

        登录FusionInsight Manager系统,选择“集群 > 服务 > Kafka > 配置 > 全部配置”,搜索“allow.everyone.if.no.acl.found”配置,修改参数值为true,保存配置即可。

    • 系统域名:可登录FusionInsight Manager,选择“系统 > 权限 > 域和互信”,查看“本端域”参数,即为当前系统域名。
    • properties.group.id:Kafka的使用者组ID,Kafka作为source时必选。
    • properties.kerberos.domain.name:为“hadoop.系统域名”。可登录FusionInsight Manager界面,选择“系统 > 域和互信”中查看集群实际域名。

  3. 查看作业管理界面,作业状态为“运行中”。
  4. 参考管理Kafka主题中的消息,查看Topic并向Kafka中写入数据。

    ./kafka-topics.sh --list --zookeeper ZooKeeper的quorumpeer实例业务IP:ZooKeeper客户端端口号/kafka

    sh kafka-console-producer.sh --broker-list Kafka角色实例所在节点的IP地址:Kafka端口号 --topic 主题名称 --producer.config 客户端目录/Kafka/kafka/config/producer.properties

    例如本示例使用主题名称为customer_source:

    sh kafka-console-producer.sh --broker-list Kafka角色实例所在节点的IP地址:Kafka端口号 --topic customer_source --producer.config /opt/client/Kafka/kafka/config/producer.properties

    输入消息内容:

    3,zhangsan
    4,wangwu
    8,zhaosi

    输入完成后按回车发送消息。

    • ZooKeeper的quorumpeer实例业务IP:

      ZooKeeper服务所有quorumpeer实例业务IP。登录FusionInsight Manager,选择“集群 > 服务 > ZooKeeper > 实例”,可查看所有quorumpeer实例所在主机业务IP地址。

    • ZooKeeper客户端端口号:

      登录FusionInsight Manager,选择“集群 > 服务 > ZooKeeper”,在“配置”页签查看“clientPort”的值。

  5. 登录GaussDB客户端执行以下命令查看Sink表中是否接收到数据,如下图所示。

    Select * from customer_t1;

GaussDB作为Source表

  1. 使用flinkuser登录Manager,选择“集群 > 服务 > Flink”,在“Flink WebUI”右侧,单击链接,访问Flink的WebUI。
  2. 参考新建作业,新建Flink SQL流作业,参考如下内容在作业开发界面进行作业开发,配置完成后启动作业。

    需勾选“基础参数”中的“开启CheckPoint”,“时间间隔(ms)”可设置为“5000”,“模式”可使用默认值。

    CREATE TABLE MyUserTable(
      --GuassDB作为source表
      c_customer_sk INTEGER,
      c_customer_name VARCHAR(32)
    ) WITH(
      'connector' = 'jdbc',
      'url' = 'jdbc:gaussdb://GaussDB的服务器IP:25308/postgres ',
      'table-name' = 'customer_t1',
      'username' = 'username ',
      'password' = 'password '
    );
    CREATE TABLE KafkaSink (
      -- Kafka作为sink表
      c_customer_sk INTEGER,
      c_customer_name VARCHAR(32)
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'customer_sink',
      'properties.bootstrap.servers' = 'KafkaBroker实例业务IP:Kafka端口号',
      'properties.group.id' = 'testGroup',
      'scan.startup.mode' = 'latest-offset',
      'value.format' = 'csv',
      'properties.sasl.kerberos.service.name' = 'kafka',  --FlinkServer所在集群为非安全模式去掉此参数
      'properties.security.protocol' = 'SASL_PLAINTEXT',  --FlinkServer所在集群为非安全模式去掉此参数
      'properties.kerberos.domain.name' = 'hadoop.系统域名'  --FlinkServer所在集群为非安全模式去掉此参数
    );
    Insert into
      KafkaSink
    select
      *
    from
      MyUserTable;
    • Kafka Broker实例IP地址及端口号说明:
      • 服务的实例IP地址可通过登录FusionInsight Manager后,单击“集群 > 服务 > Kafka > 实例”,在实例列表页面中查询。
      • 集群已启用Kerberos认证(安全模式)时Broker端口为“sasl.port”参数的值,默认为“21007”。
      • 集群未启用Kerberos认证(普通模式)时Broker端口为“port”的值,默认为“9092”。如果配置端口号为9092,则需要配置“allow.everyone.if.no.acl.found”参数为true,具体操作如下:

        登录FusionInsight Manager系统,选择“集群 > 服务 > Kafka > 配置 > 全部配置”,搜索“allow.everyone.if.no.acl.found”配置,修改参数值为true,保存配置即可。

    • 系统域名:可登录FusionInsight Manager,选择“系统 > 权限 > 域和互信”,查看“本端域”参数,即为当前系统域名。
    • properties.group.id:Kafka的使用者组ID,Kafka作为source时必选。
    • properties.kerberos.domain.name:为“hadoop.系统域名”。可登录FusionInsight Manager界面,选择“系统 > 域和互信”中查看集群实际域名。

  3. 查看作业管理界面,作业状态为“运行中”。
  4. 参考管理Kafka主题中的消息,执行以下命令查看Sink表中是否接收到数据,即查看Kafka topic是否正常写入数据,如下图所示。

    sh kafka-console-consumer.sh --topic customer_sink --bootstrap-server Kafka角色实例所在节点的IP地址:Kafka端口号 --consumer.config /opt/client/Kafka/kafka/config/ consumer.properties

GaussDB作为维表

kafkaSource作为事实表,“customer_t2”作为维度表,结果写入kafkaSink。

  1. 参考•在安全模式的GaussDB(DWS)集群中已创...在GaussDB客户端创建维度表“customer_t2”,建表语句示例如下:

    CREATE TABLE customer_t2(
    c_customer_sk INTEGER PRIMARY KEY,
    c_customer_age INTEGER,
    c_customer_address VARCHAR(32)
    )DISTRIBUTE BY HASH(c_customer_sk);
     
    INSERT INTO customer_t2 VALUES(1,18,'city a');
    INSERT INTO customer_t2 VALUES(2,14,'city b');
    INSERT INTO customer_t2 VALUES(3,16,'city c');
    INSERT INTO customer_t2 VALUES(4,24,'city d');
    INSERT INTO customer_t2 VALUES(5,32,'city e');
    INSERT INTO customer_t2 VALUES(6,27,'city f');
    INSERT INTO customer_t2 VALUES(7,41,'city a');
    INSERT INTO customer_t2 VALUES(8,35,'city h');
    INSERT INTO customer_t2 VALUES(9,16,'city j');

  2. 使用flinkuser登录Manager,选择“集群 > 服务 > Flink”,在“Flink WebUI”右侧,单击链接,访问Flink的WebUI。
  3. 参考新建作业,新建Flink SQL流作业,参考如下内容在作业开发界面进行作业开发,配置完成后启动作业。

    需勾选“基础参数”中的“开启CheckPoint”,“时间间隔(ms)”可设置为“5000”,“模式”可使用默认值。

    CREATE TABLE KafkaSource (
      -- Kafka作为source表
      c_customer_sk INTEGER,
      c_customer_name VARCHAR(32),
      proctime as proctime()
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'customer_source',
      'properties.bootstrap.servers' = 'KafkaBroker实例业务IP:Kafka端口号',
      'properties.group.id' = 'testGroup',
      'scan.startup.mode' = 'latest-offset',
      'value.format' = 'csv',
      'properties.sasl.kerberos.service.name' = 'kafka',  --FlinkServer所在集群为非安全模式去掉此参数
      'properties.security.protocol' = 'SASL_PLAINTEXT',  --FlinkServer所在集群为非安全模式去掉此参数
      'properties.kerberos.domain.name' = 'hadoop.系统域名'  --FlinkServer所在集群为非安全模式去掉此参数
    );
    CREATE TABLE KafkaSink (
      -- Kafka作为sink表
      c_customer_sk INTEGER,
      c_customer_name VARCHAR(32),
      c_customer_age INTEGER,
      c_customer_address VARCHAR(32)
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'customer_sink',
      'properties.bootstrap.servers' = 'KafkaBroker实例业务IP:Kafka端口号',
      'properties.group.id' = 'testGroup',
      'scan.startup.mode' = 'latest-offset',
      'value.format' = 'csv',
      'properties.sasl.kerberos.service.name' = 'kafka',  --FlinkServer所在集群为非安全模式去掉此参数
      'properties.security.protocol' = 'SASL_PLAINTEXT',  --FlinkServer所在集群为非安全模式去掉此参数
      'properties.kerberos.domain.name' = 'hadoop.系统域名'  --FlinkServer所在集群为非安全模式去掉此参数
    );
    CREATE TABLE MyUserTable (
      -- GaussDB作为维表
      c_customer_sk INTEGER PRIMARY KEY,
      c_customer_age INTEGER,
      c_customer_address VARCHAR(32)
    ) WITH (
      'connector' = 'jdbc',
      'url' = 'jdbc:gaussdb://GaussDB的服务器IP:25308/postgres ',
      'table-name' = 'customer_t2',
      'username' = 'username ',
      'password' = 'password '
    );
    INSERT INTO
      KafkaSink
    SELECT
      t.c_customer_sk,
      t.c_customer_name,
      d.c_customer_age,
      d.c_customer_address
    FROM
      KafkaSource as t
      JOIN MyUserTable FOR SYSTEM_TIME AS OF t.proctime as d ON t.c_customer_sk = d.c_customer_sk;
    • Kafka Broker实例IP地址及端口号说明:
      • 服务的实例IP地址可通过登录FusionInsight Manager后,单击“集群 > 服务 > Kafka > 实例”,在实例列表页面中查询。
      • 集群已启用Kerberos认证(安全模式)时Broker端口为“sasl.port”参数的值,默认为“21007”。
      • 集群未启用Kerberos认证(普通模式)时Broker端口为“port”的值,默认为“9092”。如果配置端口号为9092,则需要配置“allow.everyone.if.no.acl.found”参数为true,具体操作如下:

        登录FusionInsight Manager系统,选择“集群 > 服务 > Kafka > 配置 > 全部配置”,搜索“allow.everyone.if.no.acl.found”配置,修改参数值为true,保存配置即可。

    • 系统域名:可登录FusionInsight Manager,选择“系统 > 权限 > 域和互信”,查看“本端域”参数,即为当前系统域名。
    • properties.group.id:Kafka的使用者组ID,Kafka作为source时必选。
    • properties.kerberos.domain.name:为“hadoop.系统域名”。可登录FusionInsight Manager界面,选择“系统 > 域和互信”中查看集群实际域名。

  4. 参考管理Kafka主题中的消息,执行以下命令查看Sink表中是否接收到数据,即5执行完成后查看Kafka topic是否正常写入数据。

    sh kafka-console-consumer.sh --topic customer_sink --bootstrap-server Kafka角色实例所在节点的IP地址:Kafka端口号 --consumer.config /opt/client/Kafka/kafka/config/ consumer.properties

  5. 参考管理Kafka主题中的消息,查看Topic并向Kafka中写入数据,输入完成后可在4中的窗口查看执行结果。

    ./kafka-topics.sh --list --zookeeper ZooKeeper的quorumpeer实例业务IP:ZooKeeper客户端端口号/kafka

    sh kafka-console-producer.sh --broker-list Kafka角色实例所在节点的IP地址:Kafka端口号 --topic 主题名称 --producer.config 客户端目录/Kafka/kafka/config/producer.properties

    • ZooKeeper的quorumpeer实例业务IP:

      ZooKeeper服务所有quorumpeer实例业务IP。登录FusionInsight Manager,选择“集群 > 服务 > ZooKeeper > 实例”,可查看所有quorumpeer实例所在主机业务IP地址。

    • ZooKeeper客户端端口号:

      登录FusionInsight Manager,选择“集群 > 服务 > ZooKeeper”,在“配置”页签查看“clientPort”的值。

    例如本示例使用主题名称为customer_source:

    sh kafka-console-producer.sh --broker-list Kafka角色实例所在节点的IP地址:Kafka端口号 --topic customer_source --producer.config /opt/client/Kafka/kafka/config/producer.properties

    输入消息内容:

    3,zhangsan
    5,zhaosi
    1,xiaoming
    2,liuyang
    7,liubei
    10,guanyu
    20,zhaoyun

    输入完成后按回车发送消息,4中的kafka-console-consumer窗口打印结果如下:

    3,zhangsan,16,city c
    5,zhaosi,32,city e
    1,xiaoming,18,city a
    2,liuyang,14,city b
    7,liubei,41,city a