更新时间:2026-06-11 GMT+08:00
分享

配置FlinkServer对接Paimon

操作场景

本章节提供了如何使用FlinkServer写FlinkSQL对接Paimon的操作指导。Paimon可以作为Sink表、Source表和维表。

前提条件

  • 集群中已安装HDFS、Zookeeper、Yarn、Flink和Hive组件。
  • 参考创建FlinkServer权限角色创建一个具有FlinkServer管理员权限的用户用于访问Flink WebUI,如:flink_admin
  • 参考Catalog管理创建一个Paimon catalog,如:paimon_catalog。
  • 仅支持Flink与Hive组件共集群,不支持Hive多服务。

FlinkSQL与Paimon数据类型对应关系

表1 FlinkSQL与Paimon数据类型对应关系

FlinkSQL数据类型

Paimon数据类型

BOOLEAN

BOOLEAN

TINYINT

TINYINT

SMALLINT

SMALLINT

INTEGER

INTEGER

BIGINT

BIGINT

FLOAT

FLOAT

DOUBLE

DOUBLE

CHAR(n)

CHAR(n)

VARCHAR(n)

VARCHAR(n)

STRING

STRING

BINARY(n)

BINARY(n)

VARBINARY(n)

VARBINARY(n)

BYTES

BYTES

DECIMAL(p, s)

DECIMAL(p, s)

DATE

DATE

TIME(p)

TIME(p)

TIMESTAMP(p)

TIMESTAMP(p)

TIMESTAMP(p) WITH LOCAL TIME ZONE

TIMESTAMP WITH LOCAL TIME ZONE

ARRAY<t>

ARRAY<t>

MAP<kt, vt>

MAP<kt, vt>

MULTISET<t>

MULTISET<t>

ROW<n0 t0 'd0', n1 t1 'd1', ...>

ROW<n0 t0 'd0', n1 t1 'd1', ...>

Paimon作为Sink表

  1. 使用flink_admin登录Manager界面,选择“集群 > 服务 > Flink”,在“Flink Web UI”右侧,单击链接,访问Flink的WebUI。
  2. 新建一个paimon表,例如:paimon_catalog.`default`.paimon_table,具体操作可参考数据管理,完整示例如下:

    CREATE TABLE if not exists paimon_catalog.`default`.paimon_table (
      user_id BIGINT,
      item_id BIGINT,
      behavior STRING,
      dt STRING,
      hh STRING,
      PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
    );

  3. 参考新建作业,新建Flink SQL流作业,在作业开发界面进行作业开发,配置完成后启动作业。

    需勾选“基础参数”中的“开启CheckPoint”,根据业务要求配置“CheckPoint”间隔。
    CREATE TABLE datagen (
      user_id BIGINT,
      item_id BIGINT,
      behavior STRING,
      dt STRING,
      hh STRING
    )
    with
      ('connector' = 'datagen', 'rows-per-second' = '5');
    
    insert into
      paimon_catalog.`default`.paimon_table
    select
      *
    from
      datagen;

Paimon作为Source表

  1. 使用flink_admin登录Manager界面,选择“集群 > 服务 > Flink”,在“Flink Web UI”右侧,单击链接,访问Flink的WebUI。
  2. 参考新建作业,新建Flink SQL流作业,在作业开发界面进行作业开发,配置完成后启动作业。

    CREATE TABLE print (
      user_id BIGINT,
      item_id BIGINT,
      behavior STRING,
      dt STRING,
      hh STRING
    )
    with
      ('connector' = 'print');
    
    insert into
      print
    select
      *
    from
      paimon_catalog.`default`.paimon_table;

Paimon作为维表

  1. 使用flink_admin登录Manager界面,选择“集群 > 服务 > Flink”,在“Flink Web UI”右侧,单击链接,访问Flink的WebUI。
  2. 参考新建作业,新建Flink SQL流作业,在作业开发界面进行作业开发,配置完成后启动作业。

    CREATE TABLE datagen (
      user_id BIGINT,
      item_id BIGINT,
      behavior STRING,
      dt STRING,
      hh STRING
    )
    with
      ('connector' = 'datagen', 'rows-per-second' = '5');
    
    CREATE TABLE print (
      user_id BIGINT,
      item_id BIGINT,
      behavior STRING,
      dt STRING,
      hh STRING
    )
    with
      ('connector' = 'print');
    
    insert into
      print
    select
      o.user_id,
      c.item_id,
      c.behavior,
      c.dt,
      c.hh
    from
      datagen AS o
      left join paimon_catalog.`default`.paimon_table FOR SYSTEM_TIME AS OF PROCTIME() AS c ON o.user_id = c.user_id;

Flink Paimon连接器配置选项

表2 Flink Paimon连接器配置选项

配置键

默认值

类型

说明

changelog.precommit-compact.thread-num

(无)

Integer

从小型changelog文件复制字节的最大线程数。

默认为Java虚拟机可用的处理器数量。

end-input.watermark

(无)

Long

批处理模式或有界流情况下可选的endInput水印。

lookup.async

false

Boolean

是否启用异步lookup join。

lookup.async-thread-number

16

Integer

lookup异步操作的线程数。

lookup.bootstrap-parallelism

4

Integer

lookup join中单个任务的bootstrap并行度。

lookup.cache

AUTO

Enum

lookup join的缓存模式。

可选值:"AUTO"、"FULL"、"MEMORY"

lookup.dynamic-partition.refresh-interval

1 h

Duration

lookup的动态分区刷新间隔,扫描所有分区并获取对应分区。

lookup.refresh.async

false

Boolean

是否在异步线程中刷新lookup表。

lookup.refresh.async.pending-snapshot-count

5

Integer

如果待处理快照数量超过阈值,lookup算子将同步刷新表。

lookup.refresh.time-periods-blacklist

(无)

String

黑名单包含多个时间段。在这些时间段内,禁止刷新lookup表的缓存。黑名单格式为start1->end1,start2->end2,...,时间格式为yyyy-MM-dd HH:mm。仅在lookup表为FULL缓存模式时使用。

partition.idle-time-to-done

(无)

Duration

设置一个时间间隔,当分区在此时间间隔内没有新数据时,标记为done状态,表示数据已就绪。

partition.mark-done-action.mode

process-time

Enum

如何触发分区标记完成操作。可选值:"process-time":基于机器时间,在处理时间超过周期时间加延迟后标记分区完成;"watermark":基于输入的水印,在水印超过周期时间加延迟后标记分区完成。

partition.mark-done.recover-from-state

true

Boolean

从状态恢复时是否触发分区标记完成。

partition.time-interval

(无)

Duration

可以为分区指定时间间隔,例如,每日分区为'1 d',每小时分区为'1 h'。

postpone.default-bucket-num

1

Integer

postpone bucket表中首次压缩的分区的bucket数量。

precommit-compact

false

Boolean

如果为true,将在writer算子后添加compact coordinator和worker算子,以便将同一分区的多个changelog文件(主键表)或新创建的数据文件(无感知bucket表)压缩成更大的文件,从而减少小文件数量。

read.shuffle-bucket-with-partition

true

Boolean

读取时是否按分区和bucket进行shuffle。

scan.bounded

(无)

Boolean

Paimon消费者的有界模式。默认情况下,Paimon会根据Flink作业的模式自动选择有界模式。

scan.dedicated-split-generation

false

Boolean

如果为true,split生成过程将在Flink任务运行时执行,而不是在初始化阶段由JobManager执行。

scan.infer-parallelism

true

Boolean

如果为false,source的并行度由全局并行度设置。否则,source并行度根据split数量(批处理模式)或bucket数量(流处理模式)推断。

scan.infer-parallelism.max

1024

Integer

如果scan.infer-parallelism为true,通过此选项限制source的并行度。

scan.max-snapshot.count

-1

Integer

每个检查点扫描的最大快照数量。为负数时表示不限制。

scan.parallelism

(无)

Integer

为扫描源定义自定义并行度。默认情况下,如果未定义此选项,规划器将根据全局配置单独为每个语句推导并行度。如果启用了 scan.infer-parallelism,规划器将根据推断的并行度推导并行度。

scan.partitions

(无)

String

指定要扫描的分区。分区应以key1=value1,key2=value2的形式给出。未指定的分区键将填充partition.default-name的值。多个分区应用分号(;)分隔。此选项支持普通源表和lookup join表。对于lookup join,还支持两个特殊值max_pt()和max_two_pt(),分别指定具有最大分区值的(两个)分区。

scan.remove-normalize

false

Boolean

是否强制在流式读取时移除normalize节点。注意:这是危险的,如果下游用于计算聚合且输入不是完整的changelog,可能会导致数据错误。

scan.split-enumerator.batch-size

10

Integer

在StaticFileStoreSplitEnumerator中,每批应分配给子任务的split数量,以避免超过akka.framesize限制。

scan.split-enumerator.mode

fair

Enum

StaticFileStoreSplitEnumerator分配split的模式。可选值:"fair":批处理读取时均匀分配split,防止少数任务读取全部;"preemptive":根据任务的消费速度抢占式分配split。

scan.watermark.alignment.group

(无)

String

一组需要对齐水印的源。

scan.watermark.alignment.max-drift

(无)

Duration

对齐水印的最大漂移,在我们暂停从源/任务/分区消费之前。

scan.watermark.alignment.update-interval

1 s

Duration

任务应多久通知协调器当前水印一次,以及协调器应多久宣布一次最大对齐水印。

scan.watermark.emit.strategy

on-event

Enum

水印生成的发射策略。可选值:"on-periodic":定期发射水印,间隔由Flink 'pipeline.auto-watermark-interval'控制;"on-event":每条记录发射水印。

scan.watermark.idle-timeout

(无)

Duration

如果流的某个分区在该时间段内没有记录流动,则该分区被视为"空闲",不会阻碍下游算子中水印的进度。

sink.clustering.by-columns

(无)

String

指定用于范围分区比较的列名,格式为'columnName1,columnName2'。如果未设置或设置为空字符串,表示未启用范围分区功能。此选项仅对无主键的bucket无感知表和批处理执行模式有效。

sink.clustering.sample-factor

100

Integer

指定采样因子。设S表示总样本数,F表示采样因子,P表示sink并行度,则S = F × P。允许的最小采样因子为20。

sink.clustering.sort-in-cluster

true

Boolean

指示在范围分区后是否对属于每个sink任务的数据进行进一步排序。

sink.clustering.strategy

"auto"

String

指定范围分区使用的比较算法,包括'zorder'、'hilbert'和'order',分别对应z-order曲线算法、hilbert曲线算法和基本类型比较算法。未配置时,将根据'sink.clustering.by-columns'中的列数自动确定算法:1列使用'order',少于5列使用'zorder',5列或以上使用'hilbert'。

sink.committer-cpu

1.0

Double

Sink committer CPU,用于控制全局committer的CPU核心数。

sink.committer-memory

(无)

MemorySize

Sink committer内存,用于控制全局committer的堆内存。

sink.committer-operator-chaining

true

Boolean

允许sink committer和writer算子链接在一起。

sink.cross-partition.managed-memory

256 mb

MemorySize

cross-partition update中RocksDB的托管内存权重,Flink将根据权重计算内存大小,实际使用的内存取决于运行环境。

sink.managed.writer-buffer-memory

256 mb

MemorySize

writer buffer在托管内存中的权重,Flink将根据权重为writer计算内存大小,实际使用的内存取决于运行环境。

sink.operator-uid.suffix

(无)

String

设置writer、dynamic bucket assigner 和 committer算子的uid后缀。uid格式为{UID_PREFIX}_ {TABLE_NAME}_ $ {USER_UID_SUFFIX}。如果未设置uid后缀,Flink将自动生成算子uid,当拓扑结构发生变化时可能会不兼容。

sink.parallelism

(无)

Integer

为sink定义自定义并行度。默认情况下,如果未定义此选项,规划器将根据全局配置单独为每个语句推导并行度。

sink.savepoint.auto-tag

false

Boolean

如果为true,将为Flink savepoint创建的快照自动创建标签。

sink.use-managed-memory-allocator

false

Boolean

如果为true,Flink sink将使用托管内存进行merge tree;否则,将创建独立的内存分配器。

sink.writer-cpu

1.0

Double

Sink writer CPU,用于控制writer的CPU核心数。

sink.writer-memory

(无)

MemorySize

Sink writer内存,用于控制writer的堆内存。

source.checkpoint-align.enabled

false

Boolean

是否将Flink检查点与Paimon表的快照对齐。如果为true,只有在消费快照时才会进行检查点。

source.checkpoint-align.timeout

30 s

Duration

当检查点开始触发时,如果尚未生成新快照,枚举器将阻塞检查点并等待新快照。设置最大等待时间以避免无限等待,如果超时,检查点将失败。注意,它应设置得比检查点超时时间小。

source.operator-uid.suffix

(无)

String

设置source算子的uid后缀。设置后,uid格式为{UID_PREFIX}_ {TABLE_NAME}_ $ {USER_UID_SUFFIX}。如果未设置uid后缀,Flink将自动生成算子uid,当拓扑结构发生变化时可能会不兼容。

unaware-bucket.compaction.parallelism

(无)

Integer

为unaware-bucket表压缩作业定义自定义并行度。默认情况下,如果未定义此选项,规划器将根据全局配置单独为每个语句推导并行度。

相关文档