文档首页/ MapReduce服务 MRS/ 组件操作指南(LTS版)/ 使用Flink/ 集群连接模式创建FlinkServer作业/ 配置FlinkServer对接其他组件/ FlinkServer作业对接Iceberg
更新时间:2026-06-11 GMT+08:00
FlinkServer作业对接Iceberg
操作场景
本章节提供了如何使用FlinkServer写FlinkSQL对接Iceberg。Iceberg可以作为Sink表和Source表。
前提条件
- 集群中已安装HDFS、Yarn、Flink服务。
- 已部署和MRS集群环境网络相通的HTTP服务。
- 参考创建FlinkServer权限角色创建一个具有FlinkServer管理员权限的用户用于访问Flink WebUI,如:flinkuser。
Iceberg作为Sink表
- 使用flinkuser登录Manager界面,选择“集群 > 服务 > Flink”,在“Flink Web UI”右侧,单击链接,访问Flink的WebUI。
- 参考集群连接模式创建Flink SQL作业,新建Flink SQL流作业,参考如下内容在作业开发界面进行作业开发,根据业务要求配置“checkpoint”间隔,配置完成后启动作业。
CREATE TEMPORARY TABLE datagen(id BIGINT, data STRING) WITH ( 'connector' = 'datagen', 'rows-per-second' = '5' ); CREATE TABLE flink_table (id BIGINT, data STRING) WITH ( 'connector' = 'iceberg', 'catalog-name' = 'hive_prod', 'catalog-database' = 'default', 'catalog-table' = 'hive_iceberg_table', 'uri' = '连接MetaStore的URI配置', 'hive.metastore.kerberos.principal' = 'Hive客户端hive-site.xml文件中hive.metastore.kerberos.principal的值' --普通模式集群不需要该参数 ); insert into flink_table select * from datagen;
Iceberg作为Source表
- 使用flinkuser登录Manager界面,选择“集群 > 服务 > Flink”,在“Flink Web UI”右侧,单击链接,访问Flink的WebUI。
- 参考集群连接模式创建Flink SQL作业,新建Flink SQL流作业,参考如下内容在作业开发界面进行作业开发,配置完成后启动作业。
CREATE TEMPORARY TABLE print(id BIGINT, data STRING) WITH ('connector' = 'print'); CREATE TABLE flink_table (id BIGINT, data STRING) WITH ( 'connector' = 'iceberg', 'catalog-name' = 'hive_prod', 'catalog-database' = 'default', 'catalog-table' = 'hive_iceberg_table', 'uri' = '连接MetaStore的URI配置', 'hive.metastore.kerberos.principal' = 'Hive客户端hive-site.xml文件中hive.metastore.kerberos.principal的值' --普通模式集群不需要该参数 ); insert into print select * from flink_table /*+ OPTIONS('streaming'='true', 'monitor-interval'='60s')*/ ;
父主题: 配置FlinkServer对接其他组件