Updated on 2024-11-08 GMT+08:00

Example Typical Scenario: Migrating Data from Kafka to DLI

This section describes how to use CDM's data synchronization to migrate data from MRS Kafka to DLI.

Prerequisites

  • You have created a DLI SQL queue. For how to create a DLI queue, see Creating a Queue.

    Set Type to For SQL when buying a queue.

  • You have created an MRS security cluster that contains the Kafka component. For how to create an MRS cluster, see Purchasing a Custom Cluster.
    • In this example, the version of the MRS cluster is 3.1.0.
    • You have enabled Kerberos authentication for the MRS cluster.
  • You have created a CDM cluster. For how to create a CDM cluster, see Creating a CDM Cluster.
    • To connect the cluster to an on-premises database as the destination data source, you can use either Internet or Direct Connect. If the Internet is used, make sure that an EIP has been bound to the CDM cluster, the security group of CDM allows outbound traffic from the host where the on-premises data source is located, the host where the data source is located can access the Internet, and the connection port has been enabled in the firewall rules.
    • To successfully connect to cloud services MRS and GaussDB (DWS) as data sources, the following requirements must be met:

      i. If the CDM cluster and the cloud service are in different regions, they must be connected through either the Internet or Direct Connect. If the Internet is used, make sure that an EIP has been bound to the CDM cluster, the host where the data source is located can access the Internet, and the port has been enabled in the firewall rules.

      ii. If the CDM cluster and the cloud service are in the same region, instances in the same VPC, subnet, and security group can communicate with each other by default. If the CDM cluster and the cloud service are in the same VPC but in different subnets or security groups, you must configure routing and security group rules.

      For how to configure routing rules, see Configure routes. For how to configure security group rules, see Security Group Configuration Examples.

      iii. The cloud service instance and the CDM cluster belong to the same enterprise project. If they do not, change the enterprise project of the workspace.

    In this example, the VPC, subnet, and security group of the CDM cluster match those of the MRS cluster.

Step 1: Prepare Data

  • Create a Kafka topic for the MRS cluster and send messages to the topic.
    1. Log in to MRS Manager by referring to Accessing FusionInsight Manager.
    2. Click System and choose Permission > User. On the displayed page, set the following parameters:
      1. Username: Enter a username. In this example, enter testuser2.
      2. User Type: Select Human-Machine.
      3. Password and Confirm Password: Enter the current user's password and confirm it by entering it again.
      4. User Group and Primary Group: Select kafkaadmin.
      5. Role: Select Manager_viewer.
        Figure 1 Creating a Kafka user
    3. On MRS Manager, choose Cluster > Name of the desired cluster > Service > ZooKeeper > Instance. On the displayed page, obtain the IP address of the ZooKeeper instance.
    4. On MRS Manager, choose Cluster > Name of the desired cluster > Service > Kafka > Instance. On the displayed page, obtain the IP address of the Kafka instance.
    5. Download and install the Kafka client by referring to Installing an MRS Client. For example, the current Hive client is installed in the /opt/kafkaclient directory of the active MRS node.
    6. Go to the client installation directory as user root.

      Example: cd /opt/kafkaclient

    7. Configure environment variables.

      source bigdata_env

    8. Authenticate the user created in 2 as Kerberos authentication has been enabled for the cluster:

      kinit Username in 2

      Example: kinit testuser2

    9. Create a Kafka topic named kafkatopic.
      kafka-topics.sh --create --zookeeper IP address 1 of the node where the ZooKeeper role is:2181,IP address 2 of the node where the ZooKeeper role is:2181,IP address 3 of the node where the ZooKeeper role is:2181/kafka --replication-factor 1 --partitions 1 --topic kafkatopic

      In this command, IP address of the node where the ZooKeeper role is is the IP address of the ZooKeeper instance obtained in 3.

    10. Send a test message to kafkatopic.
      kafka-console-producer.sh --broker-list IP address 1 of the node where the Kafka role is::21007;IP address 2 of the node where the Kafka role is::21007;IP address 3 of the node where the Kafka role is::21007 --topic kafkatopic --producer.config /opt/kafkaclient/Kafka/kafka/config/producer.properties

      In this command, IP address of the node where the Kafka role is is the IP address of the Kafka instance obtained in 4.

      The content of the test message is as follows:
      {"PageViews":5, "UserID":"4324182021466249494", "Duration":146,"Sign":-1}
  • Create a database and table on DLI.
    1. Log in to the DLI management console. In the navigation pane on the left, choose SQL Editor. On the displayed page, set Engine to Spark and Queues to the created SQL queue.

      Create a database, for example, testdb. For the syntax to create a DLI database, see Creating a Database.

      create database testdb;
    2. Create a table in the database. For the table creation syntax, see Creating a DLI Table Using the DataSource Syntax.
      CREATE TABLE testdlitable(value STRING);

Step 2: Migrate Data

  1. Create a CDM connection.
    1. Create a connection to link CDM to MRS Kafka.
      1. Log in to the CDM console. In the navigation pane on the left, choose Cluster Management. On the displayed page, locate the created CDM cluster and click Job Management in the Operation column.
      2. On the Job Management page, click the Links tab and click Create Link. On the displayed page, select MRS Kafka and click Next.
        Figure 2 Selecting the MRS Kafka connector
      3. Configure the connection as follows:
        Table 1 MRS Kafka connection configurations

        Parameter

        Value

        Name

        Name of the MRS Kafka data source, for example, source_kafka.

        Manager IP

        Manager IP address, which is automatically filled in after you click Select next to the text box and select the MRS Kafka cluster.

        Username

        Name of the MRS Kafka user created in 2.

        Password

        Password of the MRS Kafka user.

        Authentication Method

        Set it to KERBEROS if Kerberos authentication is enabled for the MRS cluster or to SIMPLE if the MRS cluster is a common cluster.

        In this example, set it to KERBEROS.

        For more details about the parameters, see Link to Kafka.
        Figure 3 Configuring the MRS Kafka connection
      4. Click Save.
    2. Create a connection to link CDM to DLI.
      1. Log in to the CDM console. In the navigation pane on the left, choose Cluster Management. On the displayed page, locate the created CDM cluster and click Job Management in the Operation column.
      2. On the Job Management page, click the Links tab, and click Create Link. On the displayed page, select Data Lake Insight and click Next.
        Figure 4 Selecting the DLI connector
      3. Set the connection parameters. For details about parameter settings, see Link to DLI.
        Figure 5 Setting connection parameters
      4. Click Save.
  2. Create a CDM migration job.
    1. Log in to the CDM console. In the navigation pane on the left, choose Cluster Management. On the displayed page, locate the created CDM cluster and click Job Management in the Operation column.
    2. On the Job Management page, click the Table/File Migration tab. On the displayed tab, click Create Job.
    3. On the Create Job page, set job parameters.
      Figure 6 Setting CDM job parameters
      1. Job Name: Name of the data migration job, for example, test.
      2. Set the parameters in the Source Job Configuration area as follows:
        Table 2 Source job parameters

        Parameter

        Value

        Source Link Name

        Select the name of the data source created in 1.a.

        Topics

        Name of the topics you want to migrate to DLI. You can select one or more topics. Example: kafkatopic.

        Data Format

        Select the message format as needed. In this example, CDC (DRS_JSON) is selected, indicating that the source data will be parsed in DRS_JSON format.

        Offset Parameter

        Initial offset when data is pulled from Kafka. In this example, select EARLIEST. The options are:

        • Latest: Maximum offset, meaning that the latest data will be pulled.
        • Earliest: Minimum offset, meaning that the earliest data will be pulled.
        • Submitted: The submitted data is pulled.
        • Time Range: Data within a time range is pulled.

        Permanent Running

        Whether a job runs permanently. In this example, set it to No.

        Pull Data Timeout

        Maximum minutes allowed for a continuous data pulling. In this example, set it to 15.

        Wait Data Timeout

        (Optional) Maximum seconds allowed for waiting data reading. In this example, leave this parameter blank.

        Consumer Group ID

        Consumer group ID. The default Kafka message group ID example-group1 is used.

        For details about parameter settings, see From Apache Kafka.

      3. Set the parameters in the Destination Job Configuration area as follows:
        Table 3 Destination job parameters

        Parameter

        Value

        Destination Link Name

        Select the DLI data source connection created in 1.b.

        Resource Queue

        Select a created DLI SQL queue.

        Database Name

        Select a created DLI database. In this example, database testdb created in Create a database and table on DLI is selected.

        Table Name

        Select the name of a table in the database. In this example, table testdlitable created in Create a database and table on DLI is selected.

        Clear data before import

        Whether to clear data in the destination table before data import. In this example, set it to No.

        If set to Yes, data in the destination table will be cleared before the task is started.

        For details about parameter settings, see To DLI.

  3. Click Next. The Map Field page is displayed. CDM automatically matches the source and destination fields.
    • You can drag any unmatched fields to match them.
    • If the type is automatically created at the migration destination, you need to configure the type and name of each field.
    • CDM allows for field conversion during migration. For details, see Field Conversion.
    Figure 7 Field mapping
  4. Click Next and set task parameters. Typically, retain the default values for all parameters.

    In this step, you can configure the following optional features:

    • Retry Upon Failure: If the job fails to be executed, you can determine whether to automatically retry. Retain the default value Never.
    • Group: Select the group to which the job belongs. The default group is DEFAULT. On the Job Management page, jobs can be displayed, started, or exported by group.
    • Scheduled Execution: For how to configure scheduled execution, see Scheduling Job Execution. Retain the default value No.
    • Concurrent Extractors: Enter the number of extractors to be concurrently executed. Retain the default value 1.
    • Write Dirty Data: Set this parameter if data that fails to be processed or filtered out during job execution needs to be written to OBS. Before writing dirty data, create an OBS link. You can view the data on OBS later. Retain the default value No, meaning dirty data is not recorded.
  5. Click Save and Run. On the Job Management page, you can view the job execution progress and result.
    Figure 8 Job progress and execution result

Step 3: Query Results

Once the migration job is complete, check whether the Kafka table data has been migrated to the testdlitable table. Specifically, do as follows: Log in to the DLI management console and choose SQL Editor. On the displayed page, set Engine to Spark, Queues to the created SQL queue, and Databases to the database created in 1. Then, execute the following query statement:
select * from testdlitable;