配置Doris对接Hudi数据源
在MRS集群元数据存储在Hive MetaStore的场景中,Doris 2.0.13(MRS 3.5.1及之后版本为2.1.7)版本支持通过Catalog的方式对接Hudi 0.15.0版本数据源,支持对接Hudi所有的数据字段类型。
约束与限制
该操作仅适用于MRS 3.5.0及之后版本。
Doris支持查询的Hudi表类型
Doris支持的Hudi表类型和对应的查询类型如下:
MRS 3.5.0-LTS版本:
- COW表:支持Snapshot Query和TimeTravel Query。
- MOR表:支持Snapshot Query、TimeTravel Query和Read Optimized Query。
MRS 3.6.0-LTS及之后版本:
- COW表:支持Snapshot Query、TimeTravel Query和Incremental Query。
- MOR表:支持Snapshot Query、TimeTravel Query、Read Optimized Query和Incremental Query。
以下操作以使用TimeTravel Query和Incremental Query(MRS 3.6.0-LTS及之后版本)查询Hudi COW表为例进行演示。
- MRS 3.6.0-LTS及之后版本,如果Doris通过Catalog访问存储在OBS上的Hudi数据,需要在Doris所在集群的Manager页面,选择“集群 > 服务 > Doris > 配置”,选择“全部配置”,在左侧导航栏选择“Doris(服务) > 自定义”,在自定义参数“core.site.customized.configs”中添加表1中的参数。
表1 配置访问OBS上的Hudi数据的自定义参数 参数
参数描述
参数值获取方法
fs.obs.endpoint
Hudi数据所在的OBS并行文件系统的Endpoint信息。
可在NameNode所在节点的“${BIGDATA_HOME}/FusionInsight_HD_*/*_*_NameNode/etc”目录下的“core-site.xml”文件中搜索对应配置项名称获取。
remote.obs.credentials.provider.enable
安全认证和凭证管理配置。
token.server.rpc.address
TokenServer服务对应的IP和RPC服务端口。
token.server.kerberos.principal
TokenServer服务对应用户的Principal。
fs.obs.security.provider
安全认证和凭证管理配置。
- MRS 3.6.0-LTS及之后版本,如果Doris通过Catalog跨集群访问Hudi数据,需要在Doris所在集群的Manager页面,选择“集群 > 服务 > Doris > 配置”,选择“全部配置”,在左侧导航栏选择“Doris(服务) > 自定义”,在自定义参数“hdfs.site.customized.configs”中添加表2中的参数。
表2 配置跨集群访问Hudi数据的自定义参数 参数
参数描述
参数值获取方法
dfs.nameservices.mappings
用于设置集群NameService名称映射关系。
可在NameNode所在节点的“${BIGDATA_HOME}/FusionInsight_HD_*/*_*_NameNode/etc”目录下的“hdfs-site.xml”文件中查找对应配置项名称获取。
dfs.nameservices
用于设置集群NameService名称。
dfs.ha.namenodes.hacluster
用于设置集群NameService前缀,包含两个值。
dfs.namenode.rpc-address.hacluster.xx
用于设置主NameNode的RPC通信地址。xx为“dfs.ha.namenodes.hacluster”参数的其中一个值。
dfs.namenode.rpc-address.hacluster.xx
用于设置备NameNode的RPC通信地址。xx为“dfs.ha.namenodes.hacluster”参数的另一个值。
dfs.client.failover.proxy.provider.hacluster
用于指定HDFS客户端连接集群中Active状态节点的Java类。
参数值为“org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider”。
图1 添加自定义参数
- 登录安装了MySQL的节点,执行以下命令,连接Doris数据库。
若集群已启用Kerberos认证(安全模式),需先执行以下命令:
export LIBMYSQL_ENABLE_CLEARTEXT_PLUGIN=1
连接Doris数据库:
mysql -u数据库登录用户 -p -PFE查询连接端口 -hDoris FE实例IP地址
执行命令后输入数据库登录用户密码。
其中:
- Doris FE的查询连接端口,可以通过登录Manager,选择“集群 > 服务 > Doris > 配置”,查询Doris服务的“query_port”参数获取。
- Doris FE实例IP地址可通过登录MRS集群的Manager界面,选择“集群 > 服务 > Doris > 实例”,查看任一FE实例的IP地址。
- 用户也可以使用MySQL连接软件或者Doris WebUI界面连接数据库。
- 参考3创建Hudi Catalog,例如:hudi_catalog。
- 参考3在MySQL客户端连接Doris,执行以下命令切换到4创建的Hudi Catalog:
switch hudi_catalog; - 执行以下命令查看创建的Hudi表:
刷新Hudi Catalog:
refresh catalog hudi_catalog;切换至对应数据库:
use 数据库名称;查看表:
show tables;
- 执行以下命令使用TimeTravel Query查看某个时间之前插入的表数据:
select * from hudi_cow for time as of '20240409162842365' where id = 1;
- MRS 3.6.0及之后版本,执行以下命令使用Incremental Query查看增量表数据:
例如,查看表“20240409162842365”到“20240509162842365”之间的增量数据:
select * from hudi_test3@incr ('beginTime'='20240409162842365','endTime'='20240509162842365');
例如,查看表“20240409162842365”之后的增量数据:
select * from hudi_test3@incr ('beginTime'='20240409162842365');
其中,beginTime为增量数据的开始时间,endTime为增量数据的结束时间。
Doris on Hudi查询加速
Doris on Hudi支持以下方式的查询加速:
- Doris支持Hudi Parquet表的Bucket Shuffle Join操作
Doris支持使用Bucket Shuffle Join对Hudi数据源查询进行加速,当前支持对Hudi所有字段类型及存储在OBS上的Hudi表进行该操作。该功能由session级别变量“enable_hudi_bucket_shuffle” 控制是否启用,默认关闭,可通过连接Doris后执行set enable_hudi_bucket_shuffle=true;命令开启。
- 在等值Join条件之中包含两张表的分桶列,当左表的分桶列为等值的Join条件时,很大概率会被规划为Bucket Shuffle Join。
- 左表的分桶列的类型与右表等值Join列的类型需要保持一致。
- Bucket Shuffle Join功能只生效于等值Join的场景。
- Bucket Shuffle Join功能只能在左表为单分区时生效。
- 支持参与Join的两张表,一张是Hudi表,一张是Doris内部表。
- Doris支持Hudi的隐式分区功能
Doris支持Hudi的隐式分区功能以进行查询加速。该功能由变量“enable_hudi_hidden_partition”控制是否启用,默认关闭,可通过连接Doris后执行ADMIN SET FRONTEND CONFIG ("enable_hudi_hidden_partition" = "true");命令开启。
- Doris支持对Hudi表进行桶裁剪
MRS 3.5.0-LTS版本,Doris支持对Hudi表进行桶裁剪以进行查询加速。该功能由变量“enable_hudi_bucket_pruning”控制是否启用,默认关闭,可通过连接Doris后执行ADMIN SET FRONTEND CONFIG ("enable_hudi_bucket_pruning" = "true");命令开启。
MRS 3.6.0-LTS及之后版本,Doris支持对Hudi表进行桶裁剪以进行查询加速。该功能由session变量“enable_hudi_bucket_pruning”控制是否启用,默认关闭,可通过连接Doris后执行set enable_hudi_bucket_pruning=true;命令开启。
- Doris支持Hudi的MDT能力
MRS 3.6.0-LTS及之后版本,Doris支持基于Hudi的列统计信息和记录级索引实现对文件的裁剪。该功能由session变量“enable_hudi_mdt”控制是否启用,默认关闭,可通过连接Doris后执行set enable_hudi_mdt=true;命令开启。
- 登录安装了MySQL的节点,执行以下命令,连接Doris数据库。
若集群已启用Kerberos认证(安全模式),需先执行以下命令:
export LIBMYSQL_ENABLE_CLEARTEXT_PLUGIN=1
连接Doris数据库:
mysql -u数据库登录用户 -p -PFE查询连接端口 -hDoris FE实例IP地址
执行命令后输入数据库登录用户密码。
其中:
- Doris FE的查询连接端口,可以通过登录Manager,选择“集群 > 服务 > Doris > 配置”,查询Doris服务的“query_port”参数获取。
- Doris FE实例IP地址可通过登录MRS集群的Manager界面,选择“集群 > 服务 > Doris > 实例”,查看任一FE实例的IP地址。
- 用户也可以使用MySQL连接软件或者Doris WebUI界面连接数据库。
- 参考3创建Hudi Catalog,例如:hudi_catalog。
- 参考1在MySQL客户端连接Doris,执行以下命令切换到Hudi Catalog:
switch hudi_catalog; - 执行以下命令查看创建的Hudi表:
刷新Hudi Catalog:
refresh catalog hudi_catalog;切换至对应数据库:
use 数据库名称;查看表:
show tables;
- 执行以下命令开启Doris on Hudi查询加速,并进行查询:
- 开启Bucket Shuffle Join并进行查询。
仅支持Hudi的Bucket索引表,即待查询的Hudi表属性包含hoodie.index.type='BUCKET'、hoodie.bucket.index.num.buckets='xxx'和hoodie.bucket.index.hash.field='xxx'。
开启Bucket Shuffle Join:
set enable_hudi_bucket_shuffle = true;
执行查询命令:
select t1.name, t2.age from hudi_testt1 t1, hudi_testt2 t2 where t1.id=t2.id;
还可执行以下命令查看是否命中Bucket Shuffle Join:
explain select t1.name, t2.age from hudi_testt1 t1, hudi_testt2 t2 where t1.id=t2.id;
在执行结果中查看是否存在BUCKET_SHUFFLE关键字,如果存在,则表示本次查询命中了Bucket Shuffle Join。
- 开启Hudi表的隐式分区功能,并进行查询。
仅支持Hudi隐式分区表,即待查询的Hudi表属性包含hoodie.hidden.partitioning.rule = 'xxx'和hoodie.hidden.partitioning.enabled = 'true'。
开启Hudi表的隐式分区功能:
ADMIN SET FRONTEND CONFIG ("enable_hudi_hidden_partition" = "true");执行查询命令:
select * from hudi1 where col0 like '%2';
- 开启Hudi表的桶裁剪功能,并进行查询。
仅支持Hudi的Simple Bucket索引表,即待查询的Hudi表属性包含hoodie.index.type='BUCKET'、hoodie.bucket.index.num.buckets='xxx'、hoodie.bucket.index.hash.field='xxx'、hoodie.index.bucket.engine = 'SIMPLE'和hoodie.metadata.index.column.stats.enable = 'true'。
开启Hudi表的桶裁剪功能:
ADMIN SET FRONTEND CONFIG ("enable_hudi_bucket_pruning" = "true");执行查询命令:
select * from hudi_cow_tbl where id > 1 and id < 3;
- 开启Bucket Shuffle Join并进行查询。