通过Spark读取Hive外表数据并写入Doris
应用场景
通过Spark视图读取Hive外表数据写入到Doris,并按照Unique指定字段自动去重。
方案架构
离线数据可以从数据湖加载,也可以直接加载本地文件。从数据湖加载可以使用工具CDM,在没有CDM工具时,可以直接使用外表加载、BrokerLoad的方式。
本地文件加载则使用Doris自带的StreamLoad工具。

- 场景特点:
- 数据加载方式:
- 在存在DataArts Studio工具的场景下,数据加载统一使用CDM进行(DataArts Studio需要2.10及以上版本)。
- 在不存在DataArts Studio工具的场景下,可以使用Spark或者外表方式加载。由数据加工方进行数据推送加载时,推荐使用BrokerLoad加载方式,方便数据加工方统一调度;如果有数据应用方负责数据加载,则使用外表方式进行数据加载较为方便应用层另外配置加载数据。
- 在没有数据湖的场景下,轻量化数仓场景中,数据直接由源端文件加载到Doris中,可以使用Streamload方式加载本地文件。
- 注意事项:
步骤1:创建MRS Hive集群并配置
- 创建MRS集群。
详细操作请参考自定义购买MRS集群,例如相关参数如下:
- 集群名称:自定义,例如“mrs_test”
- 版本类型:LTS版
- 集群版本:MRS 3.3.0-LTS
- 集群组件包含:Hive、Spark等
- Kerberos认证:开启
其他参数根据实际需要进行配置。
- 集群购买完成后安装集群全量客户端,详细操作请参考安装客户端(3.x版本)。
例如客户端安装路径为“/opt/client”。
- 已存在Hive外表(数据格式仅限ORC格式),且存在一定量数据。
建表语句参考:
CREATE TABLE wxk.`lineorder_orc02`( `lo_orderkey` bigint, `lo_linenumber` int, `lo_custkey` bigint, `lo_partkey` bigint, `lo_suppkey` bigint, `lo_orderdate` date, `lo_orderpriority` varchar(300), `lo_shippriority` int, `lo_quantity` int, `lo_extendedprice` bigint, `lo_ordtotalprice` bigint, `lo_discount` int, `lo_revenue` bigint, `lo_supplycost` int, `lo_tax` int, `lo_commitdate` date, `lo_shipmode` varchar(300) ) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.orc.OrcSerde' STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat' LOCATION 'obs://test/doristest' TBLPROPERTIES ( 'bucketing_version' = '2', 'orc.compress' = 'ZLIB', 'transient_lastDdlTime' = '1705923234' ) ;
准备数据如下:
步骤2:创建MRS Doris集群并配置
- 创建MRS Doris集群。
详细操作请参考自定义购买MRS集群,例如相关参数如下:
- 集群名称:自定义,例如“mrs_doris”
- 集群版本:MRS 3.3.0-LTS
- 组件选择:Doris集群。
- Kerberos认证:开启
其他参数根据实际需要进行配置。
- 集群购买完成后安装集群客户端。
详细操作请参考安装客户端(3.x版本)。
例如客户端安装路径为“/opt/dorisclient”。
- 在Doris集群安装MySQL客户端。
详细操作请参考使用MySQL客户端连接Doris。
- 创建拥有Doris管理权限的人机用户(例如用户名为dorisuser),并修改初始密码。
- 登录Doris集群Manager页面。
- 选择“系统 > 权限 > 角色 > 添加角色”,填写角色名称,如“dorisrole”,在“配置资源权限”选择“待操作的集群 > Doris”,勾选“Doris管理员权限”,单击“确定”。
- 选择“用户 > 添加用户”,填写用户名称,如“dorisuser”,“用户类型”选择“人机”,“密码策略”默认,输入用户密码并确认密码,关联“dorisrole”角色,单击“确定”。
- 使用新建的dorisuser用户重新登录FusionInsight Manager,修改该用户初始密码。
步骤3:创建Doris Catalog并将Hive外表数据写入Doris
- 登录Doris集群中已安装好MySQL客户端的节点,执行如下命令进入Doris数据库。
export LIBMYSQL_ENABLE_CLEARTEXT_PLUGIN=1 //若集群已启用Kerberos认证(安全模式)需执行该命令。
mysql -u数据库登录用户 -p数据库登录用户密码 -P数据库连接端口 -hDoris FE实例IP地址
- 数据库连接端口为Doris FE的查询连接端口,可通过登录MRS Doris集群的Manager界面,选择“集群 > 服务 > Doris > 实例”,查看任一FE实例的业务IP地址获取。
- Doris FE实例IP地址可通过登录MRS Doris集群的Manager界面,选择“集群 > 服务 > Doris > 配置 > 全部配置”,搜索并查看“query_port”参数值获取。
- 执行以下命令创建数据库(例如名称为“sparkconnector”)并切换。
create database if not exists sparkconnector;
use sparkconnector;
- 执行以下命令创建表(注意表结构要和Hive表结构一致)。
CREATE TABLE lineorder_view_u( `LO_ORDERKEY` bigint(20) NOT NULL COMMENT '订单id', `LO_LINENUMBER` int(11) NOT NULL COMMENT '订单码', `LO_CUSTKEY` bigint(20) NOT NULL COMMENT '客户id', `LO_PARTKEY` bigint(20) NOT NULL COMMENT '商品id', `LO_SUPPKEY` bigint(20) NOT NULL COMMENT '供应商id', `LO_ORDERDATE` date NULL COMMENT '订单日期', `LO_ORDERPRIORITY` varchar(100) NULL COMMENT '订单优先级', `LO_SHIPPRIORITY` int(11) NULL COMMENT '物流优先级', `LO_QUANTITY` int(11) NULL COMMENT '数量', `LO_EXTENDEDPRICE` bigint(20) NULL COMMENT '扩展价格', `LO_ORDTOTALPRICE` bigint(20) NULL COMMENT '订单总价', `LO_DISCOUNT` int(11) NULL COMMENT '折扣', `LO_REVENUE` bigint(20) NULL COMMENT '收入', `LO_SUPPLYCOST` int(11) NULL COMMENT '成本', `LO_TAX` int(11) NULL COMMENT '税款', `LO_COMMITDATE` date NULL COMMENT '承诺日期', `LO_SHIPMODE` varchar(100) NOT NULL COMMENT '运输方式' ) ENGINE = OLAP UNIQUE KEY(`LO_ORDERKEY`, `LO_LINENUMBER`) COMMENT 'OLAP' DISTRIBUTED BY HASH(`LO_ORDERKEY`) BUCKETS 1 PROPERTIES ( "replication_allocation" = "tag.location.default: 3", "in_memory" = "false", "storage_format" = "V2", "disable_auto_compaction" = "false" );
例如执行show tables;后结果如下,表示建表成功:
- 以root用户登录MRS Hive集群主节点,执行以下命令登录Spark客户端。
cd /opt/client
source bigdata_env
kinit 组件业务用户
spark-beeline
- 执行以下命令创建临时视图。
CREATE TEMPORARY VIEW spark_doris_decimal USING doris OPTIONS( "table.identifier" = "doris数据库名.doris数据表名", "fenodes" = "FE实例IP地址:FE配置https_port", "user" = "用户名", "password" = "用户密码", 'doris.enable.https' = 'true', 'doris.ignore.https.ca' = 'true' );
- 执行以下命令读取Hive外表数据写入Doris。
insert into spark_doris_decimal select * from wxk.lineorder_orc02;
例如执行后结果如下,数据写入Doris成功,且数据符合预期结果。