更新时间:2025-02-12 GMT+08:00
DGC方式如何创建Flink Hive Sql作业
问题现象
使用DGC方式如何创建Flink Hive Sql作业。
解决方法
若通过DGC方式创建提交Flink Hive作业,以读Kafka写Hive作业为例,步骤如下:
- 提前在Hive客户端中创建Hive表。例如:
create table user_behavior_hive_tbl_no_partition( user_id STRING, item_id STRING, cat_id STRING, ts timestamp ) PARTITIONED BY (dy STRING, ho STRING, mi STRING) stored as textfile TBLPROPERTIES ( 'partition.time-extractor.timestamp-pattern' = '$dy $ho:$mi:00', 'sink.partition-commit.trigger' ='process-time', 'sink.partition-commit.delay' = '0S', 'sink.partition-commit.policy.kind' = 'metastore,success-file' );
- 创建Flink Hive Sql作业,在DGC提交运行。Sql示例如下:
CREATE TABLE test_kafka ( user_id varchar, item_id varchar, cat_id varchar, zw_test timestamp ) WITH ( 'connector' = 'kafka', 'topic' = 'zw_test_kafka', 'format' = 'json', 'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号', 'properties.group.id' = 'example-group1', 'scan.startup.mode' = 'latest-offset' ); CREATE CATALOG myhive WITH ( 'type' = 'hive', 'hive-version' = '3.1.0', 'default-database' = 'default' ); use catalog myhive; INSERT into user_behavior_hive_tbl_no_partition SELECT user_id, item_id, cat_id, zw_test, DATE_FORMAT(zw_test, 'yyyy-MM-dd'), DATE_FORMAT(zw_test, 'HH'), DATE_FORMAT(zw_test, 'mm') FROM default_catalog.default_database.test_kafka;
父主题: Flink常见开发问题