更新时间:2026-06-11 GMT+08:00
分享

FlinkServer对接Iceberg

操作场景

本章节提供了如何使用FlinkServer写FlinkSQL对接Iceberg的操作指导。Iceberg可以作为Sink表和Source表。

前提条件

  • 集群中已安装HDFS、Zookeeper、Yarn、Flink、Hive组件。
  • 参考创建FlinkServer权限角色创建一个具有FlinkServer管理员权限的用户用于访问Flink WebUI,如:flink_admin
  • 参考Catalog管理创建一个Iceberg Catalog,如:iceberg_catalog。
  • 仅支持Flink与Hive组件共集群,不支持Hive多服务。

约束与限制

本章节适用于MRS 3.6.0-LTS.1及之后的版本。

Iceberg作为Sink表

  1. 使用flink_admin登录Manager界面,选择“集群 > 服务 > Flink”,在“Flink Web UI”右侧,单击链接,访问Flink的WebUI。
  2. 新建一个Iceberg表,例如:iceberg_catalog.`default`.iceberg_table,具体操作可参考数据管理,完整示例如下:

    CREATE TABLE if not exists iceberg_catalog.`default`.iceberg_table (
      id BIGINT COMMENT 'unique id',
      data STRING NOT NULL
    ) PARTITIONED BY (data)
    WITH
      ('format-version' = '2');

  3. 参考新建作业,新建Flink SQL流作业,参考如下内容在作业开发界面进行作业开发,根据业务要求配置“checkpoint”间隔,配置完成后启动作业。

    CREATE TABLE datagen(id BIGINT, data STRING) WITH (
      'connector' = 'datagen',
      'rows-per-second' = '5'
    );
    insert into
      iceberg_catalog.`default`.iceberg_table
    select
      *
    from
      datagen;

Iceberg作为Source表

  1. 使用flink_admin登录Manager界面,选择“集群 > 服务 > Flink”,在“Flink Web UI”右侧,单击链接,访问Flink的WebUI。
  2. 参考新建作业,新建Flink SQL流作业,参考如下内容在作业开发界面进行作业开发,配置完成后启动作业。

    CREATE TABLE print(id BIGINT, data STRING) WITH ('connector' = 'print');
    insert into
      print
    select
      *
    from
      iceberg_catalog.`default`.iceberg_table
      /*+ OPTIONS('streaming'='true', 'monitor-interval'='60s')*/ ;

相关文档