创建事件流 - CreateEventStreaming
功能介绍
创建事件流。
授权信息
账号具备所有API的调用权限,如果使用账号下的IAM用户调用当前API,该IAM用户需具备调用API所需的权限,具体权限要求请参见权限和授权项。
URI
POST /v1/{project_id}/eventstreamings
参数 | 是否必选 | 参数类型 | 描述 |
|---|---|---|---|
project_id | 是 | String | 租户资源空间ID |
请求参数
参数 | 是否必选 | 参数类型 | 描述 |
|---|---|---|---|
X-Auth-Token | 是 | String | 用户Token。通过调用IAM服务获取用户Token接口获取(响应消息头中X-Subject-Token的值)。 |
参数 | 是否必选 | 参数类型 | 描述 |
|---|---|---|---|
name | 是 | String | 事件流名称,租户下唯一,由字母、数字、点、下划线和中划线组成,必须字母或数字开头 |
description | 否 | String | 事件流描述 |
source | 是 | EventStreamingSource object | 事件源,一个事件流中事件源只有一个 |
sink | 是 | EventStreamingSink object | 事件目标,一个事件流中只有一个事件目标,sink_fg、sink_kafka只能选择其中一个参数 |
rule_config | 否 | rule_config object | 事件规则,包括过滤规则和转换规则 |
option | 否 | RunOption object | 运行配置 |
参数 | 是否必选 | 参数类型 | 描述 |
|---|---|---|---|
source_kafka | 否 | SourceKafkaMQParameters object | 事件源参数 |
source_mobile_rocketmq | 否 | SourceMobileMQParameters object | 事件流移动云RockectMQ事件源参数 |
source_community_rocketmq | 否 | SourceCommunityMQParameters object | 事件流社区RockectMQ事件源参数 |
source_dms_rocketmq | 否 | SourceDMSMQParameters object | DMS事件源参数 |
name | 否 | String | 事件源类型名称 |
参数 | 是否必选 | 参数类型 | 描述 |
|---|---|---|---|
addr | 否 | String | kafka连接地址 |
group | 是 | String | kafka消费组 |
instance_name | 否 | String | kafka实例名称 |
security_protocol | 否 | String | 安全协议 |
instance_id | 否 | String | kafka实例ID |
topic | 是 | String | kafka topic名称 |
seek_to | 否 | String | 消费点位 |
enable_sasl_ssl | 否 | Boolean | SASL_SSL是否开启 |
sasl_mechanism | 否 | String | SASL认证机制 |
ssl_certificate_url | 否 | String | SASL证书地址,配置的obs地址 |
ssl_certificate_pwd | 否 | String | SASL证书密码 |
user_name | 否 | String | 用户名 |
password | 否 | String | 用户密码 |
参数 | 是否必选 | 参数类型 | 描述 |
|---|---|---|---|
group_id | 是 | String | 消费组id |
instance_id | 是 | String | 实例id |
topic | 是 | String | topic |
tag | 否 | String | 标签 |
authentication_required | 否 | Boolean | 鉴权认证 |
msg_trace_switch | 否 | Boolean | 保存消息轨迹 |
access_key | 否 | String | AccessKey |
secret_key | 否 | String | SecretKey |
message_model | 是 | String | 订阅方式 |
addr_type | 是 | String | 接入点类型 |
addr | 是 | String | 地址 |
sdk_url | 是 | String | 依赖SDK |
consume_timeout | 是 | Integer | 消费超时时间 |
message_type | 是 | String | 消息类型 |
suspend_time | 否 | Integer | 失败重试的等待时间 |
max_reconsumer_times | 否 | Integer | 最大重试次数 |
consumer_thread_nums | 否 | Integer | 消费线程数 |
consumer_batch_max_size | 否 | Integer | 批量消费最大消息数 |
consumer_max_wait | 否 | Integer | 批量消费最大等待时长,单位:秒 |
vpc_id | 否 | String | 虚拟私有云 |
subnet_id | 否 | String | 子网 |
参数 | 是否必选 | 参数类型 | 描述 |
|---|---|---|---|
instance_name | 否 | String | 实例名称,仅dms的RockectMQ需要该字段 |
instance_id | 否 | String | 实例ID,仅dms的RockectMQ需要该字段 |
addr | 是 | String | RockectMQ连接地址 |
group | 是 | String | 消费组 |
topic | 是 | String | topic名称 |
tag | 否 | String | 标签 |
vpc_id | 是 | String | 虚拟云id |
subnet_id | 是 | String | 子网id |
ssl_enable | 否 | Boolean | 开启SSL |
enable_acl | 否 | Boolean | ACL访问控制 |
access_key | 否 | String | 用户名 |
secret_key | 否 | String | 密码 |
message_type | 否 | String | 消息类型 |
consume_timeout | 否 | Integer | 消费超时时间 |
consumer_thread_nums | 否 | Integer | 线程消费数 |
consumer_batch_max_size | 否 | Integer | 批量消费最大消息数 |
max_reconsume_times | 否 | Integer | 最大重试次数,-1表示一直重试 |
suspend_current_queue_time_millis | 否 | Integer | 重试间隔,单位ms |
参数 | 是否必选 | 参数类型 | 描述 |
|---|---|---|---|
instance_name | 否 | String | 实例名称,仅dms的RockectMQ需要该字段 |
instance_id | 是 | String | 实例ID,仅dms的RockectMQ需要该字段 |
group | 是 | String | 消费组 |
topic | 是 | String | topic名称 |
tag | 否 | String | 标签 |
ssl_enable | 否 | Boolean | 开启SSL |
enable_acl | 否 | Boolean | ACL访问控制 |
access_key | 否 | String | 用户名 |
secret_key | 否 | String | 密码 |
message_type | 否 | String | 消费方式,针对不同生产顺序消息类型,选择消费方式会导致不同结果,请严格按照需求选择消费方式。1、生产顺序为:设置消息组,保证消息顺序发送。消费方式为:顺序消费,实际消息处理结果:按照消息组粒度,严格保证消息顺序。 同一消息组内的消息的消费顺序和发送顺序完全一致。2、生产顺序为:设置消息组,保证消息顺序发送。消费方式为:并发消费,实际消息处理结果:并发消费,尽可能按时间顺序处理。3、生产顺序为:未设置消息组,消息乱序发送。消费方式为:顺序消费,实际消息处理结果:按队列存储粒度,严格顺序。 基于 Apache RocketMQ 本身队列的属性,消费顺序和队列存储的顺序一致,但不保证和发送顺序一致。4、生产顺序为:未设置消息组,消息乱序发送。消费方式为:并发消费,实际消息处理结果:并发消费,尽可能按照时间顺序处理。 |
engine_version | 否 | String | mq实例版本 |
consume_timeout | 否 | Integer | 消费超时时间 |
consumer_thread_nums | 否 | Integer | 线程消费数 |
consumer_batch_max_size | 否 | Integer | 批量消费最大消息数 |
max_reconsume_times | 否 | Integer | 最大重试次数,-1表示一直重试 |
suspend_current_queue_time_millis | 否 | Integer | 重试间隔,单位ms |
参数 | 是否必选 | 参数类型 | 描述 |
|---|---|---|---|
sink_fg | 否 | SinkFGParameters object | 函数事件目标的参数 |
sink_kafka | 否 | SinkKafkaParameters object | kafka事件目标的参数 |
sink_obs | 否 | SinkObsParameters object | OBS事件目标的参数 |
name | 否 | String | 事件目标类型名称 |
参数 | 是否必选 | 参数类型 | 描述 |
|---|---|---|---|
topic | 是 | String | topic名称 |
keyTransform | 否 | Array of TransForm objects | key的转换规则 |
connectionId | 是 | String | 目标连接id |
参数 | 是否必选 | 参数类型 | 描述 |
|---|---|---|---|
type | 是 | String | 转换规则类型:
|
value | 否 | String | 常量类型规则时,字段为常量内容定义; 变量类型规则时,为变量定义,内容必须为JsonObject字符串。 变量最多支持100个,且不支持嵌套结构定义; 变量名由字母、数字、点、下划线和中划线组成,必须字母或数字开头不能以HC.开头,长度不超过64个字符; 变量值表达式支持常量或JsonPath表达式,字符串长度不超过1024个字符。 |
template | 否 | String | 变量类型规则时,规则内容的模板定义,支持对已定义变量的引用。 |
响应参数
状态码:200
参数 | 参数类型 | 描述 |
|---|---|---|
X-Request-Id | String | This field is the request ID number for task tracking. Format is request_uuid-timestamp-hostname. |
参数 | 参数类型 | 描述 |
|---|---|---|
eventStreamingID | String | 事件流ID |
请求示例
创建事件流
POST https://{eg_endpoint}/v1/{project_id}/eventstreamings
{
"name" : "test-eventstreaming",
"description" : "this is a test eventstreaming",
"source" : {
"source_kafka" : {
"addr" : "10.10.10.10:8100",
"group" : "group-test",
"instance_name" : "instance-name-test",
"security_protocol" : "PLAINTEXT",
"instance_id" : "instance-id-test",
"topic" : "topic-test",
"seek_to" : "earliest",
"enable_sasl_ssl" : true,
"sasl_mechanism" : "PLAIN",
"ssl_certificate_url" : "https://domain/kafka-certs.zip",
"ssl_certificate_pwd" : null,
"user_name" : "",
"password" : ""
},
"source_mobile_rocketmq" : {
"group_id" : "string",
"instance_id" : "string",
"topic" : "string",
"tag" : "string",
"authentication_required" : true,
"msg_trace_switch" : true,
"access_key" : "string",
"secret_key" : "string",
"message_model" : "CLUSTERING",
"addr_type" : "PUBLIC",
"addr" : "string",
"sdk_url" : "string",
"consume_timeout" : 30000,
"message_type" : "string",
"suspend_time" : 1800,
"max_reconsumer_times" : 3,
"consumer_thread_nums" : 20,
"consumer_batch_max_size" : 20,
"consumer_max_wait" : 5,
"vpc_id" : "string",
"subnet_id" : "string"
},
"source_community_rocketmq" : {
"instance_name" : "instance-name-test",
"instance_id" : "instance-id-test",
"addr" : "10.10.10.10:8100",
"group" : "group-test",
"topic" : "topic-test",
"tag" : "string",
"vpc_id" : "string",
"subnet_id" : "string",
"ssl_enable" : true,
"enable_acl" : true,
"access_key" : "string",
"secret_key" : "string",
"message_type" : "string",
"consume_timeout" : 30000,
"consumer_thread_nums" : 20,
"consumer_batch_max_size" : 1,
"max_reconsume_times" : -1,
"suspend_current_queue_time_millis" : 1000
},
"source_dms_rocketmq" : {
"instance_name" : "instance-name-test",
"instance_id" : "instance-id-test",
"group" : "group-test",
"topic" : "topic-test",
"tag" : "string",
"ssl_enable" : true,
"enable_acl" : true,
"access_key" : "string",
"secret_key" : "string",
"message_type" : "string",
"engine_version" : "string",
"consume_timeout" : 30000,
"consumer_thread_nums" : 20,
"consumer_batch_max_size" : 1,
"max_reconsume_times" : -1,
"suspend_current_queue_time_millis" : 1000
},
"name" : "string"
},
"sink" : {
"sink_fg" : {
"invoke_type" : "SYNC/ASYNC",
"urn" : "String",
"agency" : "string"
},
"sink_kafka" : {
"topic" : "string",
"keyTransform" : [ {
"type" : "VARIABLE",
"value" : "{\"contant_boolean\": true,\"contant_string\": \"constant\",\"varaible_string\": \"$.data.string\",\"varaible_json_object\": \"$.data.object\"}",
"template" : "{\"contant_boolean\": ${contant_boolean},\"contant_string\": \"${contant_string!\\\"default\\\"}\",\"varaible_string\": \"${contant_boolean}\",\"varaible_json_object\": ${varaible_json_object!\"null\"}}"
} ],
"connectionId" : "string"
},
"sink_obs" : {
"access_key" : "string",
"secret_key" : "string",
"obs_bucket" : "string",
"obs_path" : "string",
"time_format" : "string"
},
"name" : "string"
},
"rule_config" : {
"transform" : {
"type" : "VARIABLE",
"value" : "{\"contant_boolean\": true,\"contant_string\": \"constant\",\"varaible_string\": \"$.data.string\",\"varaible_json_object\": \"$.data.object\"}",
"template" : "{\"contant_boolean\": ${contant_boolean},\"contant_string\": \"${contant_string!\\\"default\\\"}\",\"varaible_string\": \"${contant_boolean}\",\"varaible_json_object\": ${varaible_json_object!\"null\"}}"
},
"filter" : { }
},
"option" : {
"thread_num" : "3",
"batch_window" : {
"count" : 500,
"time" : 10,
"interval" : 0
}
}
} 响应示例
状态码:200
创建事件流成功
{
"eventStreamingID" : "23709d68-54d5-423b-a6be-03302e893152"
} 状态码
状态码 | 描述 |
|---|---|
200 | 创建事件流成功 |
错误码
请参见错误码。

