Updated on 2023-04-03 GMT+08:00

dws-client

Description

DWS Client is a high-performance and convenient data import tool based on DWSJDBC. Ensure that JDBC can be connected when using DWS Client.

Dependency

dws-client has been added to the Maven repository. You can select the latest version from the repository. For details, visit https://mvnrepository.com/artifact/com.huaweicloud.dws/dws-client.
1
2
3
4
5
<dependency>
   <groupId>com.huaweicloud.dws</groupId>
   <artifactId>dws-client</artifactId>
   <version>1.0</version>
</dependency>

Scenario & Usage

Prerequisite: The client has been initialized.

The following is a simple example. You only need to configure the database connection. Retain the default values for other parameters.
1
2
3
4
5
6
7
8
9
public DwsClient getClient(){
        DwsConfig config = DwsConfig
                .builder()
                .withUrl("jdbc:gaussdb://***/gaussdb")
                .withUsername("***")
                .withPassword("****")
                .build();
        return new DwsClient(config);
     }
Scenario 1: Table-level parameter configuration
1
2
3
4
5
6
7
return DwsConfig.builder()
                .withUrl(System.getenv("db_url"))
                .withPassword(System.getenv("db_pwd"))
                .withUsername(System.getenv("db_username"))
.withAutoFlushBatchSize(1000) // The default batch size is 1000.
                .withTableConfig("test.t_c_batch_size_2", new TableConfig()
.withAutoFlushBatchSize(500)); //The batch size is 500 for table test.t_c_batch_size_2;

Scenario 2: Using a database connection to execute SQL statements

This API is mainly used for some special services when the currently supported functions cannot meet the requirements. For example, to query data, you can directly use the native JDBC connection to operate the database.

The API parameter is a function-based interface. The interface provides a database connection. The return value can be of any type, which is determined by the return type of the service.
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
public void sql() throws DwsClientException {
        Integer id = getClient().sql(connection -> {
            try (ResultSet resultSet = connection.createStatement().executeQuery("select id from test.user where name = 'zhangsan'")) {
                if (resultSet.next()) {
                    return resultSet.getInt("id");
                }
            }
            return null;
        });
        System.out.println("zhangsan id = " + id);
     }

Scenario 3: Obtaining table information

The API can obtain the table structure (cached) based on a table name affixed with a schema name. The table structure definitions include all columns and primary keys.
1
2
3
public void getTableSchema() throws DwsClientException {
        TableSchema tableSchema = getClient().getTableSchema(TableName.valueOf("test.test"));
     }

Scenario 4: Importing data to a database

The client provides an upsert API for importing data to the database. The Operate API is used to operate table columns. If the API is submitted, the table operation is complete and the client start importing the data to the database. You can select synchronous or asynchronous when submitting the operation. When setting a field, you can choose whether to ignore the setting when a primary key conflict occurs.
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
public void upsert() throws DwsClientException {
        getClient().upsert("test.test")
                .setObject("id", 1)
                .setObject("name", "test")
//This setting takes effect only when data is inserted. If a primary key conflict occurs, the setting is not updated.
                .setObject("age", 38, true)
// Asynchronously save the data to the database. The result is returned after data is stored in the background cache.
                //.commit()
//  The result is returned after data is successfully saved to the database.
                .syncCommit();
     }

Scenario 5: Deleting data

The deletion API and import API are carried by Operate. However, the primary key column must be set during deletion, and the "column update does not take effect" is ignored.
public void delete() throws DwsClientException {
        getClient().delete("test.test")
                .setObject("id", 1)
// Asynchronously save the data to the database. The result is returned after data is stored in the background cache.
                //.commit()
//  The result is returned after data is successfully saved to the database.
                .syncCommit();
    }
Scenario 6: Forcibly updating the cache to the database
public void flush() throws DwsClientException {
        getClient().flush();
    }

Scenario 7: Closing resources

When the close operation is performed, the cache is updated to the database. After the close operation is performed, APIs such as importing data to the database, deleting data, and executing SQL statements cannot be executed.
public void close() throws IOException {
        getClient().close();
     }

Listening to Data Import Events

In the asynchronous import scenario, if you want to know which data has been imported to the database, you can bind the flushSuccess function interface. This interface is called back to report the import information after the database transaction is submitted.

public DwsClient getClient() {
        DwsConfig config = DwsConfig
                .builder()
                .withUrl("jdbc:postgresql://***/gaussdb")
                .withUsername("***")
                .withPassword("****")
                .onFlushSuccess(records -> {
                    for (Record record : records) {
                        log.info("flush success. value = {}, pk = {}", RecordUtil.toMap(record), RecordUtil.getRecordPrimaryKeyValue(record));
                    }
                })
                .build();
        return new DwsClient(config);
     }

Listening to Abnormal Background Tasks

In the asynchronous import scenario, data is imported to the database by a background task. You can bind the ERROR function interface to detect the background task failure. Otherwise, the exception can only be found when the data is submitted next time. If the bound interface does not throw an exception, the exception is cleared and will not be thrown when the data is submitted next time, otherwise, an interface exception is thrown to the service when the request is submitted next time.

public DwsClient getClient() {
        DwsConfig config = DwsConfig
                .builder()
                .withUrl("jdbc:postgresql://***/gaussdb")
                .withUsername("***")
                .withPassword("****")
                .onError((clientException, client) -> {
                    if (clientException instanceof DwsClientRecordException) {
                        DwsClientRecordException recordException = (DwsClientRecordException) clientException;
                        List<Record> records = recordException.getRecords();
                        List<DwsClientException> exceptions = recordException.getExceptions();
                        for (int i = 0; i < records.size(); i++) {
                            log.error("pk = {} . error = {}", RecordUtil.getRecordPrimaryKeyValue(records.get(i)), exceptions.get(i));
                        }
                    }
                    if (clientException.getCode() != ExceptionCode.CONNECTION_ERROR && clientException.getCode() != ExceptionCode.LOCK_ERROR) {
                        throw clientException;
                    }
                    log.error("code = {}", clientException.getCode(), clientException.getOriginal());
                    return null;
                })
                .build();
        return new DwsClient(config);
     }

Exception Handling

Exceptions are classified into the following types:

1. InvalidException is not thrown and is triggered when the request parameter is invalid.

2. DwsClientException encapsulates all exceptions, including the parsed code and original exceptions.

3. DwsClientRecordException is an extension to DwsClientException. It includes the data sets written to the exception and the corresponding DwsClientException exception.

The following table lists the exception codes.

public enum ExceptionCode {
    /**
   /* Invalid parameter */
     */
    INVALID_CONFIG(1),

    /**
* Connection exception.
     */
    CONNECTION_ERROR(100),
    /**
* Read-only
     */
    READ_ONLY(101),
    /**
* Timeout
     */
    TIMEOUT(102),
    /**
* Too many connections
     */
    TOO_MANY_CONNECTIONS(103),
    /**
* Locking exception.
     */
    LOCK_ERROR(104),


    /**
* Authentication failed.
     */
    AUTH_FAIL(201),
    /**
* Closed
     */
    ALREADY_CLOSE(202),
    /**
* No permission.
     */
    PERMISSION_DENY(203),
    SYNTAX_ERROR(204),
    /**
* Internal exception.
     */
    INTERNAL_ERROR(205),
    /**
* Interruption exception.
     */
    INTERRUPTED(206),
    /**
* The table is not found.
     */
    TABLE_NOT_FOUND(207),
    CONSTRAINT_VIOLATION(208),
    DATA_TYPE_ERROR(209),
    DATA_VALUE_ERROR(210),

    /**
* Exceptions that cannot be parsed
     */
    UNKNOWN_ERROR(500);
    private final int code;

 }

Detailed Configuration

Parameter

Description

Default Value

Supported Versions

url

JDBC address connecting to the GaussDB(DWS) database

-

1.0

driverName

JDBC driver of the database

com.huawei.gauss200.jdbc.Driver

username

GaussDB(DWS) database user name

-

password

GaussDB database user password

-

connectionMaxUseTimeSeconds

Maximum duration specified for a connection, in seconds. If the duration exceeds the value of this parameter, the current connection is forcibly closed and a new connection is obtained. The COPY_MERGE and COPY_UPSERT statements involve temporary tables. The schemas of the temporary tables are cleared only when the connection is disconnected. So, this parameter is introduced

3600

connectionMaxIdleMs

Maximum idle time of a connection (ms)

60000

metadataCacheSeconds

Metadata cache duration, in seconds. To improve performance, this parameter is used to set the cache expiration time for data that is not frequently changed, for example, the table structure.

180

retryBaseTime

Sleep time during retry = retryBaseTime x Number of times + (0–retryRandomTime) ms. This parameter specifies the retry base time (ms).

1000

retryRandomTime

Sleep time during retry = retryBaseTime x Number of times + (0–retryRandomTime) ms. This parameter specifies the random number range during retry. This parameter is used to separate the execution time of two tasks in the deadlock scenario.

300

maxFlushRetryTimes

Maximum number of attempts to execute a database update task.

3

autoFlushBatchSize

Database update policy: The number of cached records is greater than or equal to the value of autoFlushBatchSize, or the difference between the current time and the cache start time is greater than or equal to the value of autoFlushMaxIntervalMs. This parameter specifies the maximum number of cached records.

5000

autoFlushMaxIntervalMs

Database update policy: The number of cached records is greater than or equal to the value of autoFlushBatchSize, or the difference between the current time and the cache start time is greater than or equal to the value of autoFlushMaxIntervalMs. This parameter specifies the maximum cache duration, in milliseconds.

3000

copyWriteBatchSize

When writeMode is set to AUTO and the data volume is less than the value of copyWriteBatchSize, the UPSERT method is used to import data to the database. Otherwise, the COPY/COPY+UPSERT method is used to import data to the database based on whether the primary key exists.

6000

writeMode

Data write methods:

AUTO:

1. If there is a primary key and the data volume is less than the value of copyWriteBatchSize, the UPSERT method is used to import the data to the database. Otherwise, the COPY + UPSERT method is used.

2. If there is no primary key and the data volume is less than the value of copyWriteBatchSize, use the INSERT INTO method to import data to the database. Otherwise, use the COPY method.

COPY_MERGE:

1. Use the COPY+MERGE method to import data to the database.

2. If there is no primary key, use the COPY method to import data to the database.

COPY_UPSERT:

1. If there is no primary key, use the COPY method to import data to the database.

2. If there is a primary key, use the COPY + UPSERT method to import data to the database.

UPSERT:

1. If there is no primary key, use the INSERT INTOmethod to import data to the database.

2. If there is a primary key, use the UPSERTmethod to import data to the database.

AUTO

conflictStrategy

Primary key conflict policy when the database has primary keys:

INSERT_OR_IGNORE: Ignore new data when a primary key conflict occurs.

INSERT_OR_UPDATE: Use the new data column to update the original data column when a primary key conflict occurs.

INSERT_OR_REPLACE: Replace the original data with new data when a primary key conflict occurs. The data columns in the database that are not contained in the new data are set to null. The update of all columns is the same as that of INSERT_OR_UPDATE.

INSERT_OR_UPDATE

threadSize

Number of concurrent tasks. Asynchronous tasks are submitted by table. Multiple tables can be executed concurrently. For operations that involve different columns in a table. Operations involving the same columns are classified into one type. Different types of operations can work concurrently during import. Set this parameter to improve the throughput.

3

logSwitch

Log switch. If this function is enabled, detailed process logs are recorded for debugging or fault locating.

false

logDataTables

Tables whose data needs to be printed during data import to the database for data comparison during fault locating.

-

flushSuccessFunction

Callback function after data is successfully imported to the database

-

errorFunction

Callback function when a background task fails to be executed

-

batchOutWeighRatio

To improve the overall throughput, you can set this parameter when the requirement on autoFlushBatchSize is not strict. When data is submitted to the buffer and the data volume in the buffer is greater than batchOutWeighRatio x autoFlushBatchSize, the task of submitting data to the database will be executed. This parameter is used to preferabbly use background threads to submit import tasks, rather than using service threads.

1

tableConfig

If multiple tables share one client, you may need to set conflictStrategy, writeMode, copyWriteBatchSize, autoFlushMaxIntervalMs, autoFlushBatchSize and batchOutWeighRatio to different values based on different tables. This parameter can be used to configure the preceding parameters at the table level and make them take effect globally for the tables that are not configured.

-