MapReduce访问多组件样例代码
功能介绍
主要分为三个部分:
- 从HDFS原文件中抽取name信息,查询HBase、Hive相关数据,并进行数据拼接,通过类MultiComponentMapper继承Mapper抽象类实现。
- 获取拼接后的数据取最后一条输出到HBase、HDFS,通过类MultiComponentReducer继承Reducer抽象类实现。
- main方法提供建立一个MapReduce job,并提交MapReduce作业到Hadoop集群。
代码样例
下面代码片段仅为演示,具体代码参见com.huawei.bigdata.mapreduce.examples.MultiComponentExample类:
样例1:类MultiComponentMapper定义Mapper抽象类的map方法。
private static class MultiComponentMapper extends Mapper<Object, Text, Text, Text> { Configuration conf; @Override protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { conf = context.getConfiguration(); String name = ""; String line = value.toString(); if (line.contains("male")) { // A character string that has been read name = line.substring(0, line.indexOf(",")); } // 1. 读取HBase数据 String hbaseData = readHBase(); // 2. 读取Hive数据 String hiveData = readHive(name); // Map输出键值对,内容为HBase与Hive数据拼接的字符串 context.write(new Text(name), new Text("hbase:" + hbaseData + ", hive:" + hiveData)); }
样例2:HBase数据读取的readHBase方法。
private String readHBase() { String tableName = "table1"; String columnFamily = "cf"; String hbaseKey = "1"; String hbaseValue; Configuration hbaseConfig = HBaseConfiguration.create(conf); org.apache.hadoop.hbase.client.Connection conn = null; try { // 建立HBase连接 conn = ConnectionFactory.createConnection(hbaseConfig); // 获取HBase表 Table table = conn.getTable(TableName.valueOf(tableName)); // 创建一个HBase Get操作实例 Get get = new Get(hbaseKey.getBytes()); // 提交Get请求 Result result = table.get(get); hbaseValue = Bytes.toString(result.getValue(columnFamily.getBytes(), "cid".getBytes())); return hbaseValue; } catch (IOException e) { LOG.warn("Exception occur ", e); } finally { if (conn != null) { try { conn.close(); } catch (Exception e1) { LOG.error("Failed to close the connection ", e1); } } } return ""; }
样例3:Hive数据读取的readHive方法。
private String readHive(String name) throws IOException { //加载配置信息 Properties clientInfo = null; String userdir = System.getProperty("user.dir") + "/"; InputStream fileInputStream = null; try { clientInfo = new Properties(); String hiveclientProp = userdir + "hiveclient.properties"; File propertiesFile = new File(hiveclientProp); fileInputStream = new FileInputStream(propertiesFile); clientInfo.load(fileInputStream); } catch (Exception e) { throw new IOException(e); } finally { if (fileInputStream != null) { fileInputStream.close(); } } String zkQuorum = clientInfo.getProperty("zk.quorum"); String zooKeeperNamespace = clientInfo.getProperty("zooKeeperNamespace"); String serviceDiscoveryMode = clientInfo.getProperty("serviceDiscoveryMode"); // 请仔细阅读此内容: // MapReduce任务通过JDBC方式访问Hive // Hive会将sql查询封装成另一个MapReduce任务并提交 // 所以不建议在MapReduce作业中调用Hive final String driver = "org.apache.hive.jdbc.HiveDriver"; // 集群zookeeper节点信息 String sql = "select name,sum(stayTime) as " + "stayTime from person where name = '" + name + "' group by name"; StringBuilder sBuilder = new StringBuilder("jdbc:hive2://").append(zkQuorum).append("/"); sBuilder .append(";serviceDiscoveryMode=") .append(serviceDiscoveryMode) .append(";zooKeeperNamespace=") .append(zooKeeperNamespace) .append(";"); String url = sBuilder.toString(); Connection connection = null; PreparedStatement statement = null; ResultSet resultSet = null; try { Class.forName(driver); connection = DriverManager.getConnection(url, "", ""); statement = connection.prepareStatement(sql); //执行查询 resultSet = statement.executeQuery(); if (resultSet.next()) { return resultSet.getString(1); } } catch (ClassNotFoundException e) { LOG.warn("Exception occur ", e); } catch (SQLException e) { LOG.warn("Exception occur ", e); } finally { if (null != resultSet) { try { resultSet.close(); } catch (SQLException e) { // handle exception } } if (null != statement) { try { statement.close(); } catch (SQLException e) { // handle exception } } if (null != connection) { try { connection.close(); } catch (SQLException e) { // handle exception } } } return ""; }
样例中zkQuorum对象需替换为实际ZooKeeper集群节点信息。
样例4:类MultiComponentReducer定义Reducer抽象类的reduce方法。
private static class MultiComponentReducer extends Reducer<Text, Text, Text, Text> { Configuration conf; public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { conf = context.getConfiguration(); Text finalValue = new Text(""); // 取最后一行数据作为最终结果 for (Text value : values) { finalValue = value; } // 将结果输出到HBase writeHBase(key.toString(), finalValue.toString()); // 将结果保存到HDFS context.write(key, finalValue); }
样例5:结果输出到HBase的writeHBase方法。
private void writeHBase(String rowKey, String data) { String tableName = "table1"; String columnFamily = "cf"; Configuration hbaseConfig = HBaseConfiguration.create(conf); org.apache.hadoop.hbase.client.Connection conn = null; try { // 创建HBase连接 conn = ConnectionFactory.createConnection(hbaseConfig); // 获取HBase表 Table table = conn.getTable(TableName.valueOf(tableName)); // 创建一个HBase Put请求实例 List<Put> list = new ArrayList<Put>(); byte[] row = Bytes.toBytes("row" + rowKey); Put put = new Put(row); byte[] family = Bytes.toBytes(columnFamily); byte[] qualifier = Bytes.toBytes("value"); byte[] value = Bytes.toBytes(data); put.addColumn(family, qualifier, value); list.add(put); // 执行Put请求 table.put(list); } catch (IOException e) { LOG.warn("Exception occur ", e); } finally { if (conn != null) { try { conn.close(); } catch (Exception e1) { LOG.error("Failed to close the connection ", e1); } } } } }
样例6:main()方法创建一个job,配置相关依赖,提交作业到hadoop集群。
public static void main(String[] args) throws Exception { String hiveClientProperties = MultiComponentExample.class.getClassLoader().getResource("hiveclient.properties").getPath(); // 包含配置信息的文件 String file = "file://" + hiveClientProperties; // 运行时,把配置信息放到HDFS上 config.set("tmpfiles", file); // 提交作业前清理所需目录 MultiComponentExample.cleanupBeforeRun(); // 查找Hive运行依赖 Class hiveDriverClass = Class.forName("org.apache.hive.jdbc.HiveDriver"); Class thriftClass = Class.forName("org.apache.thrift.TException"); Class serviceThriftCLIClass = Class.forName("org.apache.hive.service.rpc.thrift.TCLIService"); Class hiveConfClass = Class.forName("org.apache.hadoop.hive.conf.HiveConf"); Class hiveTransClass = Class.forName("org.apache.thrift.transport.HiveTSaslServerTransport"); Class hiveMetaClass = Class.forName("org.apache.hadoop.hive.metastore.api.MetaException"); Class hiveShimClass = Class.forName("org.apache.hadoop.hive.metastore.security.HadoopThriftAuthBridge23"); Class thriftCLIClass = Class.forName("org.apache.hive.service.cli.thrift.ThriftCLIService"); Class thriftType = Class.forName("org.apache.hadoop.hive.serde2.thrift.Type"); // 添加Hive依赖到作业 JarFinderUtil.addDependencyJars(config, hiveDriverClass, serviceThriftCLIClass, thriftCLIClass, thriftClass, hiveConfClass, hiveTransClass, hiveMetaClass, hiveShimClass, thriftType); // 添加Hive配置文件 config.addResource("hive-site.xml"); // 添加HBase配置文件 Configuration conf = HBaseConfiguration.create(config); // 实例化作业对象 Job job = Job.getInstance(conf); job.setJarByClass(MultiComponentExample.class); // 配置mapper&reducer类 job.setMapperClass(MultiComponentMapper.class); job.setReducerClass(MultiComponentReducer.class); // 配置数据输入路径和输出路径 FileInputFormat.addInputPath(job, new Path(baseDir, INPUT_DIR_NAME + File.separator + "data.txt")); FileOutputFormat.setOutputPath(job, new Path(baseDir, OUTPUT_DIR_NAME)); // 设置输出键值类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); // HBase提供工具类添加HBase的运行依赖 TableMapReduceUtil.addDependencyJars(job); // 提交作业 System.exit(job.waitForCompletion(true) ? 0 : 1); }