Updated on 2024-11-29 GMT+08:00

Migrating HDFS Data Using HDFS2ES

Scenario

This section describes how to use HDFS2ES to import formatted data from HDFS to Elasticsearch in MapReduce mode when Elasticsearch is running properly.

Prerequisites

  • The cluster to which data is to be migrated is running properly.
  • MapReduce can run properly in the cluster where data is migrated and can communicate with the Elasticsearch cluster.
  • HDFS data has been prepared.
    Requirements on the HDFS data and directory formats are as follows:
    • The HDFS directory structure is as follows: Fixed Base directory/time/data file (or directory), in which time is the data time by default. The HDFS2ES only reads data that is newly written to the HDFS directory.

      For example, /user/FusionInsight_Elasticsearch/Hdfs2Es/input/testall/20190101 indicates that the data in the directory is the data generated on Jan 1, 2019.

    • The format of the imported data is as follows: Fields are separated by customized delimiters if the data files are written by row.

      Example:

      1    Zhang San    Male    18 
      • The delimiter used between fields can be specified by the FieldSplit attribute in the table*.xml configuration file. In the preceding example, the delimiter is a tab key.
      • The value of FieldSplit in the table*.xml file supports only strings of the char type. For example, a tab key can be configured as /t or Unicode"\u0009".
    • The data can be stored in either one of the following modes:
      1. Full write:

        Each time data is written, the latest input data is considered as full data. After being written to Elasticsearch, historical data is deleted and only the latest data is retained.

      2. Incremental write without another table created:

        Data is appended to the same index.

      3. Incremental write with more tables created:

        Indexes are periodically created according to the data time in the directory, and data of the same time is written to the same index.

        When data is written to Elasticsearch, the time in HDFS directory is the data time by default. Set table names based on the table types in the configuration.

        The table name format is ${TABLE_NAME}_${DATE_TYPE}. ${TABLE_NAME} indicates the specified table name, and ${DATE_TYPE} indicates the time when the table is created (table names can be distinguished based on the number of digits). Examples:

        • ${TABLE_NAME}_2019 indicates that the table is created by year (2019).
        • ${TABLE_NAME}_20191 indicates that the table is created by quarter (the first quarter of 2019).
        • ${TABLE_NAME}_201901 indicates that the table is created by month (January 2019).
        • ${TABLE_NAME}_2019001 indicates that the table is created by week (the first week of 2019).
        • ${TABLE_NAME}_20190102 indicates that the table is created by day (January 2, 2019).
        • For ease of understanding, tables in the proceeding description refer to indexes in Elasticsearch.
        • This tool supports any of the preceding storage modes. Configure the XML file in the tableConf directory based on the site requirements.
  • The cluster client has been installed in a directory, for example, /opt/client. The source /opt/client/bigdata_env command has been executed.

Procedure

Modify configuration files

  1. Replace .xml configuration files.

    Go to the installation directory of the Yarn client, copy the core-site.xml, hdfs-site.xml, mapred-site.xml and yarn-site.xml files in the config directory to the Elasticsearch/tools/elasticsearch-data2es/hdfs2es/basicConf directory of the cluster client.

    The XML files are the tool configuration files required for accessing HDFS/MapReduce.

  2. On Manager, choose Cluster > Name of the desired cluster > Cluster Properties to check whether the authentication mode of the cluster is the security mode.

    • If yes, go to 3.
    • If no, go to 8.

  3. Create a user.

    1. On Manager, choose System > Permission > User > Create.
    2. Enter a username, for example, test. Set User Type to Human-Machine. Add user test to the elasticsearch and supergroup user groups, set the primary group to supergroup, and bind the Manager_administrator role to the user to obtain related permissions. In Ranger-based authentication mode, add the Elasticsearch access permission policy for user test to Ranger. For details, see Adding a Ranger Access Permission Policy for Elasticsearch.
    3. Click OK.

  4. Choose System > Permission > User. Select the user and choose More > Download Authentication Credential. Then select the cluster information, and click OK to download the file.
  5. Upload the user.keytab and krb5.conf files obtained after the decompression to the Elasticsearch/tools/elasticsearch-data2es/hdfs2es/basicConf directory of the cluster client.
  6. Log in to the node where the client is deployed as user root.
  7. Run the following commands to change the value of principal in the jaas.conf file in the basicConf directory of the HDFS2ES tool package:

    cd /opt/client/Elasticsearch/tools/elasticsearch-data2es/hdfs2es/basicConf

    vi jaas.conf

    In the following information, test is the username, which is the same as the value of principal in the esParams.properties file. keyTab must be set to the directory of the user.keytab file on the client.
    Client {
    com.sun.security.auth.module.Krb5LoginModule required
    useKeyTab=true
    keyTab="/opt/client/Elasticsearch/tools/elasticsearch-data2es/hdfs2es/basicConf/user.keytab"
    principal="test@<System domain name>"
    useTicketCache=false
    storeKey=true
    debug=true;
    };

    You can log in to Manager, choose System > Permission > Domain and Mutual Trust, and view the value of Local Domain, which is the current system domain name.

  8. Run the following command to modify the esParams.properties parameter configuration file in the hdfs2es/basicConf directory:

    vi esParams.properties

    A configuration example is as follows:

    # Enter the IP address and port number of the Elasticsearch cluster.
    EsNodes=ip1:port1,ip2:port2
    #Username
    principal=test
    # Cluster in security mode: true; Cluster in normal mode: false
    securityMode=true
    # Elasticsearch in security mode: true; Elasticsearch in normal mode: false (The value is determined by the ELASTICSEARCH_SECURITY_ENABLE parameter configured for Elasticsearch on Manager. If the cluster is in normal mode, the value is false.)
    esSecurityMode=true
    # Size of the thread pool, that is, the number of tables whose data can be written to Elasticsearch at a time. The value depends on the amount of data and the number of indexes to be imported.
    threadNum=10
    # Number of records to be written in batches, that is, the amount of data written in each bulk.
    batch=1000
    # Size of a file processed by a job at a time, in MB. If this parameter is left empty, the default value 0 is used. 0 indicates that all data files in the input directory are processed at a time.
    inputFileSize=

    The inputFileSize parameter is used to control the amount of data processed by a job. For example, if a job has three files to be processed and the sizes of the files are 50 MB, 60 MB, and 70 MB, respectively, and this parameter is set to 100 MB, the job is split into two files for processing. That is, the first two files are processed first, and then the third file is processed. (The size of the first job is 110 MB, and the size of the second job is 70 MB.)

  1. Run the following command to configure the indexes to be written to the Elasticsearch in the .xml files according to the configuration template:

    cd /opt/client/Elasticsearch/tools/elasticsearch-data2es/hdfs2es/tableConf

    cat tableAll.xml

    Save the configuration to the tableConf directory. The configuration file must be in .xml format and there is no limit for the file name. After the configuration is complete, delete the .xml files of all configuration templates.
    • Multiple configurations indicate that there are multiple indexes. Each index runs a MapReduce task. Therefore, set the threadNum value in the esParams.properties file based on the current cluster status. To prevent cluster exceptions, do not submit too many tasks at a time.
    • If periodic scanning is required, determine a proper scanning period based on the actual data volume and cluster running status. If the data volume is large, you are advised to import the data at a time for the first time. After the first import is complete, periodic import is used to prevent excessive historical data and cluster pressure.
    • The value of StatusConfig in the table*.xml file is the latest date of the HDFS directory whose data is imported to Elasticsearch. After data is imported using HDFS2ES, the latest date stored in the StatusConfig directory is automatically updated. Only data of HDFS directories whose dates are later than that stored in the StatusConfig directory can be imported to Elasticsearch. For example, if the date of a subdirectory in the StatusConfig directory is 20190102, data in the HDFS directories whose date is later than 20190102 will be imported. To import data in the 20190101 directory of HDFS, delete the 20190102 directory from the StatusConfig directory.
    • Data that fails to be written is written by time in the corresponding failure directory (HDFS directory in the configuration). The data content is in JSON format that can be written in batches using the bulk command. The data that fails to be written is not automatically deleted. If you do not need to permanently save the data, delete it in a timely manner.
    • The written statistics are recorded in logs. You can view the write status of each piece of data in logs/hdfs2es.log.

  2. Run the following command in the hdfs2es/tableConf directory and enter the statement for creating an index based on the configuration template:

    vi createIndex.properties

    Delete the example statement in the configuration and enter the statement for creating an index based on the site requirements. Ensure that the index configured in the XML file is configured with the corresponding index creation statement.

  3. Run the tool to scan HDFS data and write the data to Elasticsearch.

    • Method 1: Periodic scanning, that is, the tool scans the input directory periodically (unit: second) to check whether new data needs to be written.

      cd /opt/client/Elasticsearch/tools/elasticsearch-data2es/hdfs2es

      sh run.sh basicConf/ tableConf/ true 2000

    • Method 2: One-time scanning, that is, the tool runs only once. (If the date of the directory to be scanned is earlier than that of the StatusConfig directory, the task will not be executed.)

      cd /opt/client/Elasticsearch/tools/elasticsearch-data2es/hdfs2es

      sh run.sh basicConf/ tableConf/ false

  4. Run the curl command to view the index and check whether the data is imported.

    • If the security mode is used, run the following command:

      curl -XGET --tlsv1.2 --negotiate -k -u : "https://IP:PORT/my_store1/_search"

    • Normal mode:

      curl -XGET "http://IP:PORT/my_store1/_search"

    For details about how to use the curl command, see Running curl Commands in Linux.

  5. After data is imported, if data fails to be written, view the data in the specified HDFS directory. To import the failed data to the Elasticsearch cluster again, run the following commands:

    cd /opt/client/Elasticsearch/tools/elasticsearch-data2es/hdfs2es

    java -cp ./basicConf/:./../lib/* com.*.fusioninsight.es.tool.hdfs2es.reload.ReloadJob ${HDFS_DIRECTORY} ${BATCH_SIZE}

    • ${HDFS_DIRECTORY} indicates the data directory of secondary import. Ensure that files to be processed, instead of directories, are at the lower level of the directory.
    • ${BATCH_SIZE} indicates the number of data records using a bulk command.
    • You can view the log information about the import process in the Elasticsearch/tools/elasticsearch-data2es/hdfs2es/logs directory.
    • To use the secondary import function, ensure that the configuration in the basicConf directory has been performed according to the preceding steps.