SQL Syntax Overview of Stream Jobs

Syntax for Creating a Source Stream

Table 1 Syntax for creating a source stream

Function

Syntax

CloudTable HBase Source Stream

1
2
3
4
5
6
7
8
9
CREATE SOURCE STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* )
  WITH (
    type = "cloudtable",
    region = "",
    cluster_id = "",
    table_name = "",
    table_columns = ""
  )
  (TIMESTAMP BY timeindicator (',' timeindicator)?);timeindicator:PROCTIME '.' PROCTIME| ID '.' ROWTIME

DIS Source Stream

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
CREATE SOURCE STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* )
  WITH (
    type = "dis",
    region = "",
    channel = "",
    partition_count = "",
    encode = "",
    field_delimiter = "",
    offset= "")
  (TIMESTAMP BY timeindicator (',' timeindicator)?);timeindicator:PROCTIME '.' PROCTIME| ID '.' ROWTIME

DMS Source Stream

1
2
3
4
5
6
7
8
9
CREATE SOURCE STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* )
  WITH (
    type = "kafka",
    kafka_bootstrap_servers = "",
    kafka_group_id = "",
    kafka_topic = "",
    encode = "json"
  )
  (TIMESTAMP BY timeindicator (',' timeindicator)?);timeindicator:PROCTIME '.' PROCTIME| ID '.' ROWTIME

Edge-Hub Source Stream

1
2
3
4
5
6
7
8
9
CREATE SOURCE STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* )
  WITH (
    type = "edgehub", 
    topic = "", 
    encode = "", 
    json_config = "", 
    field_delimiter = ''
  )
  (TIMESTAMP BY timeindicator (',' timeindicator)?);timeindicator:PROCTIME '.' PROCTIME| ID '.' ROWTIME

MRS Kafka Source Stream

1
2
3
4
5
6
7
8
9
CREATE SOURCE STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* )
  WITH (
    type = "kafka",
    kafka_bootstrap_servers = "",
    kafka_group_id = "",
    kafka_topic = "",
    encode = "json"
  )
  (TIMESTAMP BY timeindicator (',' timeindicator)?);timeindicator:PROCTIME '.' PROCTIME| ID '.' ROWTIME

Open-Source Kafka Source Stream

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
CREATE SOURCE STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* )
  WITH (
    type = "kafka",
    kafka_bootstrap_servers = "",
    kafka_group_id = "",
    kafka_topic = "",
    encode = "json",
    json_config=""
  )
  (TIMESTAMP BY timeindicator (',' timeindicator)?);timeindicator:PROCTIME '.' PROCTIME| ID '.' ROWTIME

OBS Source Stream

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
CREATE SOURCE STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* )
  WITH (
    type = "obs",
    region = "",
    bucket = "",
    object_name = "",
    row_delimiter = "\n",
    field_delimiter = '',
    version_id = ""
  )
  (TIMESTAMP BY timeindicator (',' timeindicator)?);timeindicator:PROCTIME '.' PROCTIME| ID '.' ROWTIME

Syntax for Creating a Sink Stream

Table 2 Syntax for creating a sink stream

Function

Syntax

CloudTable HBase Sink Stream

1
2
3
4
5
6
7
8
9
CREATE SINK STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* )
  WITH (
    type = "cloudtable",
    region = "",
    cluster_id = "",
    table_name = "",
    table_columns = "",
    create_if_not_exist = ""
  )

CloudTable OpenTSDB Sink Stream

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
CREATE SINK STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* )
  WITH (
    type = "opentsdb",
    region = "",
    cluster_id = "",
    tsdb_metrics = "",
    tsdb_timestamps = "",
    tsdb_values = "",
    tsdb_tags = "",
    batch_insert_data_num = ""
  )

CSS Elasticsearch Sink Stream

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
CREATE SINK STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* )
  WITH (
    type = "es",
    region = "",
    cluster_address = "",
    es_index = "",
    es_type= "",
    es_fields= "",
    batch_insert_data_num= ""
  );

DCS Sink Stream

1
2
3
4
5
6
7
8
CREATE SINK STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* )
  WITH (
    type = "dcs_redis",
    region = "",
    cluster_address = "",
    password = "",
    value_type= "",key_value= ""
  );

DDS Sink Stream

1
2
3
4
5
6
7
8
CREATE SINK STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* )
  WITH (
    type = "dds",
    username = "",
    password = "",
    db_url = "",
    field_names = ""
  );

DIS Sink Stream

1
2
3
4
5
6
7
8
9
CREATE SINK STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* )
  WITH (
    type = "dis",
    region = "",
    channel = "",
    partition_key = "",
    encode= "",
    field_delimiter= ""
  );

DMS Sink Stream

1
2
3
4
5
6
7
CREATE SINK STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* )
  WITH(
    type = "kafka",
    kafka_bootstrap_servers = "",
    kafka_topic = "",
    encode = "json"
)

DWS Sink Stream (JDBC Mode)

1
2
3
4
5
6
7
8
CREATE SINK STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* )
  WITH (
    type = "rds",
    username = "",
    password = "",
    db_url = "",
    table_name = ""
  );

DWS Sink Stream (OBS-based Dumping)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
CREATE SINK STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* )
  WITH (
          type = "dws",
          region = "",
          encode = "",
          field_delimiter = "",
          quote = "",
          db_obs_server = "",
          obs_dir = "",
          username = "",
          password =  "",
          db_url = "",
          table_name = "",
          max_record_num_per_file = "",
          dump_interval = ""
  );

Edge-Hub Sink Stream

1
2
3
4
5
6
7
8
9
CREATE SINK STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* )
   WITH (
     type = "edgehub", 
     topic = "", 
     encode = "", 
     json_config = "", 
ENABLE_OUTPUT_NULL = "",
     field_delimiter = ''
  );

MRS HBase Sink Stream

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
CREATE SINK STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* )
  WITH (
    type = "mrs_hbase",
    region = "",
    cluster_address = "",
    table_name = "",
    table_columns = "",
    illegal_data_table = "",
    batch_insert_data_num = "",
    action = ""
)

MRS Kafka Sink Stream

1
2
3
4
5
6
7
CREATE SINK STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* )
  WITH(
    type = "kafka",
    kafka_bootstrap_servers = "",
    kafka_topic = "",
    encode = "json"
)

Open-Source Kafka Sink Stream

1
2
3
4
5
6
7
CREATE SINK STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* )
  WITH(
    type = "kafka",
    kafka_bootstrap_servers = "",
    kafka_topic = "",
    encode = "json"
  )

OBS Sink Stream

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
CREATE SINK STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* )
  WITH (
           type = "obs",
           region = "",
           encode = "",
           field_delimiter = "",
           row_delimiter = "",
           obs_dir = "",
           file_prefix = "",
           rolling_size = "",
           rolling_interval = "",
           quote = "",
           array_bracket = "",
           append = "",
           max_record_num_per_file = "",
           dump_interval = "",
           dis_notice_channel = "",
           max_record_num_cache = "",
           carbon_properties = ""
  )

RDS Sink Stream

1
2
3
4
5
6
7
8
CREATE SINK STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* )
  WITH (
    type = "rds",
    username = "",
    password = "",
    db_url = "",
    table_name = ""
  );

SMN Sink Stream

1
2
3
4
5
6
7
8
9
CREATE SINK STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* ) 
  WITH(
    type = "smn",
    region = "",
    topic_urn = "",
    urn_column = "",
    message_subject = "",
    message_column = ""
  )

File System Sink Stream

1
2
3
4
5
6
7
8
9
CREATE SINK STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* )
  [PARTITIONED BY (attr_name (',' attr_name)*]
  WITH (
    type = "filesystem",
    file.path = "obs://bucket/xx",
    encode = "parquet",
    ak = "",
    sk = ""
  );

Syntax for Creating a Temporary Stream

Table 3 Syntax for creating a temporary stream

Function

Syntax

Creating a Temporary Stream

1
CREATE TEMP STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* )

Syntax for Creating a Dimension Table

Table 4 Syntax for creating a dimension table

Function

Syntax

Creating a Redis Table

1
2
3
4
5
6
CREATE TABLE table_id (key_attr_name STRING(, hash_key_attr_name STRING)?, value_attr_name STRING)
  WITH (
    type = "dcs_redis",
    cluster_address = ""(,password = "")?,
    value_type= "",
    key_column= ""(,hash_key_column="")?);

Creating an RDS Table

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
CREATE TABLE  table_id (
  car_id STRING,
  car_owner STRING,
  car_brand STRING,
  car_price INT
)
  WITH (
    type = "rds",
    region = "",
    username = "",
    password = "",
    db_url = "",
    table_name = ""
  );

Syntax Related to Custom Stream Ecosystems

Table 5 Syntax related to custom stream ecosystems

Function

Syntax

Custom Source Stream

1
2
3
4
5
6
7
CREATE SOURCE STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* )
  WITH (
    type = "user_defined",
    type_class_name = "",
    type_class_parameter = ""
  )
  (TIMESTAMP BY timeindicator (',' timeindicator)?);timeindicator:PROCTIME '.' PROCTIME| ID '.' ROWTIME

Custom Sink Stream

1
2
3
4
5
6
CREATE SINK STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* )
  WITH (
    type = "user_defined",
    type_class_name = "",
    type_class_parameter = ""
  );