更新时间:2024-09-27 GMT+08:00
Hive结果表
功能描述
本节介绍利用Flink写Hive的表。Hive结果表的定义,以及创建结果表时使用的参数和示例代码。详情可参考:Apache Flink Hive Read & Write
Flink 支持在 BATCH 和 STREAMING 模式下从Hive写入数据。
- 当作为BATCH应用程序运行时,Flink将写 Hive表,仅在作业完成时使这些记录可见。BATCH 写入支持追加和覆盖现有表。
- STREAMING 不断写入,向Hive添加新数据,以增量方式提交记录使其可见。用户控制何时/如何触发具有多个属性的提交。流式写入不支持插入覆盖。有关可用配置的完整列表,请参阅流式处理接收器。Streaming sink
前提条件
该场景作业需要建立增强型跨源连接,且用户可以根据实际所需设置相应安全组规则。
- 如何建立增强型跨源连接,请参考《数据湖探索用户指南》中增强型跨源连接章节。
- 如何设置安全组规则,请参见《虚拟私有云用户指南》中“安全组”章节。
注意事项
- 创建Flink OpenSource SQL作业时,在作业编辑界面的“运行参数”处,“Flink版本”需要选择“1.15”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。
- 数据类型的使用,请参考Format章节。
- Hive 方言支持的 DDL 语句,Flink 1.15 当前仅支持使用Hive语法创建OBS表和使用hive语法的DLI Lakehouse表。
- 使用Hive语法创建OBS表
- defalut方言: with 属性中需要设置hive.is-external为true。
- 使用hive 方言:建表语句需要使用EXTERNAL关键字。
- 使用hive语法的DLI Lakehouse表
- 使用hive 方言:表属性中需要添加'is_lakehouse'='true'。
- 使用Hive语法创建OBS表
- 创建Flink OpenSource SQL作业时,在作业编辑界面配置开启checkpoint功能。
语法格式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
CREATE EXTERNAL TABLE [IF NOT EXISTS] table_name [(col_name data_type [column_constraint] [COMMENT col_comment], ... [table_constraint])] [COMMENT table_comment] [PARTITIONED BY (col_name data_type [COMMENT col_comment], ...)] [ [ROW FORMAT row_format] [STORED AS file_format] ] [LOCATION obs_path] [TBLPROPERTIES (property_name=property_value, ...)] row_format: : DELIMITED [FIELDS TERMINATED BY char [ESCAPED BY char]] [COLLECTION ITEMS TERMINATED BY char] [MAP KEYS TERMINATED BY char] [LINES TERMINATED BY char] [NULL DEFINED AS char] | SERDE serde_name [WITH SERDEPROPERTIES (property_name=property_value, ...)] file_format: : SEQUENCEFILE | TEXTFILE | RCFILE | ORC | PARQUET | AVRO | INPUTFORMAT input_format_classname OUTPUTFORMAT output_format_classname column_constraint: : NOT NULL [[ENABLE|DISABLE] [VALIDATE|NOVALIDATE] [RELY|NORELY]] table_constraint: : [CONSTRAINT constraint_name] PRIMARY KEY (col_name, ...) [[ENABLE|DISABLE] [VALIDATE|NOVALIDATE] [RELY|NORELY]] |
示例
以下示例演示如何使用 Datagen 写入具有分区提交功能的Hive表。
CREATE CATALOG myhive WITH ( 'type' = 'hive' , 'default-database' = 'demo', 'hive-conf-dir' = '/opt/flink/conf' ); USE CATALOG myhive; SET table.sql-dialect=hive; -- drop table demo.student_hive_sink; CREATE EXTERNAL TABLE IF NOT EXISTS demo.student_hive_sink( name STRING, score DOUBLE) PARTITIONED BY (classNo INT) STORED AS PARQUET LOCATION 'obs://demo/spark.db/student_hive_sink' TBLPROPERTIES ( 'sink.partition-commit.policy.kind'='metastore,success-file' ); SET table.sql-dialect=default; create table if not exists student_datagen_source( name STRING, score DOUBLE, classNo INT ) with ( 'connector' = 'datagen', 'rows-per-second' = '1', --每秒生成一条数据 'fields.name.kind' = 'random', --为字段user_id指定random生成器 'fields.name.length' = '7', --限制user_id长度为7 'fields.classNo.kind' ='random', 'fields.classNo.min' = '1', 'fields.classNo.max' = '10' ); insert into student_hive_sink select * from student_datagen_source;
使用spark sql进行查询结果表:
select * from demo.student_hive_sink where classNo > 0 limit 10
图1 查询结果表
父主题: Hive