Doris结果表
功能描述
Flink SQL作业写Doris结果表。
前提条件
- 该场景作业需要运行在DLI的独享队列上,因此要与Doris建立增强型跨源连接,且用户可以根据实际所需设置相应安全组规则。
- 如何建立增强型跨源连接,请参考《数据湖探索用户指南》中增强型跨源连接章节。
- 如何设置安全组规则,请参见《虚拟私有云用户指南》中“安全组”章节。
- 如果使用MRS Doris,请在增强型跨源的主机信息中添加MRS集群所有节点的主机ip信息。
详细操作请参考《数据湖探索用户指南》中的“修改主机信息”章节描述。
- 集群未启用Kerberos认证(普通模式)
使用admin用户连接Doris后,创建具有管理员权限的角色并绑定给用户。
注意事项
- 创建Flink OpenSource SQL作业时,在作业编辑界面的“运行参数”处,“Flink版本”需要选择“1.15”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。
- 认证用的username和password等硬编码到代码中或者明文存储都有很大的安全风险,建议使用DEW管理凭证。配置文件或者环境变量中密文存放,使用时解密,确保安全。Flink Opensource SQL使用DEW管理访问凭据
- 集群未启用Kerberos认证(普通模式)
- Doris的表名是区分大小写。
- 使用cloudTable的doris时,'fenodes'字段值的端口请用8030,如'xx:8030'。同时安全组请放开端口8030,8040,9030。
- 开启HTTPS后,需要在创建表的with子句中添加如下配置参数:
- 'doris.enable.https' = 'true'
- 'doris.ignore.https.ca' = 'true'
- 请在Flink“作业编辑”页面选择“运行参数配置”,选择“开启Checkpoint”,否则会导致Doris结果表无法写入数据,且写入Doris的延时取决于设置的Checkpoint的间隔时间。
语法格式
create table dorisSource ( attr_name attr_type (',' attr_name attr_type)* ) with ( 'connector' = 'doris', 'fenodes' = 'FE_IP:PORT,FE_IP:PORT,FE_IP:PORT', 'table.identifier' = 'database.table', 'username' = 'dorisUsername', 'password' = 'dorisPassword' );
参数说明
通用配置项
参数 |
默认值 |
是否必选 |
参数类型说明 |
---|---|---|---|
fenodes |
-- |
是 |
Doris FE ip地址和port, 多实例之间使用逗号分隔。其中port可登录FusionInsight Manager,选择“集群 > 服务 > Doris > 配置”,在搜索框中搜索“http”查看。如果开启https,则搜索“https”。 |
table.identifier |
-- |
是 |
Doris 表名,如:db.tbl。 |
username |
-- |
是 |
访问 Doris的用户名。 |
password |
-- |
是 |
访问 Doris的密码。 |
sink.label-prefix |
"" |
是 |
Stream load导入使用的label前缀。2pc场景下要求全局唯一 ,用来保证Flink的EOS语义。 |
sink.enable-2pc |
TRUE |
否 |
是否开启两阶段提交(2pc),默认为true,保证Exactly-Once语义。关于两阶段提交可参考这里。 |
sink.check-interval |
10000 |
否 |
加载时检查间隔异常。 |
sink.max-retries |
3 |
否 |
将记录写入数据库失败时的最大重试次数。 |
sink.buffer-size |
256 * 1024 |
否 |
缓存流加载数据的缓冲区大小。 |
sink.buffer-count |
3 |
否 |
缓存流加载数据的缓冲区计数。 |
sink.enable-delete |
TRUE |
否 |
是否启用删除。此选项需要 Doris 表开启批量删除功能(Doris0.15+版本默认开启),只支持 Unique 模型。 |
sink.properties.* |
-- |
否 |
Stream Load 的导入参数。 例如: 'sink.properties.column_separator' = ', ' 定义列分隔符, 'sink.properties.escape_delimiters' = 'true' 特殊字符作为分隔符,'\x01'会被转换为二进制的0x01 JSON格式导入 'sink.properties.format' = 'json' 'sink.properties.read_json_by_line' = 'true' |
示例
该示例是从Datagen数据源中生成数据,并将结果写入到Doris结果表中。
- 参考增强型跨源连接,在DLI上根据Doris所在的虚拟私有云和子网分别创建相应的增强型跨源连接,并绑定所要使用的Flink弹性资源池。“修改主机信息”章节描述,在增强型跨源中增加MRS的主机信息。
- 设置Doris的安全组,添加入向规则使其对Flink的队列网段放通。分别根据Doris的地址测试队列连通性。如果能连通,则表示跨源已经绑定成功,否则表示未成功。
参考测试地址连通性。
- 参考MRS Doris使用指南,创建doris表,创建语句如下:
CREATE TABLE IF NOT EXISTS dorisdemo ( `user_id` varchar(10) NOT NULL, `city` varchar(10), `age` int, `gender` int ) DISTRIBUTED BY HASH(`user_id`) BUCKETS 10
- 创建flink opensource sql作业,输入以下作业脚本,并提交运行。该作业脚本将Datagen作为数据源,将数据写入到Doris作为结果表中。
create table student_datagen_source( `user_id` String NOT NULL, `city` String, `age` int, `gender` int ) with ( 'connector' = 'datagen', 'rows-per-second' = '1', 'fields.user_id.kind' = 'random', 'fields.user_id.length' = '7', 'fields.city.kind' = 'random', 'fields.city.length' = '7' ); CREATE TABLE dorisDemo ( `user_id` String NOT NULL, `city` String, `age` int, `gender` int ) with ( 'connector' = 'doris', 'fenodes' = 'FE_IP:PORT', 'table.identifier' = 'demo.dorisdemo', 'username' = 'dorisUser', 'password' = 'dorisPassword', 'sink.label-prefix' = 'demo', 'sink.enable-2pc' = 'true', 'sink.buffer-count' = '10' ); insert into dorisDemo select * from student_datagen_source
- 查看doris结果表是否已成功写入数据。
user_id
city
age
gender
50aff04
93406c5
12
1
681a230
1f27d06
16
1
006eff4
3521ded
18
0