Updated on 2024-12-01 GMT+08:00

From MySQL to Kafka

Supported Source and Destination Databases

Table 1 Supported databases

Source DB

Destination DB

  • On-premises MySQL databases
  • MySQL databases on an ECS
  • Kafka

Suggestions

  • When a task is being started or in the full synchronization phase, do not perform DDL operations on the source database. Otherwise, the task may be abnormal.
  • To keep data consistency before and after the synchronization, ensure that no data is written to the destination database during the synchronization.
  • The success of database synchronization depends on environment and manual operations. To ensure a smooth synchronization, perform a synchronization trial before you start the synchronization to help you detect and resolve problems in advance.
  • It is recommended that you start a task during off-peak hours to minimize the impact of synchronization on your services. If you have to synchronize data during peak hours, you can select Yes for Flow Control to adjust the synchronization speed.
    • If network bandwidth is not limited, the query rate of the source database increases by about 50 MB/s during full synchronization, and two to four CPUs are occupied.
    • Tables to be synchronized without a primary key may be locked for 3s.
    • When DRS concurrently reads data from a database, it will use about 6 to 10 sessions. The impact of the connections on services must be considered.
    • If you read a table, especially a large table, during the full synchronization, the exclusive lock on that table may be blocked.
  • For more information about the impact of DRS on databases, see How Does DRS Affect the Source and Destination Databases?

Precautions

Before creating a synchronization task, read the following notes:

  • You are advised to create an independent database account for DRS task connection to prevent task failures caused by database account password modification.
  • After changing the account passwords for the source and destination databases, modify the connection information of the DRS task by referring to Modifying Connection Information to prevent automatic retry after a task failure. Automatic retry will lock the database accounts.
Table 2 Precautions

Type

Restrictions

Database permissions

  • The source database user must have the following permissions: SELECT, LOCK TABLES, SHOW VIEW, EVENT, REPLICATION SLAVE, and REPLICATION CLIENT. For a full+incremental task, if the source database version is 8.0.2 or later, the XA_RECOVER_ADMIN permission is required to prevent data loss caused by uncommitted XA transactions during startup or task editing.

Synchronization object

  • During full synchronization, tables, primary key indexes, unique indexes, common indexes, stored procedures, views, and functions can be synchronized, but events and triggers cannot be synchronized. During incremental synchronization, only table data and DDLs can be synchronized.
  • Only MyISAM and InnoDB tables can be synchronized.
  • Full and incremental synchronizations do not support invisible columns. Invisible columns can be synchronized since MySQL 8.0.23. For example:
    CREATE TABLE `test11` (
      `id` int NOT NULL,
      `c1` int DEFAULT NULL /*!80023 INVISIBLE */,
      PRIMARY KEY (`id`));

Source database

  • During data synchronization, do not upgrade the source MySQL database across major versions. Otherwise, data may become inconsistent or the synchronization task may fail (data, table structures, and keywords may cause compatibility changes after the cross-version upgrade). You are advised to create a synchronization task again if the source MySQL database is upgraded across major versions.
  • During the incremental synchronization, the binlog of the source MySQL database must be enabled and use the row-based format.
  • If the storage space is sufficient, store the source database binlog for as long as possible. The recommended retention period is three days. If this period is set to 0, the synchronization may fail.
    • If the source database is an on-premises MySQL database, set expire_logs_days to specify the binlog retention period. Set expire_logs_day to a proper value to ensure that the binlog does not expire before data transfer resumes. This ensures that services can be recovered after interruption.
    • If the source database is an RDS for MySQL instance, set the binlog retention period by following the instructions provided in RDS User Guide.
  • GTID must be enabled for the source database. If GTID is not enabled for the source database, primary/standby switchover is not supported. DRS tasks will be interrupted and cannot be restored during a switchover.
  • During an incremental synchronization, the server_id value of the MySQL source database must be set. If the source database version is MySQL 5.6 or earlier, the server_id value ranges from 2 to 4294967296. If the source database is MySQL 5.7, the server_id value ranges from 1 to 4294967296.
  • During an incremental synchronization, if the session variable character_set_client is set to binary, some data may include garbled characters.
  • The database and table names in the source database cannot contain non-ASCII characters, or the following special characters: .'<`>/\"
  • If there is a table containing fields of the longtext or longblob type in the synchronization object, you are advised to create a DRS task with large specifications. Otherwise, capture OOM may occur.

Destination database

  • The destination database is a Kafka database.
  • You are advised to set auto.create.topics.enable of Kafka to false.

Precautions

  • If the DCC does not support instances with 4 vCPUs and 8 GB memory or higher instance specifications, the synchronization task cannot be created.
  • Objects that have dependencies must be synchronized at the same time to avoid synchronization failure. Common dependencies: tables referenced by views, views referenced by views, views and tables referenced by stored procedures/functions/triggers, and tables referenced by primary and foreign keys
  • If a full synchronization task is suspended or resumed due to an exception, there may be duplicate data in the destination Kafka. Use the identifier field in the Kafka data for data deduplication. (The shard ID must be unique.)
  • During incremental synchronization, do not delete the topic for receiving DRS data in Kafka. Otherwise, the task may fail.
  • Cascade operations cannot be performed on tables with foreign keys. If the foreign key index of a table is a common index, the table structure may fail to be created. You are advised to use a unique index.
  • Binlogs cannot be forcibly deleted. Otherwise, the synchronization task fails.
  • The source database does not support the reset master or reset master to command, which may cause DRS task failures or data inconsistency.
  • If the source MySQL database does not support TLS 1.2 or is a self-built database of an earlier version (earlier than 5.6.46 or between 5.7.0 and 5.7.28), you need to submit an O&M application for testing the SSL connection.
  • Before creating a DRS task, if concurrency control rules of SQL statements are configured for the source database, the DRS task may fail.
  • During the synchronization, do not delete or change the username, password, or permission of the source database, or change the port of the destination database.
  • Data inconsistency may occur when the MyISAM table is modified during synchronization.
  • During synchronization of table-level objects, renaming tables is not recommended.
  • During database name mapping, if the objects to be synchronized contain stored procedures, views, and functions, these objects cannot be synchronized in the full synchronization phase.

Procedure

  1. On the Data Synchronization Management page, click Create Synchronization Task.
  2. On the Create Synchronization Instance page, select a region and project, specify the task name, description, and the synchronization instance details, and click Create Now.

    • Task information description
      Figure 1 Synchronization task information
      Table 3 Task information

      Parameter

      Description

      Region

      The region where the replication instance is deployed. You can change the region.

      Project

      The project corresponds to the current region and can be changed.

      Task Name

      The task name must start with a letter and consist of 4 to 50 characters. It can contain only letters, digits, hyphens (-), and underscores (_).

      Description

      The description consists of a maximum of 256 characters and cannot contain special characters !=<>'&"\

    • Synchronization instance details
      Figure 2 Synchronization instance details
      Table 4 Synchronization instance settings

      Parameter

      Description

      Data Flow

      Choose Self-built to self-built.

      Source DB Engine

      Select MySQL.

      Destination DB Engine

      Select Kafka.

      Network Type

      The Public network is used as an example. Available options: VPC, Public network and VPN or Direct Connect

      DRS Task Type

      Type of the DRS task. The value can be Single-AZ or Dual-AZ.

      • Dual-AZ: This architecture provides HA, improving the reliability of DRS tasks. After a dual-AZ task is created, DRS creates two subtasks, each running in the primary and standby AZs. If the subtask in the primary AZ fails, DRS automatically starts the subtask in the standby AZ to continue the synchronization. This deployment is for scenarios where there is a lot of service data, long-term synchronization is required, and there are strict limits on how much service downtime can be tolerated.
      • Single-AZ: Single-node deployment is used. The synchronization task will be created on only one node to save money. This deployment is for scenarios where there is a small amount of service data, short-term synchronization is required, and there is no requirement on service downtime.

      This option is available only in specific scenarios. For details, see Performing a Switchover for a Dual-AZ Task.

      VPC

      Select an available VPC.

      Synchronization Instance Subnet

      Select the subnet where the synchronization instance is located. You can also click View Subnets to go to the network console to view the subnet where the instance resides.

      By default, the DRS instance and the destination DB instance are in the same subnet. You need to select the subnet where the DRS instance resides, and there are available IP addresses for the subnet. To ensure that the synchronization instance is successfully created, only subnets with DHCP enabled are displayed.

      Security Group

      Select a security group. You can use security group rules to allow or deny access to the instance.

      Synchronization Mode

      The synchronization mode supported by a DRS task. Full+Incremental is used as an example. For details about the underlying working principles for full or incremental synchronization, see Product Architecture and Function Principles.

      • Full+Incremental

        This synchronization mode allows you to synchronize data in real time. After a full synchronization initializes the destination database, an incremental synchronization parses logs to ensure data consistency between the source and destination databases.

      • Incremental

        Through log parsing, incremental data generated on the source database is synchronized to the destination database.

      Specify EIP

      This parameter is available when you select Public network for Network Type. Select an EIP to be bound to the DRS instance. DRS will automatically bind the specified EIP to the DRS instance and unbind the EIP after the task is complete. The number of specified EIPs must be the consistent with that of DB instances.

      If DRS Task Type is set to Dual-AZ, you need to specify the primary and standby IP addresses.

      For details about the data transfer fee generated using a public network, see EIP Price Calculator.

    • Task Type
      Figure 3 Task type
      Table 5 Task type information

      Parameter

      Description

      Specifications

      DRS instance specifications. Different specifications have different performance upper limits. For details, see Real-Time Synchronization.

      NOTE:

      DRS allows you to upgrade specifications only for single-AZ synchronization tasks. Task specifications cannot be downgraded. For details, see Changing Specifications.

      AZ

      Select the AZ where you want to create the DRS task. Selecting the one housing the source or destination database can provide better performance.

      If DRS Task Type is set to Dual-AZ, you can specify Primary AZ and Standby AZ.

      Figure 4 AZ
    • Enterprise Project and Tags
      Figure 5 Enterprise Project and Tags

      Table 6 Enterprise Project and Tags

      Parameter

      Description

      Enterprise Project

      An enterprise project you would like to use to centrally manage your cloud resources and members. Select an enterprise project from the drop-down list. The default project is default.

      For more information about enterprise project, see Enterprise Management User Guide.

      To customize an enterprise project, click Enterprise in the upper right corner of the console. The Enterprise Project Management Service page is displayed. For details, see Creating an Enterprise Project in Enterprise Management User Guide.

      Tags

      • Tags a task. This configuration is optional. Adding tags helps you better identify and manage your tasks. Each task can have up to 20 tags.
      • If your organization has configured tag policies for DRS, add tags to tasks based on the policies. If a tag does not comply with the policies, task creation may fail. Contact your organization administrator to learn more about tag policies.
      • After a task is created, you can view its tag details on the Tags tab. For details, see Tag Management.

    If a task fails to be created, DRS retains the task for three days by default. After three days, the task automatically stops.

  3. On the Configure Source and Destination Databases page, wait until the synchronization instance is created. Then, specify source and destination database information and click Test Connection for both the source and destination databases to check whether they have been connected to the synchronization instance. After the connection tests are successful, select the check box before the agreement and click Next.

    Figure 6 Source database information
    Table 7 Source database settings

    Parameter

    Description

    IP Address or Domain Name

    The IP address or domain name of the source database.

    Port

    The port of the source database. Range: 1 – 65535

    Database Username

    The username for accessing the source database.

    Database Password

    The password for the database username.

    SSL Connection

    SSL encrypts the connections between the source and destination databases. If SSL is enabled, upload the SSL CA root certificate.

    NOTE:
    • The maximum size of a single certificate file that can be uploaded is 500 KB.
    • If SSL is disabled, your data may be at risk.

    The username and password of the source database are encrypted and stored in DRS and will be cleared after the task is deleted.

    Figure 7 Destination database information
    Table 8 Destination database settings

    Parameter

    Description

    IP Address or Domain Name

    The IP address or domain name of the destination database.

    Security Protocol

    Available options: PLAINTEXT, SSL, SASL_PLAINTEXT, and SASL_SSL. For details, see Kafka Authentication.

  4. On the Set Synchronization Task page, select the synchronization policy, objects, and data format, and click Next.

    The parameters on the Set Synchronization Task page vary according to the synchronization mode selected in 2. The Full+Incremental synchronization mode is used as an example in Figure 8.

    Figure 8 Synchronization mode

    Table 9 Synchronization Object

    Parameter

    Description

    Flow Control

    You can choose whether to control the flow. Flow Control takes effect in the full phase only.

    • Yes

      You can customize the maximum synchronization speed. During the full synchronization, the synchronization speed of each task (or each subtask in multi-task mode) does not exceed the value of this parameter.

      In addition, you can set the time range based on your service requirements. The traffic rate setting usually includes setting of a rate limiting time period and a traffic rate value. Flow can be controlled all day or during specific time ranges. The default value is Always. A maximum of 10 time ranges can be set, and they cannot overlap.

      The flow rate must be set based on the service scenario and cannot exceed 9,999 MB/s.

      Figure 9 Flow control
    • No
      The synchronization speed is not limited and the outbound bandwidth of the source database is maximally used, which will increase the read burden on the source database. For example, if the outbound bandwidth of the source database is 100 MB/s and 80% bandwidth is used, the I/O consumption on the source database is 80 MB/s.
      NOTE:
      • The flow control mode takes effect only in the full synchronization phase.
      • You can also change the flow control mode after creating a task. For details, see Modifying the Flow Control Mode.

    Filter Large Field

    Indicates whether to use large field filtering to process special fields (blob, mediumblob, longblob, varbinary, mediumtext and longtext) in a synchronization table.

    • Enable

      You need to set Field Filtering Threshold and Replace With. If the size of a field exceeds the threshold, the value is replaced based on a specified character.

      Note that large field filtering is used to replace the value of a field, not the entire DML record. If a DML record contains many large fields, the size of only some of these fields exceeds the filtering threshold, and the accumulated value of other fields that do not exceed the filtering threshold is greater than the value of request.max.size, when data is written to Kafka, the size of the message body in the destination Kafka database may still exceed the upper limit, resulting in a DRS error.
      Figure 10 Setting large field filtering

    • Disable

      Large fields are not filtered.

    Synchronization Object Type

    You can select Table structure or Data for Synchronization Object Type for full synchronization.

    Synchronize DML

    Select the DML operations to be synchronized. By default, all DML operations are selected.

    If you do not select Delete, DELETE statements in the incremental data of the source database will not be synchronized, which may cause a data inconsistency. As a result, there may be a data conflict or the task may fail.

    Start Point

    This option is available if you select Incremental in 2. The logs of the source database are obtained from the position after the start point during an incremental synchronization.

    Run show master status to obtain the start point of the source database and set File, Position, and Executed_Gtid_Set as prompted.

    For details, see How Do I Specify the Start Point for DRS Incremental Synchronization?

    Topic Synchronization Policy

    Topic synchronization policy. You can select A specific topic or Auto-generated topics.

    Topic

    Select the topic to be synchronized to the destination database. This parameter is available when the topic is set to A specified topic.

    Topic Name Format

    Topic name format. This parameter is available when Topic Synchronization Policy is set to Auto-generated topics.

    Due to Kafka restrictions, a topic name can contain only ASCII characters, periods (.), underscores (_), and hyphens (-). If a topic name exceeds the limit, the topic fails to be created and the task is abnormal.

    If a topic name contains a database object name, ensure that the characters in the object name meet the Kafka topic naming requirements.

    Only variables database and tablename are supported. The other characters must be constants. Replace $database$ with the database name and $tablename$ with the table name.

    For example, if this parameter is set to $database$-$tablename$ and the database name is db1, and the table name is tab1, the topic name is db1-tab1. If DDL statements are synchronized, $tablename$ is empty and the topic name is db1.

    Number of Partitions

    This parameter is available when Topic Synchronization Policy is set to Auto-generated topics.

    The number of partitions of a topic. Each topic can have multiple partitions. More partitions can provide higher throughput but consume more resources. Set the number of partitions based on the actual situation of brokers.

    Replication Factor

    This parameter is available when Topic Synchronization Policy is set to Auto-generated topics.

    Number of copies of a topic. Each topic can have multiple copies, and the copies are placed on different brokers in a cluster. The number of copies cannot exceed the number of brokers. Otherwise, the topic fails to be created.

    Synchronize Topic To

    The policy for synchronizing topics to the Kafka partitions.

    • If topics are synchronized to different partitions by hash value of the database and table names, the performance on a single table query can be improved.
    • If topics are synchronized to partition 0, strong consistency can be obtained but write performance is impacted.
    • Partitions are identified by the hash values of the primary key: This mode applies to scenarios where a single table contains a large amount of data, preventing table data from being written to the same partition, so that consumers can obtain data from different partitions concurrently. Data sequence can be preserved only when the primary key value is not changed. For a table without a primary key, if you select Partitions are identified by the hash values of the primary key, topics are synchronized to different partitions based on the hash values of database_name.table_name.

    Data Format in Kafka

    Select the data format to be delivered from MySQL to Kafka.

    • Avro refers to binary encoded format. This option is available only when Synchronization Mode is set to Incremental in 2. Only whitelisted users can use the Avro option. To use this option, submit a service ticket. In the upper right corner of the management console, choose Service Tickets > Create Service Ticket.
    • JSON: JSON message format, which is easy to interpret but takes up more space.
    • JSON-C: A data format that is compatible with multiple batch and stream computing frameworks.

    For details, see Kafka Message Format.

    Synchronization Object

    The left pane displays the source database objects, and the right pane displays the selected objects. You can select Tables, Import object file, or Databases for Synchronization Object as required.

    • If the synchronization objects in source and destination databases have different names, you can map the source object name to the destination one. For details, see Changing Object Names (Mapping Object Names).
    • For details about how to import an object file, see Importing Synchronization Objects.
      • When importing an object file, you can perform either topic mapping or object name mapping.
      • If you perform topic mapping when importing an object file, different tables can be synchronized to different topics in the destination database. If topic mapping is not specified for an object, the object uses the external topic policy. You can modify the mapping when editing the synchronization object.
    NOTE:
    • To quickly select the desired database objects, you can use the search function.
    • If there are changes made to the source databases or objects, click in the upper right corner to update the objects to be synchronized.
    • If an object name contains spaces, the spaces before and after the object name are not displayed. If there are two or more consecutive spaces in the middle of the object name, only one space is displayed.
    • The name of the selected synchronization object cannot contain spaces.

  5. On the Process Data page, select the columns to be processed.

    • If data processing is not required, click Next.
    • If you need to process columns, set processing rules by referring to Processing Data.
    Figure 11 Processing data

  6. On the Check Task page, check the synchronization task.

    • If any check fails, review the cause and rectify the fault. After the fault is rectified, click Check Again.

      For details about how to handle check failures, see Solutions to Failed Check Items in Data Replication Service User Guide.

    • If all check items are successful, click Next.

      You can proceed to the next step only when all checks are successful. If there are any items that require confirmation, view and confirm the details first before proceeding to the next step.

  7. On the displayed page, specify Start Time, Send Notifications, SMN Topic, Delay Threshold (s), and Stop Abnormal Tasks After, confirm that the configured information is correct, select the check box before the agreement, and click Submit to submit the task.

    Figure 12 Task startup settings

    Table 10 Task startup settings

    Parameter

    Description

    Start Time

    Set Start Time to Start upon task creation or Start at a specified time based on site requirements.

    NOTE:

    After a synchronization task is started, the performance of the source and destination databases may be affected. You are advised to start a synchronization task during off-peak hours.

    Send Notifications

    This parameter is optional. After enabled, select a SMN topic. If the status, latency metric, or data of the migration task is abnormal, DRS will send you a notification.

    SMN Topic

    This parameter is available only after you enable Send Notifications and create a topic on the SMN console and add a subscriber.

    For details, see Simple Message Notification User Guide.

    Delay Threshold (s)

    During an incremental synchronization, a synchronization delay indicates a time difference (in seconds) of synchronization between the source and destination database.

    If the synchronization delay exceeds the threshold you specify, DRS will send alarms to the specified recipients. The value ranges from 0 to 3,600. To avoid repeated alarms caused by the fluctuation of delay, an alarm is sent only after the delay has exceeded the threshold for six minutes.

    NOTE:
    • If the delay threshold is set to 0, no notifications will be sent to the recipient.
    • In the early stages of an incremental synchronization, the synchronization delay is long because a large quantity of data is awaiting synchronization. In this case, no notifications will be sent.
    • Before setting the delay threshold, enable Send Notifications.

    Data Exception Notification

    This parameter is optional. After enabled, DRS will send a notification if the task data is abnormal.

    Stop Abnormal Tasks After

    Number of days after which an abnormal task is automatically stopped. The value must range from 14 to 100. The default value is 14.

    NOTE:
    • You can set this parameter only for pay-per-use tasks.
    • Tasks in the abnormal state are still charged. If tasks remain in the abnormal state for a long time, they cannot be resumed. Abnormal tasks run longer than the period you set (unit: day) will automatically stop to avoid unnecessary fees.

  8. After the task is submitted, you can view and manage it on the Data Synchronization Management page.

    • You can view the task status. For more information about task status, see Task Statuses.
    • You can click in the upper right corner to view the latest task status.
    • By default, DRS retains a task in the Configuration state for three days. After three days, DRS automatically deletes background resources, but the task status remains unchanged. When you configure the task again, DRS applies for resources for the task again. In this case, the IP address of the DRS instance changes.
    • For a public network task, DRS needs to delete background resources after you stop the task. The EIP bound to the task cannot be restored to the Unbound state until background resources are deleted.