更新时间:2025-02-12 GMT+08:00

DGC方式如何创建Flink Hive Sql作业

问题现象

使用DGC方式如何创建Flink Hive Sql作业。

解决方法

若通过DGC方式创建提交Flink Hive作业,以读Kafka写Hive作业为例,步骤如下:

  1. 提前在Hive客户端中创建Hive表。例如:
    create table user_behavior_hive_tbl_no_partition(
        user_id STRING,
        item_id STRING,
        cat_id STRING,
        ts timestamp
      ) PARTITIONED BY (dy STRING, ho STRING, mi STRING) 
    stored as textfile TBLPROPERTIES (
     'partition.time-extractor.timestamp-pattern' = '$dy $ho:$mi:00',
     'sink.partition-commit.trigger' ='process-time',
     'sink.partition-commit.delay' = '0S',
     'sink.partition-commit.policy.kind' = 'metastore,success-file'
      );
  2. 创建Flink Hive Sql作业,在DGC提交运行。Sql示例如下:
    CREATE TABLE test_kafka (
      user_id varchar,
      item_id varchar,
      cat_id varchar,
      zw_test timestamp
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'zw_test_kafka',
      'format' = 'json',
      'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号',
      'properties.group.id' = 'example-group1',
      'scan.startup.mode' = 'latest-offset'
    );
    CREATE CATALOG myhive WITH (
      'type' = 'hive',
      'hive-version' = '3.1.0',
      'default-database' = 'default'
    );
    use catalog myhive;
    INSERT into
      user_behavior_hive_tbl_no_partition
    SELECT
      user_id,
      item_id,
      cat_id,
      zw_test,
      DATE_FORMAT(zw_test, 'yyyy-MM-dd'),
      DATE_FORMAT(zw_test, 'HH'),
      DATE_FORMAT(zw_test, 'mm')
    FROM
      default_catalog.default_database.test_kafka;