更新时间:2025-08-29 GMT+08:00

Stream API作业类型

当Flink作业是通过API方式开发的,此时DWS-connector提供了一个DwsGenericSinkFunction类(该类实现了dws-client初始和flink checkpoint相关接口的实现),并且由DwsInvokeFunction接口实现业务逻辑的处理。详情请参见Stream API作业类型

接口说明

public static <T> SinkFunction<T> sink(DwsConfig config, Map<String,Object> context, DwsInvokeFunction<T> invoke);
  • config为dws-client的参数,跟使用dws-client一致。
  • context是为了便于在业务中使用缓存之类的操作提供的一个全局上下文,可在构建时指定,后续每次回调处理数据的接口便会附带此对象。
  • invoke为一个函数接口,用于执行数据的处理:
        /**
         *  flink执行invoke时,执行数据处理回调
         * @param value 当前接收到的数据
         * @param client 根据配置构造的dwsClient
         * @param context  用户可以在构造时指定一个全局对象,每次回调时均会附带此参数,可作为全局缓存使用
         * @throws DwsClientException
         */
        void accept(T value, DwsClient client, JSONObject context) throws DwsClientException;

示例

通过DwsSink接口快速构建。示例如下:
SinkFunction<EventLog> dwsSink = DwsSink.sink(DwsConfig.builder()
                .withUrl("")
                .withPassword("")
                .withUsername("").build(), null, (DwsInvokeFunction<EventLog>) (eventLog, dwsClient, context) -> {
            dwsClient.write("test.test")
                    .setObject("id", eventLog.getGuid())
                    .setObject("name", eventLog.getEventId(), true)
                    .commit();
        });
        DataStreamSource<EventLog> source = evn.addSource(new LogSourceFunction());
        source.addSink(dwsSink);
        evn.execute();

使用直连DN入库

为提升整体入库性能,connector提供一个自定义Partitioner用于将数据按照DN分发给下游,满足下游算子数据集中在同一个DN上,以配合client的dn入库提升整体性能,api为com.huaweicloud.dws.connectors.flink.partition.DnPartitioner,使用示例如下:

使用功能时必须满足源端并行度不高于sink端并行度,因为内部会通过dws client实例获取表的元数据信息,以确保每个上游并发均能获取client实例。

public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment evn = StreamExecutionEnvironment.getExecutionEnvironment();
    KafkaSource<String> source = KafkaSource.<String>builder()
        .setBootstrapServers("")
        .setGroupId("")
        .setTopicPattern(Pattern.compile(""))
        .setDeserializer(new KafkaRecordDeserializationSchema<String>() {
            @Override
            public TypeInformation<String> getProducedType() {
                return TypeInformation.of(String.class);
            }
            @Override
            public void deserialize(ConsumerRecord<byte[], byte[]> consumerRecord, Collector<String> collector)
                throws IOException {
                collector.collect(new String(consumerRecord.value()));
            }
        })
        .build();
    DataStreamSource<String> dataStreamSource = evn.fromSource(source, WatermarkStrategy.noWatermarks(), "Source");
    DnPartitioner<String> partitioner = new DnPartitioner<>((IKeySelector<String>) (data, column, columnIndex) -> {
        // 系统自动获取分布列的值,需要返回当前字段的值 data 上游数据  column 当前字段的列信息 columnIndex当前字段在数据库的下标
        return JSON.parseObject(data).getString(column.getName());
    }, (ITableNameSelector<String>) data -> {
        // 根据当前数据返回该数据对应的dws表名称,如果是单表直接硬编码即可
        return JSON.parseObject(data).getString("table");
    });
    dataStreamSource.partitionCustom(partitioner, partitioner).addSink(DwsSink.sink(DwsConfig.of(), null,
        (DwsInvokeFunction<String>) (value, client, context) -> {
            JSONObject jsonData = JSON.parseObject(value);
            Operate write = client.write(jsonData.getString("table"));
            for (Map.Entry<String, Object> entry : jsonData.entrySet()) {
                write.setObject(entry.getKey(), entry.getValue());
            }
            write.commit();
        }));
    evn.execute();
}