dws-client
Description
dws-client is a high-performance and convenient data import tool based on GaussDB(DWS) JDBC. Ensure that JDBC can be connected when using GaussDB(DWS) client. Using dws-client to import data has the following advantages:
- dws-client limits the cache space and time and supports batch import to improve the data import performance, meeting the real-time data import requirements during peak and off-peak hours.
In scenarios that do not have strict real-time requirements, operations on single data records are cached until they form a batch. Then, they will be performed in a batch. This improves the write performance.
- dws-client supports concurrent data import.
- dws-client supports multiple high-performance import modes and primary key conflict policies to meet import requirements in various scenarios.
- dws-client supports API-based interaction, making it easy to use.
Dependency
1 2 3 4 5 |
<dependency> <groupId>com.huaweicloud.dws</groupId> <artifactId>dws-client</artifactId> <version>${version}</version> </dependency> |
Scenario & Usage
Prerequisite: The client has been initialized.
1 2 3 4 5 6 7 8 9 |
public DwsClient getClient(){ DwsConfig config = DwsConfig .builder() .withUrl("jdbc:gaussdb://***/gaussdb") .withUsername("***") .withPassword("****") .build(); return new DwsClient(config); } |
1 2 3 4 5 6 7 |
return DwsConfig.builder() .withUrl(System.getenv("db_url")) .withPassword(System.getenv("db_pwd")) .withUsername(System.getenv("db_username")) .withAutoFlushBatchSize(1000) // The default batch size is 1000. .withTableConfig("test.t_c_batch_size_2", new TableConfig() .withAutoFlushBatchSize(500)); //The batch size is 500 for table test.t_c_batch_size_2; |
Scenario 2: Using a database connection to execute SQL statements
This API is mainly used for some special services when the currently supported functions cannot meet the requirements. For example, to query data, you can directly use the native JDBC connection to operate the database.
1 2 3 4 5 6 7 8 9 10 11 |
public void sql() throws DwsClientException { Integer id = getClient().sql(connection -> { try (ResultSet resultSet = connection.createStatement().executeQuery("select id from test.user where name = 'zhangsan'")) { if (resultSet.next()) { return resultSet.getInt("id"); } } return null; }); System.out.println("zhangsan id = " + id); } |
Scenario 3: Obtaining table information
1 2 3 |
public void getTableSchema() throws DwsClientException { TableSchema tableSchema = getClient().getTableSchema(TableName.valueOf("test.test")); } |
Scenario 4: Importing data to a database
1 2 3 4 5 6 7 8 9 10 11 |
public void write() throws DwsClientException { getClient().write("test.test") .setObject("id", 1) .setObject("name", "test") //This setting takes effect only when data is inserted. If a primary key conflict occurs, the setting is not updated. .setObject("age", 38, true) // Asynchronously save the data to the database. The result is returned after data is stored in the background cache. //.commit() // The result is returned after data is successfully saved to the database. .syncCommit(); } |
Scenario 5: Deleting data
public void delete() throws DwsClientException { getClient().delete("test.test") .setObject("id", 1) // Asynchronously save the data to the database. The result is returned after data is stored in the background cache. //.commit() // The result is returned after data is successfully saved to the database. .syncCommit(); }
public void flush() throws DwsClientException { getClient().flush(); }
Scenario 7: Closing resources
public void close() throws IOException { getClient().close(); }
Listening to Data Import Events
In the asynchronous import scenario, if you want to know which data has been imported to the database, you can bind the flushSuccess function interface. This interface is called back to report the import information after the database transaction is submitted.
public DwsClient getClient() { DwsConfig config = DwsConfig .builder() .withUrl("jdbc:postgresql://***/gaussdb") .withUsername("***") .withPassword("****") .onFlushSuccess(records -> { for (Record record : records) { log.info("flush success. value = {}, pk = {}", RecordUtil.toMap(record), RecordUtil.getRecordPrimaryKeyValue(record)); } }) .build(); return new DwsClient(config); }
Listening to Abnormal Background Tasks
In the asynchronous import scenario, data is imported to the database by a background task. You can bind the ERROR function interface to detect the background task failure. Otherwise, the exception can only be found when the data is submitted next time. If the bound interface does not throw an exception, the exception is cleared and will not be thrown when the data is submitted next time, otherwise, an interface exception is thrown to the service when the request is submitted next time.
public DwsClient getClient() { DwsConfig config = DwsConfig .builder() .withUrl("jdbc:postgresql://***/gaussdb") .withUsername("***") .withPassword("****") .onError((clientException, client) -> { if (clientException instanceof DwsClientRecordException) { DwsClientRecordException recordException = (DwsClientRecordException) clientException; List<Record> records = recordException.getRecords(); List<DwsClientException> exceptions = recordException.getExceptions(); for (int i = 0; i < records.size(); i++) { log.error("pk = {} . error = {}", RecordUtil.getRecordPrimaryKeyValue(records.get(i)), exceptions.get(i)); } } if (clientException.getCode() != ExceptionCode.CONNECTION_ERROR && clientException.getCode() != ExceptionCode.LOCK_ERROR) { throw clientException; } log.error("code = {}", clientException.getCode(), clientException.getOriginal()); return null; }) .build(); return new DwsClient(config); }
Exception Handling
Exceptions can be classified into three types:
- 1. InvalidException is not thrown and is triggered when the request parameter is invalid.
- 2. DwsClientException encapsulates all exceptions, including the parsed code and original exceptions.
- 3. DwsClientRecordException is an extension to DwsClientException. It includes the datasets written to the exception and the corresponding DwsClientException exception.
The following table lists the exception codes.
public enum ExceptionCode { /** /* Invalid parameter */ */ INVALID_CONFIG(1), /** * Connection exception. */ CONNECTION_ERROR(100), /** * Read-only */ READ_ONLY(101), /** * Timeout */ TIMEOUT(102), /** * Too many connections */ TOO_MANY_CONNECTIONS(103), /** * Locking exception. */ LOCK_ERROR(104), /** * Authentication failed. */ AUTH_FAIL(201), /** * Closed */ ALREADY_CLOSE(202), /** * No permission. */ PERMISSION_DENY(203), SYNTAX_ERROR(204), /** * Internal exception. */ INTERNAL_ERROR(205), /** * Interruption exception. */ INTERRUPTED(206), /** * The table is not found. */ TABLE_NOT_FOUND(207), CONSTRAINT_VIOLATION(208), DATA_TYPE_ERROR(209), DATA_VALUE_ERROR(210), /** * Exceptions that cannot be parsed */ UNKNOWN_ERROR(500); private final int code; }
Detailed Configuration
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot