更新时间:2024-08-03 GMT+08:00

MapReduce访问多组件样例程序开发思路

场景说明

该样例以MapReduce访问HDFS、HBase、Hive为例,介绍如何编写MapReduce作业访问多个服务组件。帮助用户理解认证、配置加载等关键使用方式。

该样例逻辑过程如下。

以HDFS文本文件为输入数据

log1.txt:数据输入文件

YuanJing,male,10
GuoYijun,male,5

Map阶段

  1. 获取输入数据的一行并提取姓名信息。
  2. 查询HBase一条数据。
  3. 查询Hive一条数据。
  4. 将HBase查询结果与Hive查询结果进行拼接作为Map输出。

Reduce阶段

  1. 获取Map输出中的最后一条数据。
  2. 将数据输出到HBase。
  3. 将数据保存到HDFS。

数据规划

  1. 创建HDFS数据文件。
    1. 在Linux系统上新建文本文件,将log1.txt中的内容复制保存到data.txt。
    2. 在HDFS上创建一个文件夹,“/tmp/examples/multi-components/mapreduce/input/”,并上传data.txt到此目录,命令如下。
      1. 在Linux系统HDFS客户端使用命令hdfs dfs -mkdir -p /tmp/examples/multi-components/mapreduce/input/
      2. 在Linux系统HDFS客户端使用命令hdfs dfs -put data.txt /tmp/examples/multi-components/mapreduce/input/
  2. 创建HBase表并插入数据。
    1. 在Linux系统HBase客户端使用命令hbase shell
    2. 在HBase shell交互窗口创建数据表table1,该表有一个列族cf,使用命令create 'table1', 'cf'
    3. 插入一条rowkey为1、列名为cid、数据值为123的数据,使用命令put 'table1', '1', 'cf:cid', '123'
    4. 执行命令quit退出。
  3. 创建Hive表并载入数据。
    1. 在Linux系统Hive客户端使用命令beeline
    2. 在Hive beeline交互窗口创建数据表person,该表有3个字段:name/gender/stayTime,使用命令CREATE TABLE person(name STRING, gender STRING, stayTime INT) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' stored as textfile;
    3. 在Hive beeline交互窗口加载数据文件,LOAD DATA INPATH '/tmp/examples/multi-components/mapreduce/input/' OVERWRITE INTO TABLE person;
    4. 执行命令!q退出。
  4. 由于Hive加载数据将HDFS对应数据目录清空,所以需再次执行1。

功能介绍

该样例主要分为三个部分。

  • 从HDFS原文件中抽取name信息,查询HBase、Hive相关数据,并进行数据拼接,通过类MultiComponentMapper继承Mapper抽象类实现。
  • 获取拼接后的数据取最后一条输出到HBase、HDFS,通过类MultiComponentReducer继承Reducer抽象类实现。
  • main方法提供建立一个MapReduce job,并提交MapReduce作业到Hadoop集群。

代码样例

下面代码片段仅为演示,具体代码请参见com.huawei.bigdata.mapreduce.examples.MultiComponentExample类

样例1:类MultiComponentMapper定义Mapper抽象类的map方法。

private static class MultiComponentMapper extends Mapper<Object, Text, Text, Text> {

    Configuration conf;

    @Override protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {

      String name = "";
      String line = value.toString();

      //加载配置文件
        conf = context.getConfiguration();
        
        setJaasInfo("krb5.conf", "jaas.conf");
        LoginUtil.setJaasConf(ZOOKEEPER_DEFAULT_LOGIN_CONTEXT_NAME, "test", KEYTAB);
        LoginUtil.setZookeeperServerPrincipal(ZOOKEEPER_SERVER_PRINCIPAL_KEY, ZOOKEEPER_DEFAULT_SERVER_PRINCIPAL);

        //准备hive query
        //加载parameter
        Properties clientInfo = null;
        InputStream fileInputStream = null;
        try {
            clientInfo = new Properties();
            File propertiesFile = new File(hiveClientProperties);
            fileInputStream = new FileInputStream(propertiesFile);
            clientInfo.load(fileInputStream);
        } catch (Exception e) {
        } finally {
            if (fileInputStream != null) {
                fileInputStream.close();
            }
        }

        String zkQuorum = clientInfo.getProperty("zk.quorum");
        String zooKeeperNamespace = clientInfo.getProperty("zooKeeperNamespace");
        String serviceDiscoveryMode = clientInfo.getProperty("serviceDiscoveryMode");

        // 创建Hive鉴权信息
        // Read this carefully:
        // MapReduce can only use Hive through JDBC.
        // Hive will submit another MapReduce Job to execute query.
        // So we run Hive in MapReduce is not recommended.
        final String driver = "org.apache.hive.jdbc.HiveDriver";

        String sql = "select name,sum(stayTime) as "
                + "stayTime from person where name = ? group by name";

        StringBuilder sBuilder = new StringBuilder("jdbc:hive2://").append(zkQuorum).append("/");
        // in map or reduce, use 'auth=delegationToken'
        sBuilder
                .append(";serviceDiscoveryMode=")
                .append(serviceDiscoveryMode)
                .append(";zooKeeperNamespace=")
                .append(zooKeeperNamespace)
                .append(";auth=delegationToken;");

        String url = sBuilder.toString();

        try {
            Class.forName(driver);
            hiveConn = DriverManager.getConnection(url, "", "");
            statement = hiveConn.prepareStatement(sql);
        } catch (Exception e) {
            LOG.error("Init jdbc driver failed.", e);
        }

        //创建hbase连接
        try {
            // Create a HBase connection
            hbaseConn = ConnectionFactory.createConnection(conf);
            // get table
            table = hbaseConn.getTable(TableName.valueOf(HBASE_TABLE_NAME));
        } catch (IOException e) {
            LOG.error("Exception occur when connect to HBase", e);
            throw e;
        }

      if (line.contains("male")) {
        name = line.substring(0, line.indexOf(","));
      }
      // 1. 读取HBase数据
      String hbaseData = readHBase();

      // 2. 读取Hive数据
      String hiveData = readHive(name);

      // Map输出键值对,内容为HBase与Hive数据拼接的字符串
      context.write(new Text(name), new Text("hbase:" + hbaseData + ", hive:" + hiveData));
    }

样例2:HBase数据读取的readHBase方法。

    private String readHBase() {
      String tableName = "table1";
      String columnFamily = "cf";
      String hbaseKey = "1";
      String hbaseValue;

      Configuration hbaseConfig = HBaseConfiguration.create(conf);
      org.apache.hadoop.hbase.client.Connection conn = null;
      try {

        // 创建一个HBase Get请求实例
        Get get = new Get(hbaseKey.getBytes());
        // 提交Get请求
        Result result = table.get(get);
        hbaseValue = Bytes.toString(result.getValue(columnFamily.getBytes(), "cid".getBytes()));

        return hbaseValue;

      } catch (IOException e) {
        LOG.warn("Exception occur ", e);
      } finally {
        if (hbaseConn != null) {
          try {
            hbaseConn.close();
          } catch (Exception e1) {
            LOG.error("Failed to close the connection ", e1);
          }
        }
      }

      return "";
    }

样例3:Hive数据读取的readHive方法。

    private int readHive(String name) {
      
      ResultSet resultSet = null;
      try {
        statement.setString(1, name);
        resultSet = statement.executeQuery();

        if (resultSet.next()) {
          return resultSet.getInt("stayTime"); 
        }       
      } catch (SQLException e) {
        LOG.warn("Exception occur ", e);
      } finally {
        if (null != resultSet) {
          try {
            resultSet.close();
          } catch (SQLException e) {
            // handle exception
          }
        }
        if (null != statement) {
          try {
            statement.close();
          } catch (SQLException e) {
            // handle exception
          }
        }
        if (null != hiveConn) {
          try {
            hiveConn.close();
          } catch (SQLException e) {
            // handle exception
          }
        }
      }

      return 0;
    }

样例4:类MultiComponentReducer定义Reducer抽象类的reduce方法。

    public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
      
        Text finalValue = new Text("");

        setJaasInfo("krb5.conf", "jaas.conf");
        LoginUtil.setJaasConf(ZOOKEEPER_DEFAULT_LOGIN_CONTEXT_NAME, "test", KEYTAB);
        LoginUtil.setZookeeperServerPrincipal(ZOOKEEPER_SERVER_PRINCIPAL_KEY, ZOOKEEPER_DEFAULT_SERVER_PRINCIPAL);

        conf = context.getConfiguration();
        try {
            // 创建hbase连接
            conn = ConnectionFactory.createConnection(conf);
            // 得到表
            table = conn.getTable(TableName.valueOf(HBASE_TABLE_NAME));
        } catch (IOException e) {
            LOG.error("Exception occur when connect to HBase", e);
            throw e;
        }
      
      for (Text value : values) {
        finalValue = value;
      }

      // 将结果输出到HBase
      writeHBase(key.toString(), finalValue.toString());

      // 将结果保存到HDFS
      context.write(key, finalValue);
    }

样例5:结果输出到HBase的writeHBase方法。

    private void writeHBase(String rowKey, String data) {
      
      try {
        // 创建一个HBase Put请求实例
        List<Put> list = new ArrayList<Put>();
        byte[] row = Bytes.toBytes("1");
        Put put = new Put(row);
        byte[] family = Bytes.toBytes("cf");
        byte[] qualifier = Bytes.toBytes("value");
        byte[] value = Bytes.toBytes(data);
        put.addColumn(family, qualifier, value);
        list.add(put);
        // 执行Put请求
        table.put(list);
      } catch (IOException e) {
        LOG.warn("Exception occur ", e);
      } finally {
        if (conn != null) {
          try {
            conn.close();
          } catch (Exception e1) {
            LOG.error("Failed to close the connection ", e1);
          }
        }
      }

    }

样例6:main()方法创建一个job,配置相关依赖,配置相关鉴权信息,提交作业到hadoop集群。

    public static void main(String[] args) throws Exception {
      
      // 清理所需目录
      MultiComponentExample.cleanupBeforeRun();

      // 查找Hive依赖jar包
      Class hiveDriverClass = Class.forName("org.apache.hive.jdbc.HiveDriver");
      Class thriftClass = Class.forName("org.apache.thrift.TException");
      Class thriftCLIClass = Class.forName("org.apache.hive.service.cli.thrift.TCLIService");
      Class hiveConfClass = Class.forName("org.apache.hadoop.hive.conf.HiveConf");
      Class hiveTransClass = Class.forName("org.apache.thrift.transport.HiveTSaslServerTransport");
      Class hiveMetaClass = Class.forName("org.apache.hadoop.hive.metastore.api.MetaException");
      Class hiveShimClass = Class.forName("org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge23");

      // 添加Hive运行依赖到Job
      JarFinderUtil
          .addDependencyJars(config, hiveDriverClass, thriftCLIClass, thriftClass, hiveConfClass, hiveTransClass,
              hiveMetaClass, hiveShimClass);

      //开启Kerberos认证的安全集群登录
      if("kerberos".equalsIgnoreCase(config.get("hadoop.security.authentication"))){
      //security mode
          LoginUtil.setJaasConf(ZOOKEEPER_DEFAULT_LOGIN_CONTEXT_NAME, PRINCIPAL, KEYTAB);
          LoginUtil.setZookeeperServerPrincipal(ZOOKEEPER_SERVER_PRINCIPAL_KEY, ZOOKEEPER_DEFAULT_SERVER_PRINCIPAL);
       System.setProperty("java.security.krb5.conf", KRB);
    LoginUtil.login(PRINCIPAL, KEYTAB, KRB, config);
    } 
      // 添加Hive配置文件
      config.addResource("hive-site.xml");
      // 添加HBase配置文件
      Configuration conf = HBaseConfiguration.create(config);

      // 实例化Job
      Job job = Job.getInstance(conf);
      job.setJarByClass(MultiComponentExample.class);

      // 设置mapper&reducer类
      job.setMapperClass(MultiComponentMapper.class);
      job.setReducerClass(MultiComponentReducer.class);

      //设置Job输入输出路径
      FileInputFormat.addInputPath(job, new Path(baseDir, INPUT_DIR_NAME + File.separator + "data.txt"));
      FileOutputFormat.setOutputPath(job, new Path(baseDir, OUTPUT_DIR_NAME));

      // 设置输出键值类型
      job.setOutputKeyClass(Text.class);
      job.setOutputValueClass(Text.class);

      // HBase提供工具类添加HBase运行依赖到Job
      TableMapReduceUtil.addDependencyJars(job);

      // 安全模式下必须要执行这个操作
      // HBase添加鉴权信息到Job,map或reduce任务将会使用此处的鉴权信息
      TableMapReduceUtil.initCredentials(job);

      // 创建Hive鉴权信息
      Properties clientInfo = null;
      InputStream fileInputStream = null;
      try {
          clientInfo = new Properties();
          File propertiesFile = new File(hiveClientProperties);
          fileInputStream = new FileInputStream(propertiesFile);
          clientInfo.load(fileInputStream);
      } catch (Exception e) {
      } finally {
          if (fileInputStream != null) {
              fileInputStream.close();
          }
      }
      String zkQuorum = clientInfo.getProperty("zk.quorum");//zookeeper节点ip和端口列表
      String zooKeeperNamespace = clientInfo.getProperty("zooKeeperNamespace");
      String serviceDiscoveryMode = clientInfo.getProperty("serviceDiscoveryMode");
      String principal = clientInfo.getProperty("principal");
      String auth = clientInfo.getProperty("auth");
      String sasl_qop = clientInfo.getProperty("sasl.qop");
      StringBuilder sBuilder = new StringBuilder("jdbc:hive2://").append(zkQuorum).append("/");
      sBuilder.append(";serviceDiscoveryMode=").append(serviceDiscoveryMode).append(";zooKeeperNamespace=")
          .append(zooKeeperNamespace)
          .append(";sasl.qop=")
          .append(sasl_qop)
          .append(";auth=")
          .append(auth)
          .append(";principal=")
          .append(principal)
          .append(";");
      String url = sBuilder.toString();
      Connection connection = DriverManager.getConnection(url, "", "");
      String tokenStr = ((HiveConnection) connection)
          .getDelegationToken(UserGroupInformation.getCurrentUser().getShortUserName(), PRINCIPAL);
      connection.close();
      Token<DelegationTokenIdentifier> hive2Token = new Token<DelegationTokenIdentifier>();
      hive2Token.decodeFromUrlString(tokenStr);
      // 添加Hive鉴权信息到Job
      job.getCredentials().addToken(new Text("hive.server2.delegation.token"), hive2Token);
      job.getCredentials().addToken(new Text(HiveAuthFactory.HS2_CLIENT_TOKEN), hive2Token);

      // 提交作业
      System.exit(job.waitForCompletion(true) ? 0 : 1);

    }

样例中所有zkQuorum对象需替换为实际ZooKeeper集群节点信息。