MapReduce访问多组件样例程序开发思路
场景说明
该样例以MapReduce访问HDFS、HBase、Hive为例,介绍如何编写MapReduce作业访问多个服务组件。帮助用户理解认证、配置加载等关键使用方式。
该样例逻辑过程如下。
以HDFS文本文件为输入数据
log1.txt:数据输入文件
YuanJing,male,10 GuoYijun,male,5
Map阶段
- 获取输入数据的一行并提取姓名信息。
- 查询HBase一条数据。
- 查询Hive一条数据。
- 将HBase查询结果与Hive查询结果进行拼接作为Map输出。
Reduce阶段
- 获取Map输出中的最后一条数据。
- 将数据输出到HBase。
- 将数据保存到HDFS。
数据规划
- 创建HDFS数据文件。
- 在Linux系统上新建文本文件,将log1.txt中的内容复制保存到data.txt。
- 在HDFS上创建一个文件夹,“/tmp/examples/multi-components/mapreduce/input/”,并上传data.txt到此目录,命令如下。
- 在Linux系统HDFS客户端使用命令hdfs dfs -mkdir -p /tmp/examples/multi-components/mapreduce/input/
- 在Linux系统HDFS客户端使用命令hdfs dfs -put data.txt /tmp/examples/multi-components/mapreduce/input/
- 创建HBase表并插入数据。
- 在Linux系统HBase客户端使用命令hbase shell。
- 在HBase shell交互窗口创建数据表table1,该表有一个列族cf,使用命令create 'table1', 'cf'。
- 插入一条rowkey为1、列名为cid、数据值为123的数据,使用命令put 'table1', '1', 'cf:cid', '123'。
- 执行命令quit退出。
- 创建Hive表并载入数据。
- 在Linux系统Hive客户端使用命令beeline。
- 在Hive beeline交互窗口创建数据表person,该表有3个字段:name/gender/stayTime,使用命令CREATE TABLE person(name STRING, gender STRING, stayTime INT) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' stored as textfile;。
- 在Hive beeline交互窗口加载数据文件,LOAD DATA INPATH '/tmp/examples/multi-components/mapreduce/input/' OVERWRITE INTO TABLE person;。
- 执行命令!q退出。
- 由于Hive加载数据将HDFS对应数据目录清空,所以需再次执行1。
功能介绍
该样例主要分为三个部分。
- 从HDFS原文件中抽取name信息,查询HBase、Hive相关数据,并进行数据拼接,通过类MultiComponentMapper继承Mapper抽象类实现。
- 获取拼接后的数据取最后一条输出到HBase、HDFS,通过类MultiComponentReducer继承Reducer抽象类实现。
- main方法提供建立一个MapReduce job,并提交MapReduce作业到Hadoop集群。
代码样例
下面代码片段仅为演示,具体代码请参见com.huawei.bigdata.mapreduce.examples.MultiComponentExample类
样例1:类MultiComponentMapper定义Mapper抽象类的map方法。
private static class MultiComponentMapper extends Mapper<Object, Text, Text, Text> { Configuration conf; @Override protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { String name = ""; String line = value.toString(); //加载配置文件 conf = context.getConfiguration(); setJaasInfo("krb5.conf", "jaas.conf"); LoginUtil.setJaasConf(ZOOKEEPER_DEFAULT_LOGIN_CONTEXT_NAME, "test", KEYTAB); LoginUtil.setZookeeperServerPrincipal(ZOOKEEPER_SERVER_PRINCIPAL_KEY, ZOOKEEPER_DEFAULT_SERVER_PRINCIPAL); //准备hive query //加载parameter Properties clientInfo = null; InputStream fileInputStream = null; try { clientInfo = new Properties(); File propertiesFile = new File(hiveClientProperties); fileInputStream = new FileInputStream(propertiesFile); clientInfo.load(fileInputStream); } catch (Exception e) { } finally { if (fileInputStream != null) { fileInputStream.close(); } } String zkQuorum = clientInfo.getProperty("zk.quorum"); String zooKeeperNamespace = clientInfo.getProperty("zooKeeperNamespace"); String serviceDiscoveryMode = clientInfo.getProperty("serviceDiscoveryMode"); // 创建Hive鉴权信息 // Read this carefully: // MapReduce can only use Hive through JDBC. // Hive will submit another MapReduce Job to execute query. // So we run Hive in MapReduce is not recommended. final String driver = "org.apache.hive.jdbc.HiveDriver"; String sql = "select name,sum(stayTime) as " + "stayTime from person where name = ? group by name"; StringBuilder sBuilder = new StringBuilder("jdbc:hive2://").append(zkQuorum).append("/"); // in map or reduce, use 'auth=delegationToken' sBuilder .append(";serviceDiscoveryMode=") .append(serviceDiscoveryMode) .append(";zooKeeperNamespace=") .append(zooKeeperNamespace) .append(";auth=delegationToken;"); String url = sBuilder.toString(); try { Class.forName(driver); hiveConn = DriverManager.getConnection(url, "", ""); statement = hiveConn.prepareStatement(sql); } catch (Exception e) { LOG.error("Init jdbc driver failed.", e); } //创建hbase连接 try { // Create a HBase connection hbaseConn = ConnectionFactory.createConnection(conf); // get table table = hbaseConn.getTable(TableName.valueOf(HBASE_TABLE_NAME)); } catch (IOException e) { LOG.error("Exception occur when connect to HBase", e); throw e; } if (line.contains("male")) { name = line.substring(0, line.indexOf(",")); } // 1. 读取HBase数据 String hbaseData = readHBase(); // 2. 读取Hive数据 String hiveData = readHive(name); // Map输出键值对,内容为HBase与Hive数据拼接的字符串 context.write(new Text(name), new Text("hbase:" + hbaseData + ", hive:" + hiveData)); }
样例2:HBase数据读取的readHBase方法。
private String readHBase() { String tableName = "table1"; String columnFamily = "cf"; String hbaseKey = "1"; String hbaseValue; Configuration hbaseConfig = HBaseConfiguration.create(conf); org.apache.hadoop.hbase.client.Connection conn = null; try { // 创建一个HBase Get请求实例 Get get = new Get(hbaseKey.getBytes()); // 提交Get请求 Result result = table.get(get); hbaseValue = Bytes.toString(result.getValue(columnFamily.getBytes(), "cid".getBytes())); return hbaseValue; } catch (IOException e) { LOG.warn("Exception occur ", e); } finally { if (hbaseConn != null) { try { hbaseConn.close(); } catch (Exception e1) { LOG.error("Failed to close the connection ", e1); } } } return ""; }
样例3:Hive数据读取的readHive方法。
private int readHive(String name) { ResultSet resultSet = null; try { statement.setString(1, name); resultSet = statement.executeQuery(); if (resultSet.next()) { return resultSet.getInt("stayTime"); } } catch (SQLException e) { LOG.warn("Exception occur ", e); } finally { if (null != resultSet) { try { resultSet.close(); } catch (SQLException e) { // handle exception } } if (null != statement) { try { statement.close(); } catch (SQLException e) { // handle exception } } if (null != hiveConn) { try { hiveConn.close(); } catch (SQLException e) { // handle exception } } } return 0; }
样例4:类MultiComponentReducer定义Reducer抽象类的reduce方法。
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { Text finalValue = new Text(""); setJaasInfo("krb5.conf", "jaas.conf"); LoginUtil.setJaasConf(ZOOKEEPER_DEFAULT_LOGIN_CONTEXT_NAME, "test", KEYTAB); LoginUtil.setZookeeperServerPrincipal(ZOOKEEPER_SERVER_PRINCIPAL_KEY, ZOOKEEPER_DEFAULT_SERVER_PRINCIPAL); conf = context.getConfiguration(); try { // 创建hbase连接 conn = ConnectionFactory.createConnection(conf); // 得到表 table = conn.getTable(TableName.valueOf(HBASE_TABLE_NAME)); } catch (IOException e) { LOG.error("Exception occur when connect to HBase", e); throw e; } for (Text value : values) { finalValue = value; } // 将结果输出到HBase writeHBase(key.toString(), finalValue.toString()); // 将结果保存到HDFS context.write(key, finalValue); }
样例5:结果输出到HBase的writeHBase方法。
private void writeHBase(String rowKey, String data) { try { // 创建一个HBase Put请求实例 List<Put> list = new ArrayList<Put>(); byte[] row = Bytes.toBytes("1"); Put put = new Put(row); byte[] family = Bytes.toBytes("cf"); byte[] qualifier = Bytes.toBytes("value"); byte[] value = Bytes.toBytes(data); put.addColumn(family, qualifier, value); list.add(put); // 执行Put请求 table.put(list); } catch (IOException e) { LOG.warn("Exception occur ", e); } finally { if (conn != null) { try { conn.close(); } catch (Exception e1) { LOG.error("Failed to close the connection ", e1); } } } }
样例6:main()方法创建一个job,配置相关依赖,配置相关鉴权信息,提交作业到hadoop集群。
public static void main(String[] args) throws Exception { // 清理所需目录 MultiComponentExample.cleanupBeforeRun(); // 查找Hive依赖jar包 Class hiveDriverClass = Class.forName("org.apache.hive.jdbc.HiveDriver"); Class thriftClass = Class.forName("org.apache.thrift.TException"); Class thriftCLIClass = Class.forName("org.apache.hive.service.cli.thrift.TCLIService"); Class hiveConfClass = Class.forName("org.apache.hadoop.hive.conf.HiveConf"); Class hiveTransClass = Class.forName("org.apache.thrift.transport.HiveTSaslServerTransport"); Class hiveMetaClass = Class.forName("org.apache.hadoop.hive.metastore.api.MetaException"); Class hiveShimClass = Class.forName("org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge23"); // 添加Hive运行依赖到Job JarFinderUtil .addDependencyJars(config, hiveDriverClass, thriftCLIClass, thriftClass, hiveConfClass, hiveTransClass, hiveMetaClass, hiveShimClass); //开启Kerberos认证的安全集群登录 if("kerberos".equalsIgnoreCase(config.get("hadoop.security.authentication"))){ //security mode LoginUtil.setJaasConf(ZOOKEEPER_DEFAULT_LOGIN_CONTEXT_NAME, PRINCIPAL, KEYTAB); LoginUtil.setZookeeperServerPrincipal(ZOOKEEPER_SERVER_PRINCIPAL_KEY, ZOOKEEPER_DEFAULT_SERVER_PRINCIPAL); System.setProperty("java.security.krb5.conf", KRB); LoginUtil.login(PRINCIPAL, KEYTAB, KRB, config); } // 添加Hive配置文件 config.addResource("hive-site.xml"); // 添加HBase配置文件 Configuration conf = HBaseConfiguration.create(config); // 实例化Job Job job = Job.getInstance(conf); job.setJarByClass(MultiComponentExample.class); // 设置mapper&reducer类 job.setMapperClass(MultiComponentMapper.class); job.setReducerClass(MultiComponentReducer.class); //设置Job输入输出路径 FileInputFormat.addInputPath(job, new Path(baseDir, INPUT_DIR_NAME + File.separator + "data.txt")); FileOutputFormat.setOutputPath(job, new Path(baseDir, OUTPUT_DIR_NAME)); // 设置输出键值类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); // HBase提供工具类添加HBase运行依赖到Job TableMapReduceUtil.addDependencyJars(job); // 安全模式下必须要执行这个操作 // HBase添加鉴权信息到Job,map或reduce任务将会使用此处的鉴权信息 TableMapReduceUtil.initCredentials(job); // 创建Hive鉴权信息 Properties clientInfo = null; InputStream fileInputStream = null; try { clientInfo = new Properties(); File propertiesFile = new File(hiveClientProperties); fileInputStream = new FileInputStream(propertiesFile); clientInfo.load(fileInputStream); } catch (Exception e) { } finally { if (fileInputStream != null) { fileInputStream.close(); } } String zkQuorum = clientInfo.getProperty("zk.quorum");//zookeeper节点ip和端口列表 String zooKeeperNamespace = clientInfo.getProperty("zooKeeperNamespace"); String serviceDiscoveryMode = clientInfo.getProperty("serviceDiscoveryMode"); String principal = clientInfo.getProperty("principal"); String auth = clientInfo.getProperty("auth"); String sasl_qop = clientInfo.getProperty("sasl.qop"); StringBuilder sBuilder = new StringBuilder("jdbc:hive2://").append(zkQuorum).append("/"); sBuilder.append(";serviceDiscoveryMode=").append(serviceDiscoveryMode).append(";zooKeeperNamespace=") .append(zooKeeperNamespace) .append(";sasl.qop=") .append(sasl_qop) .append(";auth=") .append(auth) .append(";principal=") .append(principal) .append(";"); String url = sBuilder.toString(); Connection connection = DriverManager.getConnection(url, "", ""); String tokenStr = ((HiveConnection) connection) .getDelegationToken(UserGroupInformation.getCurrentUser().getShortUserName(), PRINCIPAL); connection.close(); Token<DelegationTokenIdentifier> hive2Token = new Token<DelegationTokenIdentifier>(); hive2Token.decodeFromUrlString(tokenStr); // 添加Hive鉴权信息到Job job.getCredentials().addToken(new Text("hive.server2.delegation.token"), hive2Token); job.getCredentials().addToken(new Text(HiveAuthFactory.HS2_CLIENT_TOKEN), hive2Token); // 提交作业 System.exit(job.waitForCompletion(true) ? 0 : 1); }
样例中所有zkQuorum对象需替换为实际ZooKeeper集群节点信息。