创建FlinkServer作业对接DWS表
操作场景
FlinkServer支持对接8.1.x及之后版本的GaussDB(DWS)数据库,本章节介绍GaussDB(DWS)作为Source表、Sink表以及维表的DDL定义,以及创建表时使用的WITH参数和代码示例,并指导如何在FlinkServer作业管理页面操作。FlinkSQL与GaussDB(DWS)数据类型对应关系如下表所示。
本示例以安全模式FlinkServer、Kafka为例,对接安全模式GaussDB(DWS)。
- 根据安全需求,FlinkServer界面回显FlinkSQL时,SQL中的“password”字段将显示为空,在回显状态下需要将密码信息补齐后再提交作业。
- 本章节仅适用于MRS 3.2.0至MRS 3.3.1版本及集群,MRS 3.3.1及之后的版本请参考创建FlinkServer作业写入数据至数据仓库服务(DWS)。
FlinkSQL数据类型 |
GaussDB(DWS)数据类型 |
---|---|
BOOLEAN |
BOOLEAN |
TINYINT |
- |
SMALLINT |
SMALLINT(INT2) |
SMALLSERIAL(SERIAL2) |
|
INTEGER |
INTEGER |
SERIAL |
|
BIGINT |
BIGINT |
BIGSERIAL |
|
FLOAT |
REAL |
FLOAT4 |
|
DOUBLE |
DOUBLE |
FLOAT8 |
|
CHAR |
CHAR(n) |
VARCHAR |
VARCHAR(n) |
DATE |
DATE |
TIMESTAMP |
TIMESTAMP[(p)] [WITHOUT TIME ZONE] |
DECIMAL |
NUMERIC[(p[,s])] |
DECIMAL[(p[,s])] |
前提条件
- 需确保FlinkServer所在集群和GaussDB(DWS)所在集群网络互通,确保“可用区”、“虚拟私有云”、“安全组”配置相同。
- FlinkServer所在集群(安全模式):
- 集群中已安装HDFS、Yarn、Kafka、ZooKeeper和Flink服务。
- 包含Kafka服务的客户端已安装,安装路径如:/opt/client。
- 参考创建FlinkServer权限角色创建一个具有FlinkServer管理员权限的用户用于访问Flink WebUI,如:flinkuser。
- 待对接的GaussDB(DWS)所在集群(安全模式):
gsql -d postgres -h IP -U username –p port -W password –r
- postgres:需要连接的数据库名称。
- IP:GaussDB(DWS) 集群地址。如果通过公网地址连接,请指定为集群“公网访问域名”,如果通过内网地址连接,请指定为集群“内网访问域名”。如果通过弹性负载均衡连接,请指定为“弹性负载均衡地址”。
- username和password:连接数据库的用户名及密码。命令中如果携带认证密码信息可能存在安全风险,在执行命令前建议关闭系统的history命令记录功能,避免信息泄露。
- port :Coordinator的端口号,请根据实际情况替换,可使用gs_om -t status --detail查询Coordinator数据路径,在该路径下的“postgresql.conf”文件中查看端口号信息。
创建用于接受数据的空表,如表“customer_t1”:
CREATE TABLE customer_t1 ( c_customer_sk INTEGER, c_customer_name VARCHAR(32) ) with (orientation = column,compression=middle) distribute by hash (c_customer_name);
GaussDB作为Sink表
- 使用flinkuser登录Manager,选择“集群 > 服务 > Flink”,在“Flink WebUI”右侧,单击链接,访问Flink的WebUI。
- 参考如何创建FlinkServer作业,新建Flink SQL流作业,参考如下内容在作业开发界面进行作业开发,配置完成后启动作业。
需勾选“基础参数”中的“开启CheckPoint”,“时间间隔(ms)”可设置为“60000”,“模式”可使用默认值。
CREATE TABLE MyUserTable( c_customer_sk INTEGER, c_customer_name VARCHAR(32) ) WITH( 'connector' = 'jdbc', 'url' = 'jdbc:gaussdb://GaussDB的服务器IP:数据库端口/postgres', 'table-name' = 'customer_t1',--如果在schema(名为“base”)下创建表“customer_t1”时,配置规则为'table-name' = 'base"."customer_t1’ 'username' = 'username',--连接GaussDB(DWS)数据库的用户名 'password' = 'password',--连接GaussDB(DWS)数据库的密码,注意提交作业需补齐密码 'write.mode' = 'upsert',--数据写入模式为upsert时可设置是否忽略空值(适用于MRS 3.3.0及以后版本) 'ignoreNullWhenUpsert' = 'false'--true表示忽略null值,false表示不忽略空值,将空值写到数据库中 ); CREATE TABLE KafkaSource ( c_customer_sk INTEGER, c_customer_name VARCHAR(32) ) WITH ( 'connector' = 'kafka', 'topic' = 'customer_source', 'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'latest-offset', 'value.format' = 'csv', 'properties.sasl.kerberos.service.name' = 'kafka', --FlinkServer所在集群为非安全模式去掉此参数 'properties.security.protocol' = 'SASL_PLAINTEXT', --FlinkServer所在集群为非安全模式去掉此参数 'properties.kerberos.domain.name' = 'hadoop.系统域名' --FlinkServer所在集群为非安全模式去掉此参数 ); Insert into MyUserTable select * from KafkaSource;
- Kafka端口号:
- properties.group.id:Kafka的使用者组ID,Kafka作为source时必选。
- 系统域名:可登录FusionInsight Manager,选择“系统 > 权限 > 域和互信”,查看“本端域”参数,即为当前系统域名。
- 查看作业管理界面,作业状态为“运行中”。
- 参考管理Kafka Topic中的消息,查看Topic并向Kafka中写入数据。
./kafka-topics.sh --list --zookeeper ZooKeeper的quorumpeer实例业务IP:ZooKeeper客户端端口号/kafka
sh kafka-console-producer.sh --broker-list Kafka角色实例所在节点的IP地址:Kafka端口号 --topic 主题名称 --producer.config 客户端目录/Kafka/kafka/config/producer.properties
例如本示例使用主题名称为customer_source:
sh kafka-console-producer.sh --broker-list Kafka角色实例所在节点的IP地址:Kafka端口号 --topic customer_source --producer.config /opt/client/Kafka/kafka/config/producer.properties
输入消息内容:
3,zhangsan 4,wangwu 8,zhaosi
输入完成后按回车发送消息。
- 登录GaussDB客户端执行以下命令查看Sink表中是否接收到数据,如下图所示。
Select * from customer_t1;
GaussDB作为Source表
- 使用flinkuser登录Manager,选择“集群 > 服务 > Flink”,在“Flink WebUI”右侧,单击链接,访问Flink的WebUI。
- 参考如何创建FlinkServer作业,新建Flink SQL流作业,参考如下内容在作业开发界面进行作业开发,配置完成后启动作业。
需勾选“基础参数”中的“开启CheckPoint”,“时间间隔(ms)”可设置为“5000”,“模式”可使用默认值。
CREATE TABLE MyUserTable( --GaussDB作为source表 c_customer_sk INTEGER, c_customer_name VARCHAR(32) ) WITH( 'connector' = 'jdbc', 'url' = 'jdbc:gaussdb://GaussDB的服务器IP:数据库端口/postgres', 'table-name' = 'customer_t1', 'username' = 'username ', 'password' = 'password ' ); CREATE TABLE KafkaSink ( -- Kafka作为sink表 c_customer_sk INTEGER, c_customer_name VARCHAR(32) ) WITH ( 'connector' = 'kafka', 'topic' = 'customer_sink', 'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'latest-offset', 'value.format' = 'csv', 'properties.sasl.kerberos.service.name' = 'kafka', --FlinkServer所在集群为非安全模式去掉此参数 'properties.security.protocol' = 'SASL_PLAINTEXT', --FlinkServer所在集群为非安全模式去掉此参数 'properties.kerberos.domain.name' = 'hadoop.系统域名' --FlinkServer所在集群为非安全模式去掉此参数 ); Insert into KafkaSink select * from MyUserTable;
- Kafka端口号:
- properties.group.id:Kafka的使用者组ID,Kafka作为source时必选。
- 系统域名:可登录FusionInsight Manager,选择“系统 > 权限 > 域和互信”,查看“本端域”参数,即为当前系统域名。
- 查看作业管理界面,作业状态为“运行中”。
- 参考管理Kafka Topic中的消息,执行以下命令查看Sink表中是否接收到数据,即查看Kafka topic是否正常写入数据,如下图所示。
sh kafka-console-consumer.sh --topic customer_sink --bootstrap-server Kafka角色实例所在节点的IP地址:Kafka端口号 --consumer.config /opt/client/Kafka/kafka/config/ consumer.properties
GaussDB作为维表
kafkaSource作为事实表,“customer_t2”作为维度表,结果写入kafkaSink。
- 在GaussDB客户端创建维度表“customer_t2”,建表语句示例如下:
CREATE TABLE customer_t2( c_customer_sk INTEGER PRIMARY KEY, c_customer_age INTEGER, c_customer_address VARCHAR(32) )DISTRIBUTE BY HASH(c_customer_sk); INSERT INTO customer_t2 VALUES(1,18,'city a'); INSERT INTO customer_t2 VALUES(2,14,'city b'); INSERT INTO customer_t2 VALUES(3,16,'city c'); INSERT INTO customer_t2 VALUES(4,24,'city d'); INSERT INTO customer_t2 VALUES(5,32,'city e'); INSERT INTO customer_t2 VALUES(6,27,'city f'); INSERT INTO customer_t2 VALUES(7,41,'city a'); INSERT INTO customer_t2 VALUES(8,35,'city h'); INSERT INTO customer_t2 VALUES(9,16,'city j');
- 使用flinkuser登录Manager,选择“集群 > 服务 > Flink”,在“Flink WebUI”右侧,单击链接,访问Flink的WebUI。
- 参考如何创建FlinkServer作业,新建Flink SQL流作业,参考如下内容在作业开发界面进行作业开发,配置完成后启动作业。
需勾选“基础参数”中的“开启CheckPoint”,“时间间隔(ms)”可设置为“5000”,“模式”可使用默认值。
CREATE TABLE KafkaSource ( -- Kafka作为source表 c_customer_sk INTEGER, c_customer_name VARCHAR(32), proctime as proctime() ) WITH ( 'connector' = 'kafka', 'topic' = 'customer_source', 'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'latest-offset', 'value.format' = 'csv', 'properties.sasl.kerberos.service.name' = 'kafka', --FlinkServer所在集群为非安全模式去掉此参数 'properties.security.protocol' = 'SASL_PLAINTEXT', --FlinkServer所在集群为非安全模式去掉此参数 'properties.kerberos.domain.name' = 'hadoop.系统域名' --FlinkServer所在集群为非安全模式去掉此参数 ); CREATE TABLE KafkaSink ( -- Kafka作为sink表 c_customer_sk INTEGER, c_customer_name VARCHAR(32), c_customer_age INTEGER, c_customer_address VARCHAR(32) ) WITH ( 'connector' = 'kafka', 'topic' = 'customer_sink', 'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'latest-offset', 'value.format' = 'csv', 'properties.sasl.kerberos.service.name' = 'kafka', --FlinkServer所在集群为非安全模式去掉此参数 'properties.security.protocol' = 'SASL_PLAINTEXT', --FlinkServer所在集群为非安全模式去掉此参数 'properties.kerberos.domain.name' = 'hadoop.系统域名' --FlinkServer所在集群为非安全模式去掉此参数 ); CREATE TABLE MyUserTable ( -- GaussDB作为维表 c_customer_sk INTEGER PRIMARY KEY, c_customer_age INTEGER, c_customer_address VARCHAR(32) ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:gaussdb://GaussDB的服务器IP:数据库端口/postgres', 'table-name' = 'customer_t2', 'username' = 'username ', 'password' = 'password ' ); INSERT INTO KafkaSink SELECT t.c_customer_sk, t.c_customer_name, d.c_customer_age, d.c_customer_address FROM KafkaSource as t JOIN MyUserTable FOR SYSTEM_TIME AS OF t.proctime as d ON t.c_customer_sk = d.c_customer_sk;
- Kafka端口号:
- properties.group.id:Kafka的使用者组ID,Kafka作为source时必选。
- 系统域名:可登录FusionInsight Manager,选择“系统 > 权限 > 域和互信”,查看“本端域”参数,即为当前系统域名。
- 参考管理Kafka Topic中的消息,执行以下命令查看Sink表中是否接收到数据,即5执行完成后查看Kafka topic是否正常写入数据。
sh kafka-console-consumer.sh --topic customer_sink --bootstrap-server Kafka角色实例所在节点的IP地址:Kafka端口号 --consumer.config /opt/client/Kafka/kafka/config/ consumer.properties
- 参考管理Kafka Topic中的消息,查看Topic并向Kafka中写入数据,输入完成后可在4中的窗口查看执行结果。
./kafka-topics.sh --list --zookeeper ZooKeeper的quorumpeer实例业务IP:ZooKeeper客户端端口号/kafka
sh kafka-console-producer.sh --broker-list Kafka角色实例所在节点的IP地址:Kafka端口号 --topic 主题名称 --producer.config 客户端目录/Kafka/kafka/config/producer.properties
例如本示例使用主题名称为customer_source:
sh kafka-console-producer.sh --broker-list Kafka角色实例所在节点的IP地址:Kafka端口号 --topic customer_source --producer.config /opt/client/Kafka/kafka/config/producer.properties
输入消息内容:
3,zhangsan 5,zhaosi 1,xiaoming 2,liuyang 7,liubei 10,guanyu 20,zhaoyun
输入完成后按回车发送消息,4中的kafka-console-consumer窗口打印结果如下:
3,zhangsan,16,city c 5,zhaosi,32,city e 1,xiaoming,18,city a 2,liuyang,14,city b 7,liubei,41,city a