Updated on 2022-08-16 GMT+08:00

Example Code

Function

The functions of the sample project are as follows:

  • Collect the name information from HDFS source files, query and combine data of HBase and Hive using the MultiComponentMapper class inherited from the Mapper abstract class.
  • Obtain the last piece of mapped data and output to HBase and HDFS, using the MultiComponentMapper class inherited from the Reducer abstract class.
  • The main function creates a MapReduce job and submits the MapReduce job to Hadoop clusters.

Example Code

For details about code, see the class com.huawei.bigdata.mapreduce.examples.MultiComponentExampl.

Example code of the map function used by MultiComponentMapper class to define the Mapper abstract class.

  private static class MultiComponentMapper extends Mapper<Object, Text, Text, Text> {

    Configuration conf;

    @Override protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {

      conf = context.getConfiguration();

      // for components that depend on Zookeeper, need provide the conf of jaas and krb5
      // Notice, no need to login again here, will use the credentials in main function
      String krb5 = "krb5.conf";
      String jaas = "jaas_mr.conf";
      // These files are uploaded at main function
      File jaasFile = new File(jaas);
      File krb5File = new File(krb5);
      System.setProperty("java.security.auth.login.config", jaasFile.getCanonicalPath());
      System.setProperty("java.security.krb5.conf", krb5File.getCanonicalPath());
      System.setProperty("zookeeper.sasl.client", "true");

      LOG.info("UGI :" + UserGroupInformation.getCurrentUser());

      String name = "";
      String line = value.toString();
      if (line.contains("male")) {
        // A character string that has been read
        name = line.substring(0, line.indexOf(","));
      }
      // 1. read from HBase
      String hbaseData = readHBase();

      // 2. read from Hive
      String hiveData = readHive(name);

      // The Map task outputs a key-value pair.
      context.write(new Text(name), new Text("hbase:" + hbaseData + ", hive:" + hiveData));
    }

Example code of the readHBase function.

    private String readHBase() {
      String tableName = "table1";
      String columnFamily = "cf";
      String hbaseKey = "1";
      String hbaseValue;

      Configuration hbaseConfig = HBaseConfiguration.create(conf);
      org.apache.hadoop.hbase.client.Connection conn = null;
      try {
        // Create a HBase connection
        conn = ConnectionFactory.createConnection(hbaseConfig);
        // get table
        Table table = conn.getTable(TableName.valueOf(tableName));
        // Instantiate a Get object.
        Get get = new Get(hbaseKey.getBytes());
        // Submit a get request.
        Result result = table.get(get);
        hbaseValue = Bytes.toString(result.getValue(columnFamily.getBytes(), "cid".getBytes()));

        return hbaseValue;

      } catch (IOException e) {
        LOG.warn("Exception occur ", e);
      } finally {
          if (conn != null) {
                    try {
                        conn.close();
                    } catch (Exception e1) {
                        LOG.error("Failed to close the connection ", e1);
                    }
                  }
       
      }

      return "";
    }

Example of the readHive function.

    private String readHive(String name) throws IOException  {
      //Load the configuration file
      Properties clientInfo = null;
      String userdir = System.getProperty("user.dir") + "/";
      InputStream fileInputStream = null;
      try {
        clientInfo = new Properties();
        String hiveclientProp = userdir + "hiveclient.properties";
        File propertiesFile = new File(hiveclientProp);
        fileInputStream = new FileInputStream(propertiesFile);
        clientInfo.load(fileInputStream);
      } catch (Exception e) {
        throw new IOException(e);
      } finally {
        if (fileInputStream != null) {
          fileInputStream.close();
        }
      }
      String zkQuorum = clientInfo.getProperty("zk.quorum");
      String zooKeeperNamespace = clientInfo.getProperty("zooKeeperNamespace");
      String serviceDiscoveryMode = clientInfo.getProperty("serviceDiscoveryMode");
      // Read this carefully:
      // MapReduce can only use Hive through JDBC.
      // Hive will submit another MapReduce Job to execute query.
      // So we run Hive in MapReduce is not recommended.
      final String driver = "org.apache.hive.jdbc.HiveDriver";

      String sql = "select name,sum(stayTime) as " + "stayTime from person where name = '" + name + "' group by name";

      StringBuilder sBuilder = new StringBuilder("jdbc:hive2://").append(zkQuorum).append("/");
      // in map or reduce, use 'auth=delegationToken'
      sBuilder
         .append(";serviceDiscoveryMode=")

          .append(serviceDiscoveryMode)
          .append(";zooKeeperNamespace=")
          .append(zooKeeperNamespace)
          .append(";auth=delegationToken;");
      String url = sBuilder.toString();
      Connection connection = null;
      PreparedStatement statement = null;
      ResultSet resultSet = null;
      try {
        Class.forName(driver);
        connection = DriverManager.getConnection(url, "", "");
        statement = connection.prepareStatement(sql);
        resultSet = statement.executeQuery();

        if (resultSet.next()) {
          return resultSet.getString(1);
        }
      } catch (ClassNotFoundException e) {
        LOG.warn("Exception occur ", e);
      } catch (SQLException e) {
        LOG.warn("Exception occur ", e);
      } finally {
          if (null != resultSet) {
                    try {
                        resultSet.close();
                    } catch (SQLException e) {
                        // handle exception
                    }
                }
                if (null != statement) {
                    try {
                        statement.close();
                    } catch (SQLException e) {
                        // handle exception
                    }
                }
                if (null != connection) {
                    try {
                        connection.close();
                    } catch (SQLException e) {
                        // handle exception
                    }
                }
        
      }

      return "";
    }

Example code of the reduce function used by MultiComponentReducer class to define the Reducer abstract class.

  private static class MultiComponentReducer extends Reducer<Text, Text, Text, Text> {
    Configuration conf;

    public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
      conf = context.getConfiguration();

      // for components that depend on Zookeeper, need provide the conf of jaas and krb5
      // Notice, no need to login again here, will use the credentials in main function
      String krb5 = "krb5.conf";
      String jaas = "jaas_mr.conf";
      // These files are uploaded at main function
      File jaasFile = new File(jaas);
      File krb5File = new File(krb5);
      System.setProperty("java.security.auth.login.config", jaasFile.getCanonicalPath());
      System.setProperty("java.security.krb5.conf", krb5File.getCanonicalPath());
      System.setProperty("zookeeper.sasl.client", "true");

      Text finalValue = new Text("");
      // just pick the last value as the data to save
      for (Text value : values) {
        finalValue = value;
      }

      // write data to HBase
      writeHBase(key.toString(), finalValue.toString());

      // save result to HDFS
      context.write(key, finalValue);
    }

Example of the writeHBase function.

    private void writeHBase(String rowKey, String data) {
      String tableName = "table1";
      String columnFamily = "cf";

      try {
        LOG.info("UGI read :" + UserGroupInformation.getCurrentUser());
      } catch (IOException e1) {
        // handler exception
      }

      Configuration hbaseConfig = HBaseConfiguration.create(conf);
      org.apache.hadoop.hbase.client.Connection conn = null;
      try {
        // Create a HBase connection
        conn = ConnectionFactory.createConnection(hbaseConfig);
        // get table
        Table table = conn.getTable(TableName.valueOf(tableName));

        // create a Put to HBase
        List<Put> list = new ArrayList<Put>();
        byte[] row = Bytes.toBytes("row" + rowKey);
        Put put = new Put(row);
        byte[] family = Bytes.toBytes(columnFamily);
        byte[] qualifier = Bytes.toBytes("value");
        byte[] value = Bytes.toBytes(data);
        put.addColumn(family, qualifier, value);
        list.add(put);
        // execute Put
        table.put(list);
      } catch (IOException e) {
        LOG.warn("Exception occur ", e);
      } finally {
           if (conn != null) {
                    try {
                        conn.close();
                    } catch (Exception e1) {
                        LOG.error("Failed to close the connection ", e1);
                    }
                }
      }

    }

Example code: the main() function creates a job, configures the dependency and permission, and submits the job to Hadoop clusters.

    public static void main(String[] args) throws Exception {
      //Load the hiveclient.properties configuration file
        Properties clientInfo = null;
        try {
            clientInfo = new Properties();
            clientInfo.load(MultiComponentExample.class.getClassLoader().getResourceAsStream("hiveclient.properties"));
        } catch (Exception e) {
            throw new IOException(e);
        } finally {

        }
      String zkQuorum = clientInfo.getProperty("zk.quorum");
      String zooKeeperNamespace = clientInfo.getProperty("zooKeeperNamespace");
      String serviceDiscoveryMode = clientInfo.getProperty("serviceDiscoveryMode");
      String principal = clientInfo.getProperty("principal");
      String auth = clientInfo.getProperty("auth");
      String sasl_qop = clientInfo.getProperty("sasl.qop");
      String hbaseKeytab = MultiComponentExample.class.getClassLoader().getResource("user.keytab").getPath();
      String hbaseJaas = MultiComponentExample.class.getClassLoader().getResource("jaas_mr.conf").getPath();
      String hiveClientProperties = MultiComponentExample.class.getClassLoader().getResource("hiveclient.properties").getPath();
      // a list of files, separated by comma
      String files = "file://" + KEYTAB + "," + "file://" + KRB + "," + "file://" + JAAS;
      files = files + "," + "file://" + hbaseKeytab;
      files = files + "," + "file://" + hbaseJaas;
      files = files + "," + "file://" + hiveClientProperties;
      // this setting will ask Job upload these files to HDFS
      config.set("tmpfiles", files);

      // clean control files before job submit
      MultiComponentExample.cleanupBeforeRun();

      // Security login
      LoginUtil.setJaasConf(ZOOKEEPER_DEFAULT_LOGIN_CONTEXT_NAME, PRINCIPAL, hbaseKeytab);
      LoginUtil.setZookeeperServerPrincipal(ZOOKEEPER_DEFAULT_SERVER_PRINCIPAL);
      LoginUtil.login(PRINCIPAL, KEYTAB, KRB, config);

      // find dependency jars for hive
      Class hiveDriverClass = Class.forName("org.apache.hive.jdbc.HiveDriver");
      Class thriftClass = Class.forName("org.apache.thrift.TException");
      Class serviceThriftCLIClass = Class.forName("org.apache.hive.service.rpc.thrift.TCLIService");
      Class hiveConfClass = Class.forName("org.apache.hadoop.hive.conf.HiveConf");
      Class hiveTransClass = Class.forName("org.apache.thrift.transport.HiveTSaslServerTransport");
      Class hiveMetaClass = Class.forName("org.apache.hadoop.hive.metastore.api.MetaException");
      Class hiveShimClass = Class.forName("org.apache.hadoop.hive.metastore.security.HadoopThriftAuthBridge23");
      Class thriftCLIClass = Class.forName("org.apache.hive.service.cli.thrift.ThriftCLIService");

      // add dependency jars to Job
      JarFinderUtil.addDependencyJars(config, hiveDriverClass, serviceThriftCLIClass, thriftCLIClass, thriftClass,
                hiveConfClass, hiveTransClass, hiveMetaClass, hiveShimClass);

      // add hive config file
      config.addResource("hive-site.xml");
      // add hbase config file
      Configuration conf = HBaseConfiguration.create(config);

      // Initialize the job object.
      Job job = Job.getInstance(conf);
      job.setJarByClass(MultiComponentExample.class);

      // set mapper&reducer class
      job.setMapperClass(MultiComponentMapper.class);
      job.setReducerClass(MultiComponentReducer.class);

      //set job input&output
      FileInputFormat.addInputPath(job, new Path(baseDir, INPUT_DIR_NAME + File.separator + "data.txt"));
      FileOutputFormat.setOutputPath(job, new Path(baseDir, OUTPUT_DIR_NAME));

      // Set the output type of the job.
      job.setOutputKeyClass(Text.class);
      job.setOutputValueClass(Text.class);

      // HBase use this utility class to add dependecy jars of hbase to MR job
      TableMapReduceUtil.addDependencyJars(job);

      // this is mandatory when access to security HBase cluster
      // HBase add security credentials to Job, and will use in map and reduce
      TableMapReduceUtil.initCredentials(job);

      // create Hive security credentials

      StringBuilder sBuilder = new StringBuilder("jdbc:hive2://").append(zkQuorum).append("/");
      sBuilder.append(";serviceDiscoveryMode=").append(serviceDiscoveryMode).append(";zooKeeperNamespace=")
          .append(zooKeeperNamespace)
          .append(";sasl.qop=")
          .append(sasl_qop)
          .append(";auth=")
          .append(auth)
          .append(";principal=")
          .append(principal)
          .append(";");
      String url = sBuilder.toString();
      Connection connection = DriverManager.getConnection(url, "", "");
      String tokenStr = ((HiveConnection) connection)
          .getDelegationToken(UserGroupInformation.getCurrentUser().getShortUserName(), PRINCIPAL);
      connection.close();
      Token<DelegationTokenIdentifier> hive2Token = new Token<DelegationTokenIdentifier>();
      hive2Token.decodeFromUrlString(tokenStr);
      // add Hive security credentials to Job
      job.getCredentials().addToken(new Text("hive.server2.delegation.token"), hive2Token);
      job.getCredentials().addToken(new Text(HiveAuthConstants.HS2_CLIENT_TOKEN), hive2Token);

      // Submit the job to a remote environment for execution.
      System.exit(job.waitForCompletion(true) ? 0 : 1);

    }

Replace all the zkQuorum objects with the actual information about the ZooKeeper cluster nodes.