Updated on 2023-07-14 GMT+08:00

Restoring Kafka Metadata

Scenario

Kafka data needs to be recovered in the following scenarios: data is modified or deleted unexpectedly and needs to be restored. After an administrator performs critical data adjustment in ZooKeeper, an exception occurs or the operation has not achieved the expected result. All Kafka modules are faulty and become unavailable. Data is migrated to a new cluster.

System administrators can create a recovery task in FusionInsight Manager to recover Kafka data. Only manual restoration tasks are supported.

  • Data restoration can be performed only when the system version is consistent with that during data backup.
  • To restore Kafka metadata when the service is running properly, you are advised to manually back up the latest Kafka metadata before restoration. Otherwise, the Kafka metadata that is generated after the data backup and before the data restoration will be lost.

Impact on the System

  • After the metadata is restored, the data generated after the data backup and before the data restoration is lost.
  • After the metadata is restored, the offset information stored on ZooKeeper by Kafka consumers is rolled back, resulting in repeated consumption.

Prerequisites

  • If you need to restore data from a remote HDFS, prepare a standby cluster. If the active cluster is deployed in security mode and the active and standby clusters are not managed by the same FusionInsight Manager, mutual trust has been configured. For details, see Configuring Cross-Manager Mutual Trust Between Clusters. If the active cluster is deployed in normal mode, no mutual trust is required.
  • Cross-cluster replication has been configured for the active and standby clusters. For details, see Enabling Cross-Cluster Replication.
  • Time is consistent between the active and standby clusters and the NTP services on the active and standby clusters use the same time source.
  • The Kafka service is disabled first, and then enabled upon data restoration.

Procedure

  1. On FusionInsight Manager, choose O&M > Backup and Restoration > Backup Management.
  2. In the Operation column of a specified task in the task list, choose More > View History to view historical backup task execution records.

    In the displayed window, locate a specified success record and click View in the Backup Path column to view the backup path information of the task and find the following information:

    • Backup Object specifies the data source of the backup data.
    • Backup Path specifies the full path where the backup files are saved.

      Select the correct item, and manually copy the full path of backup files in Backup Path.

  3. On FusionInsight Manager, choose O&M > Backup and Restoration > Restoration Management.
  4. Click Create.
  5. Set Task Name to the name of the restoration task.
  6. Select the cluster to be operated from Recovery Object.
  7. In the Restoration Configuration area, select Kafka.
  8. Set Path Type of Kafka to a backup directory type.

    The settings vary according to backup directory types:

    • LocalDir: indicates that the backup files are stored on the local disk of the active management node.

      If you select LocalDir, you also need to set Source Path to select the backup file to be restored, for example, Version_Data source_Task execution time.tar.gz.

    • LocalHDFS: indicates that the backup files are stored in the HDFS directory of the current cluster.
      If you select LocalHDFS, set the following parameters:
      • Source Path: indicates the full path of the backup file in the HDFS, for example, Backup path/Backup task name_Task creation time/Version_Data source_Task execution time.tar.gz.
      • Source NameService Name: indicates the NameService name that corresponds to the backup directory when a restoration task is executed. The default value is hacluster.
    • RemoteHDFS: indicates that the backup files are stored in the HDFS directory of the standby cluster.
      If you select RemoteHDFS, set the following parameters:
      • Source NameService Name: indicates the NameService name of the backup data cluster. You can enter the built-in NameService name of the remote cluster, for example, haclusterX, haclusterX1, haclusterX2, haclusterX3, or haclusterX4. You can also enter a configured NameService name of the remote cluster.
      • IP Mode: indicates the mode of the target IP address. The system automatically selects the IP address mode based on the cluster network type, for example, IPv4 or IPv6.
      • Source NameNode IP Address: indicates the NameNode service plane IP address of the standby cluster, supporting the active node or standby node.
      • Source Path: indicates the full path of HDFS directory for storing backup data of the standby cluster, for example, Backup path/Backup task name_Data source_Task creation time/Version_Data source_Task execution time.tar.gz.
      • Queue Name: indicates the name of the Yarn queue used for backup task execution. The name must be the same as the name of the queue that is running properly in the cluster.
    • NFS: indicates that backup files are stored in NAS using the NFS protocol.
      If you select NFS, set the following parameters:
      • IP Mode: indicates the mode of the target IP address. The system automatically selects the IP address mode based on the cluster network type, for example, IPv4 or IPv6.
      • Server IP Address: indicates the IP address of the NAS server.
      • Source Path: indicates the full path of the backup file on the NAS server, for example, Backup path/Backup task name_Data source_Task creation time/Version_Data source_Task execution time.tar.gz.
    • CIFS: indicates that backup files are stored in NAS using the CIFS protocol.
      If you select CIFS, set the following parameters:
      • IP Mode: indicates the mode of the target IP address. The system automatically selects the IP address mode based on the cluster network type, for example, IPv4 or IPv6.
      • Server IP Address: indicates the IP address of the NAS server.
      • Port: indicates the port number used to connect to the NAS server over the CIFS protocol. The default value is 445.
      • Username: indicates the username set when the CIFS protocol is configured.
      • Password: indicates the password set when the CIFS protocol is configured.
      • Source Path: indicates the full path of the backup file on the NAS server, for example, Backup path/Backup task name_Data source_Task creation time/Version_Data source_Task execution time.tar.gz.
    • OBS: indicates that backup files are stored in OBS.

      If you select OBS, set the following parameters:

      • Source Path: indicates the full OBS path of a backup file, for example, Backup path/Backup task name_Data source_Task creation time/Version_Data source_Task execution time.tar.gz.

        Only MRS 3.1.0 or later supports saving backup files in OBS.

  9. Click OK.
  10. In the restoration task list, locate a created task and click Start in the Operation column to execute the restoration task.

    • After the restoration is successful, the progress bar is in green.
    • After the restoration is successful, the restoration task cannot be executed again.
    • If the restoration task fails during the first execution, rectify the fault and click Retry to execute the task again.
      • If the Kafka service is deleted after the backup is complete, reinstall the Kafka service, restore its metadata, and restart the Kafka service. It is found that the Broker service cannot be started. In this case, the /var/log/Bigdata/kafka/broker/server.log file contains an error. An error example is as follows:
        ERROR Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)kafka.common.InconsistentClusterIdException: The Cluster ID kVSgfurUQFGGpHMTBqBPiw doesn't match stored clusterId Some(0Qftv9yBTAmf2iDPSlIk7g) in meta.properties. The broker is trying to join the wrong cluster. Configured zookeeper.connect may be wrong. at kafka.server.KafkaServer.startup(KafkaServer.scala:220) at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44) at kafka.Kafka$.main(Kafka.scala:84) at kafka.Kafka.main(Kafka.scala)

        Check the value of log.dirs in the Kafka Broker configuration file ${BIGDATA_HOME}/Fusionsight_Current/*Broker/etc/server.properties. The value is the Kafka data directory. Go to the Kafka data directory and change the value 0Qftv9yBTAmf2iDPSlIk7g of cluster.id in meta.properties to kVSgfurUQFGGpHMTBqBPiw (the latest value in the error log).

      • The preceding modification must be performed on each node where Broker is located. After the modification, restart the Kafka service.