Updated on 2024-12-09 GMT+08:00

dws-client

Description

dws-client is a high-performance and convenient data import tool based on GaussDB(DWS) JDBC. Ensure that JDBC can be connected when using GaussDB(DWS) client. Using dws-client to import data has the following advantages:

  1. dws-client limits the cache space and time and supports batch import to improve the data import performance, meeting the real-time data import requirements during peak and off-peak hours.

    In scenarios that do not have strict real-time requirements, operations on single data records are cached until they form a batch. Then, they will be performed in a batch. This improves the write performance.

  2. dws-client supports concurrent data import.
  3. dws-client supports multiple high-performance import modes and primary key conflict policies to meet import requirements in various scenarios.
  4. dws-client supports API-based interaction, making it easy to use.
Figure 1 dws-client interaction scenario

Dependency

dws-client has been added to the Maven repository. You can select the latest version from the repository. For details, visit https://mvnrepository.com/artifact/com.huaweicloud.dws/dws-client.
1
2
3
4
5
<dependency>
   <groupId>com.huaweicloud.dws</groupId>
   <artifactId>dws-client</artifactId>
   <version>${version}</version>
</dependency>

Scenario & Usage

Prerequisite: The client has been initialized.

The following is a simple example. You only need to configure the database connection. Retain the default values for other parameters.
1
2
3
4
5
6
7
8
9
public DwsClient getClient(){
        DwsConfig config = DwsConfig
                .builder()
                .withUrl("jdbc:gaussdb://***/gaussdb")
                .withUsername("***")
                .withPassword("****")
                .build();
        return new DwsClient(config);
     }
Scenario 1: Table-level parameter configuration
1
2
3
4
5
6
7
return DwsConfig.builder()
                .withUrl(System.getenv("db_url"))
                .withPassword(System.getenv("db_pwd"))
                .withUsername(System.getenv("db_username"))
.withAutoFlushBatchSize(1000) // The default batch size is 1000.
                .withTableConfig("test.t_c_batch_size_2", new TableConfig()
.withAutoFlushBatchSize(500)); //The batch size is 500 for table test.t_c_batch_size_2;

Scenario 2: Using a database connection to execute SQL statements

This API is mainly used for some special services when the currently supported functions cannot meet the requirements. For example, to query data, you can directly use the native JDBC connection to operate the database.

The API parameter is a function-based interface. The interface provides a database connection. The return value can be of any type, which is determined by the return type of the service.
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
public void sql() throws DwsClientException {
        Integer id = getClient().sql(connection -> {
            try (ResultSet resultSet = connection.createStatement().executeQuery("select id from test.user where name = 'zhangsan'")) {
                if (resultSet.next()) {
                    return resultSet.getInt("id");
                }
            }
            return null;
        });
        System.out.println("zhangsan id = " + id);
     }

Scenario 3: Obtaining table information

The API can obtain the table structure (cached) based on a table name affixed with a schema name. The table structure definitions include all columns and primary keys.
1
2
3
public void getTableSchema() throws DwsClientException {
        TableSchema tableSchema = getClient().getTableSchema(TableName.valueOf("test.test"));
     }

Scenario 4: Importing data to a database

The client provides a write API for importing data to the database. The Operate API is used to operate table columns. If the API is submitted, the table operation is complete and the client start importing the data to the database. You can select synchronous or asynchronous when submitting the operation. When setting a field, you can choose whether to ignore the setting when a primary key conflict occurs.
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
public void write() throws DwsClientException {
        getClient().write("test.test")
                .setObject("id", 1)
                .setObject("name", "test")
//This setting takes effect only when data is inserted. If a primary key conflict occurs, the setting is not updated.
                .setObject("age", 38, true)
// Asynchronously save the data to the database. The result is returned after data is stored in the background cache.
                //.commit()
//  The result is returned after data is successfully saved to the database.
                .syncCommit();
     }

Scenario 5: Deleting data

The deletion API and import API are carried by Operate. However, the primary key column must be set during deletion, and the "column update does not take effect" is ignored.
public void delete() throws DwsClientException {
        getClient().delete("test.test")
                .setObject("id", 1)
// Asynchronously save the data to the database. The result is returned after data is stored in the background cache.
                //.commit()
//  The result is returned after data is successfully saved to the database.
                .syncCommit();
    }
Scenario 6: Forcibly updating the cache to the database
public void flush() throws DwsClientException {
        getClient().flush();
    }

Scenario 7: Closing resources

When the close operation is performed, the cache is updated to the database. After the close operation is performed, APIs such as importing data to the database, deleting data, and executing SQL statements cannot be executed.
public void close() throws IOException {
        getClient().close();
     }

Listening to Data Import Events

In the asynchronous import scenario, if you want to know which data has been imported to the database, you can bind the flushSuccess function interface. This interface is called back to report the import information after the database transaction is submitted.

public DwsClient getClient() {
        DwsConfig config = DwsConfig
                .builder()
                .withUrl("jdbc:postgresql://***/gaussdb")
                .withUsername("***")
                .withPassword("****")
                .onFlushSuccess(records -> {
                    for (Record record : records) {
                        log.info("flush success. value = {}, pk = {}", RecordUtil.toMap(record), RecordUtil.getRecordPrimaryKeyValue(record));
                    }
                })
                .build();
        return new DwsClient(config);
     }

Listening to Abnormal Background Tasks

In the asynchronous import scenario, data is imported to the database by a background task. You can bind the ERROR function interface to detect the background task failure. Otherwise, the exception can only be found when the data is submitted next time. If the bound interface does not throw an exception, the exception is cleared and will not be thrown when the data is submitted next time, otherwise, an interface exception is thrown to the service when the request is submitted next time.

public DwsClient getClient() {
        DwsConfig config = DwsConfig
                .builder()
                .withUrl("jdbc:postgresql://***/gaussdb")
                .withUsername("***")
                .withPassword("****")
                .onError((clientException, client) -> {
                    if (clientException instanceof DwsClientRecordException) {
                        DwsClientRecordException recordException = (DwsClientRecordException) clientException;
                        List<Record> records = recordException.getRecords();
                        List<DwsClientException> exceptions = recordException.getExceptions();
                        for (int i = 0; i < records.size(); i++) {
                            log.error("pk = {} . error = {}", RecordUtil.getRecordPrimaryKeyValue(records.get(i)), exceptions.get(i));
                        }
                    }
                    if (clientException.getCode() != ExceptionCode.CONNECTION_ERROR && clientException.getCode() != ExceptionCode.LOCK_ERROR) {
                        throw clientException;
                    }
                    log.error("code = {}", clientException.getCode(), clientException.getOriginal());
                    return null;
                })
                .build();
        return new DwsClient(config);
     }

Exception Handling

Exceptions can be classified into three types:

  1. InvalidException is not thrown and is triggered when the request parameter is invalid.
  2. 2. DwsClientException encapsulates all exceptions, including the parsed code and original exceptions.
  3. 3. DwsClientRecordException is an extension to DwsClientException. It includes the datasets written to the exception and the corresponding DwsClientException exception.

The following table lists the exception codes.

public enum ExceptionCode {
    /**
   /* Invalid parameter */
     */
    INVALID_CONFIG(1),

    /**
* Connection exception.
     */
    CONNECTION_ERROR(100),
    /**
* Read-only
     */
    READ_ONLY(101),
    /**
* Timeout
     */
    TIMEOUT(102),
    /**
* Too many connections
     */
    TOO_MANY_CONNECTIONS(103),
    /**
* Locking exception.
     */
    LOCK_ERROR(104),


    /**
* Authentication failed.
     */
    AUTH_FAIL(201),
    /**
* Closed
     */
    ALREADY_CLOSE(202),
    /**
* No permission.
     */
    PERMISSION_DENY(203),
    SYNTAX_ERROR(204),
    /**
* Internal exception.
     */
    INTERNAL_ERROR(205),
    /**
* Interruption exception.
     */
    INTERRUPTED(206),
    /**
* The table is not found.
     */
    TABLE_NOT_FOUND(207),
    CONSTRAINT_VIOLATION(208),
    DATA_TYPE_ERROR(209),
    DATA_VALUE_ERROR(210),

    /**
* Exceptions that cannot be parsed
     */
    UNKNOWN_ERROR(500);
    private final int code;

 }

Detailed Configuration

Parameter

Description

Default Value

Supported Versions

url

JDBC address connecting to the GaussDB(DWS) database

-

1.0

username

GaussDB(DWS) database user name

-

password

GaussDB database user password

-

connectionMaxUseTimeSeconds

Maximum duration specified for a connection, in seconds. If the duration exceeds the value of this parameter, the current connection is forcibly closed and a new connection is obtained. The COPY_MERGE and COPY_UPSERT statements involve temporary tables. The schemas of the temporary tables are cleared only when the connection is disconnected. So, this parameter is introduced

3600

connectionMaxIdleMs

Maximum idle time of a connection (ms)

60000

metadataCacheSeconds

Metadata cache duration, in seconds. To improve performance, this parameter is used to set the cache expiration time for data that is not frequently changed, for example, the table structure.

180

retryBaseTime

Sleep time during retry = retryBaseTime x Number of times + (0–retryRandomTime) ms. This parameter specifies the retry base time (ms).

1000

retryRandomTime

Sleep time during retry = retryBaseTime x Number of times + (0–retryRandomTime) ms. This parameter specifies the random number range during retry. This parameter is used to separate the execution time of two tasks in the deadlock scenario.

300

maxFlushRetryTimes

Maximum number of attempts to execute a database update task.

3

autoFlushBatchSize

Database update policy: The number of cached records should be no less than the value of autoFlushBatchSize, or the difference between the current time and the cache start time is greater than or equal to the value of autoFlushMaxIntervalMs. This parameter specifies the maximum number of cached records.

5000

autoFlushMaxIntervalMs

Database update policy: The number of cached records should be no less than the value of autoFlushBatchSize, or the difference between the current time and the cache start time is greater than or equal to the value of autoFlushMaxIntervalMs. This parameter specifies the maximum cache duration, in milliseconds.

3000

copyWriteBatchSize

When writeMode is set to AUTO and the data volume is less than the value of copyWriteBatchSize, the UPSERT method is used to import data to the database. Otherwise, the COPY/COPY+UPSERT method is used to import data to the database based on whether the primary key exists.

6000

writeMode

Data write methods:

  • AUTO:

    When importing data to the database, UPSERT is used if the data volume is less than the copyWriteBatchSize value. Otherwise, COPY_UPSERT is used instead.

  • COPY_MERGE:
    • If there is a primary key, the COPY + MERGE method is used to import data to the database.
    • If there is no primary key, the COPY method is used to import data to the database.
  • COPY_UPSERT:
    • If there is no primary key, the COPY method is used to import data to the database.
    • If there is a primary key, the COPY + UPSERT method is used to import data to the database.
  • UPSERT:
    • If there is no primary key, use INSERT INTO to import data to the database.
    • If there is a primary key, use UPSERT to import data to the database.
  • UPDATE:
    • Use the UPDATE WHERE syntax to update data. If the original table does not have a primary key, you can specify unique keys. A column specified as a unique key does not need to be a unique index, but a non-unique index may impact performance.
  • COPY_UPDATE:
    • Data is imported to a temporary table by the COPY method. Temporary tables can be used to accelerate the update using UPDATE FROM WHERE.
  • UPDATE_AUTO:
    • If the batch size is less than copyWriteBatchSize, UPDATE is used. Otherwise, COPY_UPDATE is used.

AUTO

conflictStrategy

Primary key conflict policy when the database has primary keys:

  • INSERT_OR_IGNORE: Ignore new data when a primary key conflict occurs.
  • INSERT_OR_UPDATE: Use the new data column to update the original data column when a primary key conflict occurs.
  • INSERT_OR_REPLACE: Replace the original data with new data when a primary key conflict occurs. The data columns in the database that are not contained in the new data are set to null. The update of all columns is the same as that of INSERT_OR_UPDATE.

INSERT_OR_UPDATE

threadSize

Number of concurrent tasks. Tasks are submitted asynchronously by table, and multiple tables can be executed concurrently. Each table may have a different number of field columns. For instance, if a batch contains 100 operations with fields A, B, and C, and 200 operations with fields A, B, and D, the same operation fields are classified into one type. Different types of fields can be imported concurrently to the database, and you can set this parameter based on the two scenarios to improve throughput.

3

logSwitch

Log switch. If this function is enabled, detailed process logs are recorded for debugging or fault locating.

false

logDataTables

Tables whose data needs to be printed during data import to the database for data comparison during fault locating.

-

flushSuccessFunction

Callback function after data is successfully imported to the database

-

errorFunction

Callback function when a background task fails to be executed

-

batchOutWeighRatio

To improve the overall throughput, you can set this parameter when the requirement on autoFlushBatchSize is not strict. When data is submitted to the buffer and the data volume in the buffer is greater than batchOutWeighRatio x autoFlushBatchSize, the task of submitting data to the database will be executed. This parameter is used to preferably use background threads to submit import tasks, rather than using service threads.

1

tableConfig

If you have multiple tables sharing one client and plan to set different values for conflictStrategy, writeMode, copyWriteBatchSize, autoFlushMaxIntervalMs, autoFlushBatchSize, and batchOutWeighRatio, use this parameter to configure them at the table level and have the configuration apply globally to tables that are not individually configured.

NOTE:

Once a table-level parameter is configured, other table-level parameters will automatically be set to default values. Therefore, you must also set other table-level parameters.

-

uniqueKeys

This parameter is a table-level parameter and must be configured through tableConfig. If a table does not have a primary key but has a unique index, this parameter is used to specify a column as a unique constraint when the table is imported to the database. In the update scenario, this column does not need to be a unique index or primary key, however, in the upsert scenario, it must be a unique index or primary key.

-

1.0.3

copyMode

The format of copying data to the database is as follows:

CSV: Data is concatenated into a string in CSV format and enclosed in double quotation marks. Fields are separated by commas (,), and data is separated by line breaks. Data is imported to the database using the JDBC Copy API. The performance of this mode is slightly lower than that of the DELIMITER mode, but the performance is stable and reliable.

DELIMITER: Use the copy API to import data fields to the database. Characters are separated by 0X1E, and data is separated by 0X1F. In this mode, the data does not contain delimiters. If the data contains delimiters, an error is reported and the data cannot be imported to the database. In addition, this mode defines the null string as null.

CSV

1.0.6

caseSensitive

Indicates whether a table field is case sensitive.

false

1.0.7

createTempTableMode

Indicates the mode of creating a temporary table when copy merge/upsert is used.

  • AS: Use the create temp table *** as select * from *** as method. This method allows the use of tables with auto-increment fields, but it may result in lower performance.
  • LIKE: Use the create temp table *** like method. This method does not allow tables with auto-increment fields.

AS

1.0.7

numberAsEpochMsForDatetime

Indicates whether to convert the source data to the corresponding time type by millisecond if the database field is of the time type (date\time\timestamp) and the data source is of the number type.

NOTE:
  • This parameter does not take effect when data is copied to the database.
  • In versions earlier than this version, if this parameter is enabled and data is of the number type, strings will be regarded as timestamps.

false

1.0.9

stringToDatetimeFormat

If the database field is of the time type (date\time\timestamp) and the data source is of the string type, SimpleDateFormat is used to convert the field to the date type based on the stringToDatetimeFormat format, and then the timestamps in dates are used to construct the data of the corresponding type in the database.

NOTE:

If this parameter is set, the function is enabled. Do not set this parameter if it is not required.

null

updateAll

Whether the set field contains the primary key during upsert.

true

1.0.10