更新时间:2024-07-27 GMT+08:00

MRS HBase输出流

功能描述

DLI将Flink作业的输出数据输出到MRS的HBase中。

前提条件

  • 确保您的账户下已在MapReduce服务(MRS)里创建了您配置的集群。DLI支持与开启kerberos的hbase集群对接。
  • 该场景作业需要运行在DLI的独享队列上,请确保已创建DLI独享队列。

    关于如何创建DLI独享队列,在创建队列时,选择“按需计费”,勾选“专属资源模式”即可。具体操作请参见《数据湖探索用户指南》创建队列章节。

  • 确保DLI独享队列与MRS集群建立增强型跨源连接,且用户可以根据实际所需设置相应安全组规则。

    如何建立增强型跨源连接,请参见《数据湖探索用户指南》增强型跨源连接章节。

    如何设置安全组规则,请参见《虚拟私有云用户指南》“安全组”章节。

  • 若使用MRS HBase,请在增强型跨源的主机信息中添加MRS集群所有节点的主机ip信息。

    如何添加IP域名映射,请参见《数据湖探索用户指南》修改主机信息章节。

语法格式

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
CREATE SINK STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* )
  WITH (
    type = "mrs_hbase",
    region = "",
    cluster_address = "",
    table_name = "",
    table_columns = "",
    illegal_data_table = "",
    batch_insert_data_num = "",
    action = ""
)

关键字

表1 关键字说明

参数

是否必选

说明

type

输出通道类型,"mrs_hbase"表示输出到MRS的HBase中。

region

MRS服务所在区域。

cluster_address

待插入数据表所属集群zookeeper地址,形如:ip1,ip2:port。

table_name

待插入数据的表名。

支持参数化,例如当需要某一列或者几列作为表名的一部分时,可表示为”car_pass_inspect_with_age_${car_age}“,其中car_age为列名。

table_columns

待插入的列,具体形式如:"rowKey,f1:c1,f1:c2,f2:c1",其中必须指定rowKey,当某列不需要加入数据库时,以第三列为例,可表示为"rowKey,f1:c1,,f2:c1"。

illegal_data_table

如果指定该参数,异常数据(比如:rowKey不存在)会写入该表(rowKey为taskNo加下划线加时间戳加六位随机数字,schema为info:data, info:reason),否则会丢弃。

batch_insert_data_num

表示一次性批量写入的数据条数,值必须为正整数,上限为1000,默认值为10。

action

表示数据是插入还是删除,可选值为add和delete,默认值为add。

krb_auth

创建跨源认证的认证名。开启kerberos认证时,需配置该参数,填写对应的跨源认证名称。跨源认证创建详见创建跨源认证

说明:

请确保在DLI队列host文件中添加MRS集群master节点的“/etc/hosts”信息。

注意事项

无。

示例

将数据输出到MRS的HBase中。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
CREATE SINK STREAM qualified_cars (
  car_id STRING,
  car_owner STRING,
  car_age INT,
  average_speed INT,
  total_miles INT
)
  WITH (
    type = "mrs_hbase",
    region = "xxx",
    cluster_address = "192.16.0.88,192.87.3.88:2181",
    table_name = "car_pass_inspect_with_age_${car_age}",
    table_columns = "rowKey,info:owner,,car:speed,car:miles",
    illegal_data_table = "illegal_data",
    batch_insert_data_num = "20",
    action = "add",
    krb_auth = "KRB_AUTH_NAME"
  );