更新时间:2024-06-29 GMT+08:00
Stream API作业类型
当Flink作业是通过API方式开发的,此时DWS-connector提供了一个DwsGenericSinkFunction类(该类实现了dws-client初始和flink checkpoint相关接口的实现),并且由DwsInvokeFunction接口实现业务逻辑的处理。详情请参见Stream API作业类型。
接口说明
public static <T> SinkFunction<T> sink(DwsConfig config, JSONObject context, DwsInvokeFunction<T> invoke);
- config为dws-client的参数,跟使用dws-client一致。
- context是为了便于在业务中使用缓存之类的操作提供的一个全局上下文,可在构建时指定,后续每次回调处理数据的接口便会附带此对象。
- invoke为一个函数接口,用于执行数据的处理:
/** * flink执行invoke时,执行数据处理回调 * @param value 当前接收到的数据 * @param client 根据配置构造的dwsClient * @param context 用户可以在构造时指定一个全局对象,每次回调时均会附带此参数,可作为全局缓存使用 * @throws DwsClientException */ void accept(T value, DwsClient client, JSONObject context) throws DwsClientException;
示例
通过DwsSink接口快速构建。示例如下:
SinkFunction<EventLog> dwsSink = DwsSink.sink(DwsConfig.builder() .withUrl("") .withPassword("") .withUsername("").build(), null, (DwsInvokeFunction<EventLog>) (eventLog, dwsClient, context) -> { dwsClient.write("test.test") .setObject("id", eventLog.getGuid()) .setObject("name", eventLog.getEventId(), true) .commit(); }); DataStreamSource<EventLog> source = evn.addSource(new LogSourceFunction()); source.addSink(dwsSink); evn.execute();
父主题: dws-connector-flink