创建事件流
功能介绍
创建事件流。
URI
POST /v1/{project_id}/eventstreamings
参数 |
是否必选 |
参数类型 |
描述 |
---|---|---|---|
project_id |
是 |
String |
租户资源空间ID |
请求参数
参数 |
是否必选 |
参数类型 |
描述 |
---|---|---|---|
X-Auth-Token |
是 |
String |
用户Token。通过调用IAM服务获取用户Token接口获取(响应消息头中X-Subject-Token的值)。 |
参数 |
是否必选 |
参数类型 |
描述 |
---|---|---|---|
name |
是 |
String |
事件流名称,租户下唯一,由字母、数字、点、下划线和中划线组成,必须字母或数字开头 |
description |
否 |
String |
事件流描述 |
source |
是 |
EventStreamingSource object |
事件源,一个事件流中事件源只有一个 |
sink |
是 |
EventStreamingSink object |
事件目标,一个事件流中只有一个事件目标,sink_fg、sink_kafka只能选择其中一个参数 |
rule_config |
否 |
rule_config object |
事件规则,包括过滤规则和转换规则 |
option |
否 |
RunOption object |
运行配置 |
参数 |
是否必选 |
参数类型 |
描述 |
---|---|---|---|
source_kafka |
否 |
SourceKafkaMQParameters object |
事件源参数 |
source_mobile_rocketmq |
否 |
SourceMobileMQParameters object |
事件流移动云rockectMq事件源参数 |
source_community_rocketmq |
否 |
SourceCommunityMQParameters object |
事件流社区rockectMq事件源参数 |
source_dms_rocketmq |
否 |
SourceDMSMQParameters object |
DMS事件源参数 |
name |
否 |
String |
事件源类型名称 |
参数 |
是否必选 |
参数类型 |
描述 |
---|---|---|---|
addr |
否 |
String |
kafka连接地址 |
group |
是 |
String |
kafka消费组 |
instance_name |
否 |
String |
kafka实例名称 |
security_protocol |
否 |
String |
安全协议 |
instance_id |
否 |
String |
kafka实例ID |
topic |
是 |
String |
kafka topic名称 |
seek_to |
否 |
String |
消费点位 |
enable_sasl_ssl |
否 |
Boolean |
SASL_SSL是否开启 |
sasl_mechanism |
否 |
String |
SASL认证机制 |
ssl_certificate_url |
否 |
String |
SASL证书地址,配置的obs地址 |
ssl_certificate_pwd |
否 |
String |
SASL证书密码 |
user_name |
否 |
String |
用户名 |
password |
否 |
String |
用户密码 |
参数 |
是否必选 |
参数类型 |
描述 |
---|---|---|---|
group_id |
是 |
String |
消费组id |
instance_id |
是 |
String |
实例id |
topic |
是 |
String |
topic |
tag |
否 |
String |
标签 |
authentication_required |
否 |
Boolean |
鉴权认证 |
msg_trace_switch |
否 |
Boolean |
保存消息轨迹 |
access_key |
否 |
String |
AccessKey |
secret_key |
否 |
String |
SecretKey |
message_model |
是 |
String |
订阅方式 |
addr_type |
是 |
String |
接入点类型 |
addr |
是 |
String |
地址 |
sdk_url |
是 |
String |
依赖SDK |
consume_timeout |
是 |
Integer |
消费超时时间 |
message_type |
是 |
String |
消息类型 |
suspend_time |
否 |
Integer |
失败重试的等待时间 |
max_reconsumer_times |
否 |
Integer |
最大重试次数 |
consumer_thread_nums |
否 |
Integer |
消费线程数 |
consumer_batch_max_size |
否 |
Integer |
批量消费最大消息数 |
consumer_max_wait |
否 |
Integer |
批量消费最大等待时长,单位:秒 |
vpc_id |
否 |
String |
虚拟私有云 |
subnet_id |
否 |
String |
子网 |
参数 |
是否必选 |
参数类型 |
描述 |
---|---|---|---|
instance_name |
否 |
String |
实例名称,仅dms的rockectMq需要该字段 |
instance_id |
否 |
String |
实例ID,仅dms的rockectMq需要该字段 |
addr |
是 |
String |
rockectMq连接地址 |
group |
是 |
String |
消费组 |
topic |
是 |
String |
topic名称 |
tag |
否 |
String |
标签 |
vpc_id |
是 |
String |
虚拟云id |
subnet_id |
是 |
String |
子网id |
ssl_enable |
否 |
Boolean |
开启SSL |
enable_acl |
否 |
Boolean |
ACL访问控制 |
access_key |
否 |
String |
用户名 |
secret_key |
否 |
String |
密码 |
message_type |
否 |
String |
消息类型 |
consume_timeout |
否 |
Integer |
消费超时时间 |
consumer_thread_nums |
否 |
Integer |
线程消费数 |
consumer_batch_max_size |
否 |
Integer |
批量消费最大消息数 |
max_reconsume_times |
否 |
Integer |
最大重试次数,-1表示一直重试 |
suspend_current_queue_time_millis |
否 |
Integer |
重试间隔,单位ms |
参数 |
是否必选 |
参数类型 |
描述 |
---|---|---|---|
instance_name |
否 |
String |
实例名称,仅dms的rockectMq需要该字段 |
instance_id |
是 |
String |
实例ID,仅dms的rockectMq需要该字段 |
group |
是 |
String |
消费组 |
topic |
是 |
String |
topic名称 |
tag |
否 |
String |
标签 |
ssl_enable |
否 |
Boolean |
开启SSL |
enable_acl |
否 |
Boolean |
ACL访问控制 |
access_key |
否 |
String |
用户名 |
secret_key |
否 |
String |
密码 |
message_type |
否 |
String |
消费方式,针对不同生产顺序消息类型,选择消费方式会导致不同结果,请严格按照需求选择消费方式。1、生产顺序为:设置消息组,保证消息顺序发送。消费方式为:顺序消费,实际消息处理结果:按照消息组粒度,严格保证消息顺序。 同一消息组内的消息的消费顺序和发送顺序完全一致。2、生产顺序为:设置消息组,保证消息顺序发送。消费方式为:并发消费,实际消息处理结果:并发消费,尽可能按时间顺序处理。3、生产顺序为:未设置消息组,消息乱序发送。消费方式为:顺序消费,实际消息处理结果:按队列存储粒度,严格顺序。 基于 Apache RocketMQ 本身队列的属性,消费顺序和队列存储的顺序一致,但不保证和发送顺序一致。4、生产顺序为:未设置消息组,消息乱序发送。消费方式为:并发消费,实际消息处理结果:并发消费,尽可能按照时间顺序处理。 |
engine_version |
否 |
String |
mq实例版本 |
consume_timeout |
否 |
Integer |
消费超时时间 |
consumer_thread_nums |
否 |
Integer |
线程消费数 |
consumer_batch_max_size |
否 |
Integer |
批量消费最大消息数 |
max_reconsume_times |
否 |
Integer |
最大重试次数,-1表示一直重试 |
suspend_current_queue_time_millis |
否 |
Integer |
重试间隔,单位ms |
参数 |
是否必选 |
参数类型 |
描述 |
---|---|---|---|
sink_fg |
否 |
SinkFGParameters object |
函数事件目标的参数 |
sink_kafka |
否 |
SinkKafkaParameters object |
kafka事件目标的参数 |
sink_obs |
否 |
SinkObsParameters object |
OBS事件目标的参数 |
name |
否 |
String |
事件目标类型名称 |
参数 |
是否必选 |
参数类型 |
描述 |
---|---|---|---|
invoke_type |
否 |
String |
函数执行方式,同步/异步 |
urn |
否 |
String |
函数链接 |
agency |
否 |
String |
租户委托 |
参数 |
是否必选 |
参数类型 |
描述 |
---|---|---|---|
topic |
是 |
String |
topic名称 |
keyTransform |
否 |
Array of TransForm objects |
key的转换规则 |
connectionId |
是 |
String |
目标连接id |
参数 |
是否必选 |
参数类型 |
描述 |
---|---|---|---|
type |
是 |
String |
转换规则类型 |
value |
否 |
String |
常量类型规则时,字段为常量内容定义; 变量类型规则时,为变量定义,内容必须为JsonObject字符串。 变量最多支持100个,且不支持嵌套结构定义; 变量名由字母、数字、点、下划线和中划线组成,必须字母或数字开头不能以HC.开头,长度不超过64个字符; 变量值表达式支持常量或JsonPath表达式,字符串长度不超过1024个字符。 |
template |
否 |
String |
变量类型规则时,规则内容的模板定义,支持对已定义变量的引用。 |
响应参数
状态码:200
参数 |
参数类型 |
描述 |
---|---|---|
X-Request-Id |
String |
This field is the request ID number for task tracking. Format is request_uuid-timestamp-hostname. |
参数 |
参数类型 |
描述 |
---|---|---|
eventStreamingID |
String |
事件流ID |
请求示例
创建事件流
POST https://{eg_endpoint}/v1/{project_id}/eventstreamings { "name" : "test-eventstreaming", "description" : "this is a test eventstreaming", "source" : { "source_kafka" : { "addr" : "10.10.10.10:8100", "group" : "group-test", "instance_name" : "instance-name-test", "security_protocol" : "PLAINTEXT", "instance_id" : "instance-id-test", "topic" : "topic-test", "seek_to" : "earliest", "enable_sasl_ssl" : true, "sasl_mechanism" : "PLAIN", "ssl_certificate_url" : "https://domain/kafka-certs.zip", "ssl_certificate_pwd" : null, "user_name" : "", "password" : "" }, "source_mobile_rocketmq" : { "group_id" : "string", "instance_id" : "string", "topic" : "string", "tag" : "string", "authentication_required" : true, "msg_trace_switch" : true, "access_key" : "string", "secret_key" : "string", "message_model" : "CLUSTERING", "addr_type" : "PUBLIC", "addr" : "string", "sdk_url" : "string", "consume_timeout" : 30000, "message_type" : "string", "suspend_time" : 1800, "max_reconsumer_times" : 3, "consumer_thread_nums" : 20, "consumer_batch_max_size" : 20, "consumer_max_wait" : 5, "vpc_id" : "string", "subnet_id" : "string" }, "source_community_rocketmq" : { "instance_name" : "instance-name-test", "instance_id" : "instance-id-test", "addr" : "10.10.10.10:8100", "group" : "group-test", "topic" : "topic-test", "tag" : "string", "vpc_id" : "string", "subnet_id" : "string", "ssl_enable" : true, "enable_acl" : true, "access_key" : "string", "secret_key" : "string", "message_type" : "string", "consume_timeout" : 30000, "consumer_thread_nums" : 20, "consumer_batch_max_size" : 1, "max_reconsume_times" : -1, "suspend_current_queue_time_millis" : 1000 }, "source_dms_rocketmq" : { "instance_name" : "instance-name-test", "instance_id" : "instance-id-test", "group" : "group-test", "topic" : "topic-test", "tag" : "string", "ssl_enable" : true, "enable_acl" : true, "access_key" : "string", "secret_key" : "string", "message_type" : "string", "engine_version" : "string", "consume_timeout" : 30000, "consumer_thread_nums" : 20, "consumer_batch_max_size" : 1, "max_reconsume_times" : -1, "suspend_current_queue_time_millis" : 1000 }, "name" : "string" }, "sink" : { "sink_fg" : { "invoke_type" : "SYNC/ASYNC", "urn" : "String", "agency" : "string" }, "sink_kafka" : { "topic" : "string", "keyTransform" : [ { "type" : "VARIABLE", "value" : "{\"contant_boolean\": true,\"contant_string\": \"constant\",\"varaible_string\": \"$.data.string\",\"varaible_json_object\": \"$.data.object\"}", "template" : "{\"contant_boolean\": ${contant_boolean},\"contant_string\": \"${contant_string!\\\"default\\\"}\",\"varaible_string\": \"${contant_boolean}\",\"varaible_json_object\": ${varaible_json_object!\"null\"}}" } ], "connectionId" : "string" }, "sink_obs" : { "access_key" : "string", "secret_key" : "string", "obs_bucket" : "string", "obs_path" : "string", "time_format" : "string" }, "name" : "string" }, "rule_config" : { "transform" : { "type" : "VARIABLE", "value" : "{\"contant_boolean\": true,\"contant_string\": \"constant\",\"varaible_string\": \"$.data.string\",\"varaible_json_object\": \"$.data.object\"}", "template" : "{\"contant_boolean\": ${contant_boolean},\"contant_string\": \"${contant_string!\\\"default\\\"}\",\"varaible_string\": \"${contant_boolean}\",\"varaible_json_object\": ${varaible_json_object!\"null\"}}" }, "filter" : { } }, "option" : { "thread_num" : "3", "batch_window" : { "count" : 500, "time" : 10, "interval" : 0 } } }
响应示例
状态码:200
创建事件流成功
{ "eventStreamingID" : "23709d68-54d5-423b-a6be-03302e893152" }
状态码
状态码 |
描述 |
---|---|
200 |
创建事件流成功 |
错误码
请参见错误码。