Spark Doris Connector
Spark Doris Connector可以支持通过Spark读取Doris中存储的数据,也支持通过Spark写入数据到Doris中。
- 支持从Doris中读取数据
- 支持Spark DataFrame批量/流式写入Doris。
- 可以将Doris表映射为DataFrame或者RDD,推荐使用DataFrame。
- 支持在Doris端完成数据过滤,减少数据传输量。
前提条件
- 已创建包含Doris服务的集群,集群内各服务运行正常。
- 待连接Doris数据库的节点与MRS集群网络互通。
- 创建具有Doris管理权限的用户。
- 集群已启用Kerberos认证(安全模式)
在FusionInsight Manager中创建一个人机用户,例如“dorisuser”,创建一个拥有“Doris管理员权限”的角色绑定给该用户。
使用新建的用户dorisuser重新登录FusionInsight Manager,修改该用户初始密码。
- 集群未启用Kerberos认证(普通模式)
使用admin用户连接Doris后,创建具有管理员权限的角色并绑定给用户。
- 集群已启用Kerberos认证(安全模式)
- 已安装MySQL客户端,相关操作可参考安装MySQL客户端。
- 已安装Spark客户端。
操作步骤
在Doris中创建表并插入数据。
- 登录安装了MySQL的节点,执行以下命令,连接Doris数据库。
若集群已启用Kerberos认证(安全模式),需先执行以下命令再连接Doris数据库:
export LIBMYSQL_ENABLE_CLEARTEXT_PLUGIN=1
mysql -u数据库登录用户 -p数据库登录用户密码 -PFE查询连接端口 -hDoris FE实例IP地址
- Doris FE的查询连接端口,可以通过登录Manager,单击“集群 > 服务 > Doris > 配置”,查询Doris服务的“query_port”参数获取。
- Doris FE实例IP地址可通过登录MRS集群的Manager界面,单击“集群 > 服务 > Doris > 实例”,查看任一FE实例的IP地址。
- 用户也可以使用MySQL连接软件或者Doris WebUI界面连接数据库。
- 执行以下命令创建数据库并切换:
create database if not exists sparkconnector;
use sparkconnector;
- 执行以下命令创建表:
CREATE TABLE spark_connector_test_decimal (
c1 int NOT NULL,
c2 VARCHAR(25) NOT NULL,
c3 VARCHAR(152),
c4 boolean,
c5 tinyint,
c6 smallint,
c7 bigint,
c8 float,
c9 double,
c10 date,
c11 datetime,
c12 char,
c13 largeint,
c14 varchar,
c15 decimal(15, 5)
)
DUPLICATE KEY(`c1`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`c1`) BUCKETS 1;
- 分别执行执行以下命令向表中插入数据:
insert into spark_connector_test_decimal values(10000,'aaa','abc',true, 100, 3000, 100000, 1234.567, 12345.678, '2022-12-01','2022-12-01 12:00:00', 'a', 200000, 'g', 1000.12345);
insert into spark_connector_test_decimal values(10001,'aaa','abc',false, 100, 3000, 100000, 1234.567, 12345.678, '2022-12-01','2022-12-01 12:00:00', 'a', 200000, 'g', 1000.12345);
insert into spark_connector_test_decimal values(10002,'aaa','abc',True, 100, 3000, 100000, 1234.567, 12345.678, '2022-12-01','2022-12-01 12:00:00', 'a', 200000, 'g', 1000.12345);
insert into spark_connector_test_decimal values(10003,'aaa','abc',False, 100, 3000, 100000, 1234.567, 12345.678, '2022-12-01','2022-12-01 12:00:00', 'a', 200000, 'g', 1000.12345);
spark侧操作。
- 执行以下命令登录spark-sql客户端:
cd Spark客户端安装目录
source bigdata_env
kinit 组件业务用户(若集群未启用Kerberos认证(普通模式)请跳过该操作)
spark-sql --master yarn
- 执行以下命令创建临时视图:
CREATE TEMPORARY VIEW spark_doris_decimal
USING doris
OPTIONS(
"table.identifier"="sparkconnector.spark_connector_test_decimal",
"fenodes"="FE实例IP地址:29991",
"user"="dorisuser",
"password"="用户密码",
'doris.enable.https' = 'true',
'doris.ignore.https.ca' = 'true'
);
执行以下命令查询Doris表中的数据:
select * from spark_doris_decimal;
执行以下命令向Doris表中插入数据:
insert into spark_doris_decimal values(10005,'aaa','abc',False, 100, 3000, 100000, 1234.567, 12345.678, '2022-12-01','2022-12-01 12:00:00', 'a', 200000, 'g', 1000.12345);
- 切换为HTTP后,需要在创建表的with子句中删除如下配置参数:
- 'doris.enable.https' = 'true'
- 'doris.ignore.https.ca' = 'true'
- 29991为FE服务的HTTPS端口,切换为HTTP后,需要修改端口号为29980。可登录FusionInsight Manager,选择“集群 > 服务 > Doris > 配置”,在搜索框中搜索“http”查看。
- 切换为HTTP后,需要在创建表的with子句中删除如下配置参数: