创建事件流
功能介绍
创建事件流。
调试
您可以在API Explorer中调试该接口,支持自动认证鉴权。API Explorer可以自动生成SDK代码示例,并提供SDK代码示例调试功能。
URI
POST /v1/{project_id}/eventstreamings
参数 |
是否必选 |
参数类型 |
描述 |
---|---|---|---|
project_id |
是 |
String |
租户资源空间ID |
请求参数
参数 |
是否必选 |
参数类型 |
描述 |
---|---|---|---|
X-Auth-Token |
是 |
String |
用户Token。通过调用IAM服务获取用户Token接口获取(响应消息头中X-Subject-Token的值)。 |
参数 |
是否必选 |
参数类型 |
描述 |
---|---|---|---|
name |
是 |
String |
事件流名称,租户下唯一,由字母、数字、点、下划线和中划线组成,必须字母或数字开头 |
description |
否 |
String |
事件流描述 |
source |
是 |
EventStreamingSource object |
事件源,一个事件流中事件源只有一个 |
sink |
是 |
EventStreamingSink object |
事件目标,一个事件流中只有一个事件目标,sink_fg、sink_kafka只能选择其中一个参数 |
rule_config |
否 |
rule_config object |
事件规则,包括过滤规则和转换规则 |
option |
否 |
RunOption object |
运行配置 |
参数 |
是否必选 |
参数类型 |
描述 |
---|---|---|---|
source_kafka |
否 |
SourceKafkaMQParameters object |
事件源参数 |
source_mobile_rocketmq |
否 |
SourceMobileMQParameters object |
事件流移动云rockectMq事件源参数 |
source_community_rocketmq |
否 |
SourceCommunityMQParameters object |
事件流社区rockectMq事件源参数 |
source_dms_rocketmq |
否 |
SourceDMSMQParameters object |
DMS事件源参数 |
name |
否 |
String |
事件源类型名称 |
参数 |
是否必选 |
参数类型 |
描述 |
---|---|---|---|
addr |
否 |
String |
kafka连接地址 |
group |
是 |
String |
kafka消费组 |
instance_name |
否 |
String |
kafka实例名称 |
security_protocol |
否 |
String |
安全协议 |
instance_id |
否 |
String |
kafka实例ID |
topic |
是 |
String |
kafka topic名称 |
seek_to |
否 |
String |
消费点位 |
enable_sasl_ssl |
否 |
Boolean |
SASL_SSL是否开启 |
sasl_mechanism |
否 |
String |
SASL认证机制 |
ssl_certificate_url |
否 |
String |
SASL证书地址,配置的obs地址 |
ssl_certificate_pwd |
否 |
String |
SASL证书密码 |
user_name |
否 |
String |
用户名 |
password |
否 |
String |
用户密码 |
参数 |
是否必选 |
参数类型 |
描述 |
---|---|---|---|
group_id |
是 |
String |
消费组id |
instance_id |
是 |
String |
实例id |
topic |
是 |
String |
topic |
tag |
否 |
String |
标签 |
authentication_required |
否 |
Boolean |
鉴权认证 |
msg_trace_switch |
否 |
Boolean |
保存消息轨迹 |
access_key |
否 |
String |
AccessKey |
secret_key |
否 |
String |
SecretKey |
message_model |
是 |
String |
订阅方式 |
addr_type |
是 |
String |
接入点类型 |
addr |
是 |
String |
地址 |
sdk_url |
是 |
String |
依赖SDK |
consume_timeout |
是 |
Integer |
消费超时时间 |
message_type |
是 |
String |
消息类型 |
suspend_time |
否 |
Integer |
失败重试的等待时间 |
max_reconsumer_times |
否 |
Integer |
最大重试次数 |
consumer_thread_nums |
否 |
Integer |
消费线程数 |
consumer_batch_max_size |
否 |
Integer |
批量消费最大消息数 |
consumer_max_wait |
否 |
Integer |
批量消费最大等待时长,单位:秒 |
vpc_id |
否 |
String |
虚拟私有云 |
subnet_id |
否 |
String |
子网 |
参数 |
是否必选 |
参数类型 |
描述 |
---|---|---|---|
instance_name |
否 |
String |
实例名称,仅dms的rockectMq需要该字段 |
instance_id |
否 |
String |
实例ID,仅dms的rockectMq需要该字段 |
addr |
是 |
String |
rockectMq连接地址 |
group |
是 |
String |
消费组 |
topic |
是 |
String |
topic名称 |
tag |
否 |
String |
标签 |
vpc_id |
是 |
String |
虚拟云id |
subnet_id |
是 |
String |
子网id |
ssl_enable |
否 |
Boolean |
开启SSL |
enable_acl |
否 |
Boolean |
ACL访问控制 |
access_key |
否 |
String |
用户名 |
secret_key |
否 |
String |
密码 |
message_type |
否 |
String |
消息类型 |
consume_timeout |
否 |
Integer |
消费超时时间 |
consumer_thread_nums |
否 |
Integer |
线程消费数 |
consumer_batch_max_size |
否 |
Integer |
批量消费最大消息数 |
max_reconsume_times |
否 |
Integer |
最大重试次数,-1表示一直重试 |
suspend_current_queue_time_millis |
否 |
Integer |
重试间隔,单位ms |
参数 |
是否必选 |
参数类型 |
描述 |
---|---|---|---|
instance_name |
否 |
String |
实例名称,仅dms的rockectMq需要该字段 |
instance_id |
是 |
String |
实例ID,仅dms的rockectMq需要该字段 |
group |
是 |
String |
消费组 |
topic |
是 |
String |
topic名称 |
tag |
否 |
String |
标签 |
ssl_enable |
否 |
Boolean |
开启SSL |
enable_acl |
否 |
Boolean |
ACL访问控制 |
access_key |
否 |
String |
用户名 |
secret_key |
否 |
String |
密码 |
message_type |
否 |
String |
消费方式,针对不同生产顺序消息类型,选择消费方式会导致不同结果,请严格按照需求选择消费方式。1、生产顺序为:设置消息组,保证消息顺序发送。消费方式为:顺序消费,实际消息处理结果:按照消息组粒度,严格保证消息顺序。 同一消息组内的消息的消费顺序和发送顺序完全一致。2、生产顺序为:设置消息组,保证消息顺序发送。消费方式为:并发消费,实际消息处理结果:并发消费,尽可能按时间顺序处理。3、生产顺序为:未设置消息组,消息乱序发送。消费方式为:顺序消费,实际消息处理结果:按队列存储粒度,严格顺序。 基于 Apache RocketMQ 本身队列的属性,消费顺序和队列存储的顺序一致,但不保证和发送顺序一致。4、生产顺序为:未设置消息组,消息乱序发送。消费方式为:并发消费,实际消息处理结果:并发消费,尽可能按照时间顺序处理。 |
engine_version |
否 |
String |
mq实例版本 |
consume_timeout |
否 |
Integer |
消费超时时间 |
consumer_thread_nums |
否 |
Integer |
线程消费数 |
consumer_batch_max_size |
否 |
Integer |
批量消费最大消息数 |
max_reconsume_times |
否 |
Integer |
最大重试次数,-1表示一直重试 |
suspend_current_queue_time_millis |
否 |
Integer |
重试间隔,单位ms |
参数 |
是否必选 |
参数类型 |
描述 |
---|---|---|---|
sink_fg |
否 |
SinkFGParameters object |
函数事件目标的参数 |
sink_kafka |
否 |
SinkKafkaParameters object |
kafka事件目标的参数 |
sink_obs |
否 |
SinkObsParameters object |
OBS事件目标的参数 |
name |
否 |
String |
事件目标类型名称 |
参数 |
是否必选 |
参数类型 |
描述 |
---|---|---|---|
invoke_type |
否 |
String |
函数执行方式,同步/异步 |
urn |
否 |
String |
函数链接 |
agency |
否 |
String |
租户委托 |
参数 |
是否必选 |
参数类型 |
描述 |
---|---|---|---|
topic |
是 |
String |
topic名称 |
keyTransform |
否 |
Array of TransForm objects |
key的转换规则 |
connectionId |
是 |
String |
目标连接id |
参数 |
是否必选 |
参数类型 |
描述 |
---|---|---|---|
type |
是 |
String |
转换规则类型 |
value |
否 |
String |
常量类型规则时,字段为常量内容定义; 变量类型规则时,为变量定义,内容必须为JsonObject字符串。 变量最多支持100个,且不支持嵌套结构定义; 变量名由字母、数字、点、下划线和中划线组成,必须字母或数字开头不能以HC.开头,长度不超过64个字符; 变量值表达式支持常量或JsonPath表达式,字符串长度不超过1024个字符。 |
template |
否 |
String |
变量类型规则时,规则内容的模板定义,支持对已定义变量的引用。 |
响应参数
状态码:200
参数 |
参数类型 |
描述 |
---|---|---|
X-Request-Id |
String |
This field is the request ID number for task tracking. Format is request_uuid-timestamp-hostname. |
参数 |
参数类型 |
描述 |
---|---|---|
eventStreamingID |
String |
事件流ID |
请求示例
创建事件流
POST https://{eg_endpoint}/v1/{project_id}/eventstreamings
{
"name" : "test-eventstreaming",
"description" : "this is a test eventstreaming",
"source" : {
"source_kafka" : {
"addr" : "10.10.10.10:8100",
"group" : "group-test",
"instance_name" : "instance-name-test",
"security_protocol" : "PLAINTEXT",
"instance_id" : "instance-id-test",
"topic" : "topic-test",
"seek_to" : "earliest",
"enable_sasl_ssl" : true,
"sasl_mechanism" : "PLAIN",
"ssl_certificate_url" : "https://domain/kafka-certs.zip",
"ssl_certificate_pwd" : null,
"user_name" : "",
"password" : ""
},
"source_mobile_rocketmq" : {
"group_id" : "string",
"instance_id" : "string",
"topic" : "string",
"tag" : "string",
"authentication_required" : true,
"msg_trace_switch" : true,
"access_key" : "string",
"secret_key" : "string",
"message_model" : "CLUSTERING",
"addr_type" : "PUBLIC",
"addr" : "string",
"sdk_url" : "string",
"consume_timeout" : 30000,
"message_type" : "string",
"suspend_time" : 1800,
"max_reconsumer_times" : 3,
"consumer_thread_nums" : 20,
"consumer_batch_max_size" : 20,
"consumer_max_wait" : 5,
"vpc_id" : "string",
"subnet_id" : "string"
},
"source_community_rocketmq" : {
"instance_name" : "instance-name-test",
"instance_id" : "instance-id-test",
"addr" : "10.10.10.10:8100",
"group" : "group-test",
"topic" : "topic-test",
"tag" : "string",
"vpc_id" : "string",
"subnet_id" : "string",
"ssl_enable" : true,
"enable_acl" : true,
"access_key" : "string",
"secret_key" : "string",
"message_type" : "string",
"consume_timeout" : 30000,
"consumer_thread_nums" : 20,
"consumer_batch_max_size" : 1,
"max_reconsume_times" : -1,
"suspend_current_queue_time_millis" : 1000
},
"source_dms_rocketmq" : {
"instance_name" : "instance-name-test",
"instance_id" : "instance-id-test",
"group" : "group-test",
"topic" : "topic-test",
"tag" : "string",
"ssl_enable" : true,
"enable_acl" : true,
"access_key" : "string",
"secret_key" : "string",
"message_type" : "string",
"engine_version" : "string",
"consume_timeout" : 30000,
"consumer_thread_nums" : 20,
"consumer_batch_max_size" : 1,
"max_reconsume_times" : -1,
"suspend_current_queue_time_millis" : 1000
},
"name" : "string"
},
"sink" : {
"sink_fg" : {
"invoke_type" : "SYNC/ASYNC",
"urn" : "String",
"agency" : "string"
},
"sink_kafka" : {
"topic" : "string",
"keyTransform" : [ {
"type" : "VARIABLE",
"value" : "{\"contant_boolean\": true,\"contant_string\": \"constant\",\"varaible_string\": \"$.data.string\",\"varaible_json_object\": \"$.data.object\"}",
"template" : "{\"contant_boolean\": ${contant_boolean},\"contant_string\": \"${contant_string!\\\"default\\\"}\",\"varaible_string\": \"${contant_boolean}\",\"varaible_json_object\": ${varaible_json_object!\"null\"}}"
} ],
"connectionId" : "string"
},
"sink_obs" : {
"access_key" : "string",
"secret_key" : "string",
"obs_bucket" : "string",
"obs_path" : "string",
"time_format" : "string"
},
"name" : "string"
},
"rule_config" : {
"transform" : {
"type" : "VARIABLE",
"value" : "{\"contant_boolean\": true,\"contant_string\": \"constant\",\"varaible_string\": \"$.data.string\",\"varaible_json_object\": \"$.data.object\"}",
"template" : "{\"contant_boolean\": ${contant_boolean},\"contant_string\": \"${contant_string!\\\"default\\\"}\",\"varaible_string\": \"${contant_boolean}\",\"varaible_json_object\": ${varaible_json_object!\"null\"}}"
},
"filter" : { }
},
"option" : {
"thread_num" : "3",
"batch_window" : {
"count" : 500,
"time" : 10,
"interval" : 0
}
}
}
响应示例
状态码:200
创建事件流成功
{
"eventStreamingID" : "23709d68-54d5-423b-a6be-03302e893152"
}
状态码
状态码 |
描述 |
---|---|
200 |
创建事件流成功 |
错误码
请参见错误码。