Doris源表
功能描述
Flink SQL作业读取Doris源表。
前提条件
- 该场景作业需要运行在DLI的独享队列上,因此要与Doris建立增强型跨源连接,且用户可以根据实际所需设置相应安全组规则。
- 如何建立增强型跨源连接,请参考《数据湖探索用户指南》中增强型跨源连接章节。
- 如何设置安全组规则,请参见《虚拟私有云用户指南》中“安全组”章节。
- 如果使用MRS Doris,请在增强型跨源的主机信息中添加MRS集群所有节点的主机ip信息。
详细操作请参考《数据湖探索用户指南》中的“修改主机信息”章节描述。
- 集群未启用Kerberos认证(普通模式)
使用admin用户连接Doris后,创建具有管理员权限的角色并绑定给用户。
注意事项
- 创建Flink OpenSource SQL作业时,在作业编辑界面的“运行参数”处,“Flink版本”需要选择“1.15”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。
- 认证用的username和password等硬编码到代码中或者明文存储都有很大的安全风险,建议使用DEW管理凭证。配置文件或者环境变量中密文存放,使用时解密,确保安全。Flink Opensource SQL使用DEW管理访问凭据
- 集群未启用Kerberos认证(普通模式)
- Doris的表名是区分大小写。
- 使用cloudTable的doris时,'fenodes'字段值的端口请用8030,如'xx:8030'。同时安全组请放开端口8030, 8040,9030。
- 开启HTTPS后,需要在创建表的with子句中添加如下配置参数:
- 'doris.enable.https' = 'true'
- 'doris.ignore.https.ca' = 'true'
语法格式
create table dorisSource ( attr_name attr_type (',' attr_name attr_type)* ) with ( 'connector' = 'doris', 'fenodes' = 'FE_IP:PORT,FE_IP:PORT,FE_IP:PORT', 'table.identifier' = 'database.table', 'username' = 'dorisUsername', 'password' = 'dorisPassword' );
参数说明
通用配置项
参数 |
默认值 |
是否必选 |
参数类型说明 |
---|---|---|---|
fenodes |
-- |
是 |
Doris FE ip地址和port, 多实例之间使用逗号分隔。其中port可登录FusionInsight Manager,选择“集群 > 服务 > Doris > 配置”,在搜索框中搜索“http”查看。如果开启https,则搜索“https”。 |
table.identifier |
-- |
是 |
Doris表名,如:db.tbl |
username |
-- |
是 |
访问Doris的用户名。 |
password |
-- |
是 |
访问Doris的密码。 |
doris.request.retries |
3 |
否 |
向Doris发送请求的重试次数。 |
doris.request.connect.timeout.ms |
30000 |
否 |
向Doris发送请求的连接超时时间。 |
doris.request.read.timeout.ms |
30000 |
否 |
向Doris发送请求的读取超时时间。 |
doris.request.query.timeout.s |
3600 |
否 |
查询Doris的超时时间,默认值为1小时,-1表示无超时限制。 |
doris.request.tablet.size |
Integer. MAX_VALUE |
否 |
一个 Partition 对应的Doris Tablet 个数。 此数值设置越小,则会生成越多的 Partition。从而提升 Flink 侧的并行度,但同时会对 Doris 造成更大的压力。 |
doris.batch.size |
1024 |
否 |
一次从 BE 读取数据的最大行数。增大此数值可减少Flink与Doris之间建立连接的次数。 从而减轻网络延迟所带来的额外时间开销。 |
doris.exec.mem.limit |
2147483648 |
否 |
单个查询的内存限制。默认为 2GB,单位为字节。 |
doris.deserialize.arrow.async |
FALSE |
否 |
是否支持异步转换 Arrow 格式到 flink-doris-connector 迭代所需的 RowBatch。 |
doris.deserialize.queue.size |
64 |
否 |
异步转换 Arrow 格式的内部处理队列,当doris.deserialize.arrow.async 为 true 时生效。 |
doris.read.field |
-- |
否 |
读取 Doris 表的列名列表,多列之间使用逗号分隔。 |
doris.filter.query |
-- |
否 |
过滤读取数据的表达式,此表达式透传给 Doris。Doris 使用此表达式完成源端数据过滤。 |
示例
该示例是从Doris源表读取数据,并输入到 print connector。
- 参考增强型跨源连接,在DLI上根据Doris所在的虚拟私有云和子网分别创建相应的增强型跨源连接,并绑定所要使用的Flink弹性资源池。参考“修改主机信息”章节描述,在增强型跨源中增加MRS的主机信息。
- 设置Doris的安全组,添加入向规则使其对Flink的队列网段放通。参考测试地址连通性分别根据Doris的地址测试队列连通性。如果能连通,则表示跨源已经绑定成功,否则表示未成功。
- 创建Doris表,并插入10条数据。创建语句如下:
CREATE TABLE IF NOT EXISTS dorisdemo ( `user_id` varchar(10) NOT NULL, `city` varchar(10), `age` int, `gender` int ) DISTRIBUTED BY HASH(`user_id`) BUCKETS 10; INSERT INTO dorisdemo VALUES ('user1', 'city1', 20, 1); INSERT INTO dorisdemo VALUES ('user2', 'city2', 21, 0); INSERT INTO dorisdemo VALUES ('user3', 'city3', 22, 1); INSERT INTO dorisdemo VALUES ('user4', 'city4', 23, 0); INSERT INTO dorisdemo VALUES ('user5', 'city5', 24, 1); INSERT INTO dorisdemo VALUES ('user6', 'city6', 25, 0); INSERT INTO dorisdemo VALUES ('user7', 'city7', 26, 1); INSERT INTO dorisdemo VALUES ('user8', 'city8', 27, 0); INSERT INTO dorisdemo VALUES ('user9', 'city9', 28, 1); INSERT INTO dorisdemo VALUES ('user10', 'city10', 29, 0);
- 参考创建Flink OpenSource作业,创建flink opensource sql作业,输入以下作业脚本,并提交运行。该作业脚本读取Doris表,并打印。
CREATE TABLE dorisDemo ( `user_id` String NOT NULL, `city` String, `age` int, `gender` int ) with ( 'connector' = 'doris', 'fenodes' = 'FE_IP:PORT,FE_IP:PORT,FE_IP:PORT', 'table.identifier' = 'demo.dorisdemo', 'username' = 'dorisUser', 'password' = 'dorisPassword', 'doris.request.retries'='3', 'doris.batch.size' = '100' ); CREATE TABLE print ( `user_id` String NOT NULL, `city` String, `age` int, `gender` int ) with ( 'connector' = 'print' ); insert into print select * from dorisDemo;
- 查看print结果表数据。
+I[user5, city5, 24, 1] +I[user4, city4, 23, 0] +I[user3, city3, 22, 1] +I[user10, city10, 29, 0] +I[user6, city6, 25, 0] +I[user1, city1, 20, 1] +I[user9, city9, 28, 1] +I[user7, city7, 26, 1] +I[user8, city8, 27, 0] +I[user2, city2, 21, 0]