文档首页 > > SQL语法参考> 流作业SQL语法> 流作业SQL语法概览

流作业SQL语法概览

分享
更新时间: 2020/05/12 GMT+08:00

创建输入流相关语法

表1 创建输入流相关语法

功能描述

语法格式

DIS输入流

1
2
3
4
CREATE SOURCE STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* )
  WITH (type = "dis",region = "",channel = "",partition_count = "",encode = "",field_delimiter = "",offset= "")
  (TIMESTAMP BY timeindicator (',' timeindicator)?);
  timeindicator:PROCTIME '.' PROCTIME| ID '.' ROWTIME

OBS输入流

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
CREATE SOURCE STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* )
  WITH (
    type = "obs",
    region = "",
    bucket = "",
    object_name = "",
    row_delimiter = "\n",
    field_delimiter = '',
    version_id = ""
  )
  (TIMESTAMP BY timeindicator (',' timeindicator)?);timeindicator:PROCTIME '.' PROCTIME| ID '.' ROWTIME

HBase输入流

1
2
3
4
5
6
7
8
9
CREATE SOURCE STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* )
  WITH (
    type = "cloudtable",
    region = "",
    cluster_id = "",
    table_name = "",
    table_columns = ""
  )
  (TIMESTAMP BY timeindicator (',' timeindicator)?);timeindicator:PROCTIME '.' PROCTIME| ID '.' ROWTIME

MRS Kafka输入流

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
CREATE SOURCE STREAM kafka_source (
  name STRING, 
  age int
)
  WITH (
    type = "kafka",
    kafka_bootstrap_servers = "",
    kafka_group_id = "",
    kafka_topic = "",
    encode = "json"
  )
  (TIMESTAMP BY timeindicator (',' timeindicator)?);timeindicator:PROCTIME '.' PROCTIME| ID '.' ROWTIME

开源Kafka输入流

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
CREATE SOURCE STREAM kafka_source (
  name STRING, 
  age int
)
  WITH (
    type = "kafka",
    kafka_bootstrap_servers = "",
    kafka_group_id = "",
    kafka_topic = "",
    encode = "json",
    json_config=""
  )
  (TIMESTAMP BY timeindicator (',' timeindicator)?);timeindicator:PROCTIME '.' PROCTIME| ID '.' ROWTIME

DMS输入流

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
CREATE SOURCE STREAM kafka_source (
  name STRING, 
  age int
)
  WITH (
    type = "kafka",
    kafka_bootstrap_servers = "",
    kafka_group_id = "",
    kafka_topic = "",
    encode = "json"
  )
  (TIMESTAMP BY timeindicator (',' timeindicator)?);timeindicator:PROCTIME '.' PROCTIME| ID '.' ROWTIME

创建输出流相关语法

表2 创建输出流相关语法

功能描述

语法格式

DIS输出流

1
2
3
4
5
6
7
8
9
CREATE SINK STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* )
  WITH (
    type = "dis",
    region = "",
    channel = "",
    partition_key = "",
    encode= "",
    field_delimiter= ""
  );

OBS输出流

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
CREATE SINK STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* )
  WITH (
           type = "obs",
           region = "",
           encode = "",
           field_delimiter = "",
           row_delimiter = "",
           obs_dir = "",
           file_prefix = "",
           rolling_size = "",
           rolling_interval = "",
           quote = "",
           array_bracket = "",
           append = "",
           max_record_num_per_file = "",
           dump_interval = "",
           dis_notice_channel = "",
           max_record_num_cache = "",
           carbon_properties = ""
  )

HBase输出流

1
2
3
4
5
6
7
8
9
CREATE SINK STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* )
  WITH (
    type = "cloudtable",
    region = "",
    cluster_id = "",
    table_name = "",
    table_columns = "",
    create_if_not_exist = ""
  )

OpenTSDB输出流

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
CREATE SINK STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* )
  WITH (
    type = "opentsdb",
    region = "",
    cluster_id = "",
    tsdb_metrics = "",
    tsdb_timestamps = "",
    tsdb_values = "",
    tsdb_tags = "",
    batch_insert_data_num = ""
  )

RDS输出流

1
2
3
4
5
6
7
8
CREATE SINK STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* )
  WITH (
    type = "rds",
    username = "",
    password = "",
    db_url = "",
    table_name = ""
  );

RDS和DWS数据同步输出流

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
CREATE SINK STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* )
  WITH (
    type = "db_sync",
    region = "",
    db_url = "",
    username = "",
    password = "",
    table_name = "${attr_name}",
    operation_field = "${attr_name}",
    before = "${attr_name}",
    after = "${attr_name}",
    tranx_id = "${attr_name}",
    commit = "${attr_name}",
    sql = "${attr_name}",
    table_name_map = "${attr_name}",
    column_name_map ="${attr_name}",
    schema_case_sensitive = "false",
    db_type = "dws",
);

DWS输出流(通过JDBC方式)

1
2
3
4
5
6
7
8
CREATE SINK STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* )
  WITH (
    type = "rds",
    username = "",
    password = "",
    db_url = "",
    table_name = ""
  );

DWS输出流(通过OBS转储方式)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
CREATE SINK STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* )
  WITH (
          type = "dws",
          region = "",
          encode = "",
          field_delimiter = "",
          quote = "",
          db_obs_server = "",
          obs_dir = "",
          username = "",
          password =  "",
          db_url = "",
          table_name = "",
          max_record_num_per_file = "",
          dump_interval = ""
  );

DDS输出流

1
2
3
4
5
6
7
8
CREATE SINK STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* )
  WITH (
    type = "dds",
    username = "",
    password = "",
    db_url = "",
    field_names = ""
  );

SMN输出流

1
2
3
4
5
6
7
8
9
CREATE SINK STREAM stream_id xxx 
  WITH(
    type = "smn",
    region = "",
    topic_urn = "",
    urn_column = "",
    message_subject = "",
    message_column = ""
  )

Elasticsearch输出流

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
CREATE SINK STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* )
  WITH (
    type = "es",
    region = "",
    cluster_address = "",
    es_index = "",
    es_type= "",
    es_fields= "",
    batch_insert_data_num= ""
  );

DCS输出流

1
2
3
4
5
6
7
8
CREATE SINK STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* )
  WITH (
    type = "dcs_redis",
    region = "",
    cluster_address = "",
    password = "",
    value_type= "",key_value= ""
  );

MRS HBase输出流

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
CREATE SINK STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* )
  WITH (
    type = "mrs_hbase",
    region = "",
    cluster_address = "",
    table_name = "",
    table_columns = "",
    illegal_data_table = "",
    batch_insert_data_num = "",
    action = ""
)

MRS Kafka输出流

1
2
3
4
5
6
7
CREATE SINK STREAM kafka_sink (name STRING)
  WITH(
    type = "kafka",
    kafka_bootstrap_servers = "",
    kafka_topic = "",
    encode = "json"
)

开源Kafka输出流

1
2
3
4
5
6
7
CREATE SINK STREAM kafka_sink (name STRING)
  WITH(
    type = "kafka",
    kafka_bootstrap_servers = "",
    kafka_topic = "",
    encode = "json"
  )

APIG输出流

1
2
3
4
5
6
7
8
CREATE SINK STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* )
  WITH (
    type = "apig",
    region = "",
    app_id= "",
    encode= "",
    field_delimiter= ""
  );

DMS输出流

1
2
3
4
5
6
7
CREATE SINK STREAM kafka_sink (name STRING)
  WITH(
    type = "kafka",
    kafka_bootstrap_servers = "",
    kafka_topic = "",
    encode = "json"
)

创建中间流相关语法

表3 创建中间流相关语法

功能描述

语法格式

创建中间流

1
CREATE TEMP STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* )

创建维表相关语法

表4 创建维表相关语法

功能描述

语法格式

创建Redis表

1
2
3
4
5
6
CREATE TABLE table_id (key_attr_name STRING(, hash_key_attr_name STRING)?, value_attr_name STRING)
  WITH (
    type = "dcs_redis",
    cluster_address = ""(,password = "")?,
    value_type= "",
    key_column= ""(,hash_key_column="")?);

创建RDS表

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
CREATE TABLE  table_id (
  car_id STRING,
  car_owner STRING,
  car_brand STRING,
  car_price INT
)
  WITH (
    type = "rds",
    region = "",
    username = "",
    password = "",
    db_url = "",
    table_name = ""
  );

自拓展生态相关语法

表5 自拓展生态相关语法

功能描述

语法格式

自拓展输入流

1
2
3
4
5
6
7
CREATE SOURCE STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* )
  WITH (
    type = "user_defined",
    type_class_name = "",
    type_class_parameter = ""
  )
  (TIMESTAMP BY timeindicator (',' timeindicator)?);timeindicator:PROCTIME '.' PROCTIME| ID '.' ROWTIME

自拓展输出流

1
2
3
4
5
6
CREATE SINK STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* )
  WITH (
    type = "user_defined",
    type_class_name = "",
    type_class_parameter = ""
  );
分享:

    相关文档

    相关产品

文档是否有解决您的问题?

提交成功!

非常感谢您的反馈,我们会继续努力做到更好!

反馈提交失败,请稍后再试!

*必选

请至少选择或填写一项反馈信息

字符长度不能超过200

提交反馈 取消

如您有其它疑问,您也可以通过华为云社区问答频道来与我们联系探讨

智能客服提问云社区提问