Dimension Table
Function
Create a Redis table to connect to source streams for wide table generation.
Prerequisites
- An enhanced datasource connection with Redis has been established, so that you can configure security group rules as required.
- For details about how to set up an enhanced datasource connection, see Enhanced Datasource Connections in the Data Lake Insight User Guide.
- For details about how to configure security group rules, see Security Group Overview in the Virtual Private Cloud User Guide.
Caveats
- When you create a Flink OpenSource SQL job, set Flink Version to 1.15 in the Running Parameters tab. Select Save Job Log, and specify the OBS bucket for saving job logs.
- Storing authentication credentials such as usernames and passwords in code or plaintext poses significant security risks. It is recommended using DEW to manage credentials instead. Storing encrypted credentials in configuration files or environment variables and decrypting them when needed ensures security. For details, see Flink OpenSource SQL Jobs Using DEW to Manage Access Credentials.
- To obtain the key values, you can set the primary key in Flink. The primary key maps to the Redis key.
- If the primary key cannot be a composite primary key, and only can be one field.
- Constraints on schema-syntax:
- If schema-syntax is map or array, there can be only one non-primary key and it must be of the same map or array type.
- If schema-syntax is fields-scores, the number of non-primary keys must be an even number, and the second key of every two keys except the primary key must be of the double type. The double value is the score of the previous key. The following is an example:
CREATE TABLE redisSource ( redisKey string, order_id string, score1 double, order_channel string, score2 double, order_time string, score3 double, pay_amount double, score4 double, real_pay double, score5 double, pay_time string, score6 double, user_id string, score7 double, user_name string, score8 double, area_id string, score9 double, primary key (redisKey) not enforced ) WITH ( 'connector' = 'redis', 'host' = 'RedisIP', 'password' = 'RedisPassword', 'data-type' = 'sorted-set', 'deploy-mode' = 'master-replica', 'schema-syntax' = 'fields-scores' );
- Restrictions on data-type:
- When data-type is set, the types of non-primary keys defined in Flink must be the same.
- If data-type is sorted-set and schema-syntax is fields or array, only sorted set values can be read from Redis, and the score value cannot be read.
- If data-type is string, only one non-primary key field is allowed.
- If data-type is sorted-set and schema-syntax is map, there can be only one non-primary key in addition to the primary key and the non-primary key must be of the map type. The map values of the non-primary key must be of the double type, indicating the score. The keys in the map are the values in the Redis set.
- If data-type is sorted-set and schema-syntax is array-scores, only two non-primary keys are allowed and must be of the array type.
The first key indicates values in the Redis set. The second key is of the array<double> type, indicating index scores. The following is an example:
CREATE TABLE redisSink ( order_id string, arrayField Array<String>, arrayScore array<double>, primary key (order_id) not enforced ) WITH ( 'connector' = 'redis', 'host' = 'RedisIP', 'password' = 'RedisPassword', 'data-type' = 'sorted-set', "default-score" = '3', 'deploy-mode' = 'master-replica', 'schema-syntax' = 'array-scores' );
Syntax
create table dwsSource ( attr_name attr_type (',' attr_name attr_type)* (',' watermark for rowtime_column_name as watermark-strategy_expression) ,PRIMARY KEY (attr_name, ...) NOT ENFORCED ) with ( 'connector' = 'redis', 'host' = '' );
Parameters
Parameter |
Mandatory |
Default Value |
Data Types |
Description |
---|---|---|---|---|
connector |
Yes |
None |
String |
Connector type. Set this parameter to redis. |
host |
Yes |
None |
String |
Redis connector address |
port |
No |
6379 |
Integer |
Redis connector port |
password |
No |
None |
String |
Redis authentication password |
namespace |
No |
None |
String |
Redis key namespace |
delimiter |
No |
: |
String |
Delimiter between the Redis key and namespace |
data-type |
No |
hash |
String |
Redis data type. Available values are as follows:
For details about the constraints, see Constraints on data-type. |
schema-syntax |
No |
fields |
String |
Redis schema semantics. Available values are as follows:
For details about the constraints, see Constraints on schema-syntax. |
deploy-mode |
No |
standalone |
String |
Deployment mode of the Redis cluster. The value can be standalone, master-replica, or cluster. The default value is standalone. |
retry-count |
Yes |
5 |
Integer |
Size of each connection request queue. If the number of connection requests in a queue exceeds the queue size, command calling will cause RedisException. Setting requestQueueSize to a small value will cause exceptions to occur earlier during overload or disconnection. A larger value indicates more time required to reach the boundary, but more requests may be queued and more heap space may be used. The default value is 2147483647. |
connection-timeout-millis |
No |
10000 |
Integer |
Maximum timeout for connecting to the Redis cluster |
commands-timeout-millis |
No |
2000 |
Integer |
Maximum time for waiting for a completion response |
rebalancing-timeout-millis |
No |
15000 |
Integer |
Sleep time when the Redis cluster fails |
scan-keys-count |
No |
1000 |
Integer |
Number of data records read in each scan |
default-score |
No |
0 |
Double |
Default score when data-type is sorted-set |
deserialize-error-policy |
No |
fail-job |
Enum |
How to process a data parsing failure Available values are as follows:
|
skip-null-values |
No |
true |
Boolean |
Whether null values will be skipped |
lookup.async |
No |
false |
Boolean |
Whether asynchronous I/O will be used when this table is used as a dimension table |
lookup.parallelism |
No |
None |
int |
Defines the custom parallelism of the lookup join operator. If this parameter is not defined, the planner will derive the parallelism by considering the global configuration (if the lookup.parallelism parameter is defined) or the parallelism of the input operator. |
lookup.batch.interval |
No |
1s |
Duration |
Batch lookup join can buffer input records with a maximum delay. Batch lookup join can buffer input records with a maximum delay. |
lookup.batch.size |
No |
100L |
long |
Maximum number of input records that can be buffered for batch lookup join. |
lookup.batch |
No |
false |
Boolean |
Whether to enable batch lookup optimization. If enabled, the user must set both the lookup.batch.interval and lookup.batch.size parameters. Additionally, due to the implementation of the underlying batch processing interval interference mechanism, the user must explicitly enable the table.exec.batch-lookup.enabled parameter in the Flink configuration. |
ignore-retractions |
No |
false |
Boolean |
The connector should ignore retraction messages in the update insert/withdraw flow mode. |
key-column |
No |
None |
String |
Schema key of the Redis table. |
Example
Read data from a Kafka source table, use a Redis table as the dimension table. Write wide table information generated by the source and dimension tables to a Kafka result table. The procedure is as follows:
- Create an enhanced datasource connection in the VPC and subnet where Redis and Kafka locates, and bind the connection to the required Flink elastic resource pool. For details, see Enhanced Datasource Connections.
- Set Redis and Kafka security groups and add inbound rules to allow access from the Flink queue. Test the connectivity using the Redis address by referring to Testing Address Connectivity. If the connection passes the test, it is bound to the queue.
- Run the following commands on the Redis client to send data to Redis:
HMSET 330102 area_province_name a1 area_province_name b1 area_county_name c1 area_street_name d1 region_name e1 HMSET 330106 area_province_name a1 area_province_name b1 area_county_name c2 area_street_name d2 region_name e1 HMSET 330108 area_province_name a1 area_province_name b1 area_county_name c3 area_street_name d3 region_name e1 HMSET 330110 area_province_name a1 area_province_name b1 area_county_name c4 area_street_name d4 region_name e1
- Create a Flink OpenSource SQL job. Enter the following job script and submit the job. The job script uses Kafka as the data source and a Redis table as the dimension table. Data is output to a Kafka result table.
Change the values of the parameters in bold as needed in the following script.
CREATE TABLE orders ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string, proctime as Proctime() ) WITH ( 'connector' = 'kafka', 'topic' = 'kafkaSourceTopic', 'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort', 'properties.group.id' = 'GroupId', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); -- Create an address dimension table create table area_info ( area_id string, area_province_name string, area_city_name string, area_county_name string, area_street_name string, region_name string, primary key (area_id) not enforced -- Redis key ) WITH ( 'connector' = 'redis', 'host' = 'RedisIP', 'password' = 'RedisPassword', 'data-type' = 'hash', 'deploy-mode' = 'master-replica' ); -- Generate a wide table based on the address dimension table containing detailed order information. create table order_detail( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string, area_province_name string, area_city_name string, area_county_name string, area_street_name string, region_name string ) with ( 'connector' = 'kafka', 'topic' = 'kafkaSinkTopic', 'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort', 'format' = 'json' ); insert into order_detail select orders.order_id, orders.order_channel, orders.order_time, orders.pay_amount, orders.real_pay, orders.pay_time, orders.user_id, orders.user_name, area.area_id, area.area_province_name, area.area_city_name, area.area_county_name, area.area_street_name, area.region_name from orders left join area_info for system_time as of orders.proctime as area on orders.area_id = area.area_id;
- Connect to the Kafka cluster and insert the following test data into the source topic in Kafka:
{"order_id":"202103241606060001", "order_channel":"appShop", "order_time":"2021-03-24 16:06:06", "pay_amount":"200.00", "real_pay":"180.00", "pay_time":"2021-03-24 16:10:06", "user_id":"0001", "user_name":"Alice", "area_id":"330106"} {"order_id":"202103251202020001", "order_channel":"miniAppShop", "order_time":"2021-03-25 12:02:02", "pay_amount":"60.00", "real_pay":"60.00", "pay_time":"2021-03-25 12:03:00", "user_id":"0002", "user_name":"Bob", "area_id":"330110"} {"order_id":"202103251505050001", "order_channel":"appShop", "order_time":"2021-03-25 15:05:05", "pay_amount":"500.00", "real_pay":"400.00", "pay_time":"2021-03-25 15:10:00", "user_id":"0003", "user_name":"Cindy", "area_id":"330108"}
- Connect to the Kafka cluster and read data from the sink topic of Kafka. The result data is as follows:
{"order_id":"202103241606060001","order_channel":"appShop","order_time":"2021-03-24 16:06:06","pay_amount":200.0,"real_pay":180.0,"pay_time":"2021-03-24 16:10:06","user_id":"0001","user_name":"Alice","area_id":"330106","area_province_name":"a1","area_city_name":"b1","area_county_name":"c2","area_street_name":"d2","region_name":"e1"} {"order_id":"202103251202020001","order_channel":"miniAppShop","order_time":"2021-03-25 12:02:02","pay_amount":60.0,"real_pay":60.0,"pay_time":"2021-03-25 12:03:00","user_id":"0002","user_name":"Bob","area_id":"330110","area_province_name":"a1","area_city_name":"b1","area_county_name":"c4","area_street_name":"d4","region_name":"e1"} {"order_id":"202103251505050001","order_channel":"appshop","order_time":"2021-03-25 15:05:05","pay_amount":500.0,"real_pay":400.0,"pay_time":"2021-03-25 15:10:00","user_id":"0003","user_name":"Cindy","area_id":"330108","area_province_name":"a1","area_city_name":"b1","area_county_name":"c3","area_street_name":"d3","region_name":"e1"}
FAQs
If Chinese characters are written to the Redis in the Windows environment, an exception will occur during data writing.
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot