Updated on 2025-05-06 GMT+08:00

Create a Professional Event Stream Job

Function

This API is used to create a professional event stream job.

URI

POST /v1/{project_id}/eventrouter/jobs

Table 1 Path Parameters

Parameter

Mandatory

Type

Description

project_id

Yes

String

Tenant resource space ID.

Request Parameters

Table 2 Request header parameters

Parameter

Mandatory

Type

Description

X-Auth-Token

Yes

String

User token. The token can be obtained by calling the IAM API used to obtain a user token. The value of X-Subject-Token in the response header is the user token.

Table 3 Request body parameters

Parameter

Mandatory

Type

Description

cluster_id

Yes

String

Professional event stream cluster ID.

name

Yes

String

Job name. Start with a letter or digit, and only include letters, digits, periods (.), underscores (_), and hyphens (-). (Max. 128 characters)

description

No

String

Job description. The description can contain a maximum of 256 characters.

scene_type

Yes

String

Scenario type. DISASTER_BACKUP: synchronization task.

source_config

No

EventRouterJobSource object

Source configuration of a professional event stream job.

sink_config

No

EventRouterJobSink object

Target configuration of a professional event stream job.

advance_config

No

EventRouterAdvanceConfig object

Advanced configuration of professional event stream jobs.

Table 4 EventRouterJobSource

Parameter

Mandatory

Type

Description

source_kafka

No

KafkaParameters object

Kafka configuration.

source_rocketmq

No

RocketmqParameters object

RocketMQ configuration.

source_dcs

No

DcsParameters object

DCS configuration.

Table 5 EventRouterJobSink

Parameter

Mandatory

Type

Description

sink_kafka

No

KafkaParameters object

Kafka configuration.

sink_rocketmq

No

RocketmqParameters object

RocketMQ configuration.

sink_dcs

No

DcsParameters object

DCS configuration.

Table 6 KafkaParameters

Parameter

Mandatory

Type

Description

config_type

Yes

String

Instance type.

CLOUD: DMS for Kafka instance.

region

Yes

String

Region ID.

project_id

Yes

String

Resource space ID.

instance_id

No

String

Kafka instance ID. This parameter is mandatory when configuration type is set to CLOUD.

cluster_alias

Yes

String

Kafka cluster alias.

address

No

String

Kafka address.

authenticate_mode

No

String

Authentication mode for Kafka.

sasl_mechanism

No

String

SASL authentication mechanism.

user_name

No

String

Username. This parameter is mandatory when SASL_SSL is enabled.

password

No

String

Password. This parameter is mandatory when SASL_SSL is enabled.

Table 7 RocketmqParameters

Parameter

Mandatory

Type

Description

config_type

Yes

String

Instance type.

CLOUD: DMS for RocketMQ instance

region

Yes

String

Region ID.

project_id

Yes

String

Resource space ID.

instance_id

No

String

RocketMQ instance ID. This parameter is mandatory when the configuration type is CLOUD.

cluster_alias

Yes

String

RocketMQ cluster alias.

address

No

String

RocketMQ address.

ssl_open

No

String

Whether to enable RocketMQ SSL.

true: Enable

false: Disable

acl_open

No

Boolean

Whether to enable RocketMQ ACL.

true: Enable

false: Disable

user_name

No

String

Username. This parameter is mandatory when ACL is enabled.

password

No

String

Password. This parameter is mandatory when ACL is enabled.

Table 8 DcsParameters

Parameter

Mandatory

Type

Description

config_type

Yes

String

Instance type.

CLOUD: DCS instance

region

Yes

String

Region ID.

project_id

Yes

String

Resource space ID.

instance_id

No

String

DCS instance ID. This parameter is mandatory when the configuration type is set to CLOUD.

address

No

String

DCS address.

password

No

String

Password.

Table 9 EventRouterAdvanceConfig

Parameter

Mandatory

Type

Description

kafka2kafka

No

Kafka2KafkaAdvanceConfig object

Kafka synchronization custom configuration.

rocketmq2rocketmq

No

Rocketmq2RocketmqAdvanceConfig object

RocketMQ synchronization custom configuration.

dcs2dcs

No

Dcs2DcsAdvanceConfig object

DCS synchronization custom configuration.

Table 10 Kafka2KafkaAdvanceConfig

Parameter

Mandatory

Type

Description

topics

No

String

Name of the topic to be synchronized. Use commas (,) to separate the topic names for exact match. Either this parameter or topic_regex must be specified.

topic_regex

No

String

Regular expression of the topic to be synchronized. Either this parameter or the topics parameter must be set.

replica_cnt

Yes

Integer

Number of replicas, which cannot be greater than the number of brokers.

seek_to

Yes

String

Data synchronization point.

compress_alg

Yes

String

Compression algorithm.

none: The compression algorithm is not used.

gzip: Gzip compression algorithm.

snappy: Snappy compression algorithm.

lz4: LZ4 compression algorithm.

zstd: zstd compression algorithm.

is_sync_progress

Yes

Boolean

Indicates whether to synchronize the consumption progress.

true: yes

false: no

is_add_header

Yes

Boolean

Indicates whether to add the source header.

rename_topic

No

Boolean

Indicates whether to rename a topic.

topic_mappings

No

Array of KafkaTopicMapping objects

Topic mapping.

Table 11 KafkaTopicMapping

Parameter

Mandatory

Type

Description

source_topic

No

String

Source topic.

sink_topic

No

String

Target topic.

Table 12 Rocketmq2RocketmqAdvanceConfig

Parameter

Mandatory

Type

Description

topics

No

String

Specifies the name of the topic to be synchronized. Use commas (,) to separate the topic names for exact match. Either this parameter or topic_regex must be specified.

topic_regex

No

String

Regular expression of the topic to be synchronized. Either this parameter or the topics parameter must be set.

is_sync_progress

Yes

Boolean

Indicates whether to sync the consumption progress.

seek_to

Yes

String

Data synchronization point. Options: first, last, custom timestamp (long type).

compress_alg

Yes

String

Compression algorithm.

none: The compression algorithm is not used.

gzip: Gzip compression algorithm.

lz4: LZ4 compression algorithm.

zstd: zstd compression algorithm.

Table 13 Dcs2DcsAdvanceConfig

Parameter

Mandatory

Type

Description

synchronize_type

Yes

String

Synchronization type. Default: Full+Incremental.

connect_retry_time

Yes

String

Retry interval upon connection failure. The default value is 60 seconds.

other_retry_time

Yes

String

Retry interval upon other failures. The default value is 60 seconds.

use_rate_limit

No

Boolean

Whether to limit the incremental synchronization rate. The default value is 5 MB/s.

true: yes

false: no

rate_limit

No

String

Limited incremental synchronization rate. The default value is 5 MB/s.

use_salve_node

No

Boolean

Whether to use the standby node.

true: Use the standby node to synchronize data.

false: Use the master node to synchronize data.

Response Parameters

Status code: 200

Table 14 Response header parameters

Parameter

Type

Description

X-Request-Id

String

Request ID.

Table 15 Response body parameters

Parameter

Type

Description

cluster_id

String

Professional event stream cluster ID.

job_id

String

Professional event stream job ID.

name

String

Job name. Start with a letter or digit, and only include letters, digits, periods (.), underscores (_), and hyphens (-). (Max. 128 characters)

description

String

Job description. The description can contain a maximum of 256 characters.

scene_type

String

Scenario type.

source_type

String

Source type.

sink_type

String

Target type.

subnet_id

String

Subnet ID.

vpc_id

String

VPC ID.

source_config

EventRouterJobSource object

Source configuration of a professional event stream job.

sink_config

EventRouterJobSink object

Target configuration of a professional event stream job.

advance_config

EventRouterAdvanceConfig object

Advanced configuration of professional event stream jobs.

status

String

Job status.

err_code

String

Error code.

err_message

String

Error cause.

created_time

String

UTC time when the event source is created.

updated_time

String

UTC time when the event source is updated.

sink_status

String

Target connectivity status.

sink_status_info

String

Target connectivity status information.

source_status

String

Source connectivity status.

source_status_info

String

Source connectivity status information.

log_group_id

String

User log group ID for the event stream job.

log_stream_id

String

User log stream ID for the event stream job.

Table 16 EventRouterJobSource

Parameter

Type

Description

source_kafka

KafkaParameters object

Kafka configuration.

source_rocketmq

RocketmqParameters object

RocketMQ configuration.

source_dcs

DcsParameters object

DCS configuration.

Table 17 EventRouterJobSink

Parameter

Type

Description

sink_kafka

KafkaParameters object

Kafka configuration.

sink_rocketmq

RocketmqParameters object

RocketMQ configuration.

sink_dcs

DcsParameters object

DCS configuration.

Table 18 KafkaParameters

Parameter

Type

Description

config_type

String

Instance type.

CLOUD: DMS for Kafka instance.

region

String

Region ID.

project_id

String

Resource space ID.

instance_id

String

Kafka instance ID. This parameter is mandatory when configuration type is set to CLOUD.

cluster_alias

String

Kafka cluster alias.

address

String

Kafka address.

authenticate_mode

String

Authentication mode for Kafka.

sasl_mechanism

String

SASL authentication mechanism.

user_name

String

Username. This parameter is mandatory when SASL_SSL is enabled.

password

String

Password. This parameter is mandatory when SASL_SSL is enabled.

Table 19 RocketmqParameters

Parameter

Type

Description

config_type

String

Instance type.

CLOUD: DMS for RocketMQ instance

region

String

Region ID.

project_id

String

Resource space ID.

instance_id

String

RocketMQ instance ID. This parameter is mandatory when the configuration type is CLOUD.

cluster_alias

String

RocketMQ cluster alias.

address

String

RocketMQ address.

ssl_open

String

Whether to enable RocketMQ SSL.

true: Enable

false: Disable

acl_open

Boolean

Whether to enable RocketMQ ACL.

true: Enable

false: Disable

user_name

String

Username. This parameter is mandatory when ACL is enabled.

password

String

Password. This parameter is mandatory when ACL is enabled.

Table 20 DcsParameters

Parameter

Type

Description

config_type

String

Instance type.

CLOUD: DCS instance

region

String

Region ID.

project_id

String

Resource space ID.

instance_id

String

DCS instance ID. This parameter is mandatory when the configuration type is set to CLOUD.

address

String

DCS address.

password

String

Password.

Table 21 EventRouterAdvanceConfig

Parameter

Type

Description

kafka2kafka

Kafka2KafkaAdvanceConfig object

Kafka synchronization custom configuration.

rocketmq2rocketmq

Rocketmq2RocketmqAdvanceConfig object

RocketMQ synchronization custom configuration.

dcs2dcs

Dcs2DcsAdvanceConfig object

DCS synchronization custom configuration.

Table 22 Kafka2KafkaAdvanceConfig

Parameter

Type

Description

topics

String

Name of the topic to be synchronized. Use commas (,) to separate the topic names for exact match. Either this parameter or topic_regex must be specified.

topic_regex

String

Regular expression of the topic to be synchronized. Either this parameter or the topics parameter must be set.

replica_cnt

Integer

Number of replicas, which cannot be greater than the number of brokers.

seek_to

String

Data synchronization point.

compress_alg

String

Compression algorithm.

none: The compression algorithm is not used.

gzip: Gzip compression algorithm.

snappy: Snappy compression algorithm.

lz4: LZ4 compression algorithm.

zstd: zstd compression algorithm.

is_sync_progress

Boolean

Indicates whether to synchronize the consumption progress.

true: yes

false: no

is_add_header

Boolean

Indicates whether to add the source header.

rename_topic

Boolean

Indicates whether to rename a topic.

topic_mappings

Array of KafkaTopicMapping objects

Topic mapping.

Table 23 KafkaTopicMapping

Parameter

Type

Description

source_topic

String

Source topic.

sink_topic

String

Target topic.

Table 24 Rocketmq2RocketmqAdvanceConfig

Parameter

Type

Description

topics

String

Specifies the name of the topic to be synchronized. Use commas (,) to separate the topic names for exact match. Either this parameter or topic_regex must be specified.

topic_regex

String

Regular expression of the topic to be synchronized. Either this parameter or the topics parameter must be set.

is_sync_progress

Boolean

Indicates whether to sync the consumption progress.

seek_to

String

Data synchronization point. Options: first, last, custom timestamp (long type).

compress_alg

String

Compression algorithm.

none: The compression algorithm is not used.

gzip: Gzip compression algorithm.

lz4: LZ4 compression algorithm.

zstd: zstd compression algorithm.

Table 25 Dcs2DcsAdvanceConfig

Parameter

Type

Description

synchronize_type

String

Synchronization type. Default: Full+Incremental.

connect_retry_time

String

Retry interval upon connection failure. The default value is 60 seconds.

other_retry_time

String

Retry interval upon other failures. The default value is 60 seconds.

use_rate_limit

Boolean

Whether to limit the incremental synchronization rate. The default value is 5 MB/s.

true: yes

false: no

rate_limit

String

Limited incremental synchronization rate. The default value is 5 MB/s.

use_salve_node

Boolean

Whether to use the standby node.

true: Use the standby node to synchronize data.

false: Use the master node to synchronize data.

Example Requests

Create a professional event stream job named xxx.

POST https://{eg_endpoint}/v1/{project_id}/eventrouter/jobs

{
  "cluster_id": "fac59513-6c18-4266-bc81-5d412dfc1925",
  "name": "eventRoute-qapbtr95",
  "scene_type": "DISASTER_BACKUP",
  "description": "",
  "source_config": [
    {
      "source_kafka": [
        {
          "config_type": "CLOUD",
          "region": "roma-dev-1",
          "project_id": "cb13a5c409fe40599271f44bbea5a2ad",
          "instance_id": "cdecf0b0-35b8-461b-a430-d7cd02d3d712",
          "cluster_alias": "aa",
          "address": "21.22.0.58:9092",
          "user_name": "",
          "password": null,
          "authenticate_mode": "PLAINTEXT",
          "sasl_mechanism": ""
        }
      ],
    }
  ],
  "sink_config": [
    {
      "sink_kafka": [
        {
          "config_type": "CLOUD",
          "region": "roma-dev-1",
          "project_id": "cb13a5c409fe40599271f44bbea5a2ad",
          "instance_id": "47896f0e-f90a-4528-b407-78ee00362cc9",
          "cluster_alias": "bb",
          "address": "21.22.0.175:9092",
          "user_name": null,
          "password": null,
          "authenticate_mode": "PLAINTEXT",
          "sasl_mechanism": ""
        }
      ],
    }
  ],
  "advance_config": [
    {
      "kafka2kafka": [
        {
          "topics": "t01",
          "topic_regex": "",
          "task_cnt": null,
          "topic_mappings": [],
          "rename_topic": false,
          "is_add_header": false,
          "is_sync_progress": false,
          "replica_cnt": 1,
          "seek_to": "earliest",
          "compress_alg": "none"
        }
      ]
    }
  ]
}

Example Responses

Status code: 200

Request succeeded.

{
  "cluster_id" : "fac59513-6c18-4266-bc81-5d412dfc1925",
  "job_id" : "fac59513-6c18-4266-bc81-5d412dfc1925",
  "name" : "eventRoute-ifpcv5u4",
  "description" : "",
  "scene_type" : "DISASTER_BACKUP",
  "source_type" : "KAFKA",
  "source_config" : null,
  "sink_type" : "KAFKA",
  "sink_config" : null,
  "advance_config" : null,
  "status" : "CREATING",
  "err_code" : null,
  "err_message" : null,
  "created_time" : "2024-05-16T11:46:07Z",
  "updated_time" : "2024-05-16T11:46:07Z",
  "vpc_id" : "199f632d-34e7-4915-b4d8-1bc4e2824867",
  "subnet_id" : "0d1799e1-116e-4551-92dc-02dd8a860072"
}

Status Codes

Status Code

Description

200

Request succeeded.

Error Codes

See Error Codes.