更新时间:2024-03-27 GMT+08:00

批量设置同步策略

功能介绍

  • 批量设置同步策略,包括冲突策略、过滤DROP Datase、对象同步范围。
  • 设置kafka同步策略

调试

您可以在API Explorer中调试该接口,支持自动认证鉴权。API Explorer可以自动生成SDK代码示例,并提供SDK代码示例调试功能。

接口约束

  • 任务创建成功之后,任务状态为CONFIGURATION,并且与源库和目标库测试连接通过、修改任务接口调用成功后才能调用。
  • 支持设置Kafka同步策略的有:PostgreSQL-Kafka同步,Oracle-Kafka同步,GaussDB-Kafka同步,GaussDB(for MySQL)-Kafka,MySQL-Kafka。
  • GaussDB(for MySQL)-Kafka,MySQL-Kafka支持任务状态为INCRE_TRANSFER_STARTED时修改Kafka策略配置,修改配置后需等任务状态为INCRE_TRANSFER_STARTED时再进行编辑同步对象操作。

URI

POST /v3/{project_id}/jobs/batch-sync-policy

表1 路径参数

参数

是否必选

参数类型

描述

project_id

String

租户在某一Region下的Project ID。

获取方法请参见获取项目ID

请求参数

表2 请求Header参数

参数

是否必选

参数类型

描述

Content-Type

String

指定类型为application/json。

缺省值:application/json

X-Auth-Token

String

从IAM服务获取的用户Token。

X-Language

String

请求语言类型。

缺省值:en-us

取值:

  • en-us
  • zh-cn
表3 请求Body参数

参数

是否必选

参数类型

描述

jobs

Array of objects

批量设置同步策略请求列表。

详情请参见表4

表4 jobs字段数据结构说明

参数

是否必选

参数类型

描述

job_id

String

任务ID。

conflict_policy

String

冲突策略。

取值:

  • ignore:忽略。
  • overwrite:覆盖。
  • stop:报错。

filter_ddl_policy

String

过滤DDL策略。

取值:drop_database

ddl_trans

Boolean

同步增量是否同步DDL。

index_trans

Boolean

同步增量是否同步索引。

topic_policy

String

同步Topic策略,目标库为Kafka时必填。

GaussDB分布式版到Kafka同步取值:

  • 0:集中投递到一个Topic。
  • 1:按库名-schema-表名自动生成Topic名字。
  • 2:按库名自动生成Topic名字。
  • 3:按库名-schema自动生成Topic名字。
  • 4:按库名-dn序号自动生成Topic名字。

GaussDB主备版到Kafka同步、PostgreSQL到Kafka同步取值:

  • 0:集中投递到一个Topic。
  • 1:按库名-schema-表名自动生成Topic名字。
  • 2:按库名自动生成Topic名字。
  • 3:按库名-schema自动生成Topic名字。

Oracle到Kafka同步取值:

  • 0:集中投递到一个Topic。
  • 1:按schema-表名自动生成Topic。
  • 3:按schema自动生成Topic。

MySQL到Kafka同步取值:

  • 0:集中投递到一个Topic。
  • 1:自动生成Topic。

GaussDB(for MySQL)到Kafka同步取值:

  • 0:集中投递到一个Topic。
  • 1:自动生成Topic。

topic

String

Topic名称,topic_policy为0时必填,确保topic已存在。

partition_policy

String

同步到kafka partition策略。目标库为Kafka时必填。

  • 0:按库名.schema.表名的hash值投递到不同Partition。
  • 1:全部投递到Partition 0。
  • 2:按主键的hash值投递到不同Partition。
  • 3:按库名.schema的hash值投递到不同Partition。
  • 4: 按库名.dn序号的hash值投递到不同Partition(仅GaussDB分布式版到Kafka支持选择)。
  • 5:按非主键列的hash值投递到不同Partition。

当topic_policy取0时,可以取0、1、2、3、4、5;当topic_policy取1时,可以取1、2、5;当topic_policy取2时,可以取0、1、3、4;当topic_policy取3时,可以取0、1;当topic_policy取4时,可以取0、1、3。

kafka_data_format

String

投送到kafka的数据格式,取值:

  • json
  • avro
  • json_c

不填默认为json。

说明:
  • MySQL到Kafka、GaussDB(for MySQL)到Kafka同步支持json,json_c。
  • 其他支持json,avro。

topic_name_format

String

Topic名字格式,topic_policy为1,2,3,时需要。

PostgreSQL到Kafka同步、GaussDB主备版到Kafka同步取值:

  • 当topic_policy取1时,Topic名字格式支持database、schema两个变量,其他字符当做常量。分别用$database$代替数据库名,$schema$代替模式名,不填默认为$database$-$schema$
  • 当topic_policy取2时,Topic名字格式支持database一个变量,其他字符都当做常量,不填默认为$database$
  • 当topic_policy取3时,Topic名字格式支持database、schema和tablename三个变量,其他字符当做常量。分别用$database$代替数据库名,$schema$代替模式名,$tablename$代替表名,不填默认为$database$-$schema$-$tablename$

Oracle到Kafka同步取值:

  • 当topic_policy取1时,Topic名字格式支持schema和tablename两个变量,其他字符都当做常量。分别用$schema$代替模式名,$tablename$代替表名。不填默认为$schema$-$tablename$。
  • 当topic_policy取3时,Topic名字格式支持schema变量,其他字符都当做常量。用$schema$代替模式名。不填默认为$schema$

MySQL到Kafka、GaussDB(for MySQL)到Kafka同步取值:

  • 当topic_policy取1时,Topic名字格式支持database和tablename两个变量,其他字符都当做常量。分别用$database$代替数据库名,$tablename$代替表名。不填默认为$database$-$tablename$。

partitions_num

String

Partition个数,取值1-2147483647之间,topic_policy为1,2,3,时需要,不填默认为1。

replication_factor

String

副本个数,取值1-32767之间,topic_policy为1,2,3,时需要,不填默认为1。

is_fill_materialized_view

Boolean

PostgreSQL全量阶段是否填充物化视图,不填默认为false。

export_snapshot

Boolean

PostgreSQL全量阶段是否使用快照模式导出,不填默认为false。

slot_name

String

复制槽名称,gaussdbv5ha-to-kafka主备任务必填。

file_and_position

String

  • MySQL为源通过show master status命令获取源库位点,根据提示分别填写File:Position。例如:mysql-bin.000277:805。文件名只能为1-60个字符且不能包含< > & : " ' / \\ 特殊字符,文件编号只能为3-20个数字,binlog事件位置只能为1-20个数字,且总长度不能超过100个字符。格式为:文件名.文件编号:事件位点。
  • MongoDB为源的任务,任务的源库日志从位点开始获取(含当前启动位点),位点需设置在oplog范围以内。非集群通过db.getReplicationInfo()直接获得oplog范围,集群通过db.watch([], {startAtOperationTime: Timestamp(xx, xx)})命令,将启动位点填在xx处,校验位点是否在oplog范围以内。格式为:timestamp:incre。timestamp和incre均为范围在1~2,147,483,647之间的整数。

gtid_set

String

  • MySQL为源的任务需要填写。通过show master status命令获取源库位点,根据提示填写Executed_Gtid_Set(如果源库为MySQL 5.5版本,则不支持使用同步任务)。
  • 不能包含< > & " ' / \\ 特殊字符和中文,且不能超过2048个字符。

ddl_topic

String

存储DDL的topic,Kafka为目标且ddl_trans为true时必填。

取值:目标库已存在的topic名称,确保topic已存在。

响应参数

状态码: 200

表5 响应Body参数

参数

参数类型

描述

count

Integer

总数。

results

Array of objects

批量设置同步策略返回列表。

详情请参见表6

表6 results字段数据结构说明

参数

参数类型

描述

id

String

任务ID。

status

String

状态。取值:

  • success:成功。
  • failed:失败。

error_code

String

错误码。

error_msg

String

错误信息。

请求示例

  • 批量设置同步任务策略,其中增量冲突策略为忽略,同步增量DDL并过滤drop_database操作
    https://{endpoint}/v3/054ba152d480d55b2f5dc0069e7ddef0/jobs/batch-sync-policy
    
    {
        "jobs": [{
    	"conflict_policy": "ignore",
    	"ddl_trans": true,
    	"filter_ddl_policy": "drop_database",
    	"index_trans": true,
    	"job_id": "19557d51-1ee6-4507-97a6-8f69164jb201"
        }]
    }
  • 批量设置MySQL单增量同步任务策略示例:
    https://{endpoint}/v3/054ba152d480d55b2f5dc0069e7ddef0/jobs/batch-sync-policy 
      
     { 
       "jobs": [ 
         { 
           "conflict_policy": "ignore", 
           "ddl_trans": true, 
           "filter_ddl_policy": "drop_database", 
           "index_trans": true, 
           "job_id": "19557d51-1ee6-4507-97a6-8f69164jb201",
           "file_and_position": "mysql-bin.000019:197", 
           "gtid_set":"e4979f26-4bc3-11ee-b279-fa163ef21d64:1-23" 
         } 
       ] 
     }

响应示例

状态码: 200

OK

{
  "results" : [ {
    "id" : "19557d51-1ee6-4507-97a6-8f69164jb201",
    "status" : "success"
  } ],
  "count" : 1
}

状态码

状态码

描述

200

OK

400

Bad Request

错误码

请参见错误码