更新时间:2024-09-27 GMT+08:00

Hive结果表

功能描述

本节介绍利用Flink写Hive的表。Hive结果表的定义,以及创建结果表时使用的参数和示例代码。详情可参考:Apache Flink Hive Read & Write

Flink 支持在 BATCH 和 STREAMING 模式下从Hive写入数据。

  • 当作为BATCH应用程序运行时,Flink将写 Hive表,仅在作业完成时使这些记录可见。BATCH 写入支持追加和覆盖现有表。
  • STREAMING 不断写入,向Hive添加新数据,以增量方式提交记录使其可见。用户控制何时/如何触发具有多个属性的提交。流式写入不支持插入覆盖。有关可用配置的完整列表,请参阅流式处理接收器。Streaming sink

前提条件

该场景作业需要建立增强型跨源连接,且用户可以根据实际所需设置相应安全组规则。

注意事项

  • 创建Flink OpenSource SQL作业时,在作业编辑界面的“运行参数”处,“Flink版本”需要选择“1.15”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。
  • 数据类型的使用,请参考Format章节。
  • Hive 方言支持的 DDL 语句,Flink 1.15 当前仅支持使用Hive语法创建OBS表和使用hive语法的DLI Lakehouse表。
    • 使用Hive语法创建OBS表
      • defalut方言: with 属性中需要设置hive.is-external为true。
      • 使用hive 方言:建表语句需要使用EXTERNAL关键字。
    • 使用hive语法的DLI Lakehouse表
      • 使用hive 方言:表属性中需要添加'is_lakehouse'='true'。
  • 创建Flink OpenSource SQL作业时,在作业编辑界面配置开启checkpoint功能。

语法格式

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
CREATE EXTERNAL TABLE [IF NOT EXISTS] table_name
  [(col_name data_type [column_constraint] [COMMENT col_comment], ... [table_constraint])]
  [COMMENT table_comment]
  [PARTITIONED BY (col_name data_type [COMMENT col_comment], ...)]
  [
    [ROW FORMAT row_format]
    [STORED AS file_format]
  ]
  [LOCATION obs_path]
  [TBLPROPERTIES (property_name=property_value, ...)]

row_format:
  : DELIMITED [FIELDS TERMINATED BY char [ESCAPED BY char]] [COLLECTION ITEMS TERMINATED BY char]
      [MAP KEYS TERMINATED BY char] [LINES TERMINATED BY char]
      [NULL DEFINED AS char]
  | SERDE serde_name [WITH SERDEPROPERTIES (property_name=property_value, ...)]

file_format:
  : SEQUENCEFILE
  | TEXTFILE
  | RCFILE
  | ORC
  | PARQUET
  | AVRO
  | INPUTFORMAT input_format_classname OUTPUTFORMAT output_format_classname

column_constraint:
  : NOT NULL [[ENABLE|DISABLE] [VALIDATE|NOVALIDATE] [RELY|NORELY]]

table_constraint:
  : [CONSTRAINT constraint_name] PRIMARY KEY (col_name, ...) [[ENABLE|DISABLE] [VALIDATE|NOVALIDATE] [RELY|NORELY]]

参数说明

请参考使用Hive语法创建OBS表,和Hive 文档了解每个DDL语句的语义。

有关可用配置的完整列表,请参阅流式处理接收器。Streaming sink

示例

以下示例演示如何使用 Datagen 写入具有分区提交功能的Hive表。
CREATE CATALOG myhive WITH (
    'type' = 'hive' ,
    'default-database' = 'demo',
    'hive-conf-dir' = '/opt/flink/conf'
);

USE CATALOG myhive;

SET table.sql-dialect=hive;

-- drop table demo.student_hive_sink;
CREATE EXTERNAL TABLE IF NOT EXISTS demo.student_hive_sink(
  name STRING, 
 score DOUBLE) 
PARTITIONED BY (classNo INT) 
STORED AS PARQUET 
LOCATION 'obs://demo/spark.db/student_hive_sink'
TBLPROPERTIES (
   'sink.partition-commit.policy.kind'='metastore,success-file'
 );

SET table.sql-dialect=default;
create table if not exists student_datagen_source(
  name STRING, 
  score DOUBLE, 
  classNo INT
) with (
  'connector' = 'datagen',
  'rows-per-second' = '1', --每秒生成一条数据
  'fields.name.kind' = 'random', --为字段user_id指定random生成器
  'fields.name.length' = '7', --限制user_id长度为7
  'fields.classNo.kind' ='random',
  'fields.classNo.min' = '1',
  'fields.classNo.max' = '10'
);

insert into student_hive_sink select * from student_datagen_source;

使用spark sql进行查询结果表:

select * from  demo.student_hive_sink where classNo > 0 limit 10
图1 查询结果表