Updated on 2024-07-19 GMT+08:00

Stream API Job Type

When developing Flink jobs using APIs, the DWS-connector offers the DwsGenericSinkFunction class, which handles GaussDB(DWS) client initialization and Flink checkpoint APIs. Additionally, the DwsInvokeFunction interface manages the service logic. For details, see Stream API Job Type.

API Description

public static <T> SinkFunction<T> sink(DwsConfig config, JSONObject context, DwsInvokeFunction<T> invoke);
  • The config parameter in GaussDB(DWS) client functions in the same way as the client itself.
  • context is a global context provided for operations such as cache. It can be specified during GaussDB(DWS) client construction, and is called back each time with the data processing interface.
  • invoke is a function interface used to process data.
        /**
         *  Execute data processing callback when Flink executes invoke.
         * @param value indicates the current received data.
         * @param client indicates the GaussDB(DWS) client constructed based on the configuration.
         * @param context indicates a global object specified during construction. This parameter is attached to each callback and can be used as a global cache.
         * @throws DwsClientException
         */
        void accept(T value, DwsClient client, JSONObject context) throws DwsClientException;

Examples

The DwsSink interface is used for quick construction. The following is an example:
SinkFunction<EventLog> dwsSink = DwsSink.sink(DwsConfig.builder()
                .withUrl("")
                .withPassword("")
                .withUsername("").build(), null, (DwsInvokeFunction<EventLog>) (eventLog, dwsClient, context) -> {
            dwsClient.write("test.test")
                    .setObject("id", eventLog.getGuid())
                    .setObject("name", eventLog.getEventId(), true)
                    .commit();
        });
        DataStreamSource<EventLog> source = evn.addSource(new LogSourceFunction());
        source.addSink(dwsSink);
        evn.execute();