更新时间:2025-07-14 GMT+08:00
DGC方式如何创建Flink Hive Sql作业
问题现象
使用DGC方式如何创建Flink Hive Sql作业。
解决方法
若通过DGC方式创建提交Flink Hive作业,以读Kafka写Hive作业为例,步骤如下:
- 提前在Hive客户端中创建Hive表。例如:
create table user_behavior_hive_tbl_no_partition( user_id STRING, item_id STRING, cat_id STRING, ts timestamp ) PARTITIONED BY (dy STRING, ho STRING, mi STRING) stored as textfile TBLPROPERTIES ( 'partition.time-extractor.timestamp-pattern' = '$dy $ho:$mi:00', 'sink.partition-commit.trigger' ='process-time', 'sink.partition-commit.delay' = '0S', 'sink.partition-commit.policy.kind' = 'metastore,success-file' );
- 创建Flink Hive Sql作业,在DGC提交运行。Sql示例如下:
CREATE TABLE test_kafka ( user_id varchar, item_id varchar, cat_id varchar, zw_test timestamp ) WITH ( 'connector' = 'kafka', 'topic' = 'zw_test_kafka', 'format' = 'json', 'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号', 'properties.group.id' = 'example-group1', 'scan.startup.mode' = 'latest-offset' ); CREATE CATALOG myhive WITH ( 'type' = 'hive', 'hive-version' = '3.1.0', 'default-database' = 'default' ); use catalog myhive; INSERT into user_behavior_hive_tbl_no_partition SELECT user_id, item_id, cat_id, zw_test, DATE_FORMAT(zw_test, 'yyyy-MM-dd'), DATE_FORMAT(zw_test, 'HH'), DATE_FORMAT(zw_test, 'mm') FROM default_catalog.default_database.test_kafka;
MRS 3.2.0及之前版本创建Catalog时需要在WITH参数中指定hive配置文件路径,例如:'hive-conf-dir'='obs://test-bucket/tmp/hive/config'。
获取hive配置文件:
- 登录FusionInsight Manager界面,在“主页”右上方选择“下载客户端 > 仅配置文件”,选择平台类型和下载位置后单击“确定”。
- 解压客户端文件FusionInsight_Cluster_*_Client.tar,拷贝Hive/config下所有配置文件。
父主题: Flink常见问题