更新时间:2022-02-22 GMT+08:00

CloudTable HBase输出流

功能描述

DLI将作业的输出数据输出到CloudTable的HBase中。HBase是一个稳定可靠,性能卓越、可伸缩、面向列的分布式云存储系统,适用于海量数据存储以及分布式计算的场景,用户可以利用HBase搭建起TB至PB级数据规模的存储系统,对数据轻松进行过滤分析,毫秒级得到响应,快速发现数据价值。HBase支持消息数据、报表数据、推荐类数据、风控类数据、日志数据、订单数据等结构化、半结构化的KeyValue数据存储。 利用DLI,用户可方便地将海量数据高速、低时延写入HBase。

表格存储服务(CloudTable),是基于Apache HBase提供的分布式、可伸缩、全托管的KeyValue数据存储服务,为DLI提供了高性能的随机读写能力,适用于海量结构化数据、半结构化数据以及时序数据的存储和查询应用,适用于物联网IOT应用和通用海量KeyValue数据存储与查询等场景。CloudTable的更多信息,请参见《表格存储服务用户指南》。

前提条件

该场景作业需要运行在DLI的独享队列上,因此要与CloudTable HBase建立增强型跨源连接,且用户可以根据实际所需设置相应安全组规则。

如何建立增强型跨源连接,请参考《数据湖探索用户指南》“增强型跨源连接”章节。

如何设置安全组规则,请参见《虚拟私有云用户指南》中“安全组”章节。

语法格式

1
2
3
4
5
6
7
8
9
CREATE SINK STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* )
  WITH (
    type = "cloudtable",
    region = "",
    cluster_id = "",
    table_name = "",
    table_columns = "",
    create_if_not_exist = ""
  )

关键字

表1 关键字说明

参数

是否必选

说明

type

输出通道类型,“cloudtable”表示输出到CloudTable(HBase)。

region

表格存储服务所在区域。

cluster_id

待读取数据表所属集群id。

table_name

待插入数据的表名,支持参数化,例如当需要某一列或者几列作为表名的一部分时,可表示为”car_pass_inspect_with_age_${car_age}“,其中car_age为列名。

table_columns

待插入的列,具体形式如:"rowKey,f1:c1,f1:c2,f2:c1",其中必须指定rowKey,当某列不需要加入数据库时,以第三列为例,可表示为"rowKey,f1:c1,,f2:c1"。

illegal_data_table

如果指定该参数,异常数据(比如:rowKey不存在)会写入该表(rowKey为时间戳加六位随机数字,schema为info:data, info:reason),否则会丢弃。

create_if_not_exist

当待写入的表或者列族不存在时,是否创建,值为true或者false,默认值为false。

batch_insert_data_num

表示一次性批量写入的数据条数,值必须为正整数,上限为100,默认值为10。

注意事项

  • 当配置项支持参数化时,表示将记录中的一列或者多列作为该配置项的一部分。例如当配置项设置为car_${car_brand}时,如果一条记录的car_brand列值为BMW,则该配置项在该条记录下为car_BMW。
  • 通过这种方式将数据写入到CloudTable的Hbase,速度受限,推荐使用专属资源模式。

    如何创建专属资源模式,请参考《数据湖探索用户指南》中《创建队列》章节。

示例

将流qualified_cars的数据输出到表格存储服务CloudTable的HBase中。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
CREATE SINK STREAM qualified_cars (
  car_id STRING,
  car_owner STRING,
  car_age INT,
  average_speed INT,
  total_miles INT
)
  WITH (
    type = "cloudtable",
    region =  "dc1-az1",
    cluster_id = "209ab1b6-de25-4c48-8e1e-29e09d02de28",
    table_name = "car_pass_inspect_with_age_${car_age}",
    table_columns = "rowKey,info:owner,,car:speed,car:miles",
    illegal_data_table = "illegal_data",
    create_if_not_exist = "true",
    batch_insert_data_num = "20"
);