MapReduce访问多组件样例代码
功能介绍
主要分为三个部分:
- 从HDFS原文件中抽取name信息,查询HBase、Hive相关数据,并进行数据拼接,通过类MultiComponentMapper继承Mapper抽象类实现。
- 获取拼接后的数据取最后一条输出到HBase、HDFS,通过类MultiComponentReducer继承Reducer抽象类实现。
- main方法提供建立一个MapReduce job,并提交MapReduce作业到Hadoop集群。
代码样例
下面代码片段仅为演示,具体代码请参见com.huawei.bigdata.mapreduce.examples.MultiComponentExample类:
样例1:类MultiComponentMapper定义Mapper抽象类的map方法。
private static class MultiComponentMapper extends Mapper<Object, Text, Text, Text> { Configuration conf; @Override protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { conf = context.getConfiguration(); // 对于需要访问ZooKeeper的组件,需要提供jaas和krb5配置 // 在Map中不需要重复login,会使用main方法中配置的鉴权信息 String krb5 = "krb5.conf"; String jaas = "jaas_mr.conf"; // 这些文件上传自main方法 File jaasFile = new File(jaas); File krb5File = new File(krb5); System.setProperty("java.security.auth.login.config", jaasFile.getCanonicalPath()); System.setProperty("java.security.krb5.conf", krb5File.getCanonicalPath()); System.setProperty("zookeeper.sasl.client", "true"); LOG.info("UGI :" + UserGroupInformation.getCurrentUser()); String name = ""; String line = value.toString(); if (line.contains("male")) { name = line.substring(0, line.indexOf(",")); } // 1. 读取HBase数据 String hbaseData = readHBase(); // 2. 读取Hive数据 String hiveData = readHive(name); // Map输出键值对,内容为HBase与Hive数据拼接的字符串 context.write(new Text(name), new Text("hbase:" + hbaseData + ", hive:" + hiveData)); }
private String readHBase() { String tableName = "table1"; String columnFamily = "cf"; String hbaseKey = "1"; String hbaseValue; Configuration hbaseConfig = HBaseConfiguration.create(conf); org.apache.hadoop.hbase.client.Connection conn = null; try { // 建立HBase连接 conn = ConnectionFactory.createConnection(hbaseConfig); // 获取HBase表 Table table = conn.getTable(TableName.valueOf(tableName)); // 创建一个HBase Get请求实例 Get get = new Get(hbaseKey.getBytes()); // 提交Get请求 Result result = table.get(get); hbaseValue = Bytes.toString(result.getValue(columnFamily.getBytes(), "cid".getBytes())); return hbaseValue; } catch (IOException e) { LOG.warn("Exception occur ", e); } finally { if (conn != null) { try { conn.close(); } catch (Exception e1) { LOG.error("Failed to close the connection ", e1); } } } return ""; }
样例3:Hive数据读取的readHive方法。
private String readHive(String name) throws IOException { //加载配置文件 Properties clientInfo = null; String userdir = System.getProperty("user.dir") + "/"; InputStream fileInputStream = null; try { clientInfo = new Properties(); String hiveclientProp = userdir + "hiveclient.properties"; File propertiesFile = new File(hiveclientProp); fileInputStream = new FileInputStream(propertiesFile); clientInfo.load(fileInputStream); } catch (Exception e) { throw new IOException(e); } finally { if (fileInputStream != null) { fileInputStream.close(); } } String zkQuorum = clientInfo.getProperty("zk.quorum"); String zooKeeperNamespace = clientInfo.getProperty("zooKeeperNamespace"); String serviceDiscoveryMode = clientInfo.getProperty("serviceDiscoveryMode"); // 请仔细阅读此内容: // MapReduce任务通过JDBC方式访问Hive // Hive会将sql查询封装成另一个MapReduce任务并提交 // 所以不建议在MapReduce作业中调用Hive final String driver = "org.apache.hive.jdbc.HiveDriver"; String sql = "select name,sum(stayTime) as " + "stayTime from person where name = '" + name + "' group by name"; StringBuilder sBuilder = new StringBuilder("jdbc:hive2://").append(zkQuorum).append("/"); // 在map或reduce中,Hive连接方式使用'auth=delegationToken' sBuilder .append(";serviceDiscoveryMode=") .append(serviceDiscoveryMode) .append(";zooKeeperNamespace=") .append(zooKeeperNamespace) .append(";auth=delegationToken;"); String url = sBuilder.toString(); Connection connection = null; PreparedStatement statement = null; ResultSet resultSet = null; try { Class.forName(driver); connection = DriverManager.getConnection(url, "", ""); statement = connection.prepareStatement(sql); resultSet = statement.executeQuery(); if (resultSet.next()) { return resultSet.getString(1); } } catch (ClassNotFoundException e) { LOG.warn("Exception occur ", e); } catch (SQLException e) { LOG.warn("Exception occur ", e); } finally { if (null != resultSet) { try { resultSet.close(); } catch (SQLException e) { // handle exception } } if (null != statement) { try { statement.close(); } catch (SQLException e) { // handle exception } } if (null != connection) { try { connection.close(); } catch (SQLException e) { // handle exception } } } return ""; }
样例4:类MultiComponentReducer定义Reducer抽象类的reduce方法。
private static class MultiComponentReducer extends Reducer<Text, Text, Text, Text> { Configuration conf; public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { conf = context.getConfiguration(); // 对于需要访问ZooKeeper的组件,需要提供jaas和krb5配置 // 在Reduce中不需要重复login,会使用main方法中配置的鉴权信息 String krb5 = "krb5.conf"; String jaas = "jaas_mr.conf"; // 这些文件上传自main方法 File jaasFile = new File(jaas); File krb5File = new File(krb5); System.setProperty("java.security.auth.login.config", jaasFile.getCanonicalPath()); System.setProperty("java.security.krb5.conf", krb5File.getCanonicalPath()); System.setProperty("zookeeper.sasl.client", "true"); Text finalValue = new Text(""); for (Text value : values) { finalValue = value; } // 将结果输出到HBase writeHBase(key.toString(), finalValue.toString()); // 将结果保存到HDFS context.write(key, finalValue); }
样例5:结果输出到HBase的writeHBase方法。
private void writeHBase(String rowKey, String data) { String tableName = "table1"; String columnFamily = "cf"; try { LOG.info("UGI read :" + UserGroupInformation.getCurrentUser()); } catch (IOException e1) { // handler exception } Configuration hbaseConfig = HBaseConfiguration.create(conf); org.apache.hadoop.hbase.client.Connection conn = null; try { // 创建HBase连接 conn = ConnectionFactory.createConnection(hbaseConfig); // 获取HBase表 Table table = conn.getTable(TableName.valueOf(tableName)); // 创建一个HBase Put请求实例 List<Put> list = new ArrayList<Put>(); byte[] row = Bytes.toBytes("row" + rowKey); Put put = new Put(row); byte[] family = Bytes.toBytes(columnFamily); byte[] qualifier = Bytes.toBytes("value"); byte[] value = Bytes.toBytes(data); put.addColumn(family, qualifier, value); list.add(put); // 执行Put请求 table.put(list); } catch (IOException e) { LOG.warn("Exception occur ", e); } finally { if (conn != null) { try { conn.close(); } catch (Exception e1) { LOG.error("Failed to close the connection ", e1); } } } }
样例6:main()方法创建一个job,配置相关依赖,配置相关鉴权信息,提交作业到hadoop集群。
public static void main(String[] args) throws Exception { //加载hiveclient.properties配置文件 Properties clientInfo = null; try { clientInfo = new Properties(); clientInfo.load(MultiComponentExample.class.getClassLoader().getResourceAsStream("hiveclient.properties")); } catch (Exception e) { throw new IOException(e); } finally { } String zkQuorum = clientInfo.getProperty("zk.quorum"); String zooKeeperNamespace = clientInfo.getProperty("zooKeeperNamespace"); String serviceDiscoveryMode = clientInfo.getProperty("serviceDiscoveryMode"); String principal = clientInfo.getProperty("principal"); String auth = clientInfo.getProperty("auth"); String sasl_qop = clientInfo.getProperty("sasl.qop"); String hbaseKeytab = MultiComponentExample.class.getClassLoader().getResource("user.keytab").getPath(); String hbaseJaas = MultiComponentExample.class.getClassLoader().getResource("jaas_mr.conf").getPath(); String hiveClientProperties = MultiComponentExample.class.getClassLoader().getResource("hiveclient.properties").getPath(); // 拼接文件列表,以逗号分隔 String files = "file://" + KEYTAB + "," + "file://" + KRB + "," + "file://" + JAAS; files = files + "," + "file://" + hbaseKeytab; files = files + "," + "file://" + hbaseJaas; files = files + "," + "file://" + hiveClientProperties; // tmpfiles属性所涉及文件将会在Job提交时上传到HDFS config.set("tmpfiles", files); // 清理所需目录 MultiComponentExample.cleanupBeforeRun(); // 安全集群login LoginUtil.setJaasConf(ZOOKEEPER_DEFAULT_LOGIN_CONTEXT_NAME, PRINCIPAL, hbaseKeytab); LoginUtil.setZookeeperServerPrincipal(ZOOKEEPER_DEFAULT_SERVER_PRINCIPAL); LoginUtil.login(PRINCIPAL, KEYTAB, KRB, config); // 查找Hive依赖jar包 Class hiveDriverClass = Class.forName("org.apache.hive.jdbc.HiveDriver"); Class thriftClass = Class.forName("org.apache.thrift.TException"); Class serviceThriftCLIClass = Class.forName("org.apache.hive.service.rpc.thrift.TCLIService"); Class hiveConfClass = Class.forName("org.apache.hadoop.hive.conf.HiveConf"); Class hiveTransClass = Class.forName("org.apache.thrift.transport.HiveTSaslServerTransport"); Class hiveMetaClass = Class.forName("org.apache.hadoop.hive.metastore.api.MetaException"); Class hiveShimClass = Class.forName("org.apache.hadoop.hive.metastore.security.HadoopThriftAuthBridge23"); Class thriftCLIClass = Class.forName("org.apache.hive.service.cli.thrift.ThriftCLIService"); // 添加Hive运行依赖到Job JarFinderUtil.addDependencyJars(config, hiveDriverClass, serviceThriftCLIClass, thriftCLIClass, thriftClass, hiveConfClass, hiveTransClass, hiveMetaClass, hiveShimClass); // 添加Hive配置文件 config.addResource("hive-site.xml"); // 添加HBase配置文件 Configuration conf = HBaseConfiguration.create(config); // 实例化Job Job job = Job.getInstance(conf); job.setJarByClass(MultiComponentExample.class); // 设置mapper&reducer类 job.setMapperClass(MultiComponentMapper.class); job.setReducerClass(MultiComponentReducer.class); //设置Job输入输出路径 FileInputFormat.addInputPath(job, new Path(baseDir, INPUT_DIR_NAME + File.separator + "data.txt")); FileOutputFormat.setOutputPath(job, new Path(baseDir, OUTPUT_DIR_NAME)); // 设置输出键值类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); // HBase提供工具类添加HBase运行依赖到Job TableMapReduceUtil.addDependencyJars(job); // 安全模式下必须要执行这个操作 // HBase添加鉴权信息到Job,map或reduce任务将会使用此处的鉴权信息 TableMapReduceUtil.initCredentials(job); // 创建Hive鉴权信息 StringBuilder sBuilder = new StringBuilder("jdbc:hive2://").append(zkQuorum).append("/"); sBuilder.append(";serviceDiscoveryMode=").append(serviceDiscoveryMode).append(";zooKeeperNamespace=") .append(zooKeeperNamespace) .append(";sasl.qop=") .append(sasl_qop) .append(";auth=") .append(auth) .append(";principal=") .append(principal) .append(";"); String url = sBuilder.toString(); Connection connection = DriverManager.getConnection(url, "", ""); String tokenStr = ((HiveConnection) connection) .getDelegationToken(UserGroupInformation.getCurrentUser().getShortUserName(), PRINCIPAL); connection.close(); Token<DelegationTokenIdentifier> hive2Token = new Token<DelegationTokenIdentifier>(); hive2Token.decodeFromUrlString(tokenStr); // 添加Hive鉴权信息到Job job.getCredentials().addToken(new Text("hive.server2.delegation.token"), hive2Token); job.getCredentials().addToken(new Text(HiveAuthConstants.HS2_CLIENT_TOKEN), hive2Token); // 提交作业 System.exit(job.waitForCompletion(true) ? 0 : 1); }
样例中所有zkQuorum对象需替换为实际ZooKeeper集群节点信息。