Creating an RDS Table
Create an RDS table to interconnect with the source stream so that data can be output to RDS. For more information about RDS, see the Relational Database Service User Guide.
For details about the JOIN syntax, see JOIN Between Stream Data and Table Data.
Prerequisites
- Ensure that you have created a PostgreSQL or MySQL RDS instance in RDS.
For details about how to create an RDS instance, see Buying an Instance in the Relational Database Service Getting Started Guide.
- In this scenario, jobs must run on the exclusive cluster of CS. Therefore, CS must interconnect with the VPC that has been connected with RDS instance. You can also set the security group rules as required.
For details about how to set up the VPC peering connection, see VPC Peering Connection in the Cloud Stream Service User Guide.
For details about how to configure security group rules, see Security Group in the Virtual Private Cloud User Guide.
Syntax
Syntax
CREATE TABLE table_id ( car_id STRING, car_owner STRING, car_brand STRING, car_price INT ) WITH ( type = "rds", region = "", username = "", password = "", db_url = "", table_name = "" );
Description
| Parameter | Mandatory | Description |
|---|---|---|
| type | Yes | Indicates the output channel type. Value rds indicates that data is stored to RDS. |
| username | Yes | Indicates the username of a database. |
| password | Yes | Indicates the password of a database. |
| db_url | Yes | Indicates the database connection address, for example, {database_type}://ip:port/database. Currently, two types of database connections are supported: MySQL and PostgreSQL.
|
| table_name | Yes | Indicates the name of the database table for data query. |
| db_columns | No | Indicates the mapping of stream attribute fields between the sink stream and database table. This parameter is mandatory when the stream attribute fields in the sink stream do not match those in the database table. The parameter value is in the format of dbtable_attr1,dbtable_attr2,dbtable_attr3. |
| cache_max_num | No | Indicates the maximum number of cached query results. The default value is 32768. |
| cache_time | No | Indicates the maximum duration for caching database query results in the memory. The unit is millisecond. The default value is 10000. The value 0 indicates that caching is disabled. |
Example
CREATE SOURCE STREAM car_infos ( car_id STRING, car_owner STRING, car_brand STRING, car_price INT ) WITH ( type = "dis", region = "southchina", channel = "csinput", encode = "csv", field_delimiter = "," ); CREATE TABLE db_info ( car_id STRING, car_owner STRING, car_brand STRING, car_price INT ) WITH ( type = "rds", region = "southchina", username = "root", password = "******", db_url = "postgresql://192.168.0.0:2000/test1", table_name = "car" ); CREATE SINK STREAM audi_cheaper_than_30w ( car_id STRING, car_owner STRING, car_brand STRING, car_price INT ) WITH ( type = "dis", region = "cn-north-1", channel = "csoutput", partition_key = "car_owner", encode = "csv", field_delimiter = "," ); INSERT INTO audi_cheaper_than_30w SELECT a.car_id, b.car_owner, b.car_brand, b.car_price FROM car_infos as a join db_info as b on a.car_id = b.car_id;
Last Article: Creating a Redis Table
Next Article: DML Statement
Did this article solve your problem?
Thank you for your score!Your feedback would help us improve the website.