Updated on 2024-07-19 GMT+08:00

Dimension Table

Syntax

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
create table dwsSource (
  attr_name attr_type 
  (',' attr_name attr_type)* 
)
with (
  'connector' = 'dws',
  'url' = '',
  'tableName' = '',
  'username' = '',
  'password' = ''
);

Parameter Description

Table 1 Database configurations

Parameter

Description

Default Value

connector

The Flink framework differentiates connector parameters. This parameter is fixed to dws.

-

url

Database connection address

-

username

Configured connection user

-

password

Database user password

-

Table 2 Connection configuration parameters

Parameter

Name

Type

Description

Default Value

connectionSize

Size of the read thread pool

int

The number of threads used for operations is equal to the number of database connections, which is also equivalent to the size of the write thread.

1

readBatchSize

Maximum number of GET requests that can be combined and submitted at a time.

int

Maximum number of query requests that can be processed in batches once they have been stacked.

128

readBatchQueueSize

Size of the buffer pool for GET requests

int

Maximum number of stacked query requests.

256

readTimeoutMs

Timeout interval of the GET request (ms)

int

Setting the value to 0 means there will be no timeout, and it applies to two situations:

  • The time a user waits to perform a get operation before submitting it to GaussDB(DWS).
  • The execution of the get sql statement times out.

0

readSyncThreadEnable

Whether to enable the thread pool during non-asynchronous query

boolean

Enabling this function will cause future.get() to be blocked asynchronously, while disabling it will result in the main thread being blocked synchronously.

true

lookupScanEnable

Whether to enable scan query

boolean

Whether to enable scan query when the association condition is not all primary key matching.

If the value is set to false, all join conditions must be primary keys, otherwise, an exception will be thrown.

false

fetchSize / lookupScanFetchSize

Size of a scan query

int

Maximum number of records returned in a conditional query when using non-full primary key matching mode. By default, fetchSize is used, but if fetchSize is set to 0, lookupScanFetchSize will be used instead.

1000

lookupScanTimeoutMs

Timeout interval of the SCAN request (ms)

int

Timeout limit for a conditional query in non-full primary key matching mode (ms).

60000

lookupAsync

Whether to obtain data in asynchronous mode

boolean

Set the query mode to synchronous or asynchronous.

true

lookupCacheType

Cache policy

LookupCacheType

Set the following cache policies (case insensitive):

  • None: By default, there is no cache LRU. However, some data in the dimension table may still be cached. When data is retrieved from the source table, the system checks the cache for a match. If no match is found, the system searches the physical dimension table for the data.
  • ALL: All data is cached, which is applicable to small tables that are not frequently updated.

LookupCacheType.LRU

lookupCacheMaxRows

Cache size

long

After selecting the LRU cache policy, you can set the cache size.

1000

lookupCacheExpireAfterAccess

Timeout interval for calculation after reading

Duration

After the LRU cache policy is selected, you can set the timeout period to be prolonged after each read. By default, the LRU cache policy does not take effect.

null

lookupCacheExpireAfterWrite

Timeout interval for calculation after data is written

Duration

If the LRU cache policy is selected, you can set the timeout period to a fixed value after each write, regardless of whether the data is accessed.

10s

lookupCacheMissingKey

Write data to the cache if the data does not exist

boolean

After the LRU cache policy is selected, the dimension table data does not exist and the data is cached.

false

lookupCacheReloadStrategy

Full cache reloading policy

ReloadStrategy

If the ALL cache policy is selected, you can set the following data reloading policies:

  • PERIODIC: periodic data reloading.
  • TIMED: scheduled data reloading, in days.

ReloadStrategy.PERIODIC

lookupCachePeriodicReloadInterval

Data reloading interval

Duration

When the PERIOD reloading policy is selected, you can set the full cache reloading interval.

1h

lookupCachePeriodicReloadMode

Data reloading mode

ScheduleMode

When the PERIOD reloading policy is selected, you can set the following reloading modes (case insensitive):

  • FIXED_DELAY: The reloading interval is calculated from the end of the previous loading.
  • FIXED_RATE: The reloading interval is calculated from the last loading.

ScheduleMode.FIXED_DELAY

lookupCacheTimedReloadTime

Scheduled scheduling time for data reloading

string

When the TIMED reloading policy is selected, you can set the full cache reloading time in ISO-8601 format, For example, 10:15.

00:00

lookupCacheTimedReloadIntervalDays

Interval for scheduling data reloading

int

When the TIMED reloading policy is selected, you can set the scheduling interval (in days) of the full cache period.

1

Examples

Read data from a Kafka source table, use a GaussDB(DWS) table as the dimension table. Write wide table information generated by the source and dimension tables to the print result table. The procedure is as follows:

  1. Connect to the GaussDB(DWS) database instance and create a table in GaussDB(DWS) as a dimension table. The table name is area_info. The SQL statement is as follows:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    create table public.area_info(
      area_id VARCHAR,
      area_province_name VARCHAR,
      area_city_name VARCHAR,
      area_county_name VARCHAR,
      area_street_name VARCHAR,
      region_name VARCHAR,
      PRIMARY KEY(area_id)
    );
    
  2. Connect to the GaussDB(DWS) database and run the following statement to insert test data into the dimension table area_info:
    1
    2
    3
    4
    5
    6
    7
    insert into area_info
      (area_id, area_province_name, area_city_name, area_county_name, area_street_name, region_name) 
      values
      ('330102', 'a1', 'b1', 'c1', 'd1', 'e1'),
      ('330106', 'a1', 'b1', 'c2', 'd2', 'e1'),
      ('330108', 'a1', 'b1', 'c3', 'd3', 'e1'),
      ('330110', 'a1', 'b1', 'c4', 'd4', 'e1');
    
  3. Run the flink sql command to create the source table, result table, and dimension table and run the following SQL statements:
    CREATE TABLE orders (
      order_id string,
      order_channel string,
      order_time string,
      pay_amount double,
      real_pay double,
      pay_time string,
      user_id string,
      user_name string,
      area_id string,
      proctime as Proctime()
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'order_test',
      'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort',
      'properties.group.id' = 'dws-order',
      'scan.startup.mode' = 'latest-offset',
      'format' = 'json'
    );
    --Create the address dimension table.
    create table area_info (
        area_id string, 
        area_province_name string, 
        area_city_name string, 
        area_county_name string,
        area_street_name string, 
        region_name string 
    ) WITH (
      'connector' = 'dws',
      'url' = 'jdbc:gaussdb://DwsAddress:DwsPort/DwsDbName',
      'tableName' = 'area_info',
      'username' = 'DwsUserName',
      'password' = 'DwsPassword',
      'lookupCacheMaxRows' = '10000',
      'lookupCacheExpireAfterAccess' = '2h'
    );
    --Create a wide table based on the address dimension table for detailed order information.
    create table order_detail(
        order_id string,
        order_channel string,
        order_time string,
        pay_amount double,
        real_pay double,
        pay_time string,
        user_id string,
        user_name string,
        area_id string,
        area_province_name string,
        area_city_name string,
        area_county_name string,
        area_street_name string,
        region_name string
    ) with (
      'connector' = 'print'
     
    );
    insert into order_detail
        select orders.order_id, orders.order_channel, orders.order_time, orders.pay_amount, orders.real_pay, orders.pay_time, orders.user_id, orders.user_name,
               area.area_id, area.area_province_name, area.area_city_name, area.area_county_name,
               area.area_street_name, area.region_name  from orders
        left join area_info for system_time as of orders.proctime as area on orders.area_id = area.area_id;
  4. Write data to Kafka.
    1
    2
    3
    {"order_id":"202103241606060001", "order_channel":"appShop", "order_time":"2021-03-24 16:06:06", "pay_amount":"200.00", "real_pay":"180.00", "pay_time":"2021-03-24 16:10:06", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
    {"order_id":"202103251202020001", "order_channel":"miniAppShop", "order_time":"2021-03-25 12:02:02", "pay_amount":"60.00", "real_pay":"60.00", "pay_time":"2021-03-25 12:03:00", "user_id":"0002", "user_name":"Bob", "area_id":"330110"}
    {"order_id":"202103251505050001", "order_channel":"qqShop", "order_time":"2021-03-25 15:05:05", "pay_amount":"500.00", "real_pay":"400.00", "pay_time":"2021-03-25 15:10:00", "user_id":"0003", "user_name":"Cindy", "area_id":"330108"}
    
  5. The following figure shows the result.