配置FlinkServer对接Paimon
操作场景
本章节提供了如何使用FlinkServer写FlinkSQL对接Paimon的操作指导。Paimon可以作为Sink表、Source表和维表。
前提条件
- 集群中已安装HDFS、Zookeeper、Yarn、Flink和Hive组件。
- 参考创建FlinkServer权限角色创建一个具有FlinkServer管理员权限的用户用于访问Flink WebUI,如:flink_admin。
- 参考Catalog管理创建一个Paimon catalog,如:paimon_catalog。
- 仅支持Flink与Hive组件共集群,不支持Hive多服务。
FlinkSQL与Paimon数据类型对应关系
| FlinkSQL数据类型 | Paimon数据类型 |
|---|---|
| BOOLEAN | BOOLEAN |
| TINYINT | TINYINT |
| SMALLINT | SMALLINT |
| INTEGER | INTEGER |
| BIGINT | BIGINT |
| FLOAT | FLOAT |
| DOUBLE | DOUBLE |
| CHAR(n) | CHAR(n) |
| VARCHAR(n) | VARCHAR(n) |
| STRING | STRING |
| BINARY(n) | BINARY(n) |
| VARBINARY(n) | VARBINARY(n) |
| BYTES | BYTES |
| DECIMAL(p, s) | DECIMAL(p, s) |
| DATE | DATE |
| TIME(p) | TIME(p) |
| TIMESTAMP(p) | TIMESTAMP(p) |
| TIMESTAMP(p) WITH LOCAL TIME ZONE | TIMESTAMP WITH LOCAL TIME ZONE |
| ARRAY<t> | ARRAY<t> |
| MAP<kt, vt> | MAP<kt, vt> |
| MULTISET<t> | MULTISET<t> |
| ROW<n0 t0 'd0', n1 t1 'd1', ...> | ROW<n0 t0 'd0', n1 t1 'd1', ...> |
Paimon作为Sink表
- 使用flink_admin登录Manager界面,选择“集群 > 服务 > Flink”,在“Flink Web UI”右侧,单击链接,访问Flink的WebUI。
- 新建一个paimon表,例如:paimon_catalog.`default`.paimon_table,具体操作可参考数据管理,完整示例如下:
CREATE TABLE if not exists paimon_catalog.`default`.paimon_table ( user_id BIGINT, item_id BIGINT, behavior STRING, dt STRING, hh STRING, PRIMARY KEY (dt, hh, user_id) NOT ENFORCED );
- 参考新建作业,新建Flink SQL流作业,在作业开发界面进行作业开发,配置完成后启动作业。 需勾选“基础参数”中的“开启CheckPoint”,根据业务要求配置“CheckPoint”间隔。
CREATE TABLE datagen ( user_id BIGINT, item_id BIGINT, behavior STRING, dt STRING, hh STRING ) with ('connector' = 'datagen', 'rows-per-second' = '5'); insert into paimon_catalog.`default`.paimon_table select * from datagen;
Paimon作为Source表
- 使用flink_admin登录Manager界面,选择“集群 > 服务 > Flink”,在“Flink Web UI”右侧,单击链接,访问Flink的WebUI。
- 参考新建作业,新建Flink SQL流作业,在作业开发界面进行作业开发,配置完成后启动作业。
CREATE TABLE print ( user_id BIGINT, item_id BIGINT, behavior STRING, dt STRING, hh STRING ) with ('connector' = 'print'); insert into print select * from paimon_catalog.`default`.paimon_table;
Paimon作为维表
- 使用flink_admin登录Manager界面,选择“集群 > 服务 > Flink”,在“Flink Web UI”右侧,单击链接,访问Flink的WebUI。
- 参考新建作业,新建Flink SQL流作业,在作业开发界面进行作业开发,配置完成后启动作业。
CREATE TABLE datagen ( user_id BIGINT, item_id BIGINT, behavior STRING, dt STRING, hh STRING ) with ('connector' = 'datagen', 'rows-per-second' = '5'); CREATE TABLE print ( user_id BIGINT, item_id BIGINT, behavior STRING, dt STRING, hh STRING ) with ('connector' = 'print'); insert into print select o.user_id, c.item_id, c.behavior, c.dt, c.hh from datagen AS o left join paimon_catalog.`default`.paimon_table FOR SYSTEM_TIME AS OF PROCTIME() AS c ON o.user_id = c.user_id;
Flink Paimon连接器配置选项
| 配置键 | 默认值 | 类型 | 说明 |
|---|---|---|---|
| changelog.precommit-compact.thread-num | (无) | Integer | 从小型changelog文件复制字节的最大线程数。 默认为Java虚拟机可用的处理器数量。 |
| end-input.watermark | (无) | Long | 批处理模式或有界流情况下可选的endInput水印。 |
| lookup.async | false | Boolean | 是否启用异步lookup join。 |
| lookup.async-thread-number | 16 | Integer | lookup异步操作的线程数。 |
| lookup.bootstrap-parallelism | 4 | Integer | lookup join中单个任务的bootstrap并行度。 |
| lookup.cache | AUTO | Enum | lookup join的缓存模式。 可选值:"AUTO"、"FULL"、"MEMORY" |
| lookup.dynamic-partition.refresh-interval | 1 h | Duration | lookup的动态分区刷新间隔,扫描所有分区并获取对应分区。 |
| lookup.refresh.async | false | Boolean | 是否在异步线程中刷新lookup表。 |
| lookup.refresh.async.pending-snapshot-count | 5 | Integer | 如果待处理快照数量超过阈值,lookup算子将同步刷新表。 |
| lookup.refresh.time-periods-blacklist | (无) | String | 黑名单包含多个时间段。在这些时间段内,禁止刷新lookup表的缓存。黑名单格式为start1->end1,start2->end2,...,时间格式为yyyy-MM-dd HH:mm。仅在lookup表为FULL缓存模式时使用。 |
| partition.idle-time-to-done | (无) | Duration | 设置一个时间间隔,当分区在此时间间隔内没有新数据时,标记为done状态,表示数据已就绪。 |
| partition.mark-done-action.mode | process-time | Enum | 如何触发分区标记完成操作。可选值:"process-time":基于机器时间,在处理时间超过周期时间加延迟后标记分区完成;"watermark":基于输入的水印,在水印超过周期时间加延迟后标记分区完成。 |
| partition.mark-done.recover-from-state | true | Boolean | 从状态恢复时是否触发分区标记完成。 |
| partition.time-interval | (无) | Duration | 可以为分区指定时间间隔,例如,每日分区为'1 d',每小时分区为'1 h'。 |
| postpone.default-bucket-num | 1 | Integer | postpone bucket表中首次压缩的分区的bucket数量。 |
| precommit-compact | false | Boolean | 如果为true,将在writer算子后添加compact coordinator和worker算子,以便将同一分区的多个changelog文件(主键表)或新创建的数据文件(无感知bucket表)压缩成更大的文件,从而减少小文件数量。 |
| read.shuffle-bucket-with-partition | true | Boolean | 读取时是否按分区和bucket进行shuffle。 |
| scan.bounded | (无) | Boolean | Paimon消费者的有界模式。默认情况下,Paimon会根据Flink作业的模式自动选择有界模式。 |
| scan.dedicated-split-generation | false | Boolean | 如果为true,split生成过程将在Flink任务运行时执行,而不是在初始化阶段由JobManager执行。 |
| scan.infer-parallelism | true | Boolean | 如果为false,source的并行度由全局并行度设置。否则,source并行度根据split数量(批处理模式)或bucket数量(流处理模式)推断。 |
| scan.infer-parallelism.max | 1024 | Integer | 如果scan.infer-parallelism为true,通过此选项限制source的并行度。 |
| scan.max-snapshot.count | -1 | Integer | 每个检查点扫描的最大快照数量。为负数时表示不限制。 |
| scan.parallelism | (无) | Integer | 为扫描源定义自定义并行度。默认情况下,如果未定义此选项,规划器将根据全局配置单独为每个语句推导并行度。如果启用了 scan.infer-parallelism,规划器将根据推断的并行度推导并行度。 |
| scan.partitions | (无) | String | 指定要扫描的分区。分区应以key1=value1,key2=value2的形式给出。未指定的分区键将填充partition.default-name的值。多个分区应用分号(;)分隔。此选项支持普通源表和lookup join表。对于lookup join,还支持两个特殊值max_pt()和max_two_pt(),分别指定具有最大分区值的(两个)分区。 |
| scan.remove-normalize | false | Boolean | 是否强制在流式读取时移除normalize节点。注意:这是危险的,如果下游用于计算聚合且输入不是完整的changelog,可能会导致数据错误。 |
| scan.split-enumerator.batch-size | 10 | Integer | 在StaticFileStoreSplitEnumerator中,每批应分配给子任务的split数量,以避免超过akka.framesize限制。 |
| scan.split-enumerator.mode | fair | Enum | StaticFileStoreSplitEnumerator分配split的模式。可选值:"fair":批处理读取时均匀分配split,防止少数任务读取全部;"preemptive":根据任务的消费速度抢占式分配split。 |
| scan.watermark.alignment.group | (无) | String | 一组需要对齐水印的源。 |
| scan.watermark.alignment.max-drift | (无) | Duration | 对齐水印的最大漂移,在我们暂停从源/任务/分区消费之前。 |
| scan.watermark.alignment.update-interval | 1 s | Duration | 任务应多久通知协调器当前水印一次,以及协调器应多久宣布一次最大对齐水印。 |
| scan.watermark.emit.strategy | on-event | Enum | 水印生成的发射策略。可选值:"on-periodic":定期发射水印,间隔由Flink 'pipeline.auto-watermark-interval'控制;"on-event":每条记录发射水印。 |
| scan.watermark.idle-timeout | (无) | Duration | 如果流的某个分区在该时间段内没有记录流动,则该分区被视为"空闲",不会阻碍下游算子中水印的进度。 |
| sink.clustering.by-columns | (无) | String | 指定用于范围分区比较的列名,格式为'columnName1,columnName2'。如果未设置或设置为空字符串,表示未启用范围分区功能。此选项仅对无主键的bucket无感知表和批处理执行模式有效。 |
| sink.clustering.sample-factor | 100 | Integer | 指定采样因子。设S表示总样本数,F表示采样因子,P表示sink并行度,则S = F × P。允许的最小采样因子为20。 |
| sink.clustering.sort-in-cluster | true | Boolean | 指示在范围分区后是否对属于每个sink任务的数据进行进一步排序。 |
| sink.clustering.strategy | "auto" | String | 指定范围分区使用的比较算法,包括'zorder'、'hilbert'和'order',分别对应z-order曲线算法、hilbert曲线算法和基本类型比较算法。未配置时,将根据'sink.clustering.by-columns'中的列数自动确定算法:1列使用'order',少于5列使用'zorder',5列或以上使用'hilbert'。 |
| sink.committer-cpu | 1.0 | Double | Sink committer CPU,用于控制全局committer的CPU核心数。 |
| sink.committer-memory | (无) | MemorySize | Sink committer内存,用于控制全局committer的堆内存。 |
| sink.committer-operator-chaining | true | Boolean | 允许sink committer和writer算子链接在一起。 |
| sink.cross-partition.managed-memory | 256 mb | MemorySize | cross-partition update中RocksDB的托管内存权重,Flink将根据权重计算内存大小,实际使用的内存取决于运行环境。 |
| sink.managed.writer-buffer-memory | 256 mb | MemorySize | writer buffer在托管内存中的权重,Flink将根据权重为writer计算内存大小,实际使用的内存取决于运行环境。 |
| sink.operator-uid.suffix | (无) | String | 设置writer、dynamic bucket assigner 和 committer算子的uid后缀。uid格式为{UID_PREFIX}_ {TABLE_NAME}_ $ {USER_UID_SUFFIX}。如果未设置uid后缀,Flink将自动生成算子uid,当拓扑结构发生变化时可能会不兼容。 |
| sink.parallelism | (无) | Integer | 为sink定义自定义并行度。默认情况下,如果未定义此选项,规划器将根据全局配置单独为每个语句推导并行度。 |
| sink.savepoint.auto-tag | false | Boolean | 如果为true,将为Flink savepoint创建的快照自动创建标签。 |
| sink.use-managed-memory-allocator | false | Boolean | 如果为true,Flink sink将使用托管内存进行merge tree;否则,将创建独立的内存分配器。 |
| sink.writer-cpu | 1.0 | Double | Sink writer CPU,用于控制writer的CPU核心数。 |
| sink.writer-memory | (无) | MemorySize | Sink writer内存,用于控制writer的堆内存。 |
| source.checkpoint-align.enabled | false | Boolean | 是否将Flink检查点与Paimon表的快照对齐。如果为true,只有在消费快照时才会进行检查点。 |
| source.checkpoint-align.timeout | 30 s | Duration | 当检查点开始触发时,如果尚未生成新快照,枚举器将阻塞检查点并等待新快照。设置最大等待时间以避免无限等待,如果超时,检查点将失败。注意,它应设置得比检查点超时时间小。 |
| source.operator-uid.suffix | (无) | String | 设置source算子的uid后缀。设置后,uid格式为{UID_PREFIX}_ {TABLE_NAME}_ $ {USER_UID_SUFFIX}。如果未设置uid后缀,Flink将自动生成算子uid,当拓扑结构发生变化时可能会不兼容。 |
| unaware-bucket.compaction.parallelism | (无) | Integer | 为unaware-bucket表压缩作业定义自定义并行度。默认情况下,如果未定义此选项,规划器将根据全局配置单独为每个语句推导并行度。 |