更新时间:2024-06-29 GMT+08:00

Stream API作业类型

当Flink作业是通过API方式开发的,此时DWS-connector提供了一个DwsGenericSinkFunction类(该类实现了dws-client初始和flink checkpoint相关接口的实现),并且由DwsInvokeFunction接口实现业务逻辑的处理。详情请参见Stream API作业类型

接口说明

public static <T> SinkFunction<T> sink(DwsConfig config, JSONObject context, DwsInvokeFunction<T> invoke);
  • config为dws-client的参数,跟使用dws-client一致。
  • context是为了便于在业务中使用缓存之类的操作提供的一个全局上下文,可在构建时指定,后续每次回调处理数据的接口便会附带此对象。
  • invoke为一个函数接口,用于执行数据的处理:
        /**
         *  flink执行invoke时,执行数据处理回调
         * @param value 当前接收到的数据
         * @param client 根据配置构造的dwsClient
         * @param context  用户可以在构造时指定一个全局对象,每次回调时均会附带此参数,可作为全局缓存使用
         * @throws DwsClientException
         */
        void accept(T value, DwsClient client, JSONObject context) throws DwsClientException;

示例

通过DwsSink接口快速构建。示例如下:
SinkFunction<EventLog> dwsSink = DwsSink.sink(DwsConfig.builder()
                .withUrl("")
                .withPassword("")
                .withUsername("").build(), null, (DwsInvokeFunction<EventLog>) (eventLog, dwsClient, context) -> {
            dwsClient.write("test.test")
                    .setObject("id", eventLog.getGuid())
                    .setObject("name", eventLog.getEventId(), true)
                    .commit();
        });
        DataStreamSource<EventLog> source = evn.addSource(new LogSourceFunction());
        source.addSink(dwsSink);
        evn.execute();