更新时间:2024-08-03 GMT+08:00

MapReduce访问多组件样例代码

功能介绍

主要分为三个部分:

  • 从HDFS原文件中抽取name信息,查询HBase、Hive相关数据,并进行数据拼接,通过类MultiComponentMapper继承Mapper抽象类实现。
  • 获取拼接后的数据取最后一条输出到HBase、HDFS,通过类MultiComponentReducer继承Reducer抽象类实现。
  • main方法提供建立一个MapReduce job,并提交MapReduce作业到Hadoop集群。

代码样例

下面代码片段仅为演示,具体代码参见com.huawei.bigdata.mapreduce.examples.MultiComponentExample类:

样例1:类MultiComponentMapper定义Mapper抽象类的map方法。

  private static class MultiComponentMapper extends Mapper<Object, Text, Text, Text> {

    Configuration conf;

    @Override protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {

      conf = context.getConfiguration();

      String name = "";
      String line = value.toString();
      if (line.contains("male")) {
        // A character string that has been read
        name = line.substring(0, line.indexOf(","));
      }
      // 1. 读取HBase数据
      String hbaseData = readHBase();

      // 2. 读取Hive数据
      String hiveData = readHive(name);

      // Map输出键值对,内容为HBase与Hive数据拼接的字符串
      context.write(new Text(name), new Text("hbase:" + hbaseData + ", hive:" + hiveData));
    }

样例2:HBase数据读取的readHBase方法。

    private String readHBase() {
      String tableName = "table1";
      String columnFamily = "cf";
      String hbaseKey = "1";
      String hbaseValue;

      Configuration hbaseConfig = HBaseConfiguration.create(conf);
      org.apache.hadoop.hbase.client.Connection conn = null;
      try {
        // 建立HBase连接
        conn = ConnectionFactory.createConnection(hbaseConfig);
        // 获取HBase表
        Table table = conn.getTable(TableName.valueOf(tableName));
        // 创建一个HBase Get操作实例
        Get get = new Get(hbaseKey.getBytes());
        // 提交Get请求
        Result result = table.get(get);
        hbaseValue = Bytes.toString(result.getValue(columnFamily.getBytes(), "cid".getBytes()));

        return hbaseValue;

      } catch (IOException e) {
        LOG.warn("Exception occur ", e);
      } finally {
             if (conn != null) {
                    try {
                        conn.close();
                    } catch (Exception e1) {
                        LOG.error("Failed to close the connection ", e1);
                    }
                }


        
      }

      return "";
    }

样例3:Hive数据读取的readHive方法。

    private String readHive(String name) throws IOException {
      //加载配置信息
      Properties clientInfo = null;
      String userdir = System.getProperty("user.dir") + "/";
      InputStream fileInputStream = null;
      try {
        clientInfo = new Properties();
        String hiveclientProp = userdir + "hiveclient.properties";
        File propertiesFile = new File(hiveclientProp);
        fileInputStream = new FileInputStream(propertiesFile);
        clientInfo.load(fileInputStream);
      } catch (Exception e) {
        throw new IOException(e);
      } finally {
        if (fileInputStream != null) {
          fileInputStream.close();
        }
      }
      String zkQuorum = clientInfo.getProperty("zk.quorum");
      String zooKeeperNamespace = clientInfo.getProperty("zooKeeperNamespace");
      String serviceDiscoveryMode = clientInfo.getProperty("serviceDiscoveryMode");
      // 请仔细阅读此内容:
      // MapReduce任务通过JDBC方式访问Hive
      // Hive会将sql查询封装成另一个MapReduce任务并提交
      // 所以不建议在MapReduce作业中调用Hive
      final String driver = "org.apache.hive.jdbc.HiveDriver";
      // 集群zookeeper节点信息

      String sql = "select name,sum(stayTime) as " + "stayTime from person where name = '" + name + "' group by name";

      StringBuilder sBuilder = new StringBuilder("jdbc:hive2://").append(zkQuorum).append("/");
      sBuilder
          .append(";serviceDiscoveryMode=")
          .append(serviceDiscoveryMode)
          .append(";zooKeeperNamespace=")
          .append(zooKeeperNamespace)
          .append(";");
      String url = sBuilder.toString();
      Connection connection = null;
      PreparedStatement statement = null;
      ResultSet resultSet = null;
      try {
        Class.forName(driver);
        connection = DriverManager.getConnection(url, "", "");
        statement = connection.prepareStatement(sql);
        //执行查询
        resultSet = statement.executeQuery();

        if (resultSet.next()) {
          return resultSet.getString(1);
        }
      } catch (ClassNotFoundException e) {
        LOG.warn("Exception occur ", e);
      } catch (SQLException e) {
        LOG.warn("Exception occur ", e);
      } finally {

                if (null != resultSet) {
                    try {
                        resultSet.close();
                    } catch (SQLException e) {
                        // handle exception
                    }
                }
                if (null != statement) {
                    try {
                        statement.close();
                    } catch (SQLException e) {
                        // handle exception
                    }
                }
                if (null != connection) {
                    try {
                        connection.close();
                    } catch (SQLException e) {
                        // handle exception
                    }
                }
      }
      return "";
    }

样例中zkQuorum对象需替换为实际ZooKeeper集群节点信息。

样例4:类MultiComponentReducer定义Reducer抽象类的reduce方法。

  private static class MultiComponentReducer extends Reducer<Text, Text, Text, Text> {
    Configuration conf;

    public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
      conf = context.getConfiguration();

      Text finalValue = new Text("");
      // 取最后一行数据作为最终结果
      for (Text value : values) {
        finalValue = value;
      }

      // 将结果输出到HBase
      writeHBase(key.toString(), finalValue.toString());

      // 将结果保存到HDFS
      context.write(key, finalValue);
    }

样例5:结果输出到HBase的writeHBase方法。

    private void writeHBase(String rowKey, String data) {
      String tableName = "table1";
      String columnFamily = "cf";

      Configuration hbaseConfig = HBaseConfiguration.create(conf);
      org.apache.hadoop.hbase.client.Connection conn = null;
      try {
        // 创建HBase连接
        conn = ConnectionFactory.createConnection(hbaseConfig);
        // 获取HBase表
        Table table = conn.getTable(TableName.valueOf(tableName));

        // 创建一个HBase Put请求实例
        List<Put> list = new ArrayList<Put>();
        byte[] row = Bytes.toBytes("row" + rowKey);
        Put put = new Put(row);
        byte[] family = Bytes.toBytes(columnFamily);
        byte[] qualifier = Bytes.toBytes("value");
        byte[] value = Bytes.toBytes(data);
        put.addColumn(family, qualifier, value);
        list.add(put);
        // 执行Put请求
        table.put(list);
      } catch (IOException e) {
        LOG.warn("Exception occur ", e);
      } finally {
          if (conn != null) {
                    try {
                        conn.close();
                    } catch (Exception e1) {
                        LOG.error("Failed to close the connection ", e1);
                    }
                }
            }
      }

    }

样例6:main()方法创建一个job,配置相关依赖,提交作业到hadoop集群。

public static void main(String[] args) throws Exception {
      String hiveClientProperties = MultiComponentExample.class.getClassLoader().getResource("hiveclient.properties").getPath();
      // 包含配置信息的文件
      String file =  "file://" + hiveClientProperties;
      // 运行时,把配置信息放到HDFS上
      config.set("tmpfiles", file);
      // 提交作业前清理所需目录
      MultiComponentExample.cleanupBeforeRun();

      // 查找Hive运行依赖
      Class hiveDriverClass = Class.forName("org.apache.hive.jdbc.HiveDriver");
      Class thriftClass = Class.forName("org.apache.thrift.TException");
      Class serviceThriftCLIClass = Class.forName("org.apache.hive.service.rpc.thrift.TCLIService");
      Class hiveConfClass = Class.forName("org.apache.hadoop.hive.conf.HiveConf");
      Class hiveTransClass = Class.forName("org.apache.thrift.transport.HiveTSaslServerTransport");
      Class hiveMetaClass = Class.forName("org.apache.hadoop.hive.metastore.api.MetaException");
      Class hiveShimClass = Class.forName("org.apache.hadoop.hive.metastore.security.HadoopThriftAuthBridge23");
      Class thriftCLIClass = Class.forName("org.apache.hive.service.cli.thrift.ThriftCLIService");
      Class thriftType = Class.forName("org.apache.hadoop.hive.serde2.thrift.Type");
      // 添加Hive依赖到作业

      JarFinderUtil.addDependencyJars(config, hiveDriverClass, serviceThriftCLIClass, thriftCLIClass, thriftClass,
                hiveConfClass, hiveTransClass, hiveMetaClass, hiveShimClass, thriftType);
      // 添加Hive配置文件
      config.addResource("hive-site.xml");
      // 添加HBase配置文件
      Configuration conf = HBaseConfiguration.create(config);

      // 实例化作业对象
      Job job = Job.getInstance(conf);
      job.setJarByClass(MultiComponentExample.class);

      // 配置mapper&reducer类
      job.setMapperClass(MultiComponentMapper.class);
      job.setReducerClass(MultiComponentReducer.class);

      // 配置数据输入路径和输出路径
      FileInputFormat.addInputPath(job, new Path(baseDir, INPUT_DIR_NAME + File.separator + "data.txt"));
      FileOutputFormat.setOutputPath(job, new Path(baseDir, OUTPUT_DIR_NAME));

      // 设置输出键值类型
      job.setOutputKeyClass(Text.class);
      job.setOutputValueClass(Text.class);

      // HBase提供工具类添加HBase的运行依赖
      TableMapReduceUtil.addDependencyJars(job);

      // 提交作业
      System.exit(job.waitForCompletion(true) ? 0 : 1);

    }