Stream API Job Type
When developing Flink jobs using APIs, the DWS-connector offers the DwsGenericSinkFunction class, which handles GaussDB(DWS) client initialization and Flink checkpoint APIs. Additionally, the DwsInvokeFunction interface manages the service logic. For details, see Stream API Job Type.
API Description
public static <T> SinkFunction<T> sink(DwsConfig config, Map<String,Object> context, DwsInvokeFunction<T> invoke);
- The config parameter in GaussDB(DWS) client functions in the same way as the client itself.
- context is a global context provided for operations such as cache. It can be specified during GaussDB(DWS) client construction, and is called back each time with the data processing interface.
- invoke is a function interface used to process data.
/** * Execute data processing callback when Flink executes invoke. * @param value indicates the current received data. * @param client indicates the GaussDB(DWS) client constructed based on the configuration. * @param context indicates a global object specified during construction. This parameter is attached to each callback and can be used as a global cache. * @throws DwsClientException */ void accept(T value, DwsClient client, JSONObject context) throws DwsClientException;
Examples
SinkFunction<EventLog> dwsSink = DwsSink.sink(DwsConfig.builder() .withUrl("") .withPassword("") .withUsername("").build(), null, (DwsInvokeFunction<EventLog>) (eventLog, dwsClient, context) -> { dwsClient.write("test.test") .setObject("id", eventLog.getGuid()) .setObject("name", eventLog.getEventId(), true) .commit(); }); DataStreamSource<EventLog> source = evn.addSource(new LogSourceFunction()); source.addSink(dwsSink); evn.execute();
Using Connected DNs for Data Import

When using this function, ensure that the parallelism level at the source end does not exceed that at the sink end. This is because the GaussDB(DWS) client instance is utilized to fetch table metadata, ensuring each upstream concurrency can access the client instance.
public static void main(String[] args) throws Exception { StreamExecutionEnvironment evn = StreamExecutionEnvironment.getExecutionEnvironment(); KafkaSource<String> source = KafkaSource.<String>builder() .setBootstrapServers("") .setGroupId("") .setTopicPattern(Pattern.compile("")) .setDeserializer(new KafkaRecordDeserializationSchema<String>() { @Override public TypeInformation<String> getProducedType() { return TypeInformation.of(String.class); } @Override public void deserialize(ConsumerRecord<byte[], byte[]> consumerRecord, Collector<String> collector) throws IOException { collector.collect(new String(consumerRecord.value())); } }) .build(); DataStreamSource<String> dataStreamSource = evn.fromSource(source, WatermarkStrategy.noWatermarks(), "Source"); DnPartitioner<String> partitioner = new DnPartitioner<>((IKeySelector<String>) (data, column, columnIndex) -> { // The system automatically retrieves the distribution column value, requiring the value of the current column to be returned, upstream data (data), information about the current column (column), and index of the current column in the database (columnIndex). return JSON.parseObject(data).getString(column.getName()); }, (ITableNameSelector<String>) data -> { // Return the corresponding GaussDB(DWS) table name for the current data, with a fixed value if it is a single table. return JSON.parseObject(data).getString("table"); }); dataStreamSource.partitionCustom(partitioner, partitioner).addSink(DwsSink.sink(DwsConfig.of(), null, (DwsInvokeFunction<String>) (value, client, context) -> { JSONObject jsonData = JSON.parseObject(value); Operate write = client.write(jsonData.getString("table")); for (Map.Entry<String, Object> entry : jsonData.entrySet()) { write.setObject(entry.getKey(), entry.getValue()); } write.commit(); })); evn.execute(); }
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot