dws-client
Description
DWS Client is a high-performance and convenient data import tool based on DWSJDBC. Ensure that JDBC can be connected when using DWS Client.
Dependency
1 2 3 4 5 |
<dependency> <groupId>com.huaweicloud.dws</groupId> <artifactId>dws-client</artifactId> <version>1.0</version> </dependency> |
Scenario & Usage
Prerequisite: The client has been initialized.
1 2 3 4 5 6 7 8 9 |
public DwsClient getClient(){ DwsConfig config = DwsConfig .builder() .withUrl("jdbc:gaussdb://***/gaussdb") .withUsername("***") .withPassword("****") .build(); return new DwsClient(config); } |
1 2 3 4 5 6 7 |
return DwsConfig.builder() .withUrl(System.getenv("db_url")) .withPassword(System.getenv("db_pwd")) .withUsername(System.getenv("db_username")) .withAutoFlushBatchSize(1000) // The default batch size is 1000. .withTableConfig("test.t_c_batch_size_2", new TableConfig() .withAutoFlushBatchSize(500)); //The batch size is 500 for table test.t_c_batch_size_2; |
Scenario 2: Using a database connection to execute SQL statements
This API is mainly used for some special services when the currently supported functions cannot meet the requirements. For example, to query data, you can directly use the native JDBC connection to operate the database.
1 2 3 4 5 6 7 8 9 10 11 |
public void sql() throws DwsClientException { Integer id = getClient().sql(connection -> { try (ResultSet resultSet = connection.createStatement().executeQuery("select id from test.user where name = 'zhangsan'")) { if (resultSet.next()) { return resultSet.getInt("id"); } } return null; }); System.out.println("zhangsan id = " + id); } |
Scenario 3: Obtaining table information
1 2 3 |
public void getTableSchema() throws DwsClientException { TableSchema tableSchema = getClient().getTableSchema(TableName.valueOf("test.test")); } |
Scenario 4: Importing data to a database
1 2 3 4 5 6 7 8 9 10 11 |
public void upsert() throws DwsClientException { getClient().upsert("test.test") .setObject("id", 1) .setObject("name", "test") //This setting takes effect only when data is inserted. If a primary key conflict occurs, the setting is not updated. .setObject("age", 38, true) // Asynchronously save the data to the database. The result is returned after data is stored in the background cache. //.commit() // The result is returned after data is successfully saved to the database. .syncCommit(); } |
Scenario 5: Deleting data
public void delete() throws DwsClientException { getClient().delete("test.test") .setObject("id", 1) // Asynchronously save the data to the database. The result is returned after data is stored in the background cache. //.commit() // The result is returned after data is successfully saved to the database. .syncCommit(); }
public void flush() throws DwsClientException { getClient().flush(); }
Scenario 7: Closing resources
public void close() throws IOException { getClient().close(); }
Listening to Data Import Events
In the asynchronous import scenario, if you want to know which data has been imported to the database, you can bind the flushSuccess function interface. This interface is called back to report the import information after the database transaction is submitted.
public DwsClient getClient() { DwsConfig config = DwsConfig .builder() .withUrl("jdbc:postgresql://***/gaussdb") .withUsername("***") .withPassword("****") .onFlushSuccess(records -> { for (Record record : records) { log.info("flush success. value = {}, pk = {}", RecordUtil.toMap(record), RecordUtil.getRecordPrimaryKeyValue(record)); } }) .build(); return new DwsClient(config); }
Listening to Abnormal Background Tasks
In the asynchronous import scenario, data is imported to the database by a background task. You can bind the ERROR function interface to detect the background task failure. Otherwise, the exception can only be found when the data is submitted next time. If the bound interface does not throw an exception, the exception is cleared and will not be thrown when the data is submitted next time, otherwise, an interface exception is thrown to the service when the request is submitted next time.
public DwsClient getClient() { DwsConfig config = DwsConfig .builder() .withUrl("jdbc:postgresql://***/gaussdb") .withUsername("***") .withPassword("****") .onError((clientException, client) -> { if (clientException instanceof DwsClientRecordException) { DwsClientRecordException recordException = (DwsClientRecordException) clientException; List<Record> records = recordException.getRecords(); List<DwsClientException> exceptions = recordException.getExceptions(); for (int i = 0; i < records.size(); i++) { log.error("pk = {} . error = {}", RecordUtil.getRecordPrimaryKeyValue(records.get(i)), exceptions.get(i)); } } if (clientException.getCode() != ExceptionCode.CONNECTION_ERROR && clientException.getCode() != ExceptionCode.LOCK_ERROR) { throw clientException; } log.error("code = {}", clientException.getCode(), clientException.getOriginal()); return null; }) .build(); return new DwsClient(config); }
Exception Handling
Exceptions are classified into the following types:
1. InvalidException is not thrown and is triggered when the request parameter is invalid.
2. DwsClientException encapsulates all exceptions, including the parsed code and original exceptions.
3. DwsClientRecordException is an extension to DwsClientException. It includes the data sets written to the exception and the corresponding DwsClientException exception.
The following table lists the exception codes.
public enum ExceptionCode { /** /* Invalid parameter */ */ INVALID_CONFIG(1), /** * Connection exception. */ CONNECTION_ERROR(100), /** * Read-only */ READ_ONLY(101), /** * Timeout */ TIMEOUT(102), /** * Too many connections */ TOO_MANY_CONNECTIONS(103), /** * Locking exception. */ LOCK_ERROR(104), /** * Authentication failed. */ AUTH_FAIL(201), /** * Closed */ ALREADY_CLOSE(202), /** * No permission. */ PERMISSION_DENY(203), SYNTAX_ERROR(204), /** * Internal exception. */ INTERNAL_ERROR(205), /** * Interruption exception. */ INTERRUPTED(206), /** * The table is not found. */ TABLE_NOT_FOUND(207), CONSTRAINT_VIOLATION(208), DATA_TYPE_ERROR(209), DATA_VALUE_ERROR(210), /** * Exceptions that cannot be parsed */ UNKNOWN_ERROR(500); private final int code; }
Detailed Configuration
Parameter |
Description |
Default Value |
Supported Versions |
---|---|---|---|
url |
JDBC address connecting to the GaussDB(DWS) database |
- |
1.0 |
driverName |
JDBC driver of the database |
com.huawei.gauss200.jdbc.Driver |
|
username |
GaussDB(DWS) database user name |
- |
|
password |
GaussDB database user password |
- |
|
connectionMaxUseTimeSeconds |
Maximum duration specified for a connection, in seconds. If the duration exceeds the value of this parameter, the current connection is forcibly closed and a new connection is obtained. The COPY_MERGE and COPY_UPSERT statements involve temporary tables. The schemas of the temporary tables are cleared only when the connection is disconnected. So, this parameter is introduced |
3600 |
|
connectionMaxIdleMs |
Maximum idle time of a connection (ms) |
60000 |
|
metadataCacheSeconds |
Metadata cache duration, in seconds. To improve performance, this parameter is used to set the cache expiration time for data that is not frequently changed, for example, the table structure. |
180 |
|
retryBaseTime |
Sleep time during retry = retryBaseTime x Number of times + (0–retryRandomTime) ms. This parameter specifies the retry base time (ms). |
1000 |
|
retryRandomTime |
Sleep time during retry = retryBaseTime x Number of times + (0–retryRandomTime) ms. This parameter specifies the random number range during retry. This parameter is used to separate the execution time of two tasks in the deadlock scenario. |
300 |
|
maxFlushRetryTimes |
Maximum number of attempts to execute a database update task. |
3 |
|
autoFlushBatchSize |
Database update policy: The number of cached records is greater than or equal to the value of autoFlushBatchSize, or the difference between the current time and the cache start time is greater than or equal to the value of autoFlushMaxIntervalMs. This parameter specifies the maximum number of cached records. |
5000 |
|
autoFlushMaxIntervalMs |
Database update policy: The number of cached records is greater than or equal to the value of autoFlushBatchSize, or the difference between the current time and the cache start time is greater than or equal to the value of autoFlushMaxIntervalMs. This parameter specifies the maximum cache duration, in milliseconds. |
3000 |
|
copyWriteBatchSize |
When writeMode is set to AUTO and the data volume is less than the value of copyWriteBatchSize, the UPSERT method is used to import data to the database. Otherwise, the COPY/COPY+UPSERT method is used to import data to the database based on whether the primary key exists. |
6000 |
|
writeMode |
Data write methods: AUTO: 1. If there is a primary key and the data volume is less than the value of copyWriteBatchSize, the UPSERT method is used to import the data to the database. Otherwise, the COPY + UPSERT method is used. 2. If there is no primary key and the data volume is less than the value of copyWriteBatchSize, use the INSERT INTO method to import data to the database. Otherwise, use the COPY method. COPY_MERGE: 1. Use the COPY+MERGE method to import data to the database. 2. If there is no primary key, use the COPY method to import data to the database. COPY_UPSERT: 1. If there is no primary key, use the COPY method to import data to the database. 2. If there is a primary key, use the COPY + UPSERT method to import data to the database. UPSERT: 1. If there is no primary key, use the INSERT INTOmethod to import data to the database. 2. If there is a primary key, use the UPSERTmethod to import data to the database. |
AUTO |
|
conflictStrategy |
Primary key conflict policy when the database has primary keys: INSERT_OR_IGNORE: Ignore new data when a primary key conflict occurs. INSERT_OR_UPDATE: Use the new data column to update the original data column when a primary key conflict occurs. INSERT_OR_REPLACE: Replace the original data with new data when a primary key conflict occurs. The data columns in the database that are not contained in the new data are set to null. The update of all columns is the same as that of INSERT_OR_UPDATE. |
INSERT_OR_UPDATE |
|
threadSize |
Number of concurrent tasks. Asynchronous tasks are submitted by table. Multiple tables can be executed concurrently. For operations that involve different columns in a table. Operations involving the same columns are classified into one type. Different types of operations can work concurrently during import. Set this parameter to improve the throughput. |
3 |
|
logSwitch |
Log switch. If this function is enabled, detailed process logs are recorded for debugging or fault locating. |
false |
|
logDataTables |
Tables whose data needs to be printed during data import to the database for data comparison during fault locating. |
- |
|
flushSuccessFunction |
Callback function after data is successfully imported to the database |
- |
|
errorFunction |
Callback function when a background task fails to be executed |
- |
|
batchOutWeighRatio |
To improve the overall throughput, you can set this parameter when the requirement on autoFlushBatchSize is not strict. When data is submitted to the buffer and the data volume in the buffer is greater than batchOutWeighRatio x autoFlushBatchSize, the task of submitting data to the database will be executed. This parameter is used to preferabbly use background threads to submit import tasks, rather than using service threads. |
1 |
|
tableConfig |
If multiple tables share one client, you may need to set conflictStrategy, writeMode, copyWriteBatchSize, autoFlushMaxIntervalMs, autoFlushBatchSize and batchOutWeighRatio to different values based on different tables. This parameter can be used to configure the preceding parameters at the table level and make them take effect globally for the tables that are not configured. |
- |
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.