更新时间:2024-11-26 GMT+08:00
分享

配置Spark读写Doris数据

Spark Doris Connector可以支持通过Spark读取Doris中存储的数据,也支持通过Spark写入数据到Doris中。

  • 支持从Doris中读取数据
  • 支持Spark DataFrame批量/流式写入Doris。
  • 可以将Doris表映射为DataFrame或者RDD,推荐使用DataFrame。
  • 支持在Doris端完成数据过滤,减少数据传输量。
  • 该章节仅适用于MRS 3.5.0及之后版本。

前提条件

  • 已创建包含Doris服务的集群,集群内各服务运行正常。
  • 待连接Doris数据库的节点与MRS集群网络互通。
  • 创建具有Doris管理权限的用户。
    • 集群已启用Kerberos认证(安全模式)

      在FusionInsight Manager中创建一个人机用户,例如“dorisuser”,创建一个拥有“Doris管理员权限”的角色绑定给该用户。

      使用新建的用户dorisuser重新登录FusionInsight Manager,修改该用户初始密码。

    • 集群未启用Kerberos认证(普通模式)

      使用admin用户连接Doris后,创建具有管理员权限的角色并绑定给用户。

  • 已安装MySQL客户端,相关操作可参考使用MySQL客户端连接Doris
  • 已安装Spark客户端。

操作步骤

在Doris中创建表并插入数据。

  1. 登录安装了MySQL的节点,执行以下命令,连接Doris数据库。

    如果集群已启用Kerberos认证(安全模式),需先执行以下命令再连接Doris数据库:

    export LIBMYSQL_ENABLE_CLEARTEXT_PLUGIN=1

    mysql -u数据库登录用户 -p数据库登录用户密码 -P数据库连接端口 -hDoris FE实例IP地址

    • 数据库连接端口为Doris FE的查询连接端口,可以通过登录Manager,单击“集群 > 服务 > Doris > 配置”,查询Doris服务的“query_port”参数获取。
    • Doris FE实例IP地址可通过登录MRS集群的Manager界面,单击“集群 > 服务 > Doris > 实例”,查看任一FE实例的IP地址。
    • 用户也可以使用MySQL连接软件或者Doris WebUI界面连接数据库。

  2. 执行以下命令创建数据库并切换:

    create database if not exists sparkconnector;

    use sparkconnector;

  3. 执行以下命令创建表:

    CREATE TABLE spark_connector_test_decimal (

    c1 int NOT NULL,

    c2 VARCHAR(25) NOT NULL,

    c3 VARCHAR(152),

    c4 boolean,

    c5 tinyint,

    c6 smallint,

    c7 bigint,

    c8 float,

    c9 double,

    c10 date,

    c11 datetime,

    c12 char,

    c13 largeint,

    c14 varchar,

    c15 decimal(15, 5)

    )

    DUPLICATE KEY(`c1`)

    COMMENT "OLAP"

    DISTRIBUTED BY HASH(`c1`) BUCKETS 1;

  4. 分别执行执行以下命令向表中插入数据:

    insert into spark_connector_test_decimal values(10000,'aaa','abc',true, 100, 3000, 100000, 1234.567, 12345.678, '2022-12-01','2022-12-01 12:00:00', 'a', 200000, 'g', 1000.12345);

    insert into spark_connector_test_decimal values(10001,'aaa','abc',false, 100, 3000, 100000, 1234.567, 12345.678, '2022-12-01','2022-12-01 12:00:00', 'a', 200000, 'g', 1000.12345);

    insert into spark_connector_test_decimal values(10002,'aaa','abc',True, 100, 3000, 100000, 1234.567, 12345.678, '2022-12-01','2022-12-01 12:00:00', 'a', 200000, 'g', 1000.12345);

    insert into spark_connector_test_decimal values(10003,'aaa','abc',False, 100, 3000, 100000, 1234.567, 12345.678, '2022-12-01','2022-12-01 12:00:00', 'a', 200000, 'g', 1000.12345);

spark侧操作。

  1. 执行以下命令登录spark-sql客户端:

    cd Spark客户端安装目录

    source bigdata_env

    kinit 组件业务用户(如果集群未启用Kerberos认证(普通模式)请跳过该操作)

    spark-sql --master yarn

  2. 执行以下命令创建临时视图:

    CREATE TEMPORARY VIEW spark_doris_decimal

    USING doris

    OPTIONS(

    "table.identifier"="sparkconnector.spark_connector_test_decimal",

    "fenodes"="FE实例IP地址:端口号",

    "user"="dorisuser",

    "password"="用户密码",

    'doris.enable.https' = 'true',

    'doris.ignore.https.ca' = 'true'

    );

    执行以下命令查询Doris表中的数据:

    select * from spark_doris_decimal;

    执行以下命令向Doris表中插入数据:

    insert into spark_doris_decimal values(10005,'aaa','abc',False, 100, 3000, 100000, 1234.567, 12345.678, '2022-12-01','2022-12-01 12:00:00', 'a', 200000, 'g', 1000.12345);

    • 切换为HTTP后,需要在创建表的with子句中删除如下配置参数:
      • 'doris.enable.https' = 'true'
      • 'doris.ignore.https.ca' = 'true'
    • 端口号为FE服务的HTTPS端口(开启Kerberos认证的集群)或HTTP端口(未开启Kerberos认证的集群)。端口号可登录FusionInsight Manager,选择“集群 > 服务 > Doris > 配置”,在搜索框中搜索“https_http”或“http_port”查看。
    • 当导入的数据量过大时,可通过调节以下参数提高性能:
      • sink.batch.size:单次写BE的最大行数,默认值为:10000。
      • doris.sink.batch.interval.ms:每个批次sink的间隔时间,单位为ms,默认值为:50。

相关文档