Stream API Job Type
When developing Flink jobs using APIs, the DWS-connector offers the DwsGenericSinkFunction class, which handles GaussDB(DWS) client initialization and Flink checkpoint APIs. Additionally, the DwsInvokeFunction interface manages the service logic. For details, see Stream API Job Type.
API Description
public static <T> SinkFunction<T> sink(DwsConfig config, JSONObject context, DwsInvokeFunction<T> invoke);
- The config parameter in GaussDB(DWS) client functions in the same way as the client itself.
- context is a global context provided for operations such as cache. It can be specified during GaussDB(DWS) client construction, and is called back each time with the data processing interface.
- invoke is a function interface used to process data.
/** * Execute data processing callback when Flink executes invoke. * @param value indicates the current received data. * @param client indicates the GaussDB(DWS) client constructed based on the configuration. * @param context indicates a global object specified during construction. This parameter is attached to each callback and can be used as a global cache. * @throws DwsClientException */ void accept(T value, DwsClient client, JSONObject context) throws DwsClientException;
Examples
SinkFunction<EventLog> dwsSink = DwsSink.sink(DwsConfig.builder() .withUrl("") .withPassword("") .withUsername("").build(), null, (DwsInvokeFunction<EventLog>) (eventLog, dwsClient, context) -> { dwsClient.write("test.test") .setObject("id", eventLog.getGuid()) .setObject("name", eventLog.getEventId(), true) .commit(); }); DataStreamSource<EventLog> source = evn.addSource(new LogSourceFunction()); source.addSink(dwsSink); evn.execute();
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot