Creating an RDS Table

Create an RDS table to interconnect with the source stream so that data can be output to RDS. For more information about RDS, see the Relational Database Service User Guide.

For details about the JOIN syntax, see JOIN Between Stream Data and Table Data.

Prerequisites

  • Ensure that you have created a PostgreSQL or MySQL RDS instance in RDS.

    For details about how to create an RDS instance, see Buying an Instance in the Relational Database Service Getting Started Guide.

  • In this scenario, jobs must run on the exclusive cluster of CS. Therefore, CS must interconnect with the VPC that has been connected with RDS instance. You can also set the security group rules as required.

    For details about how to set up the VPC peering connection, see VPC Peering Connection in the Cloud Stream Service User Guide.

    For details about how to configure security group rules, see Security Group in the Virtual Private Cloud User Guide.

Syntax

Syntax

CREATE TABLE  table_id (
  car_id STRING,
  car_owner STRING,
  car_brand STRING,
  car_price INT
)
WITH (
  type = "rds",
  region = "",
  username = "",
  password = "",
  db_url = "",
  table_name = ""
);

Description

Table 1 Parameter description in the syntax

Parameter

Mandatory

Description

type

Yes

Indicates the output channel type. Value rds indicates that data is stored to RDS.

username

Yes

Indicates the username of a database.

password

Yes

Indicates the password of a database.

db_url

Yes

Indicates the database connection address, for example, {database_type}://ip:port/database.

Currently, two types of database connections are supported: MySQL and PostgreSQL.

  • MySQL: 'mysql://ip:port/database'
  • PostgreSQL: 'postgresql://ip:port/database'

table_name

Yes

Indicates the name of the database table for data query.

db_columns

No

Indicates the mapping of stream attribute fields between the sink stream and database table. This parameter is mandatory when the stream attribute fields in the sink stream do not match those in the database table. The parameter value is in the format of dbtable_attr1,dbtable_attr2,dbtable_attr3.

cache_max_num

No

Indicates the maximum number of cached query results. The default value is 32768.

cache_time

No

Indicates the maximum duration for caching database query results in the memory. The unit is millisecond. The default value is 10000. The value 0 indicates that caching is disabled.

Example

The RDS table is used to connect to the source stream.
CREATE SOURCE STREAM car_infos (
  car_id STRING,
  car_owner STRING,
  car_brand STRING,
  car_price INT
)
WITH (
  type = "dis",
  region = "southchina",
  channel = "csinput",
  encode = "csv",
  field_delimiter = ","
);

CREATE TABLE  db_info (
  car_id STRING,
  car_owner STRING,
  car_brand STRING,
  car_price INT
)
WITH (
  type = "rds",
  region = "southchina",
  username = "root",
  password = "******",
  db_url = "postgresql://192.168.0.0:2000/test1",
  table_name = "car"
);

CREATE SINK STREAM audi_cheaper_than_30w (
  car_id STRING,
  car_owner STRING,
  car_brand STRING,
  car_price INT
)
WITH (
  type = "dis",
  region = "cn-north-1",
  channel = "csoutput",
  partition_key = "car_owner",
  encode = "csv",
  field_delimiter = ","
);

INSERT INTO audi_cheaper_than_30w
SELECT a.car_id, b.car_owner, b.car_brand, b.car_price 
FROM car_infos as a join db_info as b on a.car_id = b.car_id;