更新时间:2026-06-11 GMT+08:00
分享

FlinkServer作业对接Iceberg

操作场景

本章节提供了如何使用FlinkServer写FlinkSQL对接Iceberg。Iceberg可以作为Sink表和Source表。

前提条件

  • 集群中已安装HDFS、Yarn、Flink服务。
  • 已部署和MRS集群环境网络相通的HTTP服务。
  • 参考创建FlinkServer权限角色创建一个具有FlinkServer管理员权限的用户用于访问Flink WebUI,如:flinkuser

Iceberg作为Sink表

  1. 使用flinkuser登录Manager界面,选择“集群 > 服务 > Flink”,在“Flink Web UI”右侧,单击链接,访问Flink的WebUI。
  2. 参考集群连接模式创建Flink SQL作业,新建Flink SQL流作业,参考如下内容在作业开发界面进行作业开发,根据业务要求配置“checkpoint”间隔,配置完成后启动作业。

    CREATE TEMPORARY TABLE datagen(id BIGINT, data STRING) WITH (
      'connector' = 'datagen',
      'rows-per-second' = '5'
    );
    CREATE TABLE flink_table (id BIGINT, data STRING) WITH (
      'connector' = 'iceberg',
      'catalog-name' = 'hive_prod',
      'catalog-database' = 'default',
      'catalog-table' = 'hive_iceberg_table',
      'uri' = '连接MetaStore的URI配置',
      'hive.metastore.kerberos.principal' = 'Hive客户端hive-site.xml文件中hive.metastore.kerberos.principal的值' --普通模式集群不需要该参数
    );
    insert into
      flink_table
    select
      *
    from
      datagen;

Iceberg作为Source表

  1. 使用flinkuser登录Manager界面,选择“集群 > 服务 > Flink”,在“Flink Web UI”右侧,单击链接,访问Flink的WebUI。
  2. 参考集群连接模式创建Flink SQL作业,新建Flink SQL流作业,参考如下内容在作业开发界面进行作业开发,配置完成后启动作业。

    CREATE TEMPORARY TABLE print(id BIGINT, data STRING) WITH ('connector' = 'print');
    CREATE TABLE flink_table (id BIGINT, data STRING) WITH (
      'connector' = 'iceberg',
      'catalog-name' = 'hive_prod',
      'catalog-database' = 'default',
      'catalog-table' = 'hive_iceberg_table',
      'uri' = '连接MetaStore的URI配置',
      'hive.metastore.kerberos.principal' = 'Hive客户端hive-site.xml文件中hive.metastore.kerberos.principal的值' --普通模式集群不需要该参数
    );
    insert into
      print
    select
      *
    from
      flink_table
      /*+ OPTIONS('streaming'='true', 'monitor-interval'='60s')*/ ;

相关文档