计算
弹性云服务器 ECS
Flexus云服务
裸金属服务器 BMS
弹性伸缩 AS
镜像服务 IMS
专属主机 DeH
函数工作流 FunctionGraph
云手机服务器 CPH
Huawei Cloud EulerOS
网络
虚拟私有云 VPC
弹性公网IP EIP
虚拟专用网络 VPN
弹性负载均衡 ELB
NAT网关 NAT
云专线 DC
VPC终端节点 VPCEP
云连接 CC
企业路由器 ER
企业交换机 ESW
全球加速 GA
安全与合规
安全技术与应用
Web应用防火墙 WAF
企业主机安全 HSS
云防火墙 CFW
安全云脑 SecMaster
DDoS防护 AAD
数据加密服务 DEW
数据库安全服务 DBSS
云堡垒机 CBH
数据安全中心 DSC
云证书管理服务 CCM
边缘安全 EdgeSec
CDN与智能边缘
内容分发网络 CDN
CloudPond云服务
智能边缘云 IEC
迁移
主机迁移服务 SMS
对象存储迁移服务 OMS
云数据迁移 CDM
迁移中心 MGC
大数据
MapReduce服务 MRS
数据湖探索 DLI
表格存储服务 CloudTable
云搜索服务 CSS
数据接入服务 DIS
数据仓库服务 GaussDB(DWS)
数据治理中心 DataArts Studio
数据可视化 DLV
数据湖工厂 DLF
湖仓构建 LakeFormation
企业应用
云桌面 Workspace
应用与数据集成平台 ROMA Connect
云解析服务 DNS
专属云
专属计算集群 DCC
IoT物联网
IoT物联网
设备接入 IoTDA
智能边缘平台 IEF
云生态
合作伙伴中心
云商店
开发者工具
SDK开发指南
API签名指南
Terraform
华为云命令行工具服务 KooCLI
其他
产品价格详情
系统权限
管理控制台
客户关联华为云合作伙伴须知
消息中心
公共问题
视频
视频直播 Live
视频点播 VOD
媒体处理 MPC
实时音视频 SparkRTC
数字内容生产线 MetaStudio
开发与运维
应用管理与运维平台 ServiceStage
软件开发生产线 CodeArts
需求管理 CodeArts Req
部署 CodeArts Deploy
性能测试 CodeArts PerfTest
编译构建 CodeArts Build
流水线 CodeArts Pipeline
制品仓库 CodeArts Artifact
测试计划 CodeArts TestPlan
代码检查 CodeArts Check
代码托管 CodeArts Repo
云应用引擎 CAE
华为云Astro轻应用
华为云Astro大屏应用
开源治理服务 CodeArts Governance
存储
对象存储服务 OBS
云硬盘 EVS
云备份 CBR
存储容灾服务 SDRS
高性能弹性文件服务 SFS Turbo
弹性文件服务 SFS
云硬盘备份 VBS
云服务器备份 CSBS
数据快递服务 DES
专属分布式存储服务 DSS
容器
云容器引擎 CCE
容器镜像服务 SWR
应用服务网格 ASM
华为云UCS
云容器实例 CCI
管理与监管
云监控服务 CES
统一身份认证服务 IAM
资源编排服务 RFS
云审计服务 CTS
标签管理服务 TMS
云日志服务 LTS
配置审计 Config
资源访问管理 RAM
消息通知服务 SMN
应用运维管理 AOM
应用性能管理 APM
组织 Organizations
优化顾问 OA
IAM 身份中心
云运维中心 COC
资源治理中心 RGC
应用身份管理服务 OneAccess
数据库
云数据库 RDS
文档数据库服务 DDS
数据管理服务 DAS
数据复制服务 DRS
云数据库 GeminiDB
云数据库 GaussDB
分布式数据库中间件 DDM
数据库和应用迁移 UGO
云数据库 TaurusDB
人工智能
人脸识别服务 FRS
图引擎服务 GES
图像识别 Image
内容审核 Moderation
文字识别 OCR
AI开发平台ModelArts
图像搜索 ImageSearch
对话机器人服务 CBS
华为HiLens
视频智能分析服务 VIAS
语音交互服务 SIS
应用中间件
分布式缓存服务 DCS
API网关 APIG
微服务引擎 CSE
分布式消息服务Kafka版
分布式消息服务RabbitMQ版
分布式消息服务RocketMQ版
多活高可用服务 MAS
事件网格 EG
企业协同
华为云会议 Meeting
云通信
消息&短信 MSGSMS
云化转型
云架构中心
云采用框架
用户服务
账号中心
费用中心
成本中心
资源中心
企业管理
工单管理
国际站常见问题
ICP备案
我的凭证
支持计划
客户运营能力
合作伙伴支持计划
专业服务
区块链
区块链服务 BCS
Web3节点引擎服务 NES
解决方案
SAP
高性能计算 HPC
开天aPaaS
云消息服务 KooMessage
云手机服务 KooPhone
云空间服务 KooDrive

创建事件流

功能介绍

创建事件流。

URI

POST /v1/{project_id}/eventstreamings

表1 路径参数

参数

是否必选

参数类型

描述

project_id

String

租户资源空间ID

请求参数

表2 请求Header参数

参数

是否必选

参数类型

描述

X-Auth-Token

String

用户Token。通过调用IAM服务获取用户Token接口获取(响应消息头中X-Subject-Token的值)。

表3 请求Body参数

参数

是否必选

参数类型

描述

name

String

事件流名称,租户下唯一,由字母、数字、点、下划线和中划线组成,必须字母或数字开头

description

String

事件流描述

source

EventStreamingSource object

事件源,一个事件流中事件源只有一个

sink

EventStreamingSink object

事件目标,一个事件流中只有一个事件目标,sink_fg、sink_kafka只能选择其中一个参数

rule_config

rule_config object

事件规则,包括过滤规则和转换规则

option

RunOption object

运行配置

表4 EventStreamingSource

参数

是否必选

参数类型

描述

source_kafka

SourceKafkaMQParameters object

事件源参数

source_mobile_rocketmq

SourceMobileMQParameters object

事件流移动云rockectMq事件源参数

source_community_rocketmq

SourceCommunityMQParameters object

事件流社区rockectMq事件源参数

source_dms_rocketmq

SourceDMSMQParameters object

DMS事件源参数

name

String

事件源类型名称

表5 SourceKafkaMQParameters

参数

是否必选

参数类型

描述

addr

String

kafka连接地址

group

String

kafka消费组

instance_name

String

kafka实例名称

security_protocol

String

安全协议

instance_id

String

kafka实例ID

topic

String

kafka topic名称

seek_to

String

消费点位

enable_sasl_ssl

Boolean

SASL_SSL是否开启

sasl_mechanism

String

SASL认证机制

ssl_certificate_url

String

SASL证书地址,配置的obs地址

ssl_certificate_pwd

String

SASL证书密码

user_name

String

用户名

password

String

用户密码

表6 SourceMobileMQParameters

参数

是否必选

参数类型

描述

group_id

String

消费组id

instance_id

String

实例id

topic

String

topic

tag

String

标签

authentication_required

Boolean

鉴权认证

msg_trace_switch

Boolean

保存消息轨迹

access_key

String

AccessKey

secret_key

String

SecretKey

message_model

String

订阅方式

addr_type

String

接入点类型

addr

String

地址

sdk_url

String

依赖SDK

consume_timeout

Integer

消费超时时间

message_type

String

消息类型

suspend_time

Integer

失败重试的等待时间

max_reconsumer_times

Integer

最大重试次数

consumer_thread_nums

Integer

消费线程数

consumer_batch_max_size

Integer

批量消费最大消息数

consumer_max_wait

Integer

批量消费最大等待时长,单位:秒

vpc_id

String

虚拟私有云

subnet_id

String

子网

表7 SourceCommunityMQParameters

参数

是否必选

参数类型

描述

instance_name

String

实例名称,仅dms的rockectMq需要该字段

instance_id

String

实例ID,仅dms的rockectMq需要该字段

addr

String

rockectMq连接地址

group

String

消费组

topic

String

topic名称

tag

String

标签

vpc_id

String

虚拟云id

subnet_id

String

子网id

ssl_enable

Boolean

开启SSL

enable_acl

Boolean

ACL访问控制

access_key

String

用户名

secret_key

String

密码

message_type

String

消息类型

consume_timeout

Integer

消费超时时间

consumer_thread_nums

Integer

线程消费数

consumer_batch_max_size

Integer

批量消费最大消息数

max_reconsume_times

Integer

最大重试次数,-1表示一直重试

suspend_current_queue_time_millis

Integer

重试间隔,单位ms

表8 SourceDMSMQParameters

参数

是否必选

参数类型

描述

instance_name

String

实例名称,仅dms的rockectMq需要该字段

instance_id

String

实例ID,仅dms的rockectMq需要该字段

group

String

消费组

topic

String

topic名称

tag

String

标签

ssl_enable

Boolean

开启SSL

enable_acl

Boolean

ACL访问控制

access_key

String

用户名

secret_key

String

密码

message_type

String

消费方式,针对不同生产顺序消息类型,选择消费方式会导致不同结果,请严格按照需求选择消费方式。1、生产顺序为:设置消息组,保证消息顺序发送。消费方式为:顺序消费,实际消息处理结果:按照消息组粒度,严格保证消息顺序。 同一消息组内的消息的消费顺序和发送顺序完全一致。2、生产顺序为:设置消息组,保证消息顺序发送。消费方式为:并发消费,实际消息处理结果:并发消费,尽可能按时间顺序处理。3、生产顺序为:未设置消息组,消息乱序发送。消费方式为:顺序消费,实际消息处理结果:按队列存储粒度,严格顺序。 基于 Apache RocketMQ 本身队列的属性,消费顺序和队列存储的顺序一致,但不保证和发送顺序一致。4、生产顺序为:未设置消息组,消息乱序发送。消费方式为:并发消费,实际消息处理结果:并发消费,尽可能按照时间顺序处理。

engine_version

String

mq实例版本

consume_timeout

Integer

消费超时时间

consumer_thread_nums

Integer

线程消费数

consumer_batch_max_size

Integer

批量消费最大消息数

max_reconsume_times

Integer

最大重试次数,-1表示一直重试

suspend_current_queue_time_millis

Integer

重试间隔,单位ms

表9 EventStreamingSink

参数

是否必选

参数类型

描述

sink_fg

SinkFGParameters object

函数事件目标的参数

sink_kafka

SinkKafkaParameters object

kafka事件目标的参数

sink_obs

SinkObsParameters object

OBS事件目标的参数

name

String

事件目标类型名称

表10 SinkFGParameters

参数

是否必选

参数类型

描述

invoke_type

String

函数执行方式,同步/异步

urn

String

函数链接

agency

String

租户委托

表11 SinkKafkaParameters

参数

是否必选

参数类型

描述

topic

String

topic名称

keyTransform

Array of TransForm objects

key的转换规则

connectionId

String

目标连接id

表12 TransForm

参数

是否必选

参数类型

描述

type

String

转换规则类型

value

String

常量类型规则时,字段为常量内容定义;

变量类型规则时,为变量定义,内容必须为JsonObject字符串。

变量最多支持100个,且不支持嵌套结构定义;

变量名由字母、数字、点、下划线和中划线组成,必须字母或数字开头不能以HC.开头,长度不超过64个字符;

变量值表达式支持常量或JsonPath表达式,字符串长度不超过1024个字符。

template

String

变量类型规则时,规则内容的模板定义,支持对已定义变量的引用。

表13 SinkObsParameters

参数

是否必选

参数类型

描述

access_key

String

AK

secret_key

String

SK

obs_bucket

String

obs_path

String

转储目录

time_format

String

时间目录格式

表14 rule_config

参数

是否必选

参数类型

描述

transform

TransForm object

订阅的事件目标转换规则

filter

Object

过滤规则

表15 RunOption

参数

是否必选

参数类型

描述

thread_num

Integer

并发数

batch_window

BatchWindow object

批量推送

表16 BatchWindow

参数

是否必选

参数类型

描述

count

Integer

批量推送条数[1,10000]

time

Integer

重试次数

interval

Integer

批量推送间隔[0,15],单位秒

响应参数

状态码:200

表17 响应Header参数

参数

参数类型

描述

X-Request-Id

String

This field is the request ID number for task tracking. Format is request_uuid-timestamp-hostname.

表18 响应Body参数

参数

参数类型

描述

eventStreamingID

String

事件流ID

请求示例

创建事件流

POST https://{eg_endpoint}/v1/{project_id}/eventstreamings

{
  "name" : "test-eventstreaming",
  "description" : "this is a test eventstreaming",
  "source" : {
    "source_kafka" : {
      "addr" : "10.10.10.10:8100",
      "group" : "group-test",
      "instance_name" : "instance-name-test",
      "security_protocol" : "PLAINTEXT",
      "instance_id" : "instance-id-test",
      "topic" : "topic-test",
      "seek_to" : "earliest",
      "enable_sasl_ssl" : true,
      "sasl_mechanism" : "PLAIN",
      "ssl_certificate_url" : "https://domain/kafka-certs.zip",
      "ssl_certificate_pwd" : null,
      "user_name" : "",
      "password" : ""
    },
    "source_mobile_rocketmq" : {
      "group_id" : "string",
      "instance_id" : "string",
      "topic" : "string",
      "tag" : "string",
      "authentication_required" : true,
      "msg_trace_switch" : true,
      "access_key" : "string",
      "secret_key" : "string",
      "message_model" : "CLUSTERING",
      "addr_type" : "PUBLIC",
      "addr" : "string",
      "sdk_url" : "string",
      "consume_timeout" : 30000,
      "message_type" : "string",
      "suspend_time" : 1800,
      "max_reconsumer_times" : 3,
      "consumer_thread_nums" : 20,
      "consumer_batch_max_size" : 20,
      "consumer_max_wait" : 5,
      "vpc_id" : "string",
      "subnet_id" : "string"
    },
    "source_community_rocketmq" : {
      "instance_name" : "instance-name-test",
      "instance_id" : "instance-id-test",
      "addr" : "10.10.10.10:8100",
      "group" : "group-test",
      "topic" : "topic-test",
      "tag" : "string",
      "vpc_id" : "string",
      "subnet_id" : "string",
      "ssl_enable" : true,
      "enable_acl" : true,
      "access_key" : "string",
      "secret_key" : "string",
      "message_type" : "string",
      "consume_timeout" : 30000,
      "consumer_thread_nums" : 20,
      "consumer_batch_max_size" : 1,
      "max_reconsume_times" : -1,
      "suspend_current_queue_time_millis" : 1000
    },
    "source_dms_rocketmq" : {
      "instance_name" : "instance-name-test",
      "instance_id" : "instance-id-test",
      "group" : "group-test",
      "topic" : "topic-test",
      "tag" : "string",
      "ssl_enable" : true,
      "enable_acl" : true,
      "access_key" : "string",
      "secret_key" : "string",
      "message_type" : "string",
      "engine_version" : "string",
      "consume_timeout" : 30000,
      "consumer_thread_nums" : 20,
      "consumer_batch_max_size" : 1,
      "max_reconsume_times" : -1,
      "suspend_current_queue_time_millis" : 1000
    },
    "name" : "string"
  },
  "sink" : {
    "sink_fg" : {
      "invoke_type" : "SYNC/ASYNC",
      "urn" : "String",
      "agency" : "string"
    },
    "sink_kafka" : {
      "topic" : "string",
      "keyTransform" : [ {
        "type" : "VARIABLE",
        "value" : "{\"contant_boolean\": true,\"contant_string\": \"constant\",\"varaible_string\": \"$.data.string\",\"varaible_json_object\": \"$.data.object\"}",
        "template" : "{\"contant_boolean\": ${contant_boolean},\"contant_string\": \"${contant_string!\\\"default\\\"}\",\"varaible_string\": \"${contant_boolean}\",\"varaible_json_object\": ${varaible_json_object!\"null\"}}"
      } ],
      "connectionId" : "string"
    },
    "sink_obs" : {
      "access_key" : "string",
      "secret_key" : "string",
      "obs_bucket" : "string",
      "obs_path" : "string",
      "time_format" : "string"
    },
    "name" : "string"
  },
  "rule_config" : {
    "transform" : {
      "type" : "VARIABLE",
      "value" : "{\"contant_boolean\": true,\"contant_string\": \"constant\",\"varaible_string\": \"$.data.string\",\"varaible_json_object\": \"$.data.object\"}",
      "template" : "{\"contant_boolean\": ${contant_boolean},\"contant_string\": \"${contant_string!\\\"default\\\"}\",\"varaible_string\": \"${contant_boolean}\",\"varaible_json_object\": ${varaible_json_object!\"null\"}}"
    },
    "filter" : { }
  },
  "option" : {
    "thread_num" : "3",
    "batch_window" : {
      "count" : 500,
      "time" : 10,
      "interval" : 0
    }
  }
}

响应示例

状态码:200

创建事件流成功

{
  "eventStreamingID" : "23709d68-54d5-423b-a6be-03302e893152"
}

状态码

状态码

描述

200

创建事件流成功

错误码

请参见错误码

我们使用cookie来确保您的高速浏览体验。继续浏览本站,即表示您同意我们使用cookie。 详情

文档反馈

文档反馈

意见反馈

0/500

标记内容

同时提交标记内容