FlinkServer对接HBase
操作场景
FlinkServer支持对接HBase,详情如下:
- 支持对接维表、Sink表。
- 当HBase与Flink为同一集群或互信的集群,支持FlinkServer对接HBase。
- 当HBase与Flink不在同一集群或不互信的集群,则只支持Flink和HBase均为普通模式集群的对接。
前提条件
- 集群已安装,包括HDFS、Yarn、Flink和HBase。
- 包含HBase服务的客户端已安装,安装路径如:/opt/client。
- 参考使用HBase客户端,登录HBase客户端,使用create 'dim_province', "f1"创建dim_province表。
操作步骤
- 以客户端安装用户登录安装客户端的节点,拷贝HBase的“/opt/client/HBase/hbase/conf/”目录下的所有配置文件至部署FlinkServer的所有节点的一个空目录,如“/tmp/client/HBase/hbase/conf/”。
修改FlinkServer节点上面配置文件目录及其上层目录属主为omm。
- 登录Manager,选择“集群 > 服务 > Flink > 配置 > 全部配置”,搜索“HBASE_CONF_DIR”参数,在该参数的“值”中填写1中拷贝了HBase配置文件的FlinkServer的目录,如“/tmp/client/HBase/hbase/conf/”。
若FlinkServer实例所在节点与包含HBase服务客户端的安装节点相同,则在HBASE_CONF_DIR”参数的“值”填写HBase的“/opt/client/HBase/hbase/conf/”目录。
- 填写完成后单击“保存”,确认修改配置后单击“确定”。
- 单击“实例”,勾选所有FlinkServer实例,选择“更多 > 重启实例”,输入密码,单击“确定”重启实例。
- 登录Manager,选择“集群 > 服务 > Flink”,在“Flink WebUI”右侧,单击链接,访问Flink的WebUI。
- 参考新建作业,新建Flink SQL作业,作业类型选择“流作业”。在作业开发界面进行如下作业配置并启动作业。
需勾选“基础参数”中的“开启CheckPoint”,“时间间隔(ms)”可设置为“60000”,“模式”可使用默认值。
安全集群且HBase的认证模式为hbase.rpc.protection=authentication时参考如下样例,建立Flink SQL作业。CREATE TABLE ksource1 ( user_id STRING, item_id STRING, proctime as PROCTIME() ) WITH ( 'connector' = 'kafka', 'topic' = 'ksource1', 'properties.group.id' = 'group1', 'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP1:Kafka端口号,Kafka的Broker实例业务IP2:Kafka端口号', 'format' = 'json', 'properties.sasl.kerberos.service.name' = 'kafka',--普通模式集群不需要该参数 'properties.security.protocol' = 'SASL_PLAINTEXT',--普通模式集群不需要该参数 'properties.kerberos.domain.name' = 'hadoop.系统域名'--普通模式集群不需要该参数 ); CREATE TABLE hsink1 ( rowkey STRING, f1 ROW < item_id STRING >, PRIMARY KEY (rowkey) NOT ENFORCED ) WITH ( 'connector' = 'hbase-2.2', 'table-name' = 'dim_province', 'zookeeper.quorum' = 'ZooKeeper的quorumpeer实例业务IP1:ZooKeeper客户端端口号,ZooKeeper的quorumpeer实例业务IP2:ZooKeeper客户端端口号' ); INSERT INTO hsink1 SELECT user_id as rowkey, ROW(item_id) as f1 FROM ksource1;
- Kafka Broker实例IP地址及端口号说明:
- 服务的实例IP地址可通过登录FusionInsight Manager后,单击“集群 > 服务 > Kafka > 实例”,在实例列表页面中查询。
- 集群已启用Kerberos认证(安全模式)时Broker端口为“sasl.port”参数的值,默认为“21007”。
- 集群未启用Kerberos认证(普通模式)时Broker端口为“port”的值,默认为“9092”。如果配置端口号为9092,则需要配置“allow.everyone.if.no.acl.found”参数为true,具体操作如下:
登录FusionInsight Manager系统,选择“集群 > 服务 > Kafka > 配置 > 全部配置”,搜索“allow.everyone.if.no.acl.found”配置,修改参数值为true,保存配置即可。
- 系统域名:可登录FusionInsight Manager,选择“系统 > 权限 > 域和互信”,查看“本端域”参数,即为当前系统域名。
- ZooKeeper的quorumpeer实例业务IP:
ZooKeeper服务所有quorumpeer实例业务IP。登录FusionInsight Manager,选择“集群 > 服务 > ZooKeeper > 实例”,可查看所有quorumpeer实例所在主机业务IP地址。
- ZooKeeper客户端端口号:
登录FusionInsight Manager,选择“集群 > 服务 > ZooKeeper”,在“配置”页签查看“clientPort”的值。
- HBase认证模式:
登录FusionInsight Manager,选择“集群 > 服务 > HBase > 配置 > 全部配置”,搜索“hbase.rpc.protection”,查看HBase认证模式,当认证模式为“integrity”和“privacy”时添加如下参数:
'properties.hbase.rpc.protection' = 'HBase认证模式'
'properties.zookeeper.znode.parent' = '/hbase'
'properties.hbase.security.authorization' = 'true'
'properties.hbase.security.authentication' = 'kerberos'
- 复合RowKey的分隔符:
如果数据是通过HBase的bulkload导入或者业务通过(多字段+分隔符)拼接写入,Flink作为Source表或者维表读HBase时,需要添加如下参数:
'rowkey.delimiter'='...'
如果数据是通过Flink写入的数据,业务应用通过HBase服务读取数据,则添加以上参数的同时需确保所有的字段类型为String。
- Kafka Broker实例IP地址及端口号说明:
- 查看作业管理界面,作业状态为“运行中”。
- 参考管理Kafka主题中的消息,向kafka中写入数据。
sh kafka-console-producer.sh --broker-list Kafka角色实例所在节点的IP地址:Kafka端口号 --topic 主题名称 --producer.config 客户端目录/Kafka/kafka/config/producer.properties
例如本示例使用主题名称为ksource1:sh kafka-console-producer.sh --broker-list Kafka角色实例所在节点的IP地址:Kafka端口号 --topic ksource1 --producer.config /opt/client/Kafka/kafka/config/producer.properties
输入消息内容:{"user_id": "3","item_id":"333333"} {"user_id": "4","item_id":"44444444"}
输入完成后按回车发送消息。
- 参考使用HBase客户端,登录HBase客户端,查看表数据信息。
hbase shell
scan 'dim_province'
应用端提交作业
- 若使用Flink run模式,推荐使用export HBASE_CONF_DIR=hbase的配置目录,例如:export HBASE_CONF_DIR=/opt/hbaseconf。
- 若使用Flink run-application模式,则有如下两种方式。
- 在建表语句中添加如下配置(推荐)
配置
说明
'properties.hbase.rpc.protection' = 'authentication'
需和HBase服务端的配置一致
'properties.zookeeper.znode.parent' = '/hbase'
多服务场景中,会存在hbase1,hbase2,需明确要访问的集群
'properties.hbase.security.authorization' = 'true'
开启鉴权
'properties.hbase.security.authentication' = 'kerberos'
开启kerberos认证
示例:CREATE TABLE hsink1 ( rowkey STRING, f1 ROW < q1 STRING >, PRIMARY KEY (rowkey) NOT ENFORCED ) WITH ( 'connector' = 'hbase-2.2', 'table-name' = 'cc', 'zookeeper.quorum' = 'x.x.x.x:clientPort', 'properties.hbase.rpc.protection' = 'authentication', 'properties.zookeeper.znode.parent' = '/hbase', 'properties.hbase.security.authorization' = 'true', 'properties.hbase.security.authentication' = 'kerberos' );
- 提交作业时将HBase的配置添加到yarnShip中。
- 在建表语句中添加如下配置(推荐)