使用Stream Load方式导入数据至Doris
Stream Load是一个同步的导入方式,用户通过HTTP协议发送请求将本地文件或数据流导入到Doris中。Stream Load同步执行导入并返回导入结果,用户可直接通过请求的返回体判断本次导入是否成功。
Stream Load主要适用于导入本地文件,或通过程序导入数据流中的数据,支持导入CSV、Parquet、ORC格式的数据,默认支持导入CSV格式数据。
语法介绍
- 创建Stream Load导入任务
Stream Load通过HTTP协议提交和传输数据。该操作通过curl命令演示如何提交导入,也可以使用其他HTTP Client进行操作。
- 集群已启用Kerberos认证(安全模式):
curl -k --location-trusted -u user:passwd [-H ""...] -T data.file -XPUT https://Doris FE实例IP地址:HTTPS端口号/api/{数据库名称}/{表名}/_stream_load
- 集群未启用Kerberos认证(普通模式)
curl --location-trusted -u user:passwd [-H ""...] -T data.file -XPUT http://Doris FE实例IP地址:HTTP端口号/api/{数据库名称}/{表名}/_stream_load
Doris FE实例IP地址可在Manager界面,选择“集群 > 服务 > Doris > 实例”查看。
HTTPS端口号可在Manager界面,选择“集群 > 服务 > Doris > 配置”,搜索“https_port”查看。
HTTP端口号可在Manager界面,选择“集群 > 服务 > Doris > 配置”,搜索“http_port”查看。
表1介绍了创建Stream Load任务的其他部分参数。
表1 Stream Load任务参数介绍 参数名称
参数描述
签名参数
user:passwd
Stream Load创建导入的协议使用的是HTTP协议,通过Basic access authentication进行签名。Doris系统会根据签名验证用户身份和导入权限。
导入任务参数(格式为:-H "key1:value1")
label
导入任务的标识。每个导入任务,都有一个在单database内部唯一的label。label是用户在导入命令中自定义的名称。通过该label,用户可以查看对应导入任务的执行情况。
column_separator
用于指定导入文件中的列分隔符,默认为\t,可以使用多个字符的组合作为列分隔符。如果是不可见字符,则需要加\x作为前缀,使用十六进制来表示分隔符。如Hive文件的分隔符\x01,需要指定为-H "column_separator:\x01"。
line_delimiter
用于指定导入文件中的换行符,默认为\n,可以使用做多个字符的组合作为换行符。
max_filter_ratio
导入任务的最大容忍率,默认为0容忍,取值范围是0~1。当导入的错误率超过该值,则导入失败。
where
导入任务指定的过滤条件。Stream Load支持对原始数据指定where语句进行过滤,被过滤的数据将不会被导入,也不会参与filter ratio的计算,但会被计入num_rows_unselected。
Partitions
待导入表的Partition信息,如果待导入数据不属于指定的Partition,则不会被导入,这些数据将计入dpp.abnorm.ALL。
columns
待导入数据的函数变换配置,目前 Stream Load支持的函数变换方法包含列的顺序变化以及表达式变换,其中表达式变换的方法与查询语句一致。
format
指定导入数据格式,支持CSV、JSON、Parquet、ORC等,默认是CSV。
exec_mem_limit
导入内存限制,默认为:2GB,单位为:字节。
strict_mode
Stream Load导入可以开启strict mode模式,在HEADER中声明strict_mode=true即可开启,默认关闭strict mode。
strict mode用于对导入过程中的列类型转换进行严格过滤,策略如下:
- 对于列类型转换来说,如果strict mode为“true”,则错误的数据将被filter。错误数据是指原始数据并不为空值,在参与列类型转换后结果为空值的数据。
- 对于导入的某列由函数变换生成时,strict mode对其不产生影响。
- 对于导入的某列类型包含范围限制的,如果原始数据能正常通过类型转换,但无法通过范围限制,strict mode对其也不产生影响。例如:如果类型是decimal(1,0),原始数据为10,则属于可以通过类型转换但不在列声明的范围内,这种数据strict mode对其不产生影响。
merge_type
数据的合并类型,支持APPEND、DELETE和MERGE三种类型,默认为APPEND。
- APPEND表示这批数据需要全部追加到现有数据中。
- DELETE表示删除与这批数据Key相同的所有行。
- MERGE语义需要与delete条件联合使用,满足delete条件的数据按照DELETE语义处理,其余的按照APPEND语义处理。
two_phase_commit
Stream Load导入可以开启两阶段事务提交模式。在Stream Load导入数据的过程中,数据写入完成即会返回信息给用户,此时数据不可见,事务状态为“PRECOMMITTED”,用户手动触发commit操作之后,数据才可见。
enable_profile
当“enable_profile”为“true”时,Stream Load profile将会打印到日志中,否则不会打印。
- 集群已启用Kerberos认证(安全模式):
- 返回结果
由于Stream Load是一种同步的导入方式,所以导入的结果会通过创建导入的返回值直接返回,例如:
{ "TxnId": 1003, "Label": "b6f3bc78-0d2c-45d9-9e4c-faa0a0149bee", "Status": "Success", "ExistingJobStatus": "FINISHED", // optional "Message": "OK", "NumberTotalRows": 1000000, "NumberLoadedRows": 1000000, "NumberFilteredRows": 1, "NumberUnselectedRows": 0, "LoadBytes": 40888898, "LoadTimeMs": 2144, "BeginTxnTimeMs": 1, "StreamLoadPutTimeMs": 2, "ReadDataTimeMs": 325, "WriteDataTimeMs": 1933, "CommitAndPublishTimeMs": 106, "ErrorURL": "http://192.168.1.1:8042/api/_load_error_log?file=__shard_0/error_log_insert_stmt_db18266d4d9b4ee5-abb00ddd64bdf005_db18266d4d9b4ee5_abb00ddd64bdf005" }
导入结果参数介绍如表2所示。
表2 Stream Load导入任务结果参数介绍 参数名称
参数描述
TxnId
数据导入任务的事务ID。
Label
导入Label,由用户指定或系统自动生成。
Status
数据导入任务完成状态,包括:
- Success:表示导入成功。
- Publish Timeout:该状态也表示导入已完成,只是数据可能会延迟可见,无需重试。
- Label Already Exists:Label重复,需更换Label。
- Fail:导入失败。
ExistingJobStatus
已存在的Label对应的导入作业的状态。
该字段只有当Status为"Label Already Exists"时才显示。可以通过这个状态,查看已存在Label对应的导入作业的状态,"RUNNING" 表示作业还在执行,"FINISHED"表示作业执行成功。
Message
数据导入任务错误信息。
NumberTotalRows
数据导入任务总处理的行数。
NumberLoadedRows
成功导入的行数。
NumberFilteredRows
数据不合格的行数。
NumberUnselectedRows
被where条件过滤的行数。
LoadBytes
导入数据的字节数。
LoadTimeMs
数据导入任务完成时间,单位为:毫秒。
BeginTxnTimeMs
向FE请求开始一个事务所花费的时间,单位为:毫秒。
StreamLoadPutTimeMs
向FE请求获取导入数据执行计划所花费的时间,单位为:毫秒。
ReadDataTimeMs
读取数据所花费的时间,单位为:毫秒。
WriteDataTimeMs
执行写入数据操作所花费的时间,单位为:毫秒。
CommitAndPublishTimeMs
向FE请求提交并且发布事务所花费的时间,单位为:毫秒。
ErrorURL
如果有数据问题,可通过访问该URL查看具体错误行。
由于Stream Load是同步的导入方式,所以不会在Doris系统中记录导入信息,用户无法异步通过查看导入命令看到Stream Load,需查看创建导入请求的返回值获取导入结果。
- 取消数据导入
- 查看Stream Load任务
用户可以通过show stream load;查看已经完成的Stream Load任务。
默认BE不记录Stream Load的导入信息,如果需要查看需配置enable_stream_load_record=true参数启用记录。
前提条件
- 已创建包含Doris服务的集群,集群内各服务运行正常。
- 待连接Doris数据库的节点与MRS集群网络互通。
- 创建具有Doris管理权限的用户。
- 集群已启用Kerberos认证(安全模式)
在FusionInsight Manager中创建一个人机用户,例如“dorisuser”,创建一个拥有“Doris管理员权限”的角色绑定给该用户。
使用新建的用户dorisuser重新登录FusionInsight Manager,修改该用户初始密码。
- 集群未启用Kerberos认证(普通模式)
使用admin用户连接Doris后,创建具有管理员权限的角色并绑定给用户。
- 集群已启用Kerberos认证(安全模式)
- 已安装MySQL客户端,相关操作可参考使用MySQL客户端连接Doris。
Stream Load任务示例
- 登录安装了MySQL的节点,执行以下命令,连接Doris数据库。
如果集群已启用Kerberos认证(安全模式),需先执行以下命令再连接Doris数据库:
export LIBMYSQL_ENABLE_CLEARTEXT_PLUGIN=1
mysql -u数据库登录用户 -p数据库登录用户密码 -P数据库连接端口 -hDoris FE实例IP地址
- 数据库连接端口为Doris FE的查询连接端口,可以通过登录Manager,单击“集群 > 服务 > Doris > 配置”,查询Doris服务的“query_port”参数获取。
- Doris FE实例IP地址可通过登录MRS集群的Manager界面,单击“集群 > 服务 > Doris > 实例”,查看任一FE实例的IP地址。
- 用户也可以使用MySQL连接软件或者Doris WebUI界面连接数据库。
- 执行以下命令创建数据库:
create database if not exists example_db;
- 执行以下命令创建表:
CREATE TABLE example_db.test_stream_tbl (
`c1` int NOT NULL,
`c2` int NOT NULL,
`c3` string NOT NULL,
`c4` date NOT NULL
) ENGINE=OLAP
UNIQUE KEY(`c1`, `c2`)
DISTRIBUTED BY HASH(`c1`) BUCKETS 1;
- 创建数据文件“data.csv”,内容为:
1,1,1,2020-02-21 2,2,2,2020-03-21 3,3,3,2020-04-21
- 使用Stream Load导入“data.csv”中的数据到3创建的表中:
- 集群已启用Kerberos认证(安全模式)
curl -k --location-trusted -u user:passwd -H "label:table1_20230217" -H "column_separator:," -T data.csv https://Doris FE实例IP地址:HTTPS端口/api/example_db/test_stream_tbl/_stream_load
- 集群未启用Kerberos认证(普通模式)
curl --location-trusted -u user:passwd -H "label:table1_20230217" -H "column_separator:," -T data.csv http://Doris FE实例IP地址:HTTP端口/api/example_db/test_stream_tbl/_stream_load
- 集群已启用Kerberos认证(安全模式)
- 执行以下命令查看表数据:
select * from example_db.test_stream_tbl;
相关参数配置
登录FusionInsight Manager,选择“集群 > 服务 > Doris > 配置 > 全部配置”,新增如下参数:
- 选择“FE(角色) > 自定义”,在自定义参数“fe.conf.customized.configs”中新增参数:
stream_load_default_timeout_second:表示导入任务的超时时间(单位为秒),如果导入任务在设定的时间内未完成则会被系统取消,状态变为“CANCELLED”。默认超时时间为600秒,如果导入的源文件无法在规定时间内完成导入,可以在Stream Load请求中设置单独的超时时间,或调整“stream_load_default_timeout_second”参数值设置全局的默认超时时间。
- 选择“BE(角色) > 自定义”,在自定义参数“be.conf.customized.configs”中新增参数:
streaming_load_max_mb:表示Stream Load的最大导入文件大小,默认值为10G,单位为MB。如果原始文件超过该值,则需要适当调整该参数值。