DWS输出流(通过OBS转储方式)
功能描述
创建sink流将Flink作业数据通过OBS转储方式输出到数据仓库服务(DWS),即Flink作业数据先输出到OBS,然后再从OBS导入到DWS。如何导入OBS数据到DWS具体可参考中“从OBS并行导入数据到集群”章节。
数据仓库服务(Data Warehouse Service,简称DWS)是一种基于基础架构和平台的在线数据处理数据库,为用户提供海量数据挖掘和分析服务。DWS的更多信息,请参见。
注意事项
- 通过OBS转储支持两种中间文件方式:
- ORC: ORC格式不支持Array数据类型,如果使用ORC格式,需先在DWS中创建外部服务器,具体可参考中“创建外部服务器”章节。
- CSV: CSV格式默认记录分隔符为换行符,若属性内容中有换行符,建议配置quote,具体参见表1。
- 如果要写入的表不存在,则会自动创建表。由于DLI SQL类型不支持text,如果存在长文本,建议先在数据库中创建表。
- encode使用orc格式时,创建DWS表时,如果SQL流字段属性定义为String类型,DWS表字段属性不能使用varchar类型,需使用特定的text类型;如果是SQL流字段属性定义为Integer类型,DWS表字段需要使用Integer类型。
前提条件
- 确保已创建OBS桶和文件夹。
- 该场景作业需要运行在DLI的独享队列上,因此要与DWS集群建立增强型跨源连接,且用户可以根据实际所需设置相应安全组规则。
语法格式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
CREATE SINK STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* ) WITH ( type = "dws", region = "", ak = "", sk = "", encode = "", field_delimiter = "", quote = "", db_obs_server = "", obs_dir = "", username = "", password = "", db_url = "", table_name = "", max_record_num_per_file = "", dump_interval = "" ); |
关键字
参数 |
是否必选 |
说明 |
---|---|---|
type |
是 |
输出通道类型,dws表示输出到数据仓库服务中。 |
region |
是 |
数据仓库服务所在区域。 |
ak |
是 |
访问密钥ID(Access Key ID)。 |
sk |
是 |
Secret Access Key,与访问密钥ID结合使用的密钥。 |
encode |
是 |
编码方式。当前支持csv和orc两种方式。 |
field_delimiter |
否 |
属性分隔符。当编码方式为csv时需要配置,建议尽量用不可见字符作为分隔符,如\u0006\u0002。 |
quote |
否 |
单字节,建议使用不可见字符,如\u0007。 |
db_obs_server |
否 |
已在数据库中创建的外部服务器,如obs_server。 如果编码方式为orc格式时需指定该参数。 |
obs_dir |
是 |
中间文件存储目录。格式为{桶名}/{目录名}, 如obs-a1/dir1/subdir。 |
username |
是 |
数据库连接用户名。 |
password |
是 |
数据库连接密码。 |
db_url |
是 |
数据库连接地址。格式为/ip:port/database,如 “192.168.1.21:8000/test1”。 |
table_name |
是 |
数据表名,若表不存在,则自动创建。 |
max_record_num_per_file |
是 |
每个文件最多存储多少条记录。当文件记录数少于最大值时,该文件会延迟一个转储周期输出。 |
dump_interval |
是 |
转储周期,单位为秒。 |
delete_obs_temp_file |
否 |
是否要删除obs上的临时文件,默认为“true”,若设置为“false”,则不会删除obs上的文件,需用户自己清理。 |
max_dump_file_num |
否 |
执行一次转储操作时最多转储多少文件。 当本次转储操作发现文件数小于最大值,则会延迟一个转储周期输出。 |
示例
- CSV格式转储。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
CREATE SINK STREAM car_infos ( car_id STRING, car_owner STRING, car_brand STRING, car_price INT, car_timestamp LONG ) WITH ( type = "dws", region = "xxx", ak = "", sk = "", encode = "csv", field_delimiter = "\u0006\u0006\u0002", quote = "\u0007", obs_dir = "dli-append-2/dws", username = "", password = "", db_url = "192.168.1.12:8000/test1", table_name = "table1", max_record_num_per_file = "100", dump_interval = "10" );
- ORC格式转储。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
CREATE SINK STREAM car_infos ( car_id STRING, car_owner STRING, car_brand STRING, car_price INT, car_timestamp LONG ) WITH ( type = "dws", region = "xxx", ak = "", sk = "", encode = "orc", db_obs_server = "obs_server", obs_dir = "dli-append-2/dws", username = "", password = "", db_url = "192.168.1.12:8000/test1", table_name = "table1", max_record_num_per_file = "100", dump_interval = "10" );