dws-connector-flink
Description
dws-connector-flink is a tool used to connect dwsclient to flink. The tool encapsulates dwsClient. Its overall import capability is the same as that of dwsClient. Currently, only the DynamicTableSourceFactory and DynamicTableSinkFactory interfaces are implemented. The CatalogFactory interface is not implemented. Therefore, catalogs are not supported.
How to Use
- Through a JAR package
dws-connector-flink has been added to the Maven repository. You can select the latest version. For details, visit https://mvnrepository.com/artifact/com.huaweicloud.dws.
<dependency> <groupId>com.huaweicloud.dws</groupId> <artifactId>dws-connector-flink_${scala.version}_${flink.version}</artifactId> <version>1.0</version> </dependency>
- Through Flink SQL
When using Flink SQL to implement dws-connector-flink, you need to place the dws-connector-flink package and its dependencies in the Flink class loading directory. In 1.0.3 and later versions, the packages with dependencies have been released to the Maven repository. You can download the packages from the repository.
Scenarios & Usage
- Sink Mode
A DwsSink API is provided to quickly build a sink. An example is as follows:
SinkFunction<EventLog> dwsSink = DwsSink.sink(DwsConfig.builder() .withUrl("") .withPassword("") .withUsername("").build(), null, (DwsInvokeFunction<EventLog>) (eventLog, dwsClient, context) -> { dwsClient.write("test.test") .setObject("id", eventLog.getGuid()) .setObject("name", eventLog.getEventId(), true) .commit(); }); DataStreamSource<EventLog> source = evn.addSource(new LogSourceFunction()); source.addSink(dwsSink); evn.execute();
API description:public static <T> SinkFunction<T> sink(DwsConfig config, JSONObject context, DwsInvokeFunction<T> invoke);
- config is a parameter of dwsClient, which is the same as that of dwsClient.
- context is a global context provided for operations such as cache. It can be specified during dwsClient construction, and is called back each time with the data processing interface.
- invoke is a function interface used to process data.
/** * Execute data processing callback when Flink executes invoke. * @param value indicates the current received data. * @param client indicates the dwsClient constructed based on the configuration. * @param context indicates a global object specified during construction. This parameter is attached to each callback and can be used as a global cache. * @throws DwsClientException */ void accept(T value, DwsClient client, JSONObject context) throws DwsClientException;
- SQL Mode
- Create a Flink table:
tableEnvironment.executeSql("create table dws_test (" + " id int,\n" + " name STRING,\n" + " PRIMARY KEY (id) NOT ENFORCED" + ") WITH (\n" + " 'connector' = 'dws',\n" + " 'url' = 'jdbc:postgresql://****/gaussdb',\n" + " 'tablename' = 'test.test'," + " 'username'='dbadmin'," + " 'password'='***'" + ")");
- Write data in the data source to the test table.
tableEnvironment.executeSql("insert into dws_test select guid as id,eventId as name from kafka_event_log")
- Query data from the test table.
tableEnvironment.executeSql("select name from dws_test where id = 100 limit 10").print();
- Create a Flink table:
flinkSQL Configuration Parameters
![](https://support.huaweicloud.com/intl/en-us/tg-dws/public_sys-resources/note_3.0-en-us.png)
Primary keys set in flink SQL is automatically mapped to unique keys in dws-client. The parameters are released with the client version. The parameter functions are the same as those on the client. The following parameters are the latest parameters.
Parameter Description |
Description |
Default Value |
---|---|---|
connector |
The Flink framework differentiates connector parameters. This parameter is fixed to dws. |
- |
url |
Database connection address |
- |
username |
Configured connection user |
- |
password |
Configured password |
- |
tableName |
The GaussDB(DWS) table |
- |
Parameter |
Description |
Default Value |
---|---|---|
connectionSize |
Number of concurrent requests at dwsClient initialization |
1 |
connectionMaxUseTimeSeconds |
Number of seconds after which a connection is forcibly released. The unit is second. |
3600 (one hour) |
connectionMaxIdleMs |
Maximum idle time of a connection, in milliseconds. If the idle time of a connection exceeds the value, the connection is released. |
60,000 (one minute) |
Parameter |
Description |
Default Value |
---|---|---|
conflictStrategy |
Primary key conflict policy when data is written to a table with a primary key. The options are as follows:
|
update |
writeMode |
Import modes:
|
auto |
maxFlushRetryTimes |
Maximum number of attempts to import data to the database. If the execution is successful with attempts less than this value, no exception is thrown. The retry interval is 1 second multiplied by the number of attempts. |
3 |
autoFlushBatchSize |
Batch size for automatic database update (batch size) |
5000 |
autoFlushMaxInterval |
Maximum interval for automatic database update (duration for forming a batch). |
5s |
copyWriteBatchSize |
When writeMode is set to auto, the batch size in the COPY method is used. |
5000 |
ignoreDelete |
Ignores delete in Flink tasks. |
false (The default value is true before 1.0.10.) |
ignoreNullWhenUpdate |
Indicates if to ignore the update of columns with null values in Flink. This parameter is valid only when conflictStrategy is set to update. |
false |
metadataCacheSeconds |
Maximum cache duration of metadata in the system, for example, table definitions (unit: second). |
180 |
copyMode |
Format for copying data to the database:
|
CSV |
createTempTableMode |
Creates a temporary table in the following methods: AS, LIKE |
AS |
numberAsEpochMsForDatetime |
Indicates whether to convert data as a timestamp to the corresponding time type if the database is of the time type and the data source is of the numeric type. |
false |
stringToDatetimeFormat |
Indicates the format for converting the data source to the time type if the database is of the time type and the data source is of the string type. If this parameter is set, it is enabled. |
null |
sink.parallelism |
Flink system parameter, which is used to set the number of concurrent sinks. |
Follow the upstream operator. |
printDataPk |
Indicates whether to print the data primary key when the connector receives data. It can be used for troubleshooting. |
false |
ignoreUpdateBefore |
Ignores update_before in Flink tasks. You need to enable this parameter for partial updates on large tables. Otherwise, the update will erase other columns and set them to null, since the data is deleted before being inserted. |
true |
Parameter |
Mandatory |
Description |
Default Value |
---|---|---|---|
fetchSize |
No |
The fetchSize parameter in the JDBC statement is used to control the number of records returned by the database. |
1000 |
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot