Hbase维表
功能描述
创建Hbase维表用于与输入流连接生成宽表。
前提条件
- 该场景作业需要运行在DLI的独享队列上,因此要与HBase建立增强型跨源连接,且用户可以根据实际所需设置相应安全组规则。
- 如何建立增强型跨源连接,请参考《数据湖探索用户指南》中增强型跨源连接章节。
- 如何设置安全组规则,请参见《虚拟私有云用户指南》中“安全组”章节。
- 如果使用MRS HBase,请在增强型跨源的主机信息中添加MRS集群所有节点的主机ip信息。
详细操作请参考《数据湖探索用户指南》中的“修改主机信息”章节描述。
注意事项
- 创建Flink OpenSource SQL作业时,在作业编辑界面的“运行参数”处,“Flink版本”需要选择“1.15”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。
- 认证用的username和password等硬编码到代码中或者明文存储都有很大的安全风险,建议使用DEW管理凭证。配置文件或者环境变量中密文存放,使用时解密,确保安全。Flink Opensource SQL使用DEW管理访问凭据
- 所有 HBase 表的列簇必须定义为ROW类型,字段名对应列簇名(column family),嵌套的字段名对应列限定符名(column qualifier)。用户只需在表结构中声明查询中使用的的列簇和列限定符。除了 ROW 类型的列,剩下的原子数据类型字段(比如,STRING, BIGINT)将被识别为 HBase 的 rowkey,一张表中只能声明一个 rowkey。rowkey 字段的名字可以是任意的,如果是保留关键字,需要用反引号。
语法格式
create table hbaseSource ( attr_name attr_type (',' attr_name attr_type)* ) with ( 'connector' = 'hbase-2.2', 'table-name' = '', 'zookeeper.quorum' = '' );
参数说明
参数 |
是否必选 |
默认值 |
参数类型 |
说明 |
---|---|---|---|---|
connector |
是 |
无 |
String |
connector的类型,需配置为:hbase-2.2。 |
table-name |
是 |
无 |
String |
连接的HBase表名。 |
zookeeper.quorum |
是 |
无 |
String |
HBase Zookeeper quorum 信息。格式为:ZookeeperAddress:ZookeeperPort。 以MRS Hbase集群为例,该参数的所使用Zookeeper的ip地址和端口号获取方式如下:
|
zookeeper.znode.parent |
否 |
/hbase |
String |
HBase集群的Zookeeper根目录。 |
lookup.async |
否 |
false |
Boolean |
是否设置异步维表。 |
lookup.cache.max-rows |
否 |
-1 |
Long |
维表配置,缓存的最大行数,超过该值时,缓存中最先添加的条目将被标记为过期。 默认表示不使用该配置。 |
lookup.cache.ttl |
否 |
-1 |
Long |
维表配置,缓存超时时间,超过该时间的数据会被剔除。格式为:{length value}{time unit label},如123ms, 321s,支持的时间单位包括: d,h,min,s,ms等,默认为ms。 默认表示不使用该配置。 |
lookup.max-retries |
否 |
3 |
Integer |
维表配置,数据拉取最大重试次数。 |
krb_auth_name |
否 |
无 |
String |
DLI侧创建的Kerberos类型的跨源认证名称。 |
数据类型映射
HBase以字节数组存储所有数据。在读和写过程中要序列化和反序列化数据。
Flink的HBase连接器利用HBase(Hadoop) 的工具类 org.apache.hadoop.hbase.util.Bytes 进行字节数组和 Flink 数据类型转换。
Flink的HBase连接器将所有数据类型(除字符串外)null 值编码成空字节。对于字符串类型,null 值的字面值由null-string-literal选项值决定。
Flink 数据类型 |
HBase 转换 |
---|---|
CHAR / VARCHAR / STRING |
byte[] toBytes(String s) String toString(byte[] b) |
BOOLEAN |
byte[] toBytes(boolean b) boolean toBoolean(byte[] b) |
BINARY / VARBINARY |
返回 byte[]。 |
DECIMAL |
byte[] toBytes(BigDecimal v) BigDecimal toBigDecimal(byte[] b) |
TINYINT |
new byte[] { val } bytes[0] // returns first and only byte from bytes |
SMALLINT |
byte[] toBytes(short val) short toShort(byte[] bytes) |
INT |
byte[] toBytes(int val) int toInt(byte[] bytes) |
BIGINT |
byte[] toBytes(long val) long toLong(byte[] bytes) |
FLOAT |
byte[] toBytes(float val) float toFloat(byte[] bytes) |
DOUBLE |
byte[] toBytes(double val) double toDouble(byte[] bytes) |
DATE |
从 1970-01-01 00:00:00 UTC 开始的天数,int 值。 |
TIME |
从 1970-01-01 00:00:00 UTC 开始天的毫秒数,int 值。 |
TIMESTAMP |
从 1970-01-01 00:00:00 UTC 开始的毫秒数,long 值。 |
ARRAY |
不支持 |
MAP / MULTISET |
不支持 |
ROW |
不支持 |
示例
该示例是从DMS Kafka数据源中读取数据,将HBase表作为维表,从而生成宽表,并将结果写入到Kafka结果表中,其具体步骤如下(该示例中HBase的版本2.2.3):
- 参考增强型跨源连接,在DLI上根据HBase和Kafka所在的虚拟私有云和子网分别创建相应的增强型跨源连接,并绑定所要使用的Flink弹性资源池。“修改主机信息”章节描述,在增强型跨源中增加MRS的主机信息。
- 设置HBase和Kafka的安全组,添加入向规则使其对Flink的队列网段放通。参考测试地址连通性分别根据HBase和Kafka的地址测试队列连通性。如果能连通,则表示跨源已经绑定成功,否则表示未成功。
- 参考MRS HBase使用,通过HBase shell在HBase中创建相应的表,表名为area_info,表中只有一个列族detail,创建语句如下:
create 'area_info', {NAME => 'detail'}
- 在HBase shell中执行下述语句,插入相应的维表数据:
put 'area_info', '330106', 'detail:area_province_name', 'a1' put 'area_info', '330106', 'detail:area_city_name', 'b1' put 'area_info', '330106', 'detail:area_county_name', 'c2' put 'area_info', '330106', 'detail:area_street_name', 'd2' put 'area_info', '330106', 'detail:region_name', 'e1' put 'area_info', '330110', 'detail:area_province_name', 'a1' put 'area_info', '330110', 'detail:area_city_name', 'b1' put 'area_info', '330110', 'detail:area_county_name', 'c4' put 'area_info', '330110', 'detail:area_street_name', 'd4' put 'area_info', '330110', 'detail:region_name', 'e1'
- 创建flink opensource sql作业,输入以下作业脚本,并提交运行。该作业脚本将Kafka作为数据源,HBase作为维表,将数据写入到Kafka作为结果表中。
注意:创建作业时,在作业编辑界面的“运行参数”处,“Flink版本”选择“1.15”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。如下脚本中的加粗参数请根据实际环境修改。
CREATE TABLE orders ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string, proctime as Proctime() ) WITH ( 'connector' = 'kafka', 'topic' = 'KafkaSourceTopic', 'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort', 'properties.group.id' = 'GroupId', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); --创建地址维表 create table area_info ( area_id string, detail row( area_province_name string, area_city_name string, area_county_name string, area_street_name string, region_name string) ) WITH ( 'connector' = 'hbase-2.2', 'table-name' = 'area_info', 'zookeeper.quorum' = 'ZookeeperAddress:ZookeeperPort', 'lookup.async' = 'true', 'lookup.cache.max-rows' = '10000', 'lookup.cache.ttl' = '2h' ); --根据地址维表生成详细的包含地址的订单信息宽表 create table order_detail( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string, area_province_name string, area_city_name string, area_county_name string, area_street_name string, region_name string ) with ( 'connector' = 'kafka', 'topic' = '<yourSinkTopic>', 'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort', 'format' = 'json' ); insert into order_detail select orders.order_id, orders.order_channel, orders.order_time, orders.pay_amount, orders.real_pay, orders.pay_time, orders.user_id, orders.user_name, area.area_id, area.area_province_name, area.area_city_name, area.area_county_name, area.area_street_name, area.region_name from orders left join area_info for system_time as of orders.proctime as area on orders.area_id = area.area_id;
- 连接Kafka集群,向Kafka的source topic中插入如下测试数据:
{"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2021-03-24 10:00:00", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"} {"order_id":"202103241606060001", "order_channel":"appShop", "order_time":"2021-03-24 16:06:06", "pay_amount":"200.00", "real_pay":"180.00", "pay_time":"2021-03-24 16:10:06", "user_id":"0001", "user_name":"Alice", "area_id":"330106"} {"order_id":"202103251202020001", "order_channel":"miniAppShop", "order_time":"2021-03-25 12:02:02", "pay_amount":"60.00", "real_pay":"60.00", "pay_time":"2021-03-25 12:03:00", "user_id":"0002", "user_name":"Bob", "area_id":"330110"}
- 连接Kafka集群,在Kafka的sink topic读取数据,结果数据参考如下:
{"order_id":"202103241000000001","order_channel":"webShop","order_time":"2021-03-24 10:00:00","pay_amount":100.0,"real_pay":100.0,"pay_time":"2021-03-24 10:02:03","user_id":"0001","user_name":"Alice","area_id":"330106","area_province_name":"a1","area_city_name":"b1","area_county_name":"c2","area_street_name":"d2","region_name":"e1"} {"order_id":"202103241606060001","order_channel":"appShop","order_time":"2021-03-24 16:06:06","pay_amount":200.0,"real_pay":180.0,"pay_time":"2021-03-24 16:10:06","user_id":"0001","user_name":"Alice","area_id":"330106","area_province_name":"a1","area_city_name":"b1","area_county_name":"c2","area_street_name":"d2","region_name":"e1"} {"order_id":"202103251202020001","order_channel":"miniAppShop","order_time":"2021-03-25 12:02:02","pay_amount":60.0,"real_pay":60.0,"pay_time":"2021-03-25 12:03:00","user_id":"0002","user_name":"Bob","area_id":"330110","area_province_name":"a1","area_city_name":"b1","area_county_name":"c4","area_street_name":"d4","region_name":"e1"}
常见问题
Q:Flink作业日志中有如下报错信息应该怎么解决?
org.apache.zookeeper.ClientCnxn$SessionTimeoutException: Client session timed out, have not heard from server in 90069ms for connection id 0x0
A:可能是跨源连接未绑定或跨源绑定失败。参考增强型跨源连接重新配置跨源,Kafka集群安全组放通DLI队列的网段地址。