更新时间:2023-11-14 GMT+08:00
分享

Hbase源表

功能描述

创建source流从HBase中获取数据,作为作业的输入数据。HBase是一个稳定可靠,性能卓越、可伸缩、面向列的分布式云存储系统,适用于海量数据存储以及分布式计算的场景,用户可以利用HBase搭建起TB至PB级数据规模的存储系统,对数据轻松进行过滤分析,毫秒级得到响应,快速发现数据价值。DLI可以从HBase中读取数据,用于过滤分析、数据转储等场景。

前提条件

  • 该场景作业需要运行在DLI的独享队列上,因此要与HBase建立增强型跨源连接,且用户可以根据实际所需设置相应安全组规则。
  • 若使用MRS HBase,请在增强型跨源的主机信息中添加MRS集群所有节点的主机IP信息。

    详细操作请参考《数据湖探索用户指南》中的“修改主机信息”章节描述。

  • Flink跨源开发场景中直接配置跨源认证信息存在密码泄露的风险,优先推荐您使用DLI提供的跨源认证。

    跨源认证简介及操作方法请参考跨源认证简介

注意事项

  • 创建Flink OpenSource SQL作业时,在作业编辑界面的“运行参数”处,“Flink版本”需要选择“1.12”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。
  • 创建HBase源表的列簇必须定义为ROW类型,字段名对应列簇名(column family),嵌套的字段名对应列限定符名(column qualifier)。

    用户只需在表结构中声明查询中使用的的列簇和列限定符。除了ROW类型的列,剩下的原子数据类型字段(比如,STRING, BIGINT)将被识别为HBase的rowkey,一张表中只能声明一个rowkey。rowkey字段的名字可以是任意的,如果是保留关键字,需要用反引号进行转义。

语法格式

create table hbaseSource (
  attr_name attr_type 
  (',' attr_name attr_type)* 
  (',' watermark for rowtime_column_name as watermark-strategy_expression)
  ','PRIMARY KEY (attr_name, ...) NOT ENFORCED)
)
with (
  'connector' = 'hbase-2.2',
  'table-name' = '',
  'zookeeper.quorum' = ''
);

参数说明

表1 参数说明

参数

是否必选

默认值

数据

说明

connector

String

指定使用的连接器,需配置为:hbase-2.2。

table-name

String

连接的HBase表名。

zookeeper.quorum

String

格式为:ZookeeperAddress:ZookeeperPort

以MRS Hbase集群为例,该参数的所使用Zookeeper的ip地址和端口号获取方式如下:

  • 在MRS Manager上,选择“集群 > 待操作的集群名称 > 服务 > ZooKeeper > 实例”,获取ZooKeeper角色实例的IP地址。
  • 在MRS Manager上,选择“集群 > 待操作的集群名称 > 服务 > ZooKeeper > 配置 > 全部配置”,搜索参数“clientPort”,获取“clientPort”的参数值即为ZooKeeper的端口。

zookeeper.znode.parent

/hbase

String

Zookeeper中的根目录,默认是/hbase。

null-string-literal

String

当字符串值为null时的存储形式,默认存成 "null" 字符串。

HBase的source的编解码将所有数据类型(除字符串外)将null值以空字节来存储。

krb_auth_name

String

DLI侧创建的Kerberos类型的跨源认证名称。

数据类型映射

HBase以字节数组存储所有数据,在读和写过程中要序列化和反序列化数据。

Flink的HBase连接器利用HBase(Hadoop) 的工具类org.apache.hadoop.hbase.util.Bytes进行字节数组和Flink数据类型转换。

Flink的HBase连接器将所有数据类型(除字符串外)null值编码成空字节。对于字符串类型,null值的字面值由null-string-literal选项值决定。

表2 数据类型映射表

Flink数据类型

HBase转换

CHAR/VARCHAR/STRING

byte[] toBytes(String s)

String toString(byte[] b)

BOOLEAN

byte[] toBytes(boolean b)

boolean toBoolean(byte[] b)

BINARY/VARBINARY

返回 byte[]。

DECIMAL

byte[] toBytes(BigDecimal v)

BigDecimal toBigDecimal(byte[] b)

TINYINT

new byte[] { val }

bytes[0] // returns first and only byte from bytes

SMALLINT

byte[] toBytes(short val)

short toShort(byte[] bytes)

INT

byte[] toBytes(int val)

int toInt(byte[] bytes)

BIGINT

byte[] toBytes(long val)

long toLong(byte[] bytes)

FLOAT

byte[] toBytes(float val)

float toFloat(byte[] bytes)

DOUBLE

byte[] toBytes(double val)

double toDouble(byte[] bytes)

DATE

从 1970-01-01 00:00:00 UTC 开始的天数,int 值。

TIME

从 1970-01-01 00:00:00 UTC 开始天的毫秒数,int 值。

TIMESTAMP

从 1970-01-01 00:00:00 UTC 开始的毫秒数,long 值。

ARRAY

不支持

MAP/MULTISET

不支持

ROW

不支持

示例

该示例是从HBase数据源中读取数据,并写入到Print结果表中,其具体步骤参考如下(该示例使用的HBase版本1.3.1和2.1.1和2.2.3):

  1. 参考增强型跨源连接,在DLI上根据HBase所在的虚拟私有云和子网创建相应的增强型跨源,并绑定所要使用的Flink作业队列。参考“修改主机信息”章节描述,在增强型跨源中增加MRS的主机信息。
  2. 设置HBase集群的安全组,添加入向规则使其对Flink作业队列网段放通。参考测试地址连通性根据HBase的地址测试队列连通性。若能连通,则表示跨源已经绑定成功,否则表示未成功。
  3. 参考MRS HBase的使用,通过HBase shell在HBase中创建相应的表,表名为order,表中只有一个列簇detail。创建语句参考如下:
    create 'order', {NAME => 'detail'}
  4. 在HBase shell中执行下述命令,以插入一条数据:
    put 'order', '202103241000000001', 'detail:order_channel','webShop'
    put 'order', '202103241000000001', 'detail:order_time','2021-03-24 10:00:00'
    put 'order', '202103241000000001', 'detail:pay_amount','100.00'
    put 'order', '202103241000000001', 'detail:real_pay','100.00'
    put 'order', '202103241000000001', 'detail:pay_time','2021-03-24 10:02:03'
    put 'order', '202103241000000001', 'detail:user_id','0001'
    put 'order', '202103241000000001', 'detail:user_name','Alice'
    put 'order', '202103241000000001', 'detail:area_id','330106'
  5. 参考创建Flink OpenSource作业,创建flink opensource sql作业,输入以下作业脚本,并提交运行。该作业脚本将HBase作为数据源,Print作为结果表。
    注意:创建作业时,在作业编辑界面的“运行参数”处,“Flink版本”选择“1.12”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。如下脚本中的加粗参数请根据实际环境修改
    create table hbaseSource (
      order_id string,--表示唯一的rowkey
      detail Row( --detail表示列簇
        order_channel string,
        order_time string,
        pay_amount string,
        real_pay string,
        pay_time string,
        user_id string,
        user_name string,
        area_id string),
      primary key (order_id) not enforced
    ) with (
      'connector' = 'hbase-2.2',
       'table-name' = 'order',
       'zookeeper.quorum' = 'ZookeeperAddress:ZookeeperPort'
    ) ;
    
    create table printSink (
      order_id string,
      order_channel string,
      order_time string,
      pay_amount string,
      real_pay string,
      pay_time string,
      user_id string,
      user_name string,
      area_id string
    ) with (
     'connector' = 'print'
    );
    
    insert into printSink select order_id, detail.order_channel,detail.order_time,detail.pay_amount,detail.real_pay,
    detail.pay_time,detail.user_id,detail.user_name,detail.area_id from hbaseSource;
  6. 按照如下方式查看taskmanager.out文件中的数据结果:
    1. 登录DLI管理控制台,选择“作业管理 > Flink作业”。
    2. 单击对应的Flink作业名称,选择“运行日志”,单击“OBS桶”,根据作业运行的日期,找到对应日志的文件夹。
    3. 进入对应日期的文件夹后,找到名字中包含“taskmanager”的文件夹进入,下载获取taskmanager.out文件查看结果日志。

    数据结果参考如下:

    +I(202103241000000001,webShop,2021-03-24 10:00:00,100.00,100.00,2021-03-24 10:02:03,0001,Alice,330106)

常见问题

  • Q:Flink作业运行失败,作业运行日志中如下报错信息,应该怎么解决?
    java.lang.IllegalArgumentException: offset (0) + length (8) exceed the capacity of the array: 6

    A:如果HBase表中的数据是以其他方式导入的话,那么其存储是以String格式存储的,所以使用其他的数据格式将会报该错误。需要将Flink创建HBase源表中非string类型的字段的字段类型重新改为String即可。

  • Q:Flink作业运行失败,作业运行日志中如下报错信息,应该怎么解决?
    org.apache.zookeeper.ClientCnxn$SessionTimeoutException: Client session timed out, have not heard from server in 90069ms for connection id 0x0

    A:跨源未绑定或未绑定成功,或是HBase集群安全组未配置放通DLI队列的网段地址。参考增强型跨源连接重新配置跨源,或者HBase集群安全组放通DLI队列的网段地址。

相关文档