Updated on 2024-07-19 GMT+08:00

Result Table

Syntax

Different Flink environments may have slightly varying SQL syntax formats. Check the event environment format for more details. The parameter names and values that come after with are specific to this document.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
create table dwsSink (
  attr_name attr_type 
  (',' attr_name attr_type)* 
  (','PRIMARY KEY (attr_name, ...) NOT ENFORCED)
)
with (
  'connector' = 'dws',
  'url' = '',
  'tableName' = '',
  'username' = '',
  'password' = ''
);

Flink SQL Configuration Parameters

Primary keys set in Flink SQL are automatically mapped to unique keys in GuassDB(DWS) client. The parameters are released with the client version. The parameter functions are the same as those on the client. The following parameters are the latest parameters.

Table 1 Database configurations

Parameter

Description

Default Value

connector

The Flink framework differentiates connector parameters. This parameter is fixed to dws.

-

url

Database connection address

-

username

Configured connection user

-

password

Configured password

-

tableName

GaussDB(DWS) table

-

Table 2 Connection configurations

Parameter

Description

Default Value

connectionSize

Number of concurrent requests at GaussDB(DWS) client initialization

1

connectionMaxUseTimeSeconds

Number of seconds after which a connection is forcibly released. The unit is second.

3,600 (one hour)

connectionMaxIdleMs

Maximum idle time of a connection, in milliseconds. If the idle time of a connection exceeds the value, the connection is released.

60,000 (one minute)

Table 3 Writing parameters

Parameter

Description

Default Value

conflictStrategy

Primary key conflict policy when data is written to a table with a primary key. The options are as follows:

  • ignore: Retain the original data and ignore the updated data.
  • update: Use the non-primary key column in the new data to update the corresponding column in the original data.
  • replace: Replace the original data with the new data.
    NOTE:

    The UPDATE and REPLACE operations are equivalent when all columns are upserted. When some columns are upserted, the REPLACE operation sets the columns that are not contained in the original data to null.

update

writeMode

Import modes:

  • auto: The system automatically selects a mode.
  • copy_merge: If there is a primary key, data is imported to a temporary table using the COPY method and then merged from the temporary table to the target table. If no primary key exists, data is directly imported to the target table using the COPY method.
  • copy_upsert: If there is a primary key, data is imported to a temporary table using the COPY method, then imported to the target table using the UPSERT method. If no primary key exists, data is directly copied to the target table.
  • upsert: If there is a primary key, use UPSERT SQL to import data to the database. If there is no primary key, use INSERT INTO to import data to the database.
  • UPDATE: Use the UPDATE WHERE syntax to update data. If the original table does not have a primary key, you can specify unique keys. A column specified as a unique key does not need to be a unique index, but a non-unique index may impact performance.
  • COPY_UPDATE: Data is imported to a temporary table by the COPY method. Temporary tables can be used to accelerate the update using UPDATE FROM WHERE.
  • UPDATE_AUTO: If the batch size is less than copyWriteBatchSize, UPDATE is used. Otherwise, COPY_UPDATE is used.

auto

maxFlushRetryTimes

Maximum number of attempts to import data to the database. If the execution is successful with attempts less than this value, no exception is thrown. The retry interval is 1 second multiplied by the number of attempts.

3

autoFlushBatchSize

Batch size for automatic database update (batch size)

5000

autoFlushMaxInterval

Maximum interval for automatic database update (duration for forming a batch).

5s

copyWriteBatchSize

When writeMode is set to auto, the batch size in the COPY method is used.

5000

ignoreDelete

Whether to ignore delete in Flink tasks.

false (The default value is true before 1.0.10.)

ignoreNullWhenUpdate

Whether to ignore the update of columns with null values in Flink. This parameter is valid only when conflictStrategy is set to update.

false

metadataCacheSeconds

Maximum cache duration of metadata in the system, for example, table definitions (unit: second).

180

copyMode

Format for copying data to the database:

  • CSV: Data is concatenated into a CSV file and imported to the database. This mode is stable but has low performance.
  • DELIMITER: Use separators to concatenate data before import it to the database. This mode requires that the data does not contain delimiters.

CSV

createTempTableMode

Temporary table creation methods, which include:

  • AS
  • LIKE

AS

numberAsEpochMsForDatetime

Whether to convert data as a timestamp to the corresponding time type if the database is of the time type and the data source is of the numeric type.

false

stringToDatetimeFormat

Format for converting the data source to the time type if the database is of the time type and the data source is of the string type. If this parameter is set, it is enabled.

null

sink.parallelism

Flink system parameter, which is used to set the number of concurrent sinks.

Follow the upstream operator.

printDataPk

Whether to print the data primary key when the connector receives data. It can be used for troubleshooting.

false

ignoreUpdateBefore

Whether to ignore update_before in Flink tasks. You need to enable this parameter for partial updates on large tables. Otherwise, the update will erase other columns and set them to null, since the data is deleted before being inserted.

true

Examples

Here is an example of how to read data from the Kafka data source and store it in the GaussDB(DWS) result table. Each batch can hold a maximum of 30,000 data records and must be stored within 10 seconds.

  1. Create the public.dws_order table in the GaussDB(DWS) database.
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    create table public.dws_order(
      order_id VARCHAR,
      order_channel VARCHAR,
      order_time VARCHAR,
      pay_amount FLOAT8,
      real_pay FLOAT8,
      pay_time VARCHAR,
      user_id VARCHAR,
      user_name VARCHAR,
      area_id VARCHAR
      );
    
  2. The data in the order_test topic of the Kafka is used as the data source, the public.dws_order is used as the result table, the Kafka data is in JSON format, and the field names correspond to the database field names.
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    CREATE TABLE kafkaSource (
      order_id string,
      order_channel string,
      order_time string,
      pay_amount double,
      real_pay double,
      pay_time string,
      user_id string,
      user_name string,
      area_id string
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'order_test',
      'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort',
      'properties.group.id' = 'GroupId',
      'scan.startup.mode' = 'latest-offset',
      'format' = 'json'
    );
    
    CREATE TABLE dwsSink (
      order_id string,
      order_channel string,
      order_time string,
      pay_amount double,
      real_pay double,
      pay_time string,
      user_id string,
      user_name string,
      area_id string
    ) WITH (
      'connector' = 'dws',
      'url' = 'jdbc:gaussdb://DWSAddress:DWSPort/DWSdbName',
      'tableName' = 'dws_order',
      'username' = 'DWSUserName',
      'password' = 'DWSPassword',
      'autoFlushMaxInterval' = '10s',
      'autoFlushBatchSize' = '30000'
    );
    
    insert into dwsSink select * from kafkaSource;
    
  3. Insert test data to Kafka.
    1
    {"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2021-03-24 10:00:00", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
    
  4. Wait for 10 seconds and query the result in the GaussDB(DWS) table.
    1
     select * from dws_order
    

    The result is shown in the following figure.

FAQs

  • Q: What is the proper value of the writeMode parameter?

    A: There are two types of service scenarios: update and upsert. update only modifies existing data, while upsert updates data if the primary key already exists and adds a new record if it does not. It is recommended to use the auto mode, where the system selects a value based on the data volume. If the data volume is large, increasing the autoFlushBatchSize value can improve the performance of importing data to the database.

  • Q: How to set autoFlushBatchSize and autoFlushMaxInterval properly?
    A: The autoFlushBatchSize parameter sets the maximum number of batches that can be stored, while the autoFlushMaxInterval parameter sets the maximum time interval for storing batches. These two parameters control the number of batches that can be stored in terms of both time and space.
    • When dealing with small data volumes, setting a value for autoFlushMaxInterval can ensure timely updates. But if timeliness is not a top priority, it is better to avoid setting a small value. It is advisable to either stick with the default value or set it to a value greater than or equal to 3 seconds.
    • The autoFlushBatchSize parameter lets you limit the number of data records in a batch. Generally, a higher number of records in a batch results in better data import performance. It is best to set this parameter to a larger value, while considering the service data size and Flink running memory to prevent memory overflow problems.

      For most services, you do not need to set autoFlushMaxInterval. Set autoFlushBatchSize to 50000.

  • Q: What can I do if a database deadlock occurs?

    A: Deadlocks are classified into row deadlocks and distributed deadlocks.

    • Row lock occurs when multiple updates are made to data with the same primary key. To solve this issue, you can perform key by on the data, based on the primary key of the database. This ensures that data with the same primary key is in the same concurrency, eliminating the possibility of concurrent updates and preventing deadlocks. To perform key by using Flink SQL, Flink must support it. Both DLI and MRS can implement key by. For instance, you can set key-by-before-sink to true in DLI Flink to implement key by. To learn how to use the API, refer to the implementation party for more details. If the API is not usable, it is recommended to import it to the database using the API.
    • Distributed deadlock occurs when column-store tables are concurrently updated, and it cannot be resolved at present. To avoid this, it is recommended to use row-store or hstore.