更新时间:2024-05-16 GMT+08:00
分享

dws-connector-flink

简介

dws-connector-flink是在dws-client的基础上对接flink的一个工具,工具为对dwsClient的包装,整体入库能力跟dwsClient一致。目前内部只实现了DynamicTableSourceFactory、DynamicTableSinkFactory两个接口,并未实现CatalogFactory,所以不支持使用Catalog的场景。

dws-flink-connector的dws connector只支持单并发查询存量数据,暂不支持并行读取。

引入依赖

  • jar包方式引入
    dws-connector-flink已发布至maven仓库,可选择最新版本使用。链接请参见:https://mvnrepository.com/artifact/com.huaweicloud.dws
    <dependency>
        <groupId>com.huaweicloud.dws</groupId>
        <artifactId>dws-connector-flink_${scala.version}_${flink.version}</artifactId>
        <version>1.0</version>
    </dependency>
  • Flink SQL方式引入

    在使用Flink SQL时需要将dws-connector-flink包及其依赖放入flink类加载目录,从1.0.3版本开始已经将带有依赖的包发布至maven仓库,使用时可直接在仓库下载使用:

    1. 选择匹配flink环境的包。

    2. 进入软件包详情页面。

    3. 选择最新版本。

    4. 选择查看所有文件。
      • 其中结尾为“jar-with-dependencies ”或 “all”的文件名为带依赖的包,用户均可使用。
      • 如果无对应flink版本,可用当前版本相近的版本尝试。
    5. 单击对应文件即可下载。

场景使用

  • sink 模式
    为便捷使用,提供了一个DwsSink接口快速构建一个sink。示例如下:
    SinkFunction<EventLog> dwsSink = DwsSink.sink(DwsConfig.builder()
                    .withUrl("")
                    .withPassword("")
                    .withUsername("").build(), null, (DwsInvokeFunction<EventLog>) (eventLog, dwsClient, context) -> {
                dwsClient.write("test.test")
                        .setObject("id", eventLog.getGuid())
                        .setObject("name", eventLog.getEventId(), true)
                        .commit();
            });
            DataStreamSource<EventLog> source = evn.addSource(new LogSourceFunction());
            source.addSink(dwsSink);
            evn.execute();
    接口说明:
    public static <T> SinkFunction<T> sink(DwsConfig config, JSONObject context, DwsInvokeFunction<T> invoke);
    • config为dwsClient的参数,跟使用dwsClient一致。
    • context是为了便于在业务中使用缓存之类的操作提供的一个全局上下文,可在构建时指定,后续每次回调处理数据的接口便会附带此对象。
    • invoke 为一个函数接口,用于执行数据的处理:
          /**
           *  flink执行invoke时,执行数据处理回调
           * @param value 当前接收到的数据
           * @param client 根据配置构造的dwsClient
           * @param context  用户可以在构造时指定一个全局对象,每次回调时均会附带此参数,可作为全局缓存使用
           * @throws DwsClientException
           */
          void accept(T value, DwsClient client, JSONObject context) throws DwsClientException;
  • SQL 模式
    1. 创建flink table:
      tableEnvironment.executeSql("create table dws_test (" +
          "   id int,\n" +
          "   name STRING,\n" +
          "   PRIMARY KEY (id) NOT ENFORCED" +
          ") WITH (\n" +
          "   'connector' = 'dws',\n" +
          "   'url' = 'jdbc:postgresql://****/gaussdb',\n" +
          "   'tablename' = 'test.test'," +
          "   'username'='dbadmin'," +
          "   'password'='***'" +
           ")");
    2. 将数据源中数据写入test表中:
      tableEnvironment.executeSql("insert into dws_test select guid as id,eventId as name from kafka_event_log")
    3. 从test表中查询数据:
      tableEnvironment.executeSql("select name from dws_test where id = 100 limit 10").print();

Flink SQL配置参数

Flink SQL中设置的PRIMARY KEY将自动映射到dws-client中的uniqueKeys。参数跟随client版本发布,参数功能与client一致,以下参数说明表示为最新参数。

表1 数据库配置

参数

说明

默认值

connector

flink框架区分connector参数,固定为dws。

-

url

数据库连接地址。

-

username

配置连接用户。

-

password

配置密码。

-

tableName

对应dws表。

-

表2 连接配置

参数

说明

默认值

connectionSize

初始dwsClient时的并发数量。

1

connectionMaxUseTimeSeconds

连接创建多少秒后强制释放(单位秒)。

3600(一小时)

connectionMaxIdleMs

连接最大空闲时间,超过后将释放,(单位毫秒)。

60000 (一分钟)

表3 写入参数

参数

说明

默认值

conflictStrategy

有主键表数据写入时主键冲突策略:

  • ignore:保持原数据,忽略更新数据。
  • update:用新数据中非主键列更新原数据中对应列。
  • replace:用新数据替换原数据。
    说明:

    update和replace在全字段upsert时等效,在部分字段upsert时,replace相当于将数据中不包含的列设置为nul。

update

writeMode

入库方式:

  • auto:系统自动选择。
  • copy_merge:当存在主键时使用copy方式入临时表,从临时表merge至目标表;无主键时直接copy至目标表。
  • copy_upsert:当存在主键时使用copy方式入临时表,从临时表upsert至目标表;无主键时直接copy至目标表。
  • upsert: 有主键用upsert sql入库;无主键用insert into 入库。
  • UPDATE:使用update where 语法更新数据,若原表无主键可选择指定uniqueKeys,指定字段不要求必须时唯一索引,但非唯一索引可能会影响性能。
  • COPY_UPDATE:数据先通过copy方式入库到临时表,通过临时表加速使用update from where方式更新目标数据。
  • UPDATE_AUTO:批量小于copyWriteBatchSize使用UPDATE,否则使用COPY_UPDATE。

auto

maxFlushRetryTimes

在入库时最大尝试次数,次数内执行成功则不抛出异常,每次重试间隔为 1秒 * 次数。

3

autoFlushBatchSize

自动刷库的批大小(攒批大小)。

5000

autoFlushMaxInterval

自动刷库的最大间隔时间(攒批时长)。

5s

copyWriteBatchSize

在writeMode == auto下,使用copy的批大小。

5000

ignoreDelete

忽略flink任务中的delete。

false (1.0.10前默认true)

ignoreNullWhenUpdate

是否忽略flink中字段值为null的更新, 只有在conflictStrategy == update时有效。

false

metadataCacheSeconds

系统中对元数据的最大缓存时间,例如表定义信息(单位秒)。

180

copyMode

copy入库格式:

  • CSV:将数据拼接为CSV格式入库,该方式稳定,但性能略低。
  • DELIMITER:用分隔符将数据拼接,然后入库,该方式需要数据中不包含分隔符。

CSV

createTempTableMode

创建临时表方式:

AS、LIKE

AS

numberAsEpochMsForDatetime

如果数据库为时间类型数据源为数字类型是否将数据当成时间戳转换为对应时间类型。

false

stringToDatetimeFormat

如果数据库为时间类型数据源为字符串类型,按该格式转换为时间类型,该参数配置即开启。

null

sink.parallelism

flink系统参数用于设置sink并发数量。

跟随上游算子

printDataPk

是否在connector接收到数据时打印数据主键,用于排查问题。

false

ignoreUpdateBefore

忽略flink任务中的update_before,在大表局部更新时该参数一定打开,否则有update时会导致数据的其它列被设置为null,因为会先删除再写入数据。

true

表4 查询参数

参数

是否必填

说明

默认值

fetchSize

jdbc statement中fetchSize参数,用于控制查询数据库返回条数。

1000

分享:

    相关文档

    相关产品