更新时间:2025-08-29 GMT+08:00
Stream API作业类型
当Flink作业是通过API方式开发的,此时DWS-connector提供了一个DwsGenericSinkFunction类(该类实现了dws-client初始和flink checkpoint相关接口的实现),并且由DwsInvokeFunction接口实现业务逻辑的处理。详情请参见Stream API作业类型。
接口说明
public static <T> SinkFunction<T> sink(DwsConfig config, Map<String,Object> context, DwsInvokeFunction<T> invoke);
- config为dws-client的参数,跟使用dws-client一致。
- context是为了便于在业务中使用缓存之类的操作提供的一个全局上下文,可在构建时指定,后续每次回调处理数据的接口便会附带此对象。
- invoke为一个函数接口,用于执行数据的处理:
/** * flink执行invoke时,执行数据处理回调 * @param value 当前接收到的数据 * @param client 根据配置构造的dwsClient * @param context 用户可以在构造时指定一个全局对象,每次回调时均会附带此参数,可作为全局缓存使用 * @throws DwsClientException */ void accept(T value, DwsClient client, JSONObject context) throws DwsClientException;
示例
通过DwsSink接口快速构建。示例如下:
SinkFunction<EventLog> dwsSink = DwsSink.sink(DwsConfig.builder() .withUrl("") .withPassword("") .withUsername("").build(), null, (DwsInvokeFunction<EventLog>) (eventLog, dwsClient, context) -> { dwsClient.write("test.test") .setObject("id", eventLog.getGuid()) .setObject("name", eventLog.getEventId(), true) .commit(); }); DataStreamSource<EventLog> source = evn.addSource(new LogSourceFunction()); source.addSink(dwsSink); evn.execute();
使用直连DN入库
为提升整体入库性能,connector提供一个自定义Partitioner用于将数据按照DN分发给下游,满足下游算子数据集中在同一个DN上,以配合client的dn入库提升整体性能,api为com.huaweicloud.dws.connectors.flink.partition.DnPartitioner,使用示例如下:

使用功能时必须满足源端并行度不高于sink端并行度,因为内部会通过dws client实例获取表的元数据信息,以确保每个上游并发均能获取client实例。
public static void main(String[] args) throws Exception { StreamExecutionEnvironment evn = StreamExecutionEnvironment.getExecutionEnvironment(); KafkaSource<String> source = KafkaSource.<String>builder() .setBootstrapServers("") .setGroupId("") .setTopicPattern(Pattern.compile("")) .setDeserializer(new KafkaRecordDeserializationSchema<String>() { @Override public TypeInformation<String> getProducedType() { return TypeInformation.of(String.class); } @Override public void deserialize(ConsumerRecord<byte[], byte[]> consumerRecord, Collector<String> collector) throws IOException { collector.collect(new String(consumerRecord.value())); } }) .build(); DataStreamSource<String> dataStreamSource = evn.fromSource(source, WatermarkStrategy.noWatermarks(), "Source"); DnPartitioner<String> partitioner = new DnPartitioner<>((IKeySelector<String>) (data, column, columnIndex) -> { // 系统自动获取分布列的值,需要返回当前字段的值 data 上游数据 column 当前字段的列信息 columnIndex当前字段在数据库的下标 return JSON.parseObject(data).getString(column.getName()); }, (ITableNameSelector<String>) data -> { // 根据当前数据返回该数据对应的dws表名称,如果是单表直接硬编码即可 return JSON.parseObject(data).getString("table"); }); dataStreamSource.partitionCustom(partitioner, partitioner).addSink(DwsSink.sink(DwsConfig.of(), null, (DwsInvokeFunction<String>) (value, client, context) -> { JSONObject jsonData = JSON.parseObject(value); Operate write = client.write(jsonData.getString("table")); for (Map.Entry<String, Object> entry : jsonData.entrySet()) { write.setObject(entry.getKey(), entry.getValue()); } write.commit(); })); evn.execute(); }
父主题: dws-connector-flink