更新时间:2025-04-25 GMT+08:00
分享

更新事件流

功能介绍

更新事件流。

调试

您可以在API Explorer中调试该接口,支持自动认证鉴权。API Explorer可以自动生成SDK代码示例,并提供SDK代码示例调试功能。

URI

PUT /v1/{project_id}/eventstreamings/{eventstreaming_id}

表1 路径参数

参数

是否必选

参数类型

描述

project_id

String

租户资源空间ID

eventstreaming_id

String

事件流ID

请求参数

表2 请求Header参数

参数

是否必选

参数类型

描述

X-Auth-Token

String

用户Token。通过调用IAM服务获取用户Token接口获取(响应消息头中X-Subject-Token的值)。

表3 请求Body参数

参数

是否必选

参数类型

描述

name

String

事件流名称,租户下唯一,由字母、数字、点、下划线和中划线组成,必须字母或数字开头

description

String

事件流描述

source

EventStreamingSource object

事件源,一个事件流中事件源只有一个

sink

EventStreamingSink object

事件目标,一个事件流中只有一个事件目标,sink_fg、sink_kafka只能选择其中一个参数

rule_config

rule_config object

事件规则,包括过滤规则和转换规则

option

RunOption object

运行配置

表4 EventStreamingSource

参数

是否必选

参数类型

描述

source_kafka

SourceKafkaMQParameters object

事件源参数

source_mobile_rocketmq

SourceMobileMQParameters object

事件流移动云rockectMq事件源参数

source_community_rocketmq

SourceCommunityMQParameters object

事件流社区rockectMq事件源参数

source_dms_rocketmq

SourceDMSMQParameters object

DMS事件源参数

name

String

事件源类型名称

表5 SourceKafkaMQParameters

参数

是否必选

参数类型

描述

addr

String

kafka连接地址

group

String

kafka消费组

instance_name

String

kafka实例名称

security_protocol

String

安全协议

instance_id

String

kafka实例ID

topic

String

kafka topic名称

seek_to

String

消费点位

enable_sasl_ssl

Boolean

SASL_SSL是否开启

sasl_mechanism

String

SASL认证机制

ssl_certificate_url

String

SASL证书地址,配置的obs地址

ssl_certificate_pwd

String

SASL证书密码

user_name

String

用户名

password

String

用户密码

表6 SourceMobileMQParameters

参数

是否必选

参数类型

描述

group_id

String

消费组id

instance_id

String

实例id

topic

String

topic

tag

String

标签

authentication_required

Boolean

鉴权认证

msg_trace_switch

Boolean

保存消息轨迹

access_key

String

AccessKey

secret_key

String

SecretKey

message_model

String

订阅方式

addr_type

String

接入点类型

addr

String

地址

sdk_url

String

依赖SDK

consume_timeout

Integer

消费超时时间

message_type

String

消息类型

suspend_time

Integer

失败重试的等待时间

max_reconsumer_times

Integer

最大重试次数

consumer_thread_nums

Integer

消费线程数

consumer_batch_max_size

Integer

批量消费最大消息数

consumer_max_wait

Integer

批量消费最大等待时长,单位:秒

vpc_id

String

虚拟私有云

subnet_id

String

子网

表7 SourceCommunityMQParameters

参数

是否必选

参数类型

描述

instance_name

String

实例名称,仅dms的rockectMq需要该字段

instance_id

String

实例ID,仅dms的rockectMq需要该字段

addr

String

rockectMq连接地址

group

String

消费组

topic

String

topic名称

tag

String

标签

vpc_id

String

虚拟云id

subnet_id

String

子网id

ssl_enable

Boolean

开启SSL

enable_acl

Boolean

ACL访问控制

access_key

String

用户名

secret_key

String

密码

message_type

String

消息类型

consume_timeout

Integer

消费超时时间

consumer_thread_nums

Integer

线程消费数

consumer_batch_max_size

Integer

批量消费最大消息数

max_reconsume_times

Integer

最大重试次数,-1表示一直重试

suspend_current_queue_time_millis

Integer

重试间隔,单位ms

表8 SourceDMSMQParameters

参数

是否必选

参数类型

描述

instance_name

String

实例名称,仅dms的rockectMq需要该字段

instance_id

String

实例ID,仅dms的rockectMq需要该字段

group

String

消费组

topic

String

topic名称

tag

String

标签

ssl_enable

Boolean

开启SSL

enable_acl

Boolean

ACL访问控制

access_key

String

用户名

secret_key

String

密码

message_type

String

消费方式,针对不同生产顺序消息类型,选择消费方式会导致不同结果,请严格按照需求选择消费方式。1、生产顺序为:设置消息组,保证消息顺序发送。消费方式为:顺序消费,实际消息处理结果:按照消息组粒度,严格保证消息顺序。 同一消息组内的消息的消费顺序和发送顺序完全一致。2、生产顺序为:设置消息组,保证消息顺序发送。消费方式为:并发消费,实际消息处理结果:并发消费,尽可能按时间顺序处理。3、生产顺序为:未设置消息组,消息乱序发送。消费方式为:顺序消费,实际消息处理结果:按队列存储粒度,严格顺序。 基于 Apache RocketMQ 本身队列的属性,消费顺序和队列存储的顺序一致,但不保证和发送顺序一致。4、生产顺序为:未设置消息组,消息乱序发送。消费方式为:并发消费,实际消息处理结果:并发消费,尽可能按照时间顺序处理。

engine_version

String

mq实例版本

consume_timeout

Integer

消费超时时间

consumer_thread_nums

Integer

线程消费数

consumer_batch_max_size

Integer

批量消费最大消息数

max_reconsume_times

Integer

最大重试次数,-1表示一直重试

suspend_current_queue_time_millis

Integer

重试间隔,单位ms

表9 EventStreamingSink

参数

是否必选

参数类型

描述

sink_fg

SinkFGParameters object

函数事件目标的参数

sink_kafka

SinkKafkaParameters object

kafka事件目标的参数

sink_obs

SinkObsParameters object

OBS事件目标的参数

name

String

事件目标类型名称

表10 SinkFGParameters

参数

是否必选

参数类型

描述

invoke_type

String

函数执行方式,同步/异步

urn

String

函数链接

agency

String

租户委托

表11 SinkKafkaParameters

参数

是否必选

参数类型

描述

topic

String

topic名称

keyTransform

Array of TransForm objects

key的转换规则

connectionId

String

目标连接id

表12 TransForm

参数

是否必选

参数类型

描述

type

String

转换规则类型

value

String

常量类型规则时,字段为常量内容定义;

变量类型规则时,为变量定义,内容必须为JsonObject字符串。

变量最多支持100个,且不支持嵌套结构定义;

变量名由字母、数字、点、下划线和中划线组成,必须字母或数字开头不能以HC.开头,长度不超过64个字符;

变量值表达式支持常量或JsonPath表达式,字符串长度不超过1024个字符。

template

String

变量类型规则时,规则内容的模板定义,支持对已定义变量的引用。

表13 SinkObsParameters

参数

是否必选

参数类型

描述

access_key

String

AK

secret_key

String

SK

obs_bucket

String

obs_path

String

转储目录

time_format

String

时间目录格式

表14 rule_config

参数

是否必选

参数类型

描述

transform

TransForm object

订阅的事件目标转换规则

filter

Object

过滤规则

表15 RunOption

参数

是否必选

参数类型

描述

thread_num

Integer

并发数

batch_window

BatchWindow object

批量推送

表16 BatchWindow

参数

是否必选

参数类型

描述

count

Integer

批量推送条数[1,10000]

time

Integer

重试次数

interval

Integer

批量推送间隔[0,15],单位秒

响应参数

状态码:200

表17 响应Header参数

参数

参数类型

描述

X-Request-Id

String

This field is the request ID number for task tracking. Format is request_uuid-timestamp-hostname.

表18 响应Body参数

参数

参数类型

描述

eventStreamingID

String

事件流ID

请求示例

更新eventstreaming_id事件流

PUT https://{eg_endpoint}/v1/{project_id}/eventstreamings/{eventstreaming_id}

{
  "name" : "test-eventstreaming",
  "description" : "this is a test eventstreaming",
  "source" : {
    "source_kafka" : {
      "addr" : "10.10.10.10:8100",
      "group" : "group-test",
      "instance_name" : "instance-name-test",
      "security_protocol" : "PLAINTEXT",
      "instance_id" : "instance-id-test",
      "topic" : "topic-test",
      "seek_to" : "earliest",
      "enable_sasl_ssl" : true,
      "sasl_mechanism" : "PLAIN",
      "ssl_certificate_url" : "https://domain/kafka-certs.zip",
      "ssl_certificate_pwd" : null,
      "user_name" : "",
      "password" : ""
    },
    "source_mobile_rocketmq" : {
      "group_id" : "string",
      "instance_id" : "string",
      "topic" : "string",
      "tag" : "string",
      "authentication_required" : true,
      "msg_trace_switch" : true,
      "access_key" : "string",
      "secret_key" : "string",
      "message_model" : "CLUSTERING",
      "addr_type" : "PUBLIC",
      "addr" : "string",
      "sdk_url" : "string",
      "consume_timeout" : 30000,
      "message_type" : "string",
      "suspend_time" : 1800,
      "max_reconsumer_times" : 3,
      "consumer_thread_nums" : 20,
      "consumer_batch_max_size" : 20,
      "consumer_max_wait" : 5,
      "vpc_id" : "string",
      "subnet_id" : "string"
    },
    "source_community_rocketmq" : {
      "instance_name" : "instance-name-test",
      "instance_id" : "instance-id-test",
      "addr" : "10.10.10.10:8100",
      "group" : "group-test",
      "topic" : "topic-test",
      "tag" : "string",
      "vpc_id" : "string",
      "subnet_id" : "string",
      "ssl_enable" : true,
      "enable_acl" : true,
      "access_key" : "string",
      "secret_key" : "string",
      "message_type" : "string",
      "consume_timeout" : 30000,
      "consumer_thread_nums" : 20,
      "consumer_batch_max_size" : 1,
      "max_reconsume_times" : -1,
      "suspend_current_queue_time_millis" : 1000
    },
    "source_dms_rocketmq" : {
      "instance_name" : "instance-name-test",
      "instance_id" : "instance-id-test",
      "group" : "group-test",
      "topic" : "topic-test",
      "tag" : "string",
      "ssl_enable" : true,
      "enable_acl" : true,
      "access_key" : "string",
      "secret_key" : "string",
      "message_type" : "string",
      "engine_version" : "string",
      "consume_timeout" : 30000,
      "consumer_thread_nums" : 20,
      "consumer_batch_max_size" : 1,
      "max_reconsume_times" : -1,
      "suspend_current_queue_time_millis" : 1000
    },
    "name" : "string"
  },
  "sink" : {
    "sink_fg" : {
      "invoke_type" : "SYNC/ASYNC",
      "urn" : "String",
      "agency" : "string"
    },
    "sink_kafka" : {
      "topic" : "string",
      "keyTransform" : [ {
        "type" : "VARIABLE",
        "value" : "{\"contant_boolean\": true,\"contant_string\": \"constant\",\"varaible_string\": \"$.data.string\",\"varaible_json_object\": \"$.data.object\"}",
        "template" : "{\"contant_boolean\": ${contant_boolean},\"contant_string\": \"${contant_string!\\\"default\\\"}\",\"varaible_string\": \"${contant_boolean}\",\"varaible_json_object\": ${varaible_json_object!\"null\"}}"
      } ],
      "connectionId" : "string"
    },
    "sink_obs" : {
      "access_key" : "string",
      "secret_key" : "string",
      "obs_bucket" : "string",
      "obs_path" : "string",
      "time_format" : "string"
    },
    "name" : "string"
  },
  "rule_config" : {
    "transform" : {
      "type" : "VARIABLE",
      "value" : "{\"contant_boolean\": true,\"contant_string\": \"constant\",\"varaible_string\": \"$.data.string\",\"varaible_json_object\": \"$.data.object\"}",
      "template" : "{\"contant_boolean\": ${contant_boolean},\"contant_string\": \"${contant_string!\\\"default\\\"}\",\"varaible_string\": \"${contant_boolean}\",\"varaible_json_object\": ${varaible_json_object!\"null\"}}"
    },
    "filter" : { }
  },
  "option" : {
    "thread_num" : "3",
    "batch_window" : {
      "count" : 500,
      "time" : 10,
      "interval" : 0
    }
  }
}

响应示例

状态码:200

更新事件流配置成功

{
  "eventStreamingID" : "23709d68-54d5-423b-a6be-03302e893152"
}

状态码

状态码

描述

200

更新事件流配置成功

错误码

请参见错误码

相关文档