文档首页/ MapReduce服务 MRS/ 组件操作指南(LTS版)/ 使用Flink/ Catalog模式创建FlinkSQL作业/ 配置FlinkServer对接其他组件示例/ FlinkServer对接Hive
更新时间:2026-06-11 GMT+08:00
FlinkServer对接Hive
操作场景
目前FlinkServer对接Hive使用对接MetaStore的方式,所以需要Hive开启MetaStore功能。Hive可以作为Source、Sink和维表。
本示例以安全模式Kafka为例。
前提条件
- 集群已安装HDFS、Yarn、Kafka、Flink和Hive(且服务名称必须为Hive)等服务。
- 包含Hive服务的客户端已安装,安装路径如:/opt/client。
- Flink支持1.12.2及以后版本,Hive支持3.1.0及以后版本。
- 参考创建FlinkServer权限角色创建一个具有FlinkServer管理员权限的用户用于访问Flink WebUI,如:flink_admin。
- 参考Catalog管理创建一个Hive Catalog,如:myhive。
- 参考创建FlinkServer集群连接中的“说明”获取访问Flink WebUI用户的客户端配置文件及用户凭据。
FlinkServer不支持跨集群访问Hive,不支持集群中存在多个Hive服务。
约束与限制
本章节适用于MRS 3.6.0-LTS.1及之后的版本。
操作步骤
- 使用flink_admin访问Flink WebUI,请参考访问FlinkServer WebUI界面。
- 参考Catalog管理创建一个Hive Catalog,如:hive_catalog。
- 新建一个Hive表,例如:hive_catalog.`default`.hive_table,具体操作可参考数据管理,完整示例如下:
create table if not exists hive_catalog.`default`.hive_table ( user_id STRING, item_id STRING, cat_id STRING, ts timestamp(9), dy STRING, ho STRING, mi STRING ) PARTITIONED BY (dy, ho, mi) with ( 'connector' = 'hive', 'partition.time-extractor.timestamp-pattern' = '$dy $ho:$mi:00', 'sink.partition-commit.trigger' = 'process-time', 'sink.partition-commit.delay' = '0S', 'sink.partition-commit.policy.kind' = 'metastore,success-file' ); - 新建Flink SQL流作业,如:flinktest1。
- 单击“作业管理”进入作业管理页面。
- 单击“新建作业”,在新建作业页面参考表1填写信息,单击“确定”,创建作业成功并进入作业开发界面。
- 在作业开发界面进行作业开发,输入SQL语句,可以单击上方“语义校验”对输入内容校验。
- Hive作为Sink表
- 流式写入Hive表。
CREATE TABLE test_datagen ( user_id varchar, item_id varchar, cat_id varchar, zw_test timestamp ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '1' ); INSERT into hive_catalog.`default`.hive_table SELECT user_id, item_id, cat_id, zw_test, DATE_FORMAT(zw_test, 'yyyy-MM-dd'), DATE_FORMAT(zw_test, 'HH'), DATE_FORMAT(zw_test, 'mm') FROM test_datagen;
- 单击左上角“提交”提交作业。
- 作业运行成功后,选择“更多 > 作业详情”可查看作业运行详情。
- 执行以下命令查看Sink表中是否接收到数据,即Hive表是否正常写入数据。
beeline select * from hive_table;
- 流式写入Hive表。
- Hive作为Source表
- 流式读取Hive表。
CREATE TABLE print_sink( user_id varchar, item_id varchar, cat_id varchar ) WITH ( 'connector' = 'print' ); insert into print_sink select user_id, item_id, cat_id from hive_catalog.`default`.hive_table;
- 作业SQL开发完成后,请勾选“基础参数”中的“开启CheckPoint”,“时间间隔(ms)”可设置为“60000”,“模式”可使用默认值。
- 单击左上角“提交”提交作业。
- 作业运行成功后,选择“更多 > 作业详情”可查看作业运行详情。
- 进入作业详情界面,单击“Task Managers > Stdout”,查看是否正常打印hive_table表中的数据。
- 流式读取Hive表。
- Hive作为维表
- Hive作为维表。
CREATE TABLE datagen_source ( user_id string, address string, proctime as PROCTIME() ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '1' ); CREATE TABLE print_sink( user_id string, address string, cat_id string ) WITH ( 'connector' = 'print' ); INSERT INTO default_catalog.default_database.print_sink select t1.user_id , t1.address, t2.cat_id from default_catalog.default_database.datagen_source as t1 join hive_catalog.`default`.hive_table FOR SYSTEM_TIME AS OF t1.proctime AS t2 ON t1.user_id = t2.user_id ;
- 作业SQL开发完成后,请勾选“基础参数”中的“开启CheckPoint”,“时间间隔(ms)”可设置为“60000”,“模式”可使用默认值。
- 单击左上角“提交”提交作业。
- 作业运行成功后,选择“更多 > 作业详情”可查看作业运行详情。
- 进入作业详情界面,单击“Task Managers > Stdout”,即可查看打印的结果数据。
- Hive作为维表。
- Hive作为Sink表