El contenido no se encuentra disponible en el idioma seleccionado. Estamos trabajando continuamente para agregar más idiomas. Gracias por su apoyo.

Compute
Elastic Cloud Server
Huawei Cloud Flexus
Bare Metal Server
Auto Scaling
Image Management Service
Dedicated Host
FunctionGraph
Cloud Phone Host
Huawei Cloud EulerOS
Networking
Virtual Private Cloud
Elastic IP
Elastic Load Balance
NAT Gateway
Direct Connect
Virtual Private Network
VPC Endpoint
Cloud Connect
Enterprise Router
Enterprise Switch
Global Accelerator
Management & Governance
Cloud Eye
Identity and Access Management
Cloud Trace Service
Resource Formation Service
Tag Management Service
Log Tank Service
Config
OneAccess
Resource Access Manager
Simple Message Notification
Application Performance Management
Application Operations Management
Organizations
Optimization Advisor
IAM Identity Center
Cloud Operations Center
Resource Governance Center
Migration
Server Migration Service
Object Storage Migration Service
Cloud Data Migration
Migration Center
Cloud Ecosystem
KooGallery
Partner Center
User Support
My Account
Billing Center
Cost Center
Resource Center
Enterprise Management
Service Tickets
HUAWEI CLOUD (International) FAQs
ICP Filing
Support Plans
My Credentials
Customer Operation Capabilities
Partner Support Plans
Professional Services
Analytics
MapReduce Service
Data Lake Insight
CloudTable Service
Cloud Search Service
Data Lake Visualization
Data Ingestion Service
GaussDB(DWS)
DataArts Studio
Data Lake Factory
DataArts Lake Formation
IoT
IoT Device Access
Others
Product Pricing Details
System Permissions
Console Quick Start
Common FAQs
Instructions for Associating with a HUAWEI CLOUD Partner
Message Center
Security & Compliance
Security Technologies and Applications
Web Application Firewall
Host Security Service
Cloud Firewall
SecMaster
Anti-DDoS Service
Data Encryption Workshop
Database Security Service
Cloud Bastion Host
Data Security Center
Cloud Certificate Manager
Edge Security
Managed Threat Detection
Blockchain
Blockchain Service
Web3 Node Engine Service
Media Services
Media Processing Center
Video On Demand
Live
SparkRTC
MetaStudio
Storage
Object Storage Service
Elastic Volume Service
Cloud Backup and Recovery
Storage Disaster Recovery Service
Scalable File Service Turbo
Scalable File Service
Volume Backup Service
Cloud Server Backup Service
Data Express Service
Dedicated Distributed Storage Service
Containers
Cloud Container Engine
SoftWare Repository for Container
Application Service Mesh
Ubiquitous Cloud Native Service
Cloud Container Instance
Databases
Relational Database Service
Document Database Service
Data Admin Service
Data Replication Service
GeminiDB
GaussDB
Distributed Database Middleware
Database and Application Migration UGO
TaurusDB
Middleware
Distributed Cache Service
API Gateway
Distributed Message Service for Kafka
Distributed Message Service for RabbitMQ
Distributed Message Service for RocketMQ
Cloud Service Engine
Multi-Site High Availability Service
EventGrid
Dedicated Cloud
Dedicated Computing Cluster
Business Applications
Workspace
ROMA Connect
Message & SMS
Domain Name Service
Edge Data Center Management
Meeting
AI
Face Recognition Service
Graph Engine Service
Content Moderation
Image Recognition
Optical Character Recognition
ModelArts
ImageSearch
Conversational Bot Service
Speech Interaction Service
Huawei HiLens
Video Intelligent Analysis Service
Developer Tools
SDK Developer Guide
API Request Signing Guide
Terraform
Koo Command Line Interface
Content Delivery & Edge Computing
Content Delivery Network
Intelligent EdgeFabric
CloudPond
Intelligent EdgeCloud
Solutions
SAP Cloud
High Performance Computing
Developer Services
ServiceStage
CodeArts
CodeArts PerfTest
CodeArts Req
CodeArts Pipeline
CodeArts Build
CodeArts Deploy
CodeArts Artifact
CodeArts TestPlan
CodeArts Check
CodeArts Repo
Cloud Application Engine
MacroVerse aPaaS
KooMessage
KooPhone
KooDrive

Development Plan of Accessing a Multi-Component Program

Updated on 2024-12-10 GMT+08:00

Scenario Description

The following example illustrates how to compile MapReduce jobs to visit multiple service components in HDFS, HBase, and Hive, helping users to understand key actions such as authentication and configuration loading.

The logic process of the example is as follows:

Use an HDFS text file as input data.

log1.txt: Input file

YuanJing,male,10
GuoYijun,male,5

Map phase

  1. Obtain one row of the input data and extract the user name.
  2. Query one piece of data from HBase.
  3. Query one piece of data from Hive.
  4. Combine the data queried from HBase and that from Hive as the output of Map.

Reduce phase

  1. Obtain the last piece of data from the Map output.
  2. Export the data to HBase.
  3. Save the data to HDFS.

Data Planning

  1. Create an HDFS data file.
    1. Create a text file named data.txt in the Linux-based HDFS and copy the content of log1.txt to data.txt.
    2. Run the following commands to create the /tmp/examples/multi-components/mapreduce/input/ folder in HDFS, and upload data.txt to it:
      1. On the Linux-based HDFS client, run the hdfs dfs -mkdir -p /tmp/examples/multi-components/mapreduce/input/ command.
      2. On the Linux-based HDFS client, run the hdfs dfs -put data.txt /tmp/examples/multi-components/mapreduce/input/ command.
  2. Create an HBase table and insert data.
    1. Run the hbase shell command on the Linux-based HBase client.
    2. Run the create 'table1', 'cf' command in the HBase shell to create table1 with column family cf.
    3. Run the put 'table1', '1', 'cf:cid', '123' command to insert data whose rowkey is 1, column name is cid, and data value is 123.
    4. Run the quit command to exit.
  3. Create a Hive table and insert data.
    1. Run the beeline command on the Linux-based Hive client.
    2. In the Hive beeline interaction window, run the CREATE TABLE person(name STRING, gender STRING, stayTime INT) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' stored as textfile; command to create data table person with fields name, gender, and stayTime.
    3. In the Hive beeline interaction window, run the LOAD DATA INPATH '/tmp/examples/multi-components/mapreduce/input/' OVERWRITE INTO TABLE person; command to load data files to the person table.
    4. Run the !q command to exit.
  4. The HDFS data directory will be cleared when data is loaded to Hive. Therefore, you need to perform step 1 again.

Function Description

The example is divided into three parts:

  • Collect the name information from HDFS original files, query and combine data of HBase and Hive using the MultiComponentMapper class inherited from the Mapper abstract class.
  • Obtain the last piece of mapped data and output it to HBase and HDFS, using the MultiComponentReducer class inherited from the Reducer abstract class.
  • Use the main method to create a MapReduce job and then submit the MapReduce job to the Hadoop cluster.

Sample Code

The following code snippets are used as an example. For complete codes, see the com.huawei.bigdata.mapreduce.examples.MultiComponentExample class.

Example 1: The MultiComponentMapper class defines the map method of the Mapper abstract class.

private static class MultiComponentMapper extends Mapper<Object, Text, Text, Text> {

    Configuration conf;

    @Override protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {

      String name = "";
      String line = value.toString();

      // Load the configuration file.
        conf = context.getConfiguration();
        
        setJaasInfo("krb5.conf", "jaas.conf");
        LoginUtil.setJaasConf(ZOOKEEPER_DEFAULT_LOGIN_CONTEXT_NAME, "test", KEYTAB);
        LoginUtil.setZookeeperServerPrincipal(ZOOKEEPER_SERVER_PRINCIPAL_KEY, ZOOKEEPER_DEFAULT_SERVER_PRINCIPAL);

        // Prepare for a Hive query.
        // Load parameters.
        Properties clientInfo = null;
        InputStream fileInputStream = null;
        try {
            clientInfo = new Properties();
            File propertiesFile = new File(hiveClientProperties);
            fileInputStream = new FileInputStream(propertiesFile);
            clientInfo.load(fileInputStream);
        } catch (Exception e) {
        } finally {
            if (fileInputStream != null) {
                fileInputStream.close();
            }
        }

        String zkQuorum = clientInfo.getProperty("zk.quorum");
        String zooKeeperNamespace = clientInfo.getProperty("zooKeeperNamespace");
        String serviceDiscoveryMode = clientInfo.getProperty("serviceDiscoveryMode");

        // Create Hive authentication information.
        // Read this carefully:
        // MapReduce can only use Hive through JDBC.
        // Hive will submit another MapReduce Job to execute query.
        // So we run Hive in MapReduce is not recommended.
        final String driver = "org.apache.hive.jdbc.HiveDriver";

        String sql = "select name,sum(stayTime) as "
                + "stayTime from person where name = ? group by name";

        StringBuilder sBuilder = new StringBuilder("jdbc:hive2://").append(zkQuorum).append("/");
        // in map or reduce, use 'auth=delegationToken'
        sBuilder
                .append(";serviceDiscoveryMode=")
                .append(serviceDiscoveryMode)
                .append(";zooKeeperNamespace=")
                .append(zooKeeperNamespace)
                .append(";auth=delegationToken;");

        String url = sBuilder.toString();

        try {
            Class.forName(driver);
            hiveConn = DriverManager.getConnection(url, "", "");
            statement = hiveConn.prepareStatement(sql);
        } catch (Exception e) {
            LOG.error("Init jdbc driver failed.", e);
        }

        // Create an HBase connection.
        try {
            // Create an HBase connection.
            hbaseConn = ConnectionFactory.createConnection(conf);
            // get table
            table = hbaseConn.getTable(TableName.valueOf(HBASE_TABLE_NAME));
        } catch (IOException e) {
            LOG.error("Exception occur when connect to HBase", e);
            throw e;
        }

      if (line.contains("male")) {
        name = line.substring(0, line.indexOf(","));
      }
      // 1. Read HBase data.
      String hbaseData = readHBase();

      // 2. Read Hive data.
      String hiveData = readHive(name);

      // Map outputs a key-value pair, which is a character string combining HBase and Hive data.
      context.write(new Text(name), new Text("hbase:" + hbaseData + ", hive:" + hiveData));
    }

Example 2: Use the readHBase method to read HBase data.

    private String readHBase() {
      String tableName = "table1";
      String columnFamily = "cf";
      String hbaseKey = "1";
      String hbaseValue;

      Configuration hbaseConfig = HBaseConfiguration.create(conf);
      org.apache.hadoop.hbase.client.Connection conn = null;
      try {

        // Create an HBase Get request instance.
        Get get = new Get(hbaseKey.getBytes());
        // Submit a Get request.
        Result result = table.get(get);
        hbaseValue = Bytes.toString(result.getValue(columnFamily.getBytes(), "cid".getBytes()));

        return hbaseValue;

      } catch (IOException e) {
        LOG.warn("Exception occur ", e);
      } finally {
        if (hbaseConn != null) {
          try {
            hbaseConn.close();
          } catch (Exception e1) {
            LOG.error("Failed to close the connection ", e1);
          }
        }
      }

      return "";
    }

Example 3: Use the readHive method to read Hive data.

    private int readHive(String name) {
      
      ResultSet resultSet = null;
      try {
        statement.setString(1, name);
        resultSet = statement.executeQuery();

        if (resultSet.next()) {
          return resultSet.getInt("stayTime"); 
        }       
      } catch (SQLException e) {
        LOG.warn("Exception occur ", e);
      } finally {
        if (null != resultSet) {
          try {
            resultSet.close();
          } catch (SQLException e) {
            // handle exception
          }
        }
        if (null != statement) {
          try {
            statement.close();
          } catch (SQLException e) {
            // handle exception
          }
        }
        if (null != hiveConn) {
          try {
            hiveConn.close();
          } catch (SQLException e) {
            // handle exception
          }
        }
      }

      return 0;
    }

Example 4: The MultiComponentReducer class defines the reduce method of the Reducer abstract class.

    public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
      
        Text finalValue = new Text("");

        setJaasInfo("krb5.conf", "jaas.conf");
        LoginUtil.setJaasConf(ZOOKEEPER_DEFAULT_LOGIN_CONTEXT_NAME, "test", KEYTAB);
        LoginUtil.setZookeeperServerPrincipal(ZOOKEEPER_SERVER_PRINCIPAL_KEY, ZOOKEEPER_DEFAULT_SERVER_PRINCIPAL);

        conf = context.getConfiguration();
        try {
            // Create an HBase connection.
            conn = ConnectionFactory.createConnection(conf);
            // Obtain a table.
            table = conn.getTable(TableName.valueOf(HBASE_TABLE_NAME));
        } catch (IOException e) {
            LOG.error("Exception occur when connect to HBase", e);
            throw e;
        }
      
      for (Text value : values) {
        finalValue = value;
      }

      // Export the result to HBase.
      writeHBase(key.toString(), finalValue.toString());

      // Save the result to HDFS.
      context.write(key, finalValue);
    }

Example 5: Use the writeHBase method to output data to HBase.

    private void writeHBase(String rowKey, String data) {
      
      try {
        // Create an HBase Put request instance.
        List<Put> list = new ArrayList<Put>();
        byte[] row = Bytes.toBytes("1");
        Put put = new Put(row);
        byte[] family = Bytes.toBytes("cf");
        byte[] qualifier = Bytes.toBytes("value");
        byte[] value = Bytes.toBytes(data);
        put.addColumn(family, qualifier, value);
        list.add(put);
        // Execute the Put request.
        table.put(list);
      } catch (IOException e) {
        LOG.warn("Exception occur ", e);
      } finally {
        if (conn != null) {
          try {
            conn.close();
          } catch (Exception e1) {
            LOG.error("Failed to close the connection ", e1);
          }
        }
      }

    }

Example 6: Use the main () method to create a job, configure dependencies and authentication information, and submit the job to the Hadoop cluster.

    public static void main(String[] args) throws Exception {
      
      // Clear required directories.
      MultiComponentExample.cleanupBeforeRun();

      // Query the Hive dependency JAR file.
      Class hiveDriverClass = Class.forName("org.apache.hive.jdbc.HiveDriver");
      Class thriftClass = Class.forName("org.apache.thrift.TException");
      Class thriftCLIClass = Class.forName("org.apache.hive.service.cli.thrift.TCLIService");
      Class hiveConfClass = Class.forName("org.apache.hadoop.hive.conf.HiveConf");
      Class hiveTransClass = Class.forName("org.apache.thrift.transport.HiveTSaslServerTransport");
      Class hiveMetaClass = Class.forName("org.apache.hadoop.hive.metastore.api.MetaException");
      Class hiveShimClass = Class.forName("org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge23");

      // Add a Hive running dependency to the job.
      JarFinderUtil
          .addDependencyJars(config, hiveDriverClass, thriftCLIClass, thriftClass, hiveConfClass, hiveTransClass,
              hiveMetaClass, hiveShimClass);

      // Log in to a cluster with Kerberos authentication enabled.
      if("kerberos".equalsIgnoreCase(config.get("hadoop.security.authentication"))){
      //security mode
          LoginUtil.setJaasConf(ZOOKEEPER_DEFAULT_LOGIN_CONTEXT_NAME, PRINCIPAL, KEYTAB);
          LoginUtil.setZookeeperServerPrincipal(ZOOKEEPER_SERVER_PRINCIPAL_KEY, ZOOKEEPER_DEFAULT_SERVER_PRINCIPAL);
       System.setProperty("java.security.krb5.conf", KRB);
    LoginUtil.login(PRINCIPAL, KEYTAB, KRB, config);
    } 
      // Add a Hive configuration file.
      config.addResource("hive-site.xml");
      // Add an HBase configuration file.
      Configuration conf = HBaseConfiguration.create(config);

      // Instantiate the job.
      Job job = Job.getInstance(conf);
      job.setJarByClass(MultiComponentExample.class);

      // Set the mapper and reducer classes.
      job.setMapperClass(MultiComponentMapper.class);
      job.setReducerClass(MultiComponentReducer.class);

      // Set the input and output paths of the job.
      FileInputFormat.addInputPath(job, new Path(baseDir, INPUT_DIR_NAME + File.separator + "data.txt"));
      FileOutputFormat.setOutputPath(job, new Path(baseDir, OUTPUT_DIR_NAME));

      // Set the output key-value type.
      job.setOutputKeyClass(Text.class);
      job.setOutputValueClass(Text.class);

      // HBase provides a tool class to add the HBase running dependency to the job.
      TableMapReduceUtil.addDependencyJars(job);

      // This operation must be performed in security mode.
      // HBase adds authentication information to the job. The map or reduce task uses the authentication information.
      TableMapReduceUtil.initCredentials(job);

      // Create Hive authentication information.
      Properties clientInfo = null;
      InputStream fileInputStream = null;
      try {
          clientInfo = new Properties();
          File propertiesFile = new File(hiveClientProperties);
          fileInputStream = new FileInputStream(propertiesFile);
          clientInfo.load(fileInputStream);
      } catch (Exception e) {
      } finally {
          if (fileInputStream != null) {
              fileInputStream.close();
          }
      }
      String zkQuorum = clientInfo.getProperty("zk.quorum");// List of ZooKeeper node IP addresses and ports
      String zooKeeperNamespace = clientInfo.getProperty("zooKeeperNamespace");
      String serviceDiscoveryMode = clientInfo.getProperty("serviceDiscoveryMode");
      String principal = clientInfo.getProperty("principal");
      String auth = clientInfo.getProperty("auth");
      String sasl_qop = clientInfo.getProperty("sasl.qop");
      StringBuilder sBuilder = new StringBuilder("jdbc:hive2://").append(zkQuorum).append("/");
      sBuilder.append(";serviceDiscoveryMode=").append(serviceDiscoveryMode).append(";zooKeeperNamespace=")
          .append(zooKeeperNamespace)
          .append(";sasl.qop=")
          .append(sasl_qop)
          .append(";auth=")
          .append(auth)
          .append(";principal=")
          .append(principal)
          .append(";");
      String url = sBuilder.toString();
      Connection connection = DriverManager.getConnection(url, "", "");
      String tokenStr = ((HiveConnection) connection)
          .getDelegationToken(UserGroupInformation.getCurrentUser().getShortUserName(), PRINCIPAL);
      connection.close();
      Token<DelegationTokenIdentifier> hive2Token = new Token<DelegationTokenIdentifier>();
      hive2Token.decodeFromUrlString(tokenStr);
      // Add Hive authentication information to a job.
      job.getCredentials().addToken(new Text("hive.server2.delegation.token"), hive2Token);
      job.getCredentials().addToken(new Text(HiveAuthFactory.HS2_CLIENT_TOKEN), hive2Token);

      // Submit the job.
      System.exit(job.waitForCompletion(true) ? 0 : 1);

    }
NOTE:

Replace all zkQuorum objects in the examples with the actual ZooKeeper cluster node information.

Utilizamos cookies para mejorar nuestro sitio y tu experiencia. Al continuar navegando en nuestro sitio, tú aceptas nuestra política de cookies. Descubre más

Feedback

Feedback

Feedback

0/500

Selected Content

Submit selected content with the feedback