Updated on 2022-12-14 GMT+08:00

Importing Data Using Loader

Scenario

This section describes how to import data from external data sources to MRS.

Generally, you can manually manage data import and export jobs on the Loader UI. To use shell scripts to update and run Loader jobs, you must configure the installed Loader client.

Prerequisites

  • You have obtained the service username and password for creating a Loader job.
  • You have the permission to access the HDFS or OBS directories, HBase tables, and data involved in job execution.
  • You have obtained the username and password used by an external data source (SFTP server or relational database).
  • No disk space alarm is reported, and the available disk space is sufficient for importing and exporting data.
  • When using Loader to import data from SFTP, FTP, and HDFS/OBS, ensure that the input paths and input path subdirectories of the external data sources and the name of the files in these directories do not contain any of the special characters /"';:;.
  • If a task requires the Yarn queue function, the user must be authorized with related Yarn queue permission.
  • The user who configures a task must obtain execution permission on the task and obtain usage permission on the related connection of the task.

Procedure

  1. Check whether data is imported from MRS to a relational database for the first time.

    • If yes, go to 2.
    • If no, go to 3.

  2. Modify the permission on the JAR package of the relational database driver.

    1. Log in to the active and standby management nodes of the Loader service, obtain the JAR package of the relational database driver, and save it to the following directory on the active and standby Loader nodes: ${BIGDATA_HOME}/FusionInsight_Porter_8.1.2.2/install/FusionInsight-Sqoop-1.99.3/FusionInsight-Sqoop-1.99.3/server/webapps/loader/WEB-INF/ext-lib.

      The version 8.1.2.2 is used as an example. Replace it with the actual version number.

    2. Run the following command as user root on the active and standby nodes of the Loader service to modify the permission:

      cd ${BIGDATA_HOME}/FusionInsight_Porter_8.1.2.2/install/FusionInsight-Sqoop-1.99.3/FusionInsight-Sqoop-1.99.3/server/webapps/loader/WEB-INF/ext-lib

      chown omm:wheel JAR package name

      chmod 600 JAR package name

    3. Log in to FusionInsight Manager. Choose Cluster > Name of the desired cluster > Services > Loader > More > Restart Service. Enter the password of the administrator to restart the Loader service.

  3. Access the Loader web UI.

    1. Log in to FusionInsight Manager. For details, see Accessing FusionInsight Manager.
    2. Choose Cluster > Name of the desired cluster > Services > Loader.
    3. Click LoaderServer(Node name, Active). The Loader web UI is displayed.
      Figure 1 Loader web UI

  4. Create a Loader data import job. Click New Job. Select the required job type in 1. Basic Information, and click Next.

    1. Set Name to the job name and Type to Import.
    2. Select a connection for Connection. By default, no connection is created. Click Add to create a connection, and then click Test to test whether the connection is available. Click OK when the system displays a message indicates that the test is successful.
      Data sources need to be connected when MRS exchanges data and files with external data sources. Connection indicates the set of connection parameters for connecting to data sources.
      Table 1 Connection configuration parameters

      Connector

      Parameter

      Description

      generic-jdbc-connector

      JDBC Driver Class

      Name of a JDBC driver class

      JDBC Connection String

      JDBC connection string

      Username

      Username for connecting to the database

      Password

      Password for connecting to the database

      JDBC Connection Properties

      JDBC connection attribute. Click Add to manually add connection attributes.

      • Name: connection attribute name
      • Value: connection attribute value

      ftp-connector

      FTP Server IP Address

      IP address of the FTP server

      FTP Server Port

      Port number of the FTP server

      FTP Username

      Username for accessing the FTP server

      FTP Password

      Password for accessing the FTP server

      FTP Mode

      FTP access mode. Possible values are ACTIVE and PASSIVE. If this parameter is not set, FTP access is in passive mode by default.

      FTP Protocol

      FTP protocol.

      • FTP: indicates the FTP protocol.
      • SSL_EXPLICIT: indicates the explicit SSL protocol.
      • SSL_IMPLICIT: indicates the implicit SSL protocol.
      • TLS_EXPLICIT: indicates the explicit TLS protocol.
      • TLS_IMPLICIT: indicates the implicit TLS protocol.

      If this parameter is not set, the FTP protocol is used by default.

      File Name Encoding Type

      File name and file path encoding format supported by the FTP server. If this parameter is not set, the default format UTF-8 is used.

      hdfs-connector

      -

      -

      oracle-connector

      JDBC Connection String

      Connection string for a user to connect to the database

      Username

      Username for connecting to the database

      Password

      Password for connecting to the database

      Connection Properties

      Connection attributes. Click Add to manually add connection attributes.

      • Name: connection attribute name
      • Value: connection attribute value

      mysql-fastpath-connector

      JDBC Connection String

      JDBC connection string

      Username

      Username for connecting to the database

      Password

      Password for connecting to the database

      Connection Properties

      Connection attributes. Click Add to manually add connection attributes.

      • Name: connection attribute name
      • Value: connection attribute value

      sftp-connector

      SFTP Server IP Address

      IP address of the SFTP server

      SFTP Server Port

      Port number of the SFTP server

      SFTP Username

      Username for accessing the SFTP server

      SFTP Password

      Password for accessing the SFTP server

      SFTP Public Key

      Public key of the SFTP server

      oracle-partition-connector

      JDBC Driver Class

      Name of a JDBC driver class

      JDBC Connection String

      JDBC connection string

      Username

      Username for connecting to the database

      Password

      Password for connecting to the database

      Connection Properties

      Connection attributes. Click Add to manually add connection attributes.

      • Name: connection attribute name
      • Value: connection attribute value
    3. Set Group to the group to which the job belongs. By default, there is no created group. Click Add to create a group and click OK.
    4. Queue indicates that Loader tasks are executed in a specified Yarn queue. The default value is root.default, which indicates that the tasks are executed in the default queue.
    5. Set Priority to the priority of Loader tasks in the specified Yarn queue. The value can be VERY_LOW, LOW, NORMAL, HIGH, or VERY_HIGH. The default value is NORMAL.

  5. In the 2. Input Settings area, set the data source and click Next.

    • When creating or editing a Loader job, you can use macro definitions when configuring parameters such as the SFTP path, HDFS/OBS path, and Where condition of SQL. For details, see Using Macro Definitions in Configuration Items.
    • Loader supports common field data types, such as Char, VarChar, Boolean, Binary, SmallInt, Int, BigInt, Decimal, Float, Double, Date, Time, TimeStamp, and String. The supported types may vary according to the data source. For details about the supported types, expand the field data type drop-down list of the corresponding input operator (such as Table Input) on the Loader GUI. Some database-specific fields may not be supported. For example, Loader does not support the CLOB, XMLType, and BLOB fields in Oracle.
    Table 2 List of input configuration parameters

    Source File Type

    Parameter

    Description

    sftp-connector or ftp-connector

    Input Path

    Input path or name of the source file on an SFTP server. If multiple SFTP server IP addresses are configured for the connector, you can set this parameter to multiple input paths separated with semicolons (;). Ensure that the number of input paths is the same as that of SFTP servers configured for the connector.

    File Split Type

    Indicates whether to split source files by file name or size. The files obtained after the splitting are used as the input files of each Map in the MapReduce task for data import. FILE indicates that each Map processes one or more complete source files. The same source file cannot be allocated to different Maps. When the data is saved to the output directory, the directory structure of the input path is retained. SIZE indicates that each Map processes input files of a certain size. A source file can be split into multiple Maps. The number of files saved when data is saved to the output directory is the same as that of Maps. The file name format is import_part_xxxx, where xxxx is a unique random number generated by the system.

    Filter Type

    File filtering criterion. WILCARD indicates that a wildcard is used in filtering, and REGEX indicates that a regular expression is used in filtering. This parameter is used together with Path Filter and File Filter. The default value is WILDCARD.

    Path Filter

    Wildcard or regular expression for filtering the directories in the input path of the source files. This parameter is used when Filter Type is set. Input Path is not used for filtering. If there are multiple filter conditions, use commas (,) to separate them. If the value is empty, the directories are not filtered.

    File Filter

    Wildcard or regular expression for filtering the file names of the source files. This parameter is used when Filter Type is set. If there are multiple filter conditions, use commas (,) to separate them. The value cannot be left blank.

    Encoding Type

    Source file encoding format, for example, UTF-8. This parameter can be set only in text file import.

    Suffix

    File name extension added to a source file after the source file is imported. If this parameter is empty, no file name extension is added to the source file.

    Compression

    Indicates whether to enable compressed transmission when SFTP is used to export data. true indicates that compression is enabled, and false indicates that compression is disabled.

    hdfs-connector

    Input Path

    Input path of source files in HDFS

    Path Filter

    Wildcard for filtering the directories in the input paths of the source files. Input Path is not used for filtering. If there are multiple filter conditions, use commas (,) to separate them. If the value is empty, the directories are not filtered. The regular expression filtering is not supported.

    File Filter

    Wildcard for filtering the file names of the source files. If there are multiple filter conditions, use commas (,) to separate them. The value cannot be left blank. The regular expression filtering is not supported.

    Encoding Type

    Source file encoding format, for example, UTF-8. This parameter can be set only in text file import.

    Suffix

    File name extension added to a source file after the source file is imported. If this parameter is empty, no file name extension is added to the source file.

    generic-jdbc-connector

    Schema Name

    Database schema name. This parameter exists in the Table name schema.

    Table Name

    Database table name. This parameter exists in the Table name schema.

    SQL Statement

    SQL statement for the Loader to query data to be imported in Table SQL statement mode. The SQL statement requires the query condition WHERE ${CONDITIONS}. Without this condition, the SQL statement cannot be run properly, for example, select * from TABLE WHERE A>B and ${CONDITIONS}. If Table column names is set, the column specified by Table column names will replace the column queried in the SQL statement. This parameter cannot be set when Schema name or Table name is set.

    Table Column Names

    Table columns whose content is to be imported by Loader. Use commas (,) to separate multiple fields.

    Partition Column Name

    Database table column based on which to-be-imported data is determined. This parameter is used for partitioning in a Map job. You are advised to configure the primary key field.

    NOTE:
    • A partition column must have an index. If no index exists, do not specify a partition column. If a partition column without an index is specified, the database server disk I/O will be busy, the access of other services to the database will be affected, and the import will take a long period.
    • In multiple fields with indexes, select the field that has the most discrete value as the partition column. A partition column that is not discrete may result in load imbalance when multiple MapReduce jobs are imported.
    • The sorting rules of partition columns must be case-sensitive. Otherwise, data may be lost during data import.
    • You are not advised to select fields of the float or double type for the partition column. Otherwise, the records containing the minimum and maximum values of the partition column may fail to be imported due to precision issues.

    Nulls in Partition Column

    Indicates whether to process records whose values are null in database table columns. If the value is true, the data whose value is null in the partition column is processed. If the value is false, the data whose value is null in the partition column is not processed.

    Whether to Specify a Partition Column

    Indicates whether to specify a partition column.

    oracle-connector

    Table Name

    Table name.

    Column Name

    Column name.

    Query Condition

    Query condition in an SQL statement

    Splitting Mode

    Data splitting mode. The options are ROWID and PARTITION.

    Table Partition Name

    Name of a table partition. Use commas (,) to separate the names of different partitions.

    Data Block Allocation Mode

    Allocation method of data after being split.

    Read Size

    Amount of data to be read each time.

    mysql-fastpath-connector

    Schema Name

    Database schema name.

    Table Name

    Database table name.

    Query Condition

    Query condition of a specified table.

    Partition Column Name

    Database table column based on which to-be-imported data is determined. This parameter is used for partitioning in a Map job. You are advised to configure the primary key field.

    NOTE:
    • A partition column must have an index. If no index exists, do not specify a partition column. If a partition column without an index is specified, the database server disk I/O will be busy, the access of other services to the database will be affected, and the import will take a long period.
    • In multiple fields with indexes, select the field that has the most discrete value as the partition column. A partition column that is not discrete may result in load imbalance when multiple MapReduce jobs are imported.
    • You are not advised to select fields of the float or double type for the partition column. Otherwise, the records containing the minimum and maximum values of the partition column may fail to be imported due to precision issues.

    Nulls in Partition Column

    Indicates whether to process records whose values are null in database table columns. If the value is true, the data whose value is null in the partition column is processed. If the value is false, the data whose value is null in the partition column is not processed.

    Whether to Specify a Partition Column

    Indicates whether to specify a partition column.

    oracle-partition-connector

    Schema Name

    Database schema name.

    Table Name

    Partition table name.

    Query Condition

    Query condition in an SQL statement.

    Table Column Names

    Table columns whose content is to be imported by Loader. Use commas (,) to separate multiple fields.

  6. In the 3. Convert area, set the conversion operations during data transmission.

    Check whether source data values in the data operation job created by the Loader can be directly used without conversion, including upper and lower case conversion, cutting, merging, and separation.

    • If yes, click Next.
    • If no, perform 6.a to 6.d.
    1. No created conversion step exists by default. Drag an example conversion step on the left to the edit box to create a new conversion step.
    2. Conversion step types must be selected based on service requirements. A complete conversion process includes the following types:
      1. Input type. Only one conversion step can be added. This parameter is mandatory if the task involves HBase or relational databases.
      2. Conversion type, which is an intermediate conversion step. You can add one or more conversion types or do not add any conversion type.
      3. Output type. Only one output type can be added in the last conversion step. This parameter is mandatory if the task involves HBase or relational databases.
        Table 3 Example list

        Type

        Description

        Input Type

        • CSV File Input: CSV file input step for configuring separators to generate multiple fields.
        • Fixed-Width File Input: Text file input step for configuring the length of characters or bytes to be truncated to generate multiple fields.
        • Table Input: relational data input step for configuring specified columns in the database as input fields.
        • HBase Input: HBase table input step for configuring the column definition of an HBase table to a specified field.
        • HTML Input: HTML web page data input step for obtaining the target data of the HTML web page file to the specified field.
        • Hive Input: Hive table input step for defining columns in a Hive table to specified fields.
        • Spark Input: Spark SQL table input step for defining columns in the SparkSQL table to specified fields. Only SparkSQL can access Hive data.

        Conversion Type

        • Long Integer Time Conversion: Configure the conversion between a long integer value and a date.
        • Null Value Conversion: Configure a specified value to replace the null value.
        • Random Value Conversion: Configure new value-added fields as random data fields.
        • Adding a Constant Field: Add a constant to directly generate a constant field.
        • Concatenation and Conversion: Concatenate fields, connect generated fields using connection characters, and convert new fields.
        • Separator Conversion: Configure the generated fields to be separated by separators and convert new fields.
        • Modulo Conversion: Configure the generated fields to be converted into new fields through modulo operation.
        • Cutting Character String: Truncate a generated field based on a specified position to generate a new field.
        • EL Operation Conversion: Calculate field values. Currently, the following operators are supported: md5sum, sha1sum, sha256sum, and sha512sum.
        • Character String Case Conversion: Configure the generated fields to be converted to new fields through case conversion.
        • Reverse String Conversion: Reverse the generated fields to generate new fields.
        • Character String Space Clearing Conversion: Configure the generated fields to clear spaces and convert them to new fields.
        • Row Filtering Conversion: Configure logical conditions to filter out rows that contain triggering conditions.
        • Update Fields: Update the value of a specified field when certain conditions are met.

        Output Type

        • File Output: Configure generated fields to be connected by separators and exported to a file.
        • Table Output: Configure the mapping between output fields and specified columns in the database.
        • HBase Output: Configure the generated fields to the columns of the HBase table.
        • Hive Output: Configure generated fields to a column of a Hive table.
        • Spark Output: Configure generated fields to the columns of SparkSQL tables. Only SparkSQL can access Hive data.

        The edit box allows you to perform the following tasks:

        • Rename: Rename an example.
        • Edit: Edit the step conversion by referring to 6.c.
        • Delete: Delete an example.

          You can also use the shortcut key Del to delete the example.

    3. Click Edit to edit the step conversion information and configure fields and data.

      For details about how to set parameters in the step conversion information, see Operator Help.

      • When sftp-connector or ftp-connector is used to import data, the time type field in the original data must be set to a string during the data conversion so that the time can be accurate to millisecond for data import. The data that has more precise time than millisecond will not be imported.
      • When generic-jdbc-connector is used to import data, it is recommended that the data length of the CHAR or VARCHAR type field be set to -1 during data conversion so that all data can be imported. This prevents the data from being truncated when the actual data length is too long.
      • When generic-jdbc-connector is used to import data, the time type field in the original data must be set to a time type value during the data conversion so that the time can be accurate to second for data import. The data that has more precise time than second will not be imported.
      • When data is imported to a Hive partitioned table, Hive does not scan the newly imported data by default. You need to run the following HQL statement to repair the table so that the newly imported data can be queried:

        MSCK REPAIR TABLE table_name;

      If the conversion step is incorrectly configured, the source data cannot be converted and become dirty data. The dirty data marking rules are as follows:

      • In any input type step, all data becomes dirty data if the number of fields contained in the original data is less than that of configured fields, or the field values in the original data do not match the configured field type.
      • In the CSV File Input step, Validate input field checks whether the input field matches the value type. If the input field and value type of a row do not match, the row is skipped and becomes dirty data.
      • In the Fixed Width File Input step, Fixed Length specifies the field splitting length. If the length is greater than the length of the original field value, data splitting fails and the current row becomes dirty data.
      • In the HBase Input step, if the HBase table name specified by HBase Table Name is incorrect, or no primary key column is configured for Primary Key, all data becomes dirty data.
      • In any conversion step, rows whose conversion fails become dirty data. For example, in the Split Conversion step, if the number of generated fields is less than that of configured fields, or the original data cannot be converted to the String type, the current row becomes dirty data.
      • In the Filter Row Conversion step, rows filtered by filter criteria become dirty data.
      • In the Modulo Conversion step, if the original field value is NULL, the current row becomes dirty data.
      • For jobs that import data to Hive/SparkSQL tables, you must configure the Hive conversion step.
    4. Click Next.

  7. In the 4. Output Settings area, set the destination location for saving data and click Save to save the job or click Save and Run to save and run the job.

    Table 4 List of output configuration parameters

    Storage Type

    Parameter

    Description

    HDFS

    File Type

    Compression format of files imported to HDFS. Select a format from the drop-down list. If you select NONE or do not set this parameter, data is not compressed.

    Compression Format

    Compression format of files imported to HDFS. Select a format from the drop-down list. If you select NONE or do not set this parameter, data is not compressed.

    Output Directory

    Directory for storing data imported into HDFS.

    Operation

    Action during data import. When all data is to be imported from the input path to the destination path, the data is stored in a temporary directory and then copied from the temporary directory to the destination path. After the data is imported successfully, the data is deleted from the temporary directory. One of the following actions can be taken when duplicate file names exist during data transfer:

    • OVERRIDE: overrides the old file.
    • RENAME: renames as new file. For a file without an extension, a string is added to the file name as the extension; for a file with an extension, a string is added to the extension. The string is unique.
    • APPEND: adds the content of the new file to the end of the old file. This action only adds content regardless of whether the file can be used. For example, a text file can be used after this operation, while a compressed file cannot.
    • IGNORE: reserves the old file and does not copy the new file.
    • ERROR: stops the task and reports an error if duplicate file names exist. Transferred files are imported successfully, while files that have duplicate names and files that are not transferred fail to be imported.

    Extractors

    Number of Maps that are started at the same time in a MapReduce task of a data configuration operation. This parameter cannot be set when Extractor Size is set. The value must be less than or equal to 3000.

    Extractor Size

    Size of data processed by Maps that are started in a MapReduce task of a data configuration operation. The unit is MB. The value must be greater than or equal to 100. The recommended value is 1000. This parameter cannot be set when Extractors is set. When a relational database connector is used, Extractor Size is unavailable. You need to set Extractors.

    HBASE_BULKLOAD

    HBase Instance

    HBase service instance that Loader selects from all available HBase service instances in the cluster. If the selected HBase service instance is not added to the cluster, the HBase job cannot be run properly.

    Clear data before import

    Indicates whether to clear data in the original table before importing data. The value true indicates that the clearing operation is performed, and the value false indicates that the clearing operation is not performed. If you do not set this parameter, the original table is not cleared by default.

    Extractors

    Number of Maps that are started at the same time in a MapReduce task of a data configuration operation. The value must be less than or equal to 3000.

    Extractor Size

    HBase does not support this parameter. Please set Extractors.

    HBASE_PUTLIST

    HBase Instance

    HBase service instance that Loader selects from all available HBase service instances in the cluster. If the selected HBase service instance is not added to the cluster, the HBase job cannot be run properly.

    Extractors

    Number of Maps that are started at the same time in a MapReduce task of a data configuration operation. The value must be less than or equal to 3000.

    Extractor Size

    HBase does not support this parameter. Please set Extractors.

    HIVE

    Output Directory

    Directory for storing data imported into Hive.

    Extractors

    Number of Maps that are started at the same time in a MapReduce task of a data configuration operation. This parameter cannot be set when Extractor Size is set. The value must be less than or equal to 3000.

    Extractor Size

    Size of data processed by Maps that are started in a MapReduce task of a data configuration operation. The unit is MB. The value must be greater than or equal to 100. The recommended value is 1000. This parameter cannot be set when Extractors is set. When a relational database connector is used, Extractor Size is unavailable. You need to set Extractors.

    SPARK

    Output Directory

    Only SparkSQL is supported to access Hive data. You can specify the directory for storing data imported to Hive.

    Extractors

    Number of Maps that are started at the same time in a MapReduce task of a data configuration operation. This parameter cannot be set when Extractor Size is set. The value must be less than or equal to 3000.

    Extractor Size

    Size of data processed by Maps that are started in a MapReduce task of a data configuration operation. The unit is MB. The value must be greater than or equal to 100. The recommended value is 1000. This parameter cannot be set when Extractors is set. When a relational database connector is used, Extractor Size is unavailable. You need to set Extractors.

  8. On the Loader web UI, view, start, stop, copy, delete, edit, or view historical information about created jobs.

    Figure 2 Viewing Loader jobs