更新时间:2026-05-20 GMT+08:00
创建Flink CDC实时同步任务
- 上传StarRocks connector连接器到已创建的对象存储OBS桶创建:flink-connector-starrocks-1.2.14_flink-1.15.jar,连接器下载地址
- 进入DLI控制台->作业管理->Flink作业,单击右上角“
” 图1 创建Flink SQL作业
图2 作业配置说明
1.所属队列,选择“poc_demo”队列。
2.UDF Jar单击“
”选择上传至OBS的flink-connector-starrocks-1.2.14_flink-1.15.jar包。3.委托选择dli_dew_agency_access。
4.OBS桶选择已创建的桶。
5.勾选“Checkpoint”。
- Flink CDC同步sql编写
CREATE TABLE mysql_orders_source ( order_id BIGINT, customer_id INT, order_time TIMESTAMP, amount DECIMAL(10,2), channel STRING, product_category STRING, payment_method STRING, city STRING, PRIMARY KEY (order_id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'xxx.xxx.xxx.xxx', ---填写RDS for MySQL数据库地址 'port' = '3306', 'username' = 'root', 'password' = 'XXXXX', ---填写RDS for MySQL root用户密码 'database-name' = 'business_db', 'table-name' = 'orders', 'server-time-zone' = 'Asia/Shanghai' ); -- StarRocks ODS 目标表 CREATE TABLE sr_ods_orders_sink ( order_id BIGINT, customer_id INT, order_time TIMESTAMP, amount DECIMAL(10,2), channel STRING, product_category STRING, payment_method STRING, city STRING, PRIMARY KEY (order_id) NOT ENFORCED ) WITH ( 'connector' = 'starrocks', 'jdbc-url' = 'jdbc:mysql://XXX.XXX.XXX.XXX:9030', ---根据实际情况填写StarRocks地址 'load-url' = 'XXX.XXX.XXX.XXX:8030', ---根据实际情况填写StarRocks地址 'database-name' = 'realtime_dw', 'table-name' = 'ods_orders', 'username' = 'admin', 'password' = 'XXXXXX', ---根据实际情况填写StarRocks admin用户密码 'sink.properties.format' = 'json', 'sink.properties.strip_outer_array' = 'true', 'sink.buffer-flush.max-rows' = '64000', 'sink.buffer-flush.interval-ms' = '1000' ); -- 执行同步 INSERT INTO sr_ods_orders_sink SELECT * FROM mysql_orders_source;