Updated on 2024-10-23 GMT+08:00

Solution to Slow Processing

If the links of a real-time processing migration job are too slow (the monitoring metric does not meet the performance specifications provided by the real-time processing migration job), consider the following points:

  • Too slow writing at the destination
  • Too slow extraction at the source
  • Other (contact technical support)

If the write speed at the destination is too slow, the extraction at the source is also slow. If the link speed is too slow, check the write speed at the destination first, and then check the upstream.

Slow Writing at the Destination

  1. Check whether the data load at the destination has reached the upper limit of the data source, such as GaussDB(DWS) and Doris. Check whether the CPU, memory, and I/O are in high load.
  2. If the load at the destination has not reached the upper limit, increase the number of concurrent jobs to accelerate writing.
  3. If the operation in step 2 cannot effectively improve performance, check the performance of the source based on Slow Extraction at the Source.
  4. If the source performance is normal, optimize parameters based on Destination optimization.
  5. If the preceding operations cannot accelerate the job, contact technical support.

Slow Extraction at the Source

  1. Check whether the load of the source data, such as MySQL, Oracle, and SQL Server, has reached the upper limit. Check whether the CPU, memory, and I/O are overloaded.
  2. If the source load has not reached the upper limit, and if the source is a full and incremental MySQL, Oracle, SQL Server, PostgreSQL, or OpenGauss migration job in the full extraction phase, or is a Kafka or Hudi migration job with slow extraction, increase the number of concurrent jobs to improve the concurrent extraction rate.

    Relational data such as MySQL, Oracle, SQL Server, PostgreSQL, and OpenGauss is extracted in single concurrency mode during incremental migration. Increasing the concurrency does not improve the extraction performance.

  3. If step 2 cannot improve performance, optimize parameters by referring to Source Optimization.
  4. If the preceding operations cannot accelerate the job, contact technical support.

Source Optimization

  • MySQL optimization
    Table 1 Full migration

    Parameter

    Type

    Default Value

    Description

    scan.incremental.snapshot.backfill.skip

    boolean

    true

    Whether to skip reading binlog data. The default value is true. Skipping reading binlog data can effectively reduce memory usage. Note that this function provides only at-least-once guarantee.

    scan.incremental.snapshot.chunk.size

    int

    50000

    Shard size, which determines the maximum number of data records in a single shard and the number of shards in the full migration phase. The larger the shard size, the more data records in a single shard, and the smaller the number of shards.

    If a table has a large number of records, the job will be divided into multiple shards, occupying too much memory. To avoid this issue, reduce the number of records in the table.

    If scan.incremental.snapshot.backfill.skip is false, the real-time processing migration job caches data of a single shard. In this case, a larger shard occupies more memory, causing memory overflow. To avoid this issue, reduce the shard size.

    scan.snapshot.fetch.size

    int

    1024

    Maximum number of data records that can be extracted from the MySQL database in a single request during full data extraction. Increasing the number of requests can reduce the number of requests to the MySQL database and improve performance.

    debezium.max.queue.size

    int

    8192

    Number of data cache queues. The default value is 8192. If the size of a single data record in the source table is too large (for example, 1 MB), memory overflow occurs when too much data is cached. You can reduce the value.

    debezium.max.queue.size.in.bytes

    int

    0

    Size of the data cache queue. The default value is 0, indicating that the cache queue is calculated based on the number of data records instead of the data size. If debezium.max.queue.size cannot effectively limit memory usage, you can explicitly set this parameter to limit the size of cached data.

    jdbc.properties.socketTimeout

    int

    300000

    Timeout interval of the socket for connecting to the MySQL database in the full migration phase. The default value is 5 minutes. If the MySQL database is overloaded, and the SocketTimeout exception occurs for a job, you can increase the value of this parameter.

    jdbc.properties.connectTimeout

    int

    60000

    Timeout interval of the connection to the MySQL database in the full migration phase. The default value is 1 minute. If the MySQL database is overloaded, and the ConnectTimeout exception occurs for a job, you can increase the value of this parameter.

    Table 2 Incremental migration

    Parameter

    Type

    Default Value

    Description

    debezium.max.queue.size

    int

    8192

    Number of data cache queues. The default value is 8192. If the size of a single data record in the source table is too large (for example, 1 MB), memory overflow occurs when too much data is cached. You can reduce the value.

    debezium.max.queue.size.in.bytes

    int

    0

    Size of the data cache queue. The default value is 0, indicating that the cache queue is calculated based on the number of data records instead of the data size. If debezium.max.queue.size cannot effectively limit memory usage, you can explicitly set this parameter to limit the size of cached data.

  • Oracle optimization
    Table 3 Full migration

    Parameter

    Type

    Default Value

    Description

    scan.incremental.snapshot.backfill.skip

    boolean

    true

    Whether to skip reading redo log data. The default value is true. The initialization of LogMiner in the Oracle database is slow. Skipping reading redo log data can effectively improve the full extraction performance and reduce memory usage. Note that this function provides only at-least-once guarantee.

    scan.incremental.snapshot.chunk.size

    int

    50000

    Shard size, which determines the maximum number of data records in a single shard and the number of shards in the full migration phase. The larger the shard size, the more data records in a single shard, and the smaller the number of shards.

    If a table has a large number of records, the job will be divided into multiple shards, occupying too much memory. To avoid this issue, reduce the number of records in the table.

    If scan.incremental.snapshot.backfill.skip is false, the real-time processing migration job caches data of a single shard. In this case, a larger shard occupies more memory, causing memory overflow. To avoid this issue, reduce the shard size.

    scan.snapshot.fetch.size

    int

    1024

    Maximum number of data records that can be extracted from the MySQL database in a single request during full data extraction. Increasing the number of requests can reduce the number of requests to the Oracle database and improve performance.

Destination optimization

  • Hudi optimization
    Real-time processing migration jobs use Flink-based real-time links. Hudi bloom consumes Flink memory. Bucket indexes are recommended. The following table lists the common optimization parameters for real-time processing migration jobs.
    Table 4 Full migration

    Parameter

    Type

    Default Value

    Description

    hoodie.sink.flush.tasks

    int

    1

    Number of concurrent Hudi flush tasks. The default value is 1, indicating sequential writing. If Hudi commits a large number of FleGroups (for example, a large amount of historical data of the source table is updated), you can increase the value of this parameter.

    FileGroup data flushed by a single thread = Number of FileGroups committed at a time/Number of concurrent jobs

    If the number of FileGroups flushed by a single thread is less than or equal to 5, the recommended value for this parameter is 2.

    If the number of FileGroups flushed by a single thread is less than or equal to 10, the recommended value for this parameter is 5.

    If the number of FileGroups flushed by a single thread is less than or equal to 25, the recommended value for this parameter is 10.

    If the number of FileGroups flushed by a single thread is less than or equal to 50, the recommended value for this parameter is 20.

    If the number of FileGroups flushed by a single thread is greater than 50, the recommended value for this parameter is 30.

    The larger the number of concurrent flush tasks, the higher the memory during flushing. Adjust the value based on the memory monitoring of the real-time processing migration job.

    hoodie.context.flatmap.parallelism

    int

    1

    When Hudi performs commit operations, it scans partitions. By default, one scan operation is performed at a time. If a large number of partitions are involved in a commit operation, you can increase the value of this parameter to accelerate the commit operation.

    If the number of partitions committed at a time is less than or equal to 10, the recommended value for this parameter is 5.

    If the number of partitions committed at a time is less than or equal to 25, the recommended value for this parameter is 10.

    If the number of partitions committed at a time is less than or equal to 50, the recommended value for this parameter is 20.

    If the number of partitions committed at a time is greater than 50, the recommended value for this parameter is 30.

    compaction.async.enabled

    boolean

    true

    Whether to enable compaction. The default value is true, indicating that compaction is enabled for Hudi. The compaction operation affects the writing performance of real-time tasks. If you use an external compaction operation, you can set this parameter to false to disable compaction for real-time processing migration jobs.

    compaction.delta_commits

    int

    40

    Frequency at which compaction requests are generated for real-time processing migration jobs. The default value is 40, indicating that a compaction request is generated every 40 commits. Lowering the compaction request generation frequency reduces the compaction frequency and improves job performance. If the incremental Hudi data is small, you can increase the value of this parameter.

  • DWS optimization

    The following table lists the main DWS destination parameters.

    Table 5 Full migration

    Parameter

    Type

    Default Value

    Description

    Write Mode

    enum

    UPSERT MODE

    DWS write mode, which can be set in the destination configuration.

    UPSERT MODE: batch update

    COPY MODE: DWS-dedicated high-performance batch import

    COPY MODE is recommended for real-time processing migration jobs.

    Maximum Data Volume for Batch Write

    int

    50000

    Maximum number of data records that can be written to DWS at a time. You can set this parameter in the destination configuration.

    If Maximum Data Volume for Batch Write or Scheduled Batch Write Interval is met, data will be written.

    Increasing the number of data records written at a time can reduce the number of DWS requests but may increase the duration of a single request and the amount of cached data, which affects memory usage. Adjust the value based on the DWS specifications and load.

    Scheduled Batch Write Interval

    int

    3

    Interval for writing data to DWS. You can set this parameter in the destination configuration.

    If the interval is reached, cached data will be written.

    Increasing the value of this parameter increases the number of data records cached in a single write, but it takes a longer time for DWS data to become visible.

    sink.buffer-flush.max-size

    int

    512

    Amount of the data that can be written to DWS at a time. The default value is 512MB. You can set this parameter in the advanced settings of the destination configuration.

    If the size of cached data reaches the upper limit, data will be written.

    Similar to Maximum Data Volume for Batch Write, increasing the amount of data written at a time can reduce the number of DWS requests but may increase the duration of a single request and the amount of cached data, which affects memory usage. Adjust the value based on the DWS specifications and load.