Updated on 2025-04-30 GMT+08:00

Stream API Job Type

When developing Flink jobs using APIs, the DWS-connector offers the DwsGenericSinkFunction class, which handles GaussDB(DWS) client initialization and Flink checkpoint APIs. Additionally, the DwsInvokeFunction interface manages the service logic. For details, see Stream API Job Type.

API Description

public static <T> SinkFunction<T> sink(DwsConfig config, Map<String,Object> context, DwsInvokeFunction<T> invoke);
  • The config parameter in GaussDB(DWS) client functions in the same way as the client itself.
  • context is a global context provided for operations such as cache. It can be specified during GaussDB(DWS) client construction, and is called back each time with the data processing interface.
  • invoke is a function interface used to process data.
        /**
         *  Execute data processing callback when Flink executes invoke.
         * @param value indicates the current received data.
         * @param client indicates the GaussDB(DWS) client constructed based on the configuration.
         * @param context indicates a global object specified during construction. This parameter is attached to each callback and can be used as a global cache.
         * @throws DwsClientException
         */
        void accept(T value, DwsClient client, JSONObject context) throws DwsClientException;

Examples

The DwsSink interface is used for quick construction. The following is an example:
SinkFunction<EventLog> dwsSink = DwsSink.sink(DwsConfig.builder()
                .withUrl("")
                .withPassword("")
                .withUsername("").build(), null, (DwsInvokeFunction<EventLog>) (eventLog, dwsClient, context) -> {
            dwsClient.write("test.test")
                    .setObject("id", eventLog.getGuid())
                    .setObject("name", eventLog.getEventId(), true)
                    .commit();
        });
        DataStreamSource<EventLog> source = evn.addSource(new LogSourceFunction());
        source.addSink(dwsSink);
        evn.execute();

Using Connected DNs for Data Import

The connector offers a customized partitioner to enhance data import performance by distributing data to downstream operators based on DN, ensuring data centralization on the same DN. The API is com.huaweicloud.dws.connectors.flink.partition.DnPartitioner. Here's an example:

When using this function, ensure that the parallelism level at the source end does not exceed that at the sink end. This is because the GaussDB(DWS) client instance is utilized to fetch table metadata, ensuring each upstream concurrency can access the client instance.

public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment evn = StreamExecutionEnvironment.getExecutionEnvironment();
    KafkaSource<String> source = KafkaSource.<String>builder()
        .setBootstrapServers("")
        .setGroupId("")
        .setTopicPattern(Pattern.compile(""))
        .setDeserializer(new KafkaRecordDeserializationSchema<String>() {
            @Override
            public TypeInformation<String> getProducedType() {
                return TypeInformation.of(String.class);
            }
            @Override
            public void deserialize(ConsumerRecord<byte[], byte[]> consumerRecord, Collector<String> collector)
                throws IOException {
                collector.collect(new String(consumerRecord.value()));
            }
        })
        .build();
    DataStreamSource<String> dataStreamSource = evn.fromSource(source, WatermarkStrategy.noWatermarks(), "Source");
    DnPartitioner<String> partitioner = new DnPartitioner<>((IKeySelector<String>) (data, column, columnIndex) -> {
        // The system automatically retrieves the distribution column value, requiring the value of the current column to be returned, upstream data (data), information about the current column (column), and index of the current column in the database (columnIndex).
        return JSON.parseObject(data).getString(column.getName());
    }, (ITableNameSelector<String>) data -> {
        // Return the corresponding GaussDB(DWS) table name for the current data, with a fixed value if it is a single table.
        return JSON.parseObject(data).getString("table");
    });
    dataStreamSource.partitionCustom(partitioner, partitioner).addSink(DwsSink.sink(DwsConfig.of(), null,
        (DwsInvokeFunction<String>) (value, client, context) -> {
            JSONObject jsonData = JSON.parseObject(value);
            Operate write = client.write(jsonData.getString("table"));
            for (Map.Entry<String, Object> entry : jsonData.entrySet()) {
                write.setObject(entry.getKey(), entry.getValue());
            }
            write.commit();
        }));
    evn.execute();
}