使用Temporal join关联维表的最新分区
功能描述
对于随时间变化的分区表,我们可以将其读取为无界流,如果每个分区包含某个版本的完整数据,则该分区可以被视为时间表的一个版本,时间表的版本保留了分区的数据。Flink支持在处理时间关联中自动跟踪时间表的最新分区(版本)。
最新分区(版本)由 'streaming-source.partition-order' 选项定义。
这是在Flink 流应用作业中将 Hive 表用作维度表的最常见用例。
注意事项
使用Temporal join关联维表的最新分区,仅在Flink STREAMING模式下支持。
示例
下面的示例展示了一个经典的业务流水线,维度表来自 Hive,每天通过批处理流水线作业或 Flink 作业更新一次,kafka流来自实时在线业务数据或日志,需要与维度表连接以扩充流。
- 使用spark sql 创建 hive obs 外表,并插入数据。
CREATE TABLE if not exists dimension_hive_table ( product_id STRING, product_name STRING, unit_price DECIMAL(10, 4), pv_count BIGINT, like_count BIGINT, comment_count BIGINT, update_time TIMESTAMP, update_user STRING ) STORED AS PARQUET LOCATION 'obs://demo/spark.db/dimension_hive_table' PARTITIONED BY ( create_time STRING );
INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_11', 'product_name_11', 1.2345, 100, 50, 20, '2023-11-25 02:10:58', 'update_user_1'); INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_12', 'product_name_12', 2.3456, 200, 100, 40, '2023-11-25 02:10:58', 'update_user_2'); INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_13', 'product_name_13', 3.4567, 300, 150, 60, '2023-11-25 02:10:58', 'update_user_3'); INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_14', 'product_name_14', 4.5678, 400, 200, 80, '2023-11-25 02:10:58', 'update_user_4'); INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_15', 'product_name_15', 5.6789, 500, 250, 100, '2023-11-25 02:10:58', 'update_user_5'); INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_16', 'product_name_16', 6.7890, 600, 300, 120, '2023-11-25 02:10:58', 'update_user_6'); INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_17', 'product_name_17', 7.8901, 700, 350, 140, '2023-11-25 02:10:58', 'update_user_7'); INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_18', 'product_name_18', 8.9012, 800, 400, 160, '2023-11-25 02:10:58', 'update_user_8'); INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_19', 'product_name_19', 9.0123, 900, 450, 180, '2023-11-25 02:10:58', 'update_user_9'); INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_10', 'product_name_10', 10.1234, 1000, 500, 200, '2023-11-25 02:10:58', 'update_user_10');
- 参考创建Flink OpenSource作业,创建flink opensource sql作业,输入以下作业脚本,并提交运行。该作业模拟从kafka读取数据,并关联hive维表对数据进行打宽,并输出到print。
如下脚本中的加粗参数请根据实际环境修改。
CREATE CATALOG myhive WITH ( 'type' = 'hive' , 'default-database' = 'demo', 'hive-conf-dir' = '/opt/flink/conf' ); USE CATALOG myhive; CREATE TABLE if not exists ordersSource ( product_id STRING, user_name string, proctime as Proctime() ) WITH ( 'connector' = 'kafka', 'topic' = 'TOPIC', 'properties.bootstrap.servers' = 'KafkaIP:PROT,KafkaIP:PROT,KafkaIP:PROT', 'properties.group.id' = 'GroupId', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); create table if not exists print ( product_id STRING, user_name string, product_name STRING, unit_price DECIMAL(10, 4), pv_count BIGINT, like_count BIGINT, comment_count BIGINT, update_time TIMESTAMP, update_user STRING, create_time STRING ) with ( 'connector' = 'print' ); insert into print select orders.product_id, orders.user_name, dim.product_name, dim.unit_price, dim.pv_count, dim.like_count, dim.comment_count, dim.update_time, dim.update_user, dim.create_time from ordersSource orders left join dimension_hive_table /*+ OPTIONS('streaming-source.enable'='true', 'streaming-source.partition.include' = 'latest', 'streaming-source.monitor-interval' = '10 m') */ for system_time as of orders.proctime as dim on orders.product_id = dim.product_id;
- 连接Kafka集群,向Kafka的source topic中插入如下测试数据:
{"product_id": "product_id_11", "user_name": "name11"} {"product_id": "product_id_12", "user_name": "name12"}
- 查看print结果表数据。
+I[product_id_11, name11, product_name_11, 1.2345, 100, 50, 20, 2023-11-24T18:10:58, update_user_1, create_time_1] +I[product_id_12, name12, product_name_12, 2.3456, 200, 100, 40, 2023-11-24T18:10:58, update_user_2, create_time_1]
- 模拟向hive 维表,插入新的分区数据
INSERT INTO dimension_hive_table PARTITION (create_time='create_time_2') VALUES ('product_id_21', 'product_name_21', 1.2345, 100, 50, 20, '2023-11-25 02:10:58', 'update_user_1'); INSERT INTO dimension_hive_table PARTITION (create_time='create_time_2') VALUES ('product_id_22', 'product_name_22', 2.3456, 200, 100, 40, '2023-11-25 02:10:58', 'update_user_2'); INSERT INTO dimension_hive_table PARTITION (create_time='create_time_2') VALUES ('product_id_23', 'product_name_23', 3.4567, 300, 150, 60, '2023-11-25 02:10:58', 'update_user_3'); INSERT INTO dimension_hive_table PARTITION (create_time='create_time_2') VALUES ('product_id_24', 'product_name_24', 4.5678, 400, 200, 80, '2023-11-25 02:10:58', 'update_user_4'); INSERT INTO dimension_hive_table PARTITION (create_time='create_time_2') VALUES ('product_id_25', 'product_name_25', 5.6789, 500, 250, 100, '2023-11-25 02:10:58', 'update_user_5'); INSERT INTO dimension_hive_table PARTITION (create_time='create_time_2') VALUES ('product_id_26', 'product_name_26', 6.7890, 600, 300, 120, '2023-11-25 02:10:58', 'update_user_6'); INSERT INTO dimension_hive_table PARTITION (create_time='create_time_2') VALUES ('product_id_27', 'product_name_27', 7.8901, 700, 350, 140, '2023-11-25 02:10:58', 'update_user_7'); INSERT INTO dimension_hive_table PARTITION (create_time='create_time_2') VALUES ('product_id_28', 'product_name_28', 8.9012, 800, 400, 160, '2023-11-25 02:10:58', 'update_user_8'); INSERT INTO dimension_hive_table PARTITION (create_time='create_time_2') VALUES ('product_id_29', 'product_name_29', 9.0123, 900, 450, 180, '2023-11-25 02:10:58', 'update_user_9'); INSERT INTO dimension_hive_table PARTITION (create_time='create_time_2') VALUES ('product_id_20', 'product_name_20', 10.1234, 1000, 500, 200, '2023-11-25 02:10:58', 'update_user_10');
- 连接Kafka集群,向Kafka的source topic中插入如下测试数据。关联上一个分区create_time='create_time_1'数据:
{"product_id": "product_id_13", "user_name": "name13"}
- 查看print结果表数据。可观察到hive维表中的前一个分区create_time='create_time_1'数据已经被清除
+I[product_id_13, name13, null, null, null, null, null, null, null, null]
- 连接Kafka集群,向Kafka的source topic中插入如下测试数据。关联最新分区create_time='create_time_2'数据:
{"product_id": "product_id_21", "user_name": "name21"}
- 查看print结果表数据。可观察到hive维表中保存了最新分区create_time='create_time_2'的数据
+I[product_id_21, name21, product_name_21, 1.2345, 100, 50, 20, 2023-11-24T18:10:58, update_user_1, create_time_2]