Updated on 2023-04-03 GMT+08:00

dws-connector-flink

Description

dws-connector-flink is a tool used to connect dwsclient to flink. The tool encapsulates dwsClient. Its overall import capability is the same as that of dwsClient. Currently, only the DynamicTableSourceFactory and DynamicTableSinkFactory interfaces are implemented. The CatalogFactory interface is not implemented. Therefore, catalogs are not supported.

How to Use

Scenarios & Usage

  • Sink Mode
    A DwsSink API is provided to quickly build a sink. An example is as follows:
    SinkFunction<EventLog> dwsSink = DwsSink.sink(DwsConfig.builder()
                    .withUrl("")
                    .withPassword("")
                    .withUsername("").build(), null, (DwsInvokeFunction<EventLog>) (eventLog, dwsClient, context) -> {
                dwsClient.upsert("test.test")
                        .setObject("id", eventLog.getGuid())
                        .setObject("name", eventLog.getEventId(), true)
                        .commit();
            });
            DataStreamSource<EventLog> source = evn.addSource(new LogSourceFunction());
            source.addSink(dwsSink);
            evn.execute();
    API description:
    public static <T> SinkFunction<T> sink(DwsConfig config, JSONObject context, DwsInvokeFunction<T> invoke);
    • config is a parameter of dwsClient, which is the same as that of dwsClient.
    • context is a global context provided for operations such as cache. It can be specified during dwsClient construction, and is called back each time with the data processing interface.
    • invoke is a function interface used to process data.
          /**
      * Execute data processing callback when Flink executes invoke.
      * @param value indicates the current received data.
      * @param client indicates the dwsClient constructed based on the configuration.
      * @param context indicates a global object specified during construction. This parameter is attached to each callback and can be used as a global cache.
           * @throws DwsClientException
           */
          void accept(T value, DwsClient client, JSONObject context) throws DwsClientException;
  • SQL Mode
    1. Create a Flink table:
      tableEnvironment.executeSql("create table dws_test (" +
          "   id int,\n" +
          "   name STRING,\n" +
          "   PRIMARY KEY (id) NOT ENFORCED" +
          ") WITH (\n" +
          "   'connector' = 'dws',\n" +
          "   'url' = 'jdbc:postgresql://****/gaussdb',\n" +
          "   'tablename' = 'test.test'," +
          "   'username'='dbadmin'," +
          "   'password'='***'" +
           ")");
    2. Write data in the data source to the test table.
      tableEnvironment.executeSql("insert into dws_test select guid as id,eventId as name from kafka_event_log")
    3. Query data from the test table.
      tableEnvironment.executeSql("select name from dws_test where id = 100 limit 10").print();

flinkSQL Configuration Parameters

Table 1 Database Configuration

Parameter Description

Description

Default Value

connector

The Flink framework differentiates connector parameters. This parameter is fixed to dws.

-

url

Database connection address

-

username

Configured connection user

-

password

Configured password

-

tableName

The GaussDB(DWS) table

-

Table 2 Connection configuration

Parameter

Description

Default Value

connectionSize

Number of concurrent requests at dwsClient initialization

1

connectionMaxUseTimeSeconds

Number of seconds after which a connection is forcibly released. The unit is second.

3600 (one hour)

connectionMaxIdleMs

Maximum idle time of a connection, in milliseconds. If the idle time of a connection exceeds the value, the connection is released.

60,000 (one minute)

Table 3 Writing parameters

Parameter

Description

Default Value

conflictStrategy

Primary key conflict policy when data is written to a table with a primary key. The options are as follows:

ignore: Retain the original data and ignore the updated data.

update: Use the non-primary key column in the new data to update the corresponding column in the original data.

replace: Replace the original data with the new data.

The UPDATE and REPLACE operations are equivalent when all columns are upserted. When some columns are upserted, the REPLACE operation sets the columns that are not contained in the original data to null.

update

writeMode

Import modes:

auto: The system automatically selects a mode.

copy_merge: If there is a primary key, data is imported to a temporary table using the COPY method and then merged from the temporary table to the target table. If no primary key exists, data is directly imported to the target table using the COPY method.

copy_upsert: If there is a primary key, data is imported to a temporary table using the COPY method, then imported to the target table using the UPSERT method. If no primary key exists, data is directly copied to the target table.

upsert: If there is a primary key, use UPSERT SQL to import data to the database. If there is no primary key, use INSERT INTO to import data to the database.

auto

maxFlushRetryTimes

Maximum number of attempts to import data to the database. If the execution is successful with attempts less than this value, no exception is thrown. The retry interval is 1 second multiplied by the number of attempts.

3

autoFlushBatchSize

Batch size for automatic database update (batch size)

5000

autoFlushMaxInterval

Maximum interval for automatic database update (duration for forming a batch), in seconds.

5

copyWriteBatchSize

When writeMode is set to auto, the batch size in the COPY method is used.

1000

ignoreDelete

Flink tasks generate deletion operations. This parameter specifies if to ignore the deletion operations.

false

ignoreNullWhenUpdate

Indicates if to ignore the update of columns with null values in Flink. This parameter is valid only when conflictStrategy is set to update.

false

metadataCacheSeconds

Maximum cache duration of metadata in the system, for example, table definitions (unit: second).

180

Table 4 Query parameters

Parameter

Mandatory

Description

Default Value

fetchSize

No

The fetchSize parameter in the JDBC statement is used to control the number of records returned by the database.

1000