创建FlinkServer作业写入数据至Hive表
本章节适用于MRS 3.1.2及之后的版本。
操作场景
目前FlinkServer对接Hive使用对接metaStore的方式,所以需要Hive开启MetaStore功能。Hive可以作为sink和维表。
本示例以安全模式Kafka为例。
前提条件
- 集群已安装HDFS、Yarn、Kafka、Flink和Hive(且服务名称必须为Hive)等服务。
- 包含Hive服务的客户端已安装,安装路径如:/opt/client。
- Flink支持1.12.2及以后版本,Hive支持3.1.0及以后版本。
- 参考创建FlinkServer权限角色创建一个具有FlinkServer管理员权限的用户用于访问Flink WebUI,如:flink_admin。
- 参考创建FlinkServer集群连接步骤中的“说明”获取访问Flink WebUI用户的客户端配置文件及用户凭据。
创建作业步骤
场景一:Hive作为Sink表。
以映射表类型为Kafka对接Hive流程为例。
- 使用flink_admin访问Flink WebUI,请参考访问FlinkServer WebUI界面。
- 新建集群连接,如:flink_hive。
- 选择“系统管理 > 集群连接管理”,进入集群连接管理页面。
- 单击“创建集群连接”,在弹出的页面中参考表1填写信息,单击“测试”,测试连接成功后单击“确定”,完成集群连接创建。
表1 创建集群连接信息 参数名称
参数描述
取值样例
集群连接名称
集群连接的名称,只能包含英文字母、数字和下划线,且不能多于100个字符。
flink_hive
描述
集群连接名称描述信息。
-
版本
选择集群版本。
MRS 3
是否安全版本
- 是,安全集群选择是。需要输入访问用户名和上传用户凭证;
- 否,非安全集群选择否。
是
访问用户名
访问用户需要包含访问集群中服务所需要的最小权限。只能包含英文字母、数字和下划线,且不能多于100个字符。
“是否安全版本”选择“是”时存在此参数。
flink_admin
客户端配置文件
集群客户端配置文件,格式为tar。
-
用户凭据
FusionInsight Manager中用户的认证凭据,格式为tar。
“是否安全版本”选择“是”时存在此参数。
输入访问用户名后才可上传文件。
flink_admin的用户凭据
- 新建Flink SQL流作业,如:flinktest1。
- 单击“作业管理”进入作业管理页面。
- 单击“新建作业”,在新建作业页面参考表2填写信息,单击“确定”,创建作业成功并进入作业开发界面。
- 在作业开发界面进行作业开发,输入如下语句,可以单击上方“语义校验”对输入内容校验。
CREATE TABLE test_kafka ( user_id varchar, item_id varchar, cat_id varchar, zw_test timestamp ) WITH ( 'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号', 'format' = 'json', 'topic' = 'zw_tset_kafka', 'connector' = 'kafka', 'scan.startup.mode' = 'latest-offset', 'properties.sasl.kerberos.service.name' = 'kafka',--普通模式集群不需要该参数,同时删除上一行的逗号 'properties.security.protocol' = 'SASL_PLAINTEXT',--普通模式集群不需要该参数 'properties.kerberos.domain.name' = 'hadoop.系统域名'--普通模式集群不需要该参数 ); CREATE CATALOG myhive WITH ( 'type' = 'hive', 'hive-version' = '3.1.0', 'default-database' = 'default', 'cluster.name' = 'flink_hive' ); use catalog myhive; set table.sql-dialect = hive;create table user_behavior_hive_tbl ( user_id STRING, item_id STRING, cat_id STRING, ts timestamp ) PARTITIONED BY (dy STRING, ho STRING, mi STRING) stored as textfile TBLPROPERTIES ( 'partition.time-extractor.timestamp-pattern' = '$dy $ho:$mi:00', 'sink.partition-commit.trigger' = 'process-time', 'sink.partition-commit.delay' = '0S', 'sink.partition-commit.policy.kind' = 'metastore,success-file' ); INSERT into user_behavior_hive_tbl SELECT user_id, item_id, cat_id, zw_test, DATE_FORMAT(zw_test, 'yyyy-MM-dd'), DATE_FORMAT(zw_test, 'HH'), DATE_FORMAT(zw_test, 'mm') FROM default_catalog.default_database.test_kafka;
- Kafka端口号:
- 'cluster.name' = 'flink_hive'的值为2新建的集群连接名称。
- 系统域名:可登录FusionInsight Manager,选择“系统 > 权限 > 域和互信”,查看“本端域”参数,即为当前系统域名。
- 作业SQL开发完成后,请勾选“基础参数”中的“开启CheckPoint”,“时间间隔(ms)”可设置为“60000”,“模式”可使用默认值。
- 单击左上角“提交”提交作业。
- 作业运行成功后,选择“更多 > 作业详情”可查看作业运行详情。
- 参考管理Kafka Topic中的消息,查看Topic并向Kafka中写入数据。
./kafka-topics.sh --list --bootstrap-server Kafka的Broker实例业务IP:Kafka端口号 --command-config 客户端目录/Kafka/kafka/config/client.properties
sh kafka-console-producer.sh --broker-list Kafka角色实例所在节点的IP地址:Kafka端口号 --topic 主题名称 --producer.config 客户端目录/Kafka/kafka/config/producer.properties
例如本示例使用主题名称为zw_tset_kafka:sh kafka-console-producer.sh --broker-list Kafka角色实例所在节点的IP地址:Kafka端口号 --topic zw_tset_kafka --producer.config /opt/client/Kafka/kafka/config/producer.properties
输入消息内容:{"user_id": "3","item_id":"333333","cat_id":"cat333","zw_test":"2021-09-08 09:08:01"} {"user_id": "4","item_id":"444444","cat_id":"cat444","zw_test":"2021-09-08 09:08:01"}
输入完成后按回车发送消息。
- 执行以下命令查看Sink表中是否接收到数据,即Hive表是否正常写入数据。
beeline
select * from user_behavior_hive_tbl;
场景二:Hive作为维表。
- 参考Hive客户端使用实践进入Hive客户端,创建Hive表并插入数据 。
CREATE TABLE hive3 ( id int, name string ) PARTITIONED BY (dy STRING,ho STRING,mi STRING) STORED AS textfile TBLPROPERTIES ( 'partition.time-extractor.timestamp-pattern'='$dy $ho:$mi:00', 'streaming-source.partition.include'='all', 'streaming-source.enable'='true', 'streaming-source.partition.include'='all', 'streaming-source.monitor-interval'='2m'); insert into table hive3 values('1','aname','sss','name1','company1'); insert into table hive3 values('2','bname','sss','name1','company1');
- 使用flink_admin访问Flink WebUI,请参考访问FlinkServer WebUI界面。
- 新建集群连接,如:flink_hive1。
- 选择“系统管理 > 集群连接管理”,进入集群连接管理页面。
- 单击“创建集群连接”,在弹出的页面中参考表3填写信息,单击“测试”,测试连接成功后单击“确定”,完成集群连接创建。
表3 创建集群连接信息 参数名称
参数描述
取值样例
集群连接名称
集群连接的名称,只能包含英文字母、数字和下划线,且不能多于100个字符。
flink_hive1
描述
集群连接名称描述信息。
-
版本
选择集群版本。
MRS 3
是否安全版本
- 是,安全集群选择是。需要输入访问用户名和上传用户凭证;
- 否,非安全集群选择否。
是
访问用户名
访问用户需要包含访问集群中服务所需要的最小权限。只能包含英文字母、数字和下划线,且不能多于100个字符。
“是否安全版本”选择“是”时存在此参数。
flink_admin
客户端配置文件
集群客户端配置文件,格式为tar。
-
用户凭据
FusionInsight Manager中用户的认证凭据,格式为tar。
“是否安全版本”选择“是”时存在此参数。
输入访问用户名后才可上传文件。
flink_admin的用户凭据
- 新建Flink SQL流作业,如:flinktest2。
- 单击“作业管理”进入作业管理页面。
- 单击“新建作业”,在新建作业页面参考表4填写信息,单击“确定”,创建作业成功并进入作业开发界面。
- 在作业开发界面进行作业开发,输入如下语句,可以单击上方“语义校验”对输入内容校验。
CREATE TABLE kafka_source ( id int, address string, proctime as PROCTIME() ) WITH ( 'connector' = 'kafka', 'topic' = 'test_source', 'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号', 'scan.startup.mode' = 'latest-offset', 'value.format' = 'csv', 'properties.sasl.kerberos.service.name' = 'kafka',--普通模式集群不需要该参数,同时删除上一行的逗号 'properties.security.protocol' = 'SASL_PLAINTEXT',--普通模式集群不需要该参数 'properties.kerberos.domain.name' = 'hadoop.系统域名'--普通模式集群不需要该参数 ); CREATE TABLE kafka_sink( id int, address string, name string ) WITH ( 'connector' = 'kafka', 'topic' = 'test_sink', 'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号', 'scan.startup.mode' = 'latest-offset', 'value.format' = 'csv', 'properties.sasl.kerberos.service.name' = 'kafka',--普通模式集群不需要该参数,同时删除上一行的逗号 'properties.security.protocol' = 'SASL_PLAINTEXT',--普通模式集群不需要该参数 'properties.kerberos.domain.name' = 'hadoop.系统域名'--普通模式集群不需要该参数 ); CREATE CATALOG myhive WITH ( 'type' = 'hive', 'hive-version' = '3.1.0', 'default-database' = 'default', 'cluster.name' = 'flink_hive1' ); use catalog myhive; set table.sql-dialect = hive; create table if not exists hive3 (id int, name string) stored as textfile TBLPROPERTIES ( 'streaming-source.enable' = 'false', 'streaming-source.partition.include' = 'all', 'lookup.join.cache.ttl' = '5 min' ); INSERT INTO default_catalog.default_database.kafka_sink select t1.id, t1.address, t2.name from default_catalog.default_database.kafka_source as t1 join hive3 FOR SYSTEM_TIME AS OF t1.proctime AS t2 ON t1.id = t2.id;
- Kafka Broker实例IP地址及端口号说明:
- 服务的实例IP地址可通过登录FusionInsight Manager后,单击“集群 > 服务 > Kafka > 实例”,在实例列表页面中查询。
- 集群已启用Kerberos认证(安全模式)时Broker端口为“sasl.port”参数的值,默认为“21007”。
- 集群未启用Kerberos认证(普通模式)时Broker端口为“port”的值,默认为“9092”。如果配置端口号为9092,则需要配置“allow.everyone.if.no.acl.found”参数为true,具体操作如下:
登录FusionInsight Manager系统,选择“集群 > 服务 > Kafka > 配置 > 全部配置”,搜索“allow.everyone.if.no.acl.found”配置,修改参数值为true,保存配置即可。
- 'cluster.name' = 'flink_hive'的值为3新建的集群连接名称。
- 系统域名:可登录FusionInsight Manager,选择“系统 > 权限 > 域和互信”,查看“本端域”参数,即为当前系统域名。
- Kafka Broker实例IP地址及端口号说明:
- 作业SQL开发完成后,请勾选“基础参数”中的“开启CheckPoint”,“时间间隔(ms)”可设置为“60000”,“模式”可使用默认值。
- 单击左上角“提交”提交作业。
- 作业运行成功后,选择“更多 > 作业详情”可查看作业运行详情。
- 参考管理Kafka Topic中的消息,查看Topic并向Kafka中写入数据。
./kafka-topics.sh --list --bootstrap-server Kafka的Broker实例业务IP:Kafka端口号 --command-config 客户端目录/Kafka/kafka/config/client.properties
sh kafka-console-producer.sh --broker-list Kafka的Broker实例所在节点的IP地址:Kafka端口号 --topic 主题名称 --producer.config 客户端目录/Kafka/kafka/config/producer.properties
输入消息内容:1,city1 2,city2
输入完成后按回车发送消息。
- 参考管理Kafka Topic中的消息,执行以下命令查看Sink表中是否接收到数据,即9执行完成后查看Kafka topic是否正常写入数据。
sh kafka-console-consumer.sh --topic test_sink --bootstrap-server Kafka的Broker实例业务IP:Kafka端口号 --consumer.config /opt/client/Kafka/kafka/config/consumer.properties
结果如下:
1,city1,aname 2,city2,bname