Dimension Table
Syntax
1 2 3 4 5 6 7 8 9 10 11 |
create table dwsSource ( attr_name attr_type (',' attr_name attr_type)* ) with ( 'connector' = 'dws', 'url' = '', 'tableName' = '', 'username' = '', 'password' = '' ); |
Parameter Description
Parameter |
Description |
Default Value |
---|---|---|
connector |
The Flink framework differentiates connector parameters. This parameter is fixed to dws. |
- |
url |
Database connection address |
- |
username |
Configured connection user |
- |
password |
Database user password |
- |
Parameter |
Name |
Type |
Description |
Default Value |
---|---|---|---|---|
connectionSize |
Size of the read thread pool |
int |
The number of threads used for operations is equal to the number of database connections, which is also equivalent to the size of the write thread. |
1 |
readBatchSize |
Maximum number of GET requests that can be combined and submitted at a time. |
int |
Maximum number of query requests that can be processed in batches once they have been stacked. |
128 |
readBatchQueueSize |
Size of the buffer pool for GET requests |
int |
Maximum number of stacked query requests. |
256 |
readTimeoutMs |
Timeout interval of the GET request (ms) |
int |
Setting the value to 0 means there will be no timeout, and it applies to two situations:
|
0 |
readSyncThreadEnable |
Whether to enable the thread pool during non-asynchronous query |
boolean |
Enabling this function will cause future.get() to be blocked asynchronously, while disabling it will result in the main thread being blocked synchronously. |
true |
lookupScanEnable |
Whether to enable scan query |
boolean |
Whether to enable scan query when the association condition is not all primary key matching. If the value is set to false, all join conditions must be primary keys, otherwise, an exception will be thrown. |
false |
fetchSize / lookupScanFetchSize |
Size of a scan query |
int |
Maximum number of records returned in a conditional query when using non-full primary key matching mode. By default, fetchSize is used, but if fetchSize is set to 0, lookupScanFetchSize will be used instead. |
1000 |
lookupScanTimeoutMs |
Timeout interval of the SCAN request (ms) |
int |
Timeout limit for a conditional query in non-full primary key matching mode (ms). |
60000 |
lookupAsync |
Whether to obtain data in asynchronous mode |
boolean |
Set the query mode to synchronous or asynchronous. |
true |
lookupCacheType |
Cache policy |
LookupCacheType |
Set the following cache policies (case insensitive):
|
LookupCacheType.LRU |
lookupCacheMaxRows |
Cache size |
long |
After selecting the LRU cache policy, you can set the cache size. |
1000 |
lookupCacheExpireAfterAccess |
Timeout interval for calculation after reading |
Duration |
After the LRU cache policy is selected, you can set the timeout period to be prolonged after each read. By default, the LRU cache policy does not take effect. |
null |
lookupCacheExpireAfterWrite |
Timeout interval for calculation after data is written |
Duration |
If the LRU cache policy is selected, you can set the timeout period to a fixed value after each write, regardless of whether the data is accessed. |
10s |
lookupCacheMissingKey |
Write data to the cache if the data does not exist |
boolean |
After the LRU cache policy is selected, the dimension table data does not exist and the data is cached. |
false |
lookupCacheReloadStrategy |
Full cache reloading policy |
ReloadStrategy |
If the ALL cache policy is selected, you can set the following data reloading policies:
|
ReloadStrategy.PERIODIC |
lookupCachePeriodicReloadInterval |
Data reloading interval |
Duration |
When the PERIOD reloading policy is selected, you can set the full cache reloading interval. |
1h |
lookupCachePeriodicReloadMode |
Data reloading mode |
ScheduleMode |
When the PERIOD reloading policy is selected, you can set the following reloading modes (case insensitive):
|
ScheduleMode.FIXED_DELAY |
lookupCacheTimedReloadTime |
Scheduled scheduling time for data reloading |
string |
When the TIMED reloading policy is selected, you can set the full cache reloading time in ISO-8601 format, For example, 10:15. |
00:00 |
lookupCacheTimedReloadIntervalDays |
Interval for scheduling data reloading |
int |
When the TIMED reloading policy is selected, you can set the scheduling interval (in days) of the full cache period. |
1 |
Examples
Read data from a Kafka source table, use a GaussDB(DWS) table as the dimension table. Write wide table information generated by the source and dimension tables to the print result table. The procedure is as follows:
- Connect to the GaussDB(DWS) database instance and create a table in GaussDB(DWS) as a dimension table. The table name is area_info. The SQL statement is as follows:
1 2 3 4 5 6 7 8 9
create table public.area_info( area_id VARCHAR, area_province_name VARCHAR, area_city_name VARCHAR, area_county_name VARCHAR, area_street_name VARCHAR, region_name VARCHAR, PRIMARY KEY(area_id) );
- Connect to the GaussDB(DWS) database and run the following statement to insert test data into the dimension table area_info:
1 2 3 4 5 6 7
insert into area_info (area_id, area_province_name, area_city_name, area_county_name, area_street_name, region_name) values ('330102', 'a1', 'b1', 'c1', 'd1', 'e1'), ('330106', 'a1', 'b1', 'c2', 'd2', 'e1'), ('330108', 'a1', 'b1', 'c3', 'd3', 'e1'), ('330110', 'a1', 'b1', 'c4', 'd4', 'e1');
- Run the flink sql command to create the source table, result table, and dimension table and run the following SQL statements:
CREATE TABLE orders ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string, proctime as Proctime() ) WITH ( 'connector' = 'kafka', 'topic' = 'order_test', 'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort', 'properties.group.id' = 'dws-order', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); --Create the address dimension table. create table area_info ( area_id string, area_province_name string, area_city_name string, area_county_name string, area_street_name string, region_name string ) WITH ( 'connector' = 'dws', 'url' = 'jdbc:gaussdb://DwsAddress:DwsPort/DwsDbName', 'tableName' = 'area_info', 'username' = 'DwsUserName', 'password' = 'DwsPassword', 'lookupCacheMaxRows' = '10000', 'lookupCacheExpireAfterAccess' = '2h' ); --Create a wide table based on the address dimension table for detailed order information. create table order_detail( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string, area_province_name string, area_city_name string, area_county_name string, area_street_name string, region_name string ) with ( 'connector' = 'print' ); insert into order_detail select orders.order_id, orders.order_channel, orders.order_time, orders.pay_amount, orders.real_pay, orders.pay_time, orders.user_id, orders.user_name, area.area_id, area.area_province_name, area.area_city_name, area.area_county_name, area.area_street_name, area.region_name from orders left join area_info for system_time as of orders.proctime as area on orders.area_id = area.area_id;
- Write data to Kafka.
1 2 3
{"order_id":"202103241606060001", "order_channel":"appShop", "order_time":"2021-03-24 16:06:06", "pay_amount":"200.00", "real_pay":"180.00", "pay_time":"2021-03-24 16:10:06", "user_id":"0001", "user_name":"Alice", "area_id":"330106"} {"order_id":"202103251202020001", "order_channel":"miniAppShop", "order_time":"2021-03-25 12:02:02", "pay_amount":"60.00", "real_pay":"60.00", "pay_time":"2021-03-25 12:03:00", "user_id":"0002", "user_name":"Bob", "area_id":"330110"} {"order_id":"202103251505050001", "order_channel":"qqShop", "order_time":"2021-03-25 15:05:05", "pay_amount":"500.00", "real_pay":"400.00", "pay_time":"2021-03-25 15:10:00", "user_id":"0003", "user_name":"Cindy", "area_id":"330108"}
- The following figure shows the result.
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot