dws-connector-flink
简介
dws-connector-flink是在dws-client的基础上对接flink的一个工具,工具为对dwsClient的包装,整体入库能力跟dwsClient一致。目前内部只实现了DynamicTableSourceFactory、DynamicTableSinkFactory两个接口,并未实现CatalogFactory,所以不支持使用Catalog的场景。
dws-flink-connector的dws connector只支持单并发查询存量数据,暂不支持并行读取。
引入依赖
- jar包方式引入
dws-connector-flink已发布至maven仓库,可选择最新版本使用。链接请参见:https://mvnrepository.com/artifact/com.huaweicloud.dws。
<dependency> <groupId>com.huaweicloud.dws</groupId> <artifactId>dws-connector-flink_${scala.version}_${flink.version}</artifactId> <version>1.0</version> </dependency>
- Flink SQL方式引入
在使用Flink SQL时需要将dws-connector-flink包及其依赖放入flink类加载目录,从1.0.3版本开始已经将带有依赖的包发布至maven仓库,使用时可直接在仓库下载使用:
场景使用
- sink 模式
为便捷使用,提供了一个DwsSink接口快速构建一个sink。示例如下:
SinkFunction<EventLog> dwsSink = DwsSink.sink(DwsConfig.builder() .withUrl("") .withPassword("") .withUsername("").build(), null, (DwsInvokeFunction<EventLog>) (eventLog, dwsClient, context) -> { dwsClient.write("test.test") .setObject("id", eventLog.getGuid()) .setObject("name", eventLog.getEventId(), true) .commit(); }); DataStreamSource<EventLog> source = evn.addSource(new LogSourceFunction()); source.addSink(dwsSink); evn.execute();
接口说明:public static <T> SinkFunction<T> sink(DwsConfig config, JSONObject context, DwsInvokeFunction<T> invoke);
- config为dwsClient的参数,跟使用dwsClient一致。
- context是为了便于在业务中使用缓存之类的操作提供的一个全局上下文,可在构建时指定,后续每次回调处理数据的接口便会附带此对象。
- invoke 为一个函数接口,用于执行数据的处理:
/** * flink执行invoke时,执行数据处理回调 * @param value 当前接收到的数据 * @param client 根据配置构造的dwsClient * @param context 用户可以在构造时指定一个全局对象,每次回调时均会附带此参数,可作为全局缓存使用 * @throws DwsClientException */ void accept(T value, DwsClient client, JSONObject context) throws DwsClientException;
- SQL 模式
- 创建flink table:
tableEnvironment.executeSql("create table dws_test (" + " id int,\n" + " name STRING,\n" + " PRIMARY KEY (id) NOT ENFORCED" + ") WITH (\n" + " 'connector' = 'dws',\n" + " 'url' = 'jdbc:postgresql://****/gaussdb',\n" + " 'tablename' = 'test.test'," + " 'username'='dbadmin'," + " 'password'='***'" + ")");
- 将数据源中数据写入test表中:
tableEnvironment.executeSql("insert into dws_test select guid as id,eventId as name from kafka_event_log")
- 从test表中查询数据:
tableEnvironment.executeSql("select name from dws_test where id = 100 limit 10").print();
- 创建flink table:
Flink SQL配置参数
Flink SQL中设置的PRIMARY KEY将自动映射到dws-client中的uniqueKeys。参数跟随client版本发布,参数功能与client一致,以下参数说明表示为最新参数。
参数 |
说明 |
默认值 |
---|---|---|
connector |
flink框架区分connector参数,固定为dws。 |
- |
url |
数据库连接地址。 |
- |
username |
配置连接用户。 |
- |
password |
配置密码。 |
- |
tableName |
对应dws表。 |
- |
参数 |
说明 |
默认值 |
---|---|---|
connectionSize |
初始dwsClient时的并发数量。 |
1 |
connectionMaxUseTimeSeconds |
连接创建多少秒后强制释放(单位秒)。 |
3600(一小时) |
connectionMaxIdleMs |
连接最大空闲时间,超过后将释放,(单位毫秒)。 |
60000 (一分钟) |
参数 |
说明 |
默认值 |
---|---|---|
conflictStrategy |
有主键表数据写入时主键冲突策略:
|
update |
writeMode |
入库方式:
|
auto |
maxFlushRetryTimes |
在入库时最大尝试次数,次数内执行成功则不抛出异常,每次重试间隔为 1秒 * 次数。 |
3 |
autoFlushBatchSize |
自动刷库的批大小(攒批大小)。 |
5000 |
autoFlushMaxInterval |
自动刷库的最大间隔时间(攒批时长)。 |
5s |
copyWriteBatchSize |
在writeMode == auto下,使用copy的批大小。 |
5000 |
ignoreDelete |
忽略flink任务中的delete。 |
false (1.0.10前默认true) |
ignoreNullWhenUpdate |
是否忽略flink中字段值为null的更新, 只有在conflictStrategy == update时有效。 |
false |
metadataCacheSeconds |
系统中对元数据的最大缓存时间,例如表定义信息(单位秒)。 |
180 |
copyMode |
copy入库格式:
|
CSV |
createTempTableMode |
创建临时表方式: AS、LIKE |
AS |
numberAsEpochMsForDatetime |
如果数据库为时间类型数据源为数字类型是否将数据当成时间戳转换为对应时间类型。 |
false |
stringToDatetimeFormat |
如果数据库为时间类型数据源为字符串类型,按该格式转换为时间类型,该参数配置即开启。 |
null |
sink.parallelism |
flink系统参数用于设置sink并发数量。 |
跟随上游算子 |
printDataPk |
是否在connector接收到数据时打印数据主键,用于排查问题。 |
false |
ignoreUpdateBefore |
忽略flink任务中的update_before,在大表局部更新时该参数一定打开,否则有update时会导致数据的其它列被设置为null,因为会先删除再写入数据。 |
true |
参数 |
是否必填 |
说明 |
默认值 |
---|---|---|---|
fetchSize |
否 |
jdbc statement中fetchSize参数,用于控制查询数据库返回条数。 |
1000 |