dws-client
Description
dws-client is a high-performance and convenient data import tool based on GaussDB(DWS) JDBC. Ensure that JDBC can be connected when using GaussDB(DWS) client. Using dws-client to import data has the following advantages:
- dws-client limits the cache space and time and supports batch import to improve the data import performance, meeting the real-time data import requirements during peak and off-peak hours.
In scenarios that do not have strict real-time requirements, operations on single data records are cached until they form a batch. Then, they will be performed in a batch. This improves the write performance.
- dws-client supports concurrent data import.
- dws-client supports multiple high-performance import modes and primary key conflict policies to meet import requirements in various scenarios.
- dws-client supports API-based interaction, making it easy to use.
Dependency
1 2 3 4 5 |
<dependency> <groupId>com.huaweicloud.dws</groupId> <artifactId>dws-client</artifactId> <version>${version}</version> </dependency> |
Core Features
Version 2.x is used as an example. In compatibility mode, the version is 1.x.
Initializing client
Initialize the client to create an instance for tasks like importing data into the database.
All dws-client parameters are listed in com.huaweicloud.dws.client.config.DwsClientConfigs. Each ConfigOp constant represents a parameter. The system stores these using a map. The key in ConfigOp serves as the storage key. If you use a configuration file, the key is also the file's key.
- The following is a simple example. You only need to configure the database connection. Retain the default values for other parameters.
public DwsClient getClient() throws Exception { DwsConfig config = DwsConfig.of() .with(DwsClientConfigs.JDBC_URL, System.getenv("db_url")) .with(DwsClientConfigs.JDBC_PASSWORD, System.getenv("db_pwd")) .with(DwsClientConfigs.JDBC_USERNAME, System.getenv("db_username")); return new DwsClient(config); }
- Use the configuration file.
Create the client.properties configuration file.
dws.client.jdbc.url=jdbc:gaussdb://xxxx:8000/gaussdb dws.client.jdbc.password=**** dws.client.jdbc.username=dbadmin
Initialize the configuration file.public DwsClient getClientByProperties() throws Exception { URL resource = this.getClass().getClassLoader().getResource("client.properties"); DwsConfig config = new DwsConfig(resource.getFile()); return new DwsClient(config); }
- Use the map parameter.
public DwsClient getClientByMap() throws Exception { Map<String, Object> config = new HashMap<>(); config.put(DwsClientConfigs.JDBC_URL.key(), System.getenv("db_url")); config.put(DwsClientConfigs.JDBC_PASSWORD.key(), System.getenv("db_pwd")); config.put(DwsClientConfigs.JDBC_USERNAME.key(), System.getenv("db_username")); return new DwsClient(new DwsConfig(config)); }
- Configure to be compatible with version 1.x.
1 2 3 4 5 6 7 8 9
public DwsClient getClient(){ DwsConfig config = DwsConfig .builder() .withUrl("jdbc:gaussdb://***/gaussdb") .withUsername("***") .withPassword("****") .build(); return new DwsClient(config); }
- Configure table-level parameters.
When importing multiple tables to the client's database, you can set table-specific parameters. To do this, use the withTable("xxx") method to get the table-level parameter constructor based on the global settings. This initializes the table-level parameters using the global settings, but any new parameters will replace the existing ones. Once set, these parameters are added to the global settings. The interface then returns the updated global parameters, allowing for chained calls. public DwsClient getClientTable() throws Exception { DwsConfig config = DwsConfig.of() .with(DwsClientConfigs.JDBC_URL, System.getenv("db_url")) .with(DwsClientConfigs.JDBC_PASSWORD, System.getenv("db_pwd")) .with(DwsClientConfigs.JDBC_USERNAME, System.getenv("db_username")) .with(DwsClientConfigs.WRITE_AUTO_FLUSH_BATCH_SIZE, 10000) .withTable("test") .with(DwsClientConfigs.WRITE_CONFLICT_STRATEGY, ConflictStrategy.INSERT_OR_IGNORE) .build() .withTable("test1") .with(DwsClientConfigs.WRITE_AUTO_FLUSH_BATCH_SIZE, 200) .build(); return new DwsClient(config); }
- 1.x compatibility
1 2 3 4 5 6 7
return DwsConfig.builder() .withUrl(System.getenv("db_url")) .withPassword(System.getenv("db_pwd")) .withUsername(System.getenv("db_username")) .withAutoFlushBatchSize(1000) // The default batch size is 1000. .withTableConfig("test.t_c_batch_size_2", new TableConfig() .withAutoFlushBatchSize(500)); //The batch size is 500 for table test.t_c_batch_size_2;
Using a database connection to execute SQL statements
This API is mainly used for some special services when the currently supported functions cannot meet the requirements. For example, to query data, you can directly use the native JDBC connection to operate the database.
1 2 3 4 5 6 7 8 9 10 11 |
public void sql() throws DwsClientException { Integer id = getClient().sql(connection -> { try (ResultSet resultSet = connection.createStatement().executeQuery("select id from test.user where name = 'zhangsan'")) { if (resultSet.next()) { return resultSet.getInt("id"); } } return null; }); System.out.println("zhangsan id = " + id); } |
Obtaining table information
1 2 3 |
public void getTableSchema() throws DwsClientException { TableSchema tableSchema = getClient().getTableSchema(TableName.valueOf("test.test")); } |
Data import
1 2 3 4 5 6 7 8 9 10 11 |
public void write() throws DwsClientException { getClient().write("test.test") .setObject("id", 1) .setObject("name", "test") //This setting takes effect only when data is inserted. If a primary key conflict occurs, the setting is not updated. .setObject("age", 38, true) // Asynchronously save the data to the database. The result is returned after data is stored in the background cache. //.commit() // The result is returned after data is successfully saved to the database. .syncCommit(); } |
public void testWrite() throws Exception { try (DwsClient client = getClient()) { client.sql((conn) -> { conn.createStatement().execute("DROP Table IF EXISTS test.dws_client_test;" + "create table test.dws_client_test (id integer, name varchar(10), age int);"); return null; }); TableSchema tableSchema = client.getTableSchema(TableName.valueOf("test.dws_client_test")); log.info("table schema {}", tableSchema); for (int i = 0; i < 100; i++) { Operate operate = client.write(tableSchema) .setObject("id", i) .setObject("name", "name_" + i) .setObject("age", i); operate.commit(); } } }
The column index helps write data to the client by cutting down on hash calculations. This relieves CPU pressure and boosts client write throughput.
public void testWrite() throws Exception { try (DwsClient client = getClient()) { client.sql((conn) -> { conn.createStatement().execute("DROP Table IF EXISTS test.dws_client_test;" + "create table test.dws_client_test (id integer, name varchar(10), age int);"); return null; }); TableSchema tableSchema = client.getTableSchema(TableName.valueOf("test.dws_client_test")); log.info("table schema {}", tableSchema); for (int i = 0; i < 100; i++) { Operate operate = client.write(tableSchema) .setObject(0, i) .setObject(1, "name_" + i) .setObject(2, i); operate.commit(); } } }
Data deletion
public void delete() throws DwsClientException { getClient().delete("test.test") .setObject("id", 1) // Asynchronously save the data to the database. The result is returned after data is stored in the background cache. //.commit() // The result is returned after data is successfully saved to the database. .syncCommit(); }
public void flush() throws DwsClientException { getClient().flush(); }
Disabling resources
public void close() throws IOException { getClient().close(); }
Listening to Data Import Events
public DwsClient getClient() throws Exception { DwsConfig config = DwsConfig.of() .with(DwsClientConfigs.JDBC_URL, System.getenv("db_url")) .with(DwsClientConfigs.JDBC_PASSWORD, System.getenv("db_pwd")) .with(DwsClientConfigs.JDBC_USERNAME, System.getenv("db_username")) .onFlushSuccess(records -> { for (Record record : records) { log.info("flush success. value = {}, pk = {}", RecordUtil.toMap(record), RecordUtil.getRecordPrimaryKeyValue(record)); } }); return new DwsClient(config); }
1.x compatibility
public DwsClient getClient() { DwsConfig config = DwsConfig .builder() .withUrl("jdbc:postgresql://***/gaussdb") .withUsername("***") .withPassword("****") .onFlushSuccess(records -> { for (Record record : records) { log.info("flush success. value = {}, pk = {}", RecordUtil.toMap(record), RecordUtil.getRecordPrimaryKeyValue(record)); } }) .build(); return new DwsClient(config); }
Listening to Abnormal Background Tasks
public DwsClient getClient() throws Exception { DwsConfig config = DwsConfig.of() .with(DwsClientConfigs.JDBC_URL, System.getenv("db_url")) .with(DwsClientConfigs.JDBC_PASSWORD, System.getenv("db_pwd")) .with(DwsClientConfigs.JDBC_USERNAME, System.getenv("db_username")) .onError((clientException, client) -> { if (clientException instanceof DwsClientRecordException) { DwsClientRecordException recordException = (DwsClientRecordException) clientException; List<Record> records = recordException.getRecords(); List<DwsClientException> exceptions = recordException.getExceptions(); for (int i = 0; i < records.size(); i++) { log.error("pk = {} . error = {}", RecordUtil.getRecordPrimaryKeyValue(records.get(i)), exceptions.get(i)); } } if (clientException.getCode() != ExceptionCode.CONNECTION_ERROR && clientException.getCode() != ExceptionCode.LOCK_ERROR) { throw clientException; } log.error("code = {}", clientException.getCode(), clientException.getOriginal()); return null; }); return new DwsClient(config); }
1.x compatibility
public DwsClient getClient() { DwsConfig config = DwsConfig .builder() .withUrl("jdbc:postgresql://***/gaussdb") .withUsername("***") .withPassword("****") .onError((clientException, client) -> { if (clientException instanceof DwsClientRecordException) { DwsClientRecordException recordException = (DwsClientRecordException) clientException; List<Record> records = recordException.getRecords(); List<DwsClientException> exceptions = recordException.getExceptions(); for (int i = 0; i < records.size(); i++) { log.error("pk = {} . error = {}", RecordUtil.getRecordPrimaryKeyValue(records.get(i)), exceptions.get(i)); } } if (clientException.getCode() != ExceptionCode.CONNECTION_ERROR && clientException.getCode() != ExceptionCode.LOCK_ERROR) { throw clientException; } log.error("code = {}", clientException.getCode(), clientException.getOriginal()); return null; }) .build(); return new DwsClient(config); }
Exception Handling
Exceptions can be classified into three types:
- InvalidException is not thrown and is triggered when the request parameter is invalid.
- 2. DwsClientException encapsulates all exceptions, including the parsed code and original exceptions.
- 3. DwsClientRecordException is an extension to DwsClientException. It includes the datasets written to the exception and the corresponding DwsClientException exception.
The following table lists the exception codes.
public enum ExceptionCode { /** /* Invalid parameter */ */ INVALID_CONFIG(1), /** * Connection exception. */ CONNECTION_ERROR(100), /** * Read-only */ READ_ONLY(101), /** * Timeout */ TIMEOUT(102), /** * Too many connections */ TOO_MANY_CONNECTIONS(103), /** * Locking exception. */ LOCK_ERROR(104), /** * Authentication failed. */ AUTH_FAIL(201), /** * Closed */ ALREADY_CLOSE(202), /** * No permission. */ PERMISSION_DENY(203), SYNTAX_ERROR(204), /** * Internal exception. */ INTERNAL_ERROR(205), /** * Interruption exception. */ INTERRUPTED(206), /** * The table is not found. */ TABLE_NOT_FOUND(207), CONSTRAINT_VIOLATION(208), DATA_TYPE_ERROR(209), DATA_VALUE_ERROR(210), /** * Exceptions that cannot be parsed */ UNKNOWN_ERROR(500); private final int code; }
Detailed Configuration
The list includes only public parameters. Do not configure parameters not on this list.
For configuration using the file, use the key parameter. The value parameter sets the time unit. Supported values include:
Day: d, day(s)
Hour: h, hour(s)
Minute: min(s), m, minute(s)
Second: s, sec(s), second(s)
Millisecond: ms, milli(s), millisecond(s)
Memory parameters:
byte: b
kb: k, kb
mb: m, mb
gb: g, gb
Deleted parameters in version 2.x
Parameter |
Description |
Reason |
---|---|---|
logDataTables |
Tables whose data needs to be printed during data import to the database for data comparison during fault locating. |
No application scenario exists. Debugging the development node is easier. You cannot configure this parameter. |
batchOutWeighRatio |
To improve the overall throughput, you can set this parameter when the requirement on autoFlushBatchSize is not strict. When data is submitted to the buffer and the data volume in the buffer is greater than batchOutWeighRatio x autoFlushBatchSize, the task of submitting data to the database will be executed. This parameter is used to preferably use background threads to submit import tasks, rather than using service threads. |
The WRITE_FORCE_FLUSH_BATCH_SIZE parameter has the same function and is more straightforward. |
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot