更新时间:2024-04-19 GMT+08:00

Hbase结果表

功能描述

DLI将作业的输出数据输出到HBase中。HBase是一个稳定可靠,性能卓越、可伸缩、面向列的分布式云存储系统,适用于海量数据存储以及分布式计算的场景,用户可以利用HBase搭建起TB至PB级数据规模的存储系统,对数据轻松进行过滤分析,毫秒级得到响应,快速发现数据价值。HBase支持消息数据、报表数据、推荐类数据、风控类数据、日志数据、订单数据等结构化、半结构化的KeyValue数据存储。 利用DLI,用户可方便地将海量数据高速、低时延写入HBase。

前提条件

  • 该场景作业需要运行在DLI的独享队列上,因此要与HBase建立增强型跨源连接,且用户可以根据实际所需设置相应安全组规则。
  • 如果使用MRS HBase,请在增强型跨源的主机信息中添加MRS集群所有节点的主机ip信息。

    详细操作请参考《数据湖探索用户指南》中的“修改主机信息”章节描述。

注意事项

  • 创建Flink OpenSource SQL作业时,在作业编辑界面的“运行参数”处,“Flink版本”需要选择“1.15”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。
  • 认证用的username和password等硬编码到代码中或者明文存储都有很大的安全风险,建议使用DEW管理凭证。配置文件或者环境变量中密文存放,使用时解密,确保安全。Flink Opensource SQL使用DEW管理访问凭据
  • 创建的HBase结果表的列簇必须定义为ROW类型,字段名对应列簇名(column family),嵌套的字段名对应列限定符名(column qualifier)。用户只需在表结构中声明查询中使用的的列簇和列限定符。除了ROW类型的列,剩下的原子数据类型字段(比如,STRING, BIGINT)将被识别为 HBase的rowkey,一张表中只能声明一个rowkey。rowkey字段的名字可以是任意的,如果是保留关键字,需要用反引号。

语法格式

create table hbaseSink (
  attr_name attr_type 
  (',' attr_name attr_type)* 
  ','PRIMARY KEY (attr_name, ...) NOT ENFORCED)
) with (
  'connector' = 'hbase-2.2',
  'table-name' = '',
  'zookeeper.quorum' = ''
);

参数说明

表1 参数说明

参数

是否必选

默认值

类型

说明

connector

String

指定使用的连接器,固定为:hbase-2.2。

table-name

String

连接的HBase表名。

zookeeper.quorum

String

HBase Zookeeper实例信息,格式为:ZookeeperAddress:ZookeeperPort

以MRS Hbase集群为例,该参数的所使用Zookeeper的ip地址和端口号获取方式如下:

  • 在MRS Manager上,选择“集群 > 待操作的集群名称 > 服务 > ZooKeeper > 实例”,获取ZooKeeper角色实例的IP地址。
  • 在MRS Manager上,选择“集群 > 待操作的集群名称 > 服务 > ZooKeeper > 配置 > 全部配置”,搜索参数“clientPort”,获取“clientPort”的参数值即为ZooKeeper的端口。

zookeeper.znode.parent

/hbase

String

Zookeeper中的根目录,默认是/hbase。

null-string-literal

null

String

当字符串值为null时的存储形式,默认存成 "null" 字符串。

HBase sink的编解码将所有数据类型(除字符串外)为null值时以空字节来存储。

sink.buffer-flush.max-size

2mb

MemorySize

每次写入请求缓存行的最大值。

它能提升写入HBase数据库的性能,但是也可能增加延迟。

设置为 "0" 关闭此选项。

sink.buffer-flush.max-rows

1000

Integer

每次写入请求缓存的最大行数。

它能提升写入HBase数据库的性能,但是也可能增加延迟。

设置为 "0" 关闭此选项。

sink.buffer-flush.interval

1s

Duration

刷新缓存的间隔,在这段时间内以异步线程刷新数据。

它能提升写入HBase数据库的性能,但是也可能增加延迟。

设置为 "0" 关闭此选项。

注意:"sink.buffer-flush.max-size" 和 "sink.buffer-flush.max-rows" 同时设置为 "0",并设置刷新缓存的间隔,则以完整的异步处理方式刷新缓存。

格式为:{length value}{time unit label},如123ms, 321s,支持的时间单位包括: d,h,min,s,ms等,默认为ms。

sink.parallelism

Integer

为 HBase sink operator 定义并行度。

默认情况下,并行度由框架决定,和链在一起的上游operator一样。

properties.connector.auth.open

Boolean

true 代表hbase集群开启了kerberos认证。如果开启了kerberos认证,则必须设置。

properties.connector.kerberos.principal

String

安全集群的登录用户名。如果开启了kerberos认证,则必须设置。

properties.connector.kerberos.keytab

String

上传“user.keytab”文件的OBS路径。如果开启了kerberos认证,则必须设置。

properties.connector.kerberos.krb5

String

上传“krb5.conf”文件的OBS路径。如果开启了kerberos认证,则必须设置。

注:“krb5.conf”中需移除[libdefaults]下的“renew_lifetime”配置项,否则可能会遇到“Message stream modified (41)”问题。

数据类型映射

HBase以字节数组存储所有数据。在读和写过程中要序列化和反序列化数据。

Flink 的 HBase 连接器利用 HBase(Hadoop) 的工具类org.apache.hadoop.hbase.util.Bytes进行字节数组和Flink 数据类型转换。

Flink 的 HBase 连接器将所有数据类型(除字符串外)null值编码成空字节。对于字符串类型,null值的字面值由null-string-literal选项值决定。

表2 数据类型映射表

Flink 数据类型

HBase 转换

CHAR / VARCHAR / STRING

byte[] toBytes(String s)

String toString(byte[] b)

BOOLEAN

byte[] toBytes(boolean b)

boolean toBoolean(byte[] b)

BINARY / VARBINARY

返回 byte[]。

DECIMAL

byte[] toBytes(BigDecimal v)

BigDecimal toBigDecimal(byte[] b)

TINYINT

new byte[] { val }

bytes[0] // returns first and only byte from bytes

SMALLINT

byte[] toBytes(short val)

short toShort(byte[] bytes)

INT

byte[] toBytes(int val)

int toInt(byte[] bytes)

BIGINT

byte[] toBytes(long val)

long toLong(byte[] bytes)

FLOAT

byte[] toBytes(float val)

float toFloat(byte[] bytes)

DOUBLE

byte[] toBytes(double val)

double toDouble(byte[] bytes)

DATE

从 1970-01-01 00:00:00 UTC 开始的天数,int 值。

TIME

从 1970-01-01 00:00:00 UTC 开始天的毫秒数,int 值。

TIMESTAMP

从 1970-01-01 00:00:00 UTC 开始的毫秒数,long 值。

ARRAY

不支持

MAP / MULTISET

不支持

ROW

不支持

示例

该示例是从Kafka数据源中读取数据,并写入到HBase结果表中,其具体步骤如下(该示例中hbase的版本为2.2.3):

  1. 参考增强型跨源连接,在DLI上根据HBase和Kafka所在的虚拟私有云和子网分别创建相应的增强型跨源连接,并绑定所要使用的Flink弹性资源池。“修改主机信息”章节描述,在增强型跨源中增加MRS的主机信息。
  2. 设置HBase和Kafka的安全组,添加入向规则使其对Flink的队列网段放通。参考测试地址连通性分别根据HBase和Kafka的地址测试队列连通性。如果能连通,则表示跨源已经绑定成功,否则表示未成功。
  3. 参考MRS HBase使用,通过HBase shell在HBase中创建相应的表,表名为order,表中只有一个列族detail,创建语句如下:
    create 'order', {NAME => 'detail'}
  4. 创建flink opensource sql作业,输入以下作业脚本,并提交运行。该作业脚本将Kafka作为数据源,HBase作为结果表(Rowkey为order_id,列簇名为detail)
    注意:创建作业时,在作业编辑界面的“运行参数”处,“Flink版本”选择“1.15”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。如下脚本中的加粗参数请根据实际环境修改
    CREATE TABLE orders (
      order_id string,
      order_channel string,
      order_time string,
      pay_amount double,
      real_pay double,
      pay_time string,
      user_id string,
      user_name string,
      area_id string
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'KafkaTopic',
      'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort',
      'properties.group.id' = 'GroupId',
      'scan.startup.mode' = 'latest-offset',
      'format' = 'json'
    );
    
    create table hbaseSink(
      order_id string,
      detail Row(
        order_channel string,
        order_time string,
        pay_amount double,
        real_pay double,
        pay_time string,
        user_id string,
        user_name string,
        area_id string)
    ) with (
      'connector' = 'hbase-2.2',
      'table-name' = 'order',
      'zookeeper.quorum' = 'ZookeeperAddress:ZookeeperPort',
      'sink.buffer-flush.max-rows' = '1'
    );
    
    insert into hbaseSink select order_id, Row(order_channel,order_time,pay_amount,real_pay,pay_time,user_id,user_name,area_id) from orders;
  5. 连接Kafka集群,向Kafka中输入数据:
    {"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2021-03-24 10:00:00", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
    
    {"order_id":"202103241606060001", "order_channel":"appShop", "order_time":"2021-03-24 16:06:06", "pay_amount":"200.00", "real_pay":"180.00", "pay_time":"2021-03-24 16:10:06", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
    
    {"order_id":"202103251202020001", "order_channel":"miniAppShop", "order_time":"2021-03-25 12:02:02", "pay_amount":"60.00", "real_pay":"60.00", "pay_time":"2021-03-25 12:03:00", "user_id":"0002", "user_name":"Bob", "area_id":"330110"}
  6. 通过HBase shell使用下述语句查看数据结果:
     scan 'order'
    数据结果参考如下:
    202103241000000001   column=detail:area_id, timestamp=2021-12-16T21:30:37.954, value=330106
    
    202103241000000001   column=detail:order_channel, timestamp=2021-12-16T21:30:37.954, value=webShop
    
    202103241000000001   column=detail:order_time, timestamp=2021-12-16T21:30:37.954, value=2021-03-24 10:00:00
    
    202103241000000001   column=detail:pay_amount, timestamp=2021-12-16T21:30:37.954, value=@Y\x00\x00\x00\x00\x00\x00
    
    202103241000000001   column=detail:pay_time, timestamp=2021-12-16T21:30:37.954, value=2021-03-24 10:02:03
    
    202103241000000001   column=detail:real_pay, timestamp=2021-12-16T21:30:37.954, value=@Y\x00\x00\x00\x00\x00\x00
    
    202103241000000001   column=detail:user_id, timestamp=2021-12-16T21:30:37.954, value=0001
    
    202103241000000001   column=detail:user_name, timestamp=2021-12-16T21:30:37.954, value=Alice
    
    202103241606060001   column=detail:area_id, timestamp=2021-12-16T21:30:44.842, value=330106
    
    202103241606060001   column=detail:order_channel, timestamp=2021-12-16T21:30:44.842, value=appShop
    
    202103241606060001   column=detail:order_time, timestamp=2021-12-16T21:30:44.842, value=2021-03-24 16:06:06
    
    202103241606060001   column=detail:pay_amount, timestamp=2021-12-16T21:30:44.842, value=@i\x00\x00\x00\x00\x00\x00
    
    202103241606060001   column=detail:pay_time, timestamp=2021-12-16T21:30:44.842, value=2021-03-24 16:10:06
    
    202103241606060001   column=detail:real_pay, timestamp=2021-12-16T21:30:44.842, value=@f\x80\x00\x00\x00\x00\x00
    
    202103241606060001   column=detail:user_id, timestamp=2021-12-16T21:30:44.842, value=0001
    
    202103241606060001   column=detail:user_name, timestamp=2021-12-16T21:30:44.842, value=Alice
    
    202103251202020001   column=detail:area_id, timestamp=2021-12-16T21:30:52.181, value=330110
    
    202103251202020001   column=detail:order_channel, timestamp=2021-12-16T21:30:52.181, value=miniAppShop
    
    202103251202020001   column=detail:order_time, timestamp=2021-12-16T21:30:52.181, value=2021-03-25 12:02:02
    
    202103251202020001   column=detail:pay_amount, timestamp=2021-12-16T21:30:52.181, value=@N\x00\x00\x00\x00\x00\x00
    
    202103251202020001   column=detail:pay_time, timestamp=2021-12-16T21:30:52.181, value=2021-03-25 12:03:00
    
    202103251202020001   column=detail:real_pay, timestamp=2021-12-16T21:30:52.181, value=@N\x00\x00\x00\x00\x00\x00
    
    202103251202020001   column=detail:user_id, timestamp=2021-12-16T21:30:52.181, value=0002
    
    202103251202020001   column=detail:user_name, timestamp=2021-12-16T21:30:52.181, value=Bob

常见问题

Q:Flink作业运行失败,作业运行日志中如下报错信息,应该怎么解决?

org.apache.zookeeper.ClientCnxn$SessionTimeoutException: Client session timed out, have not heard from server in 90069ms for connection id 0x0

A:可能是跨源连接未绑定或跨源绑定失败。参考增强型跨源连接重新配置跨源,Kafka集群安全组放通DLI队列的网段地址。