更新时间:2024-08-03 GMT+08:00
MapReduce访问多组件样例程序开发思路
场景说明
该样例以MapReduce访问HDFS、HBase、Hive为例,介绍如何编写MapReduce作业访问多个服务组件。帮助用户理解认证、配置加载等关键使用方式。
该样例逻辑过程如下。
以HDFS文本文件为输入数据
log1.txt:数据输入文件
YuanJing,male,10 GuoYijun,male,5
Map阶段
- 获取输入数据的一行并提取姓名信息。
- 查询HBase一条数据。
- 查询Hive一条数据。
- 将HBase查询结果与Hive查询结果进行拼接作为Map输出。
Reduce阶段
- 获取Map输出中的最后一条数据。
- 将数据输出到HBase。
- 将数据保存到HDFS。
数据规划
- 创建HDFS数据文件。
- 在Linux系统上新建文本文件,将log1.txt中的内容复制保存到data.txt。
- 在HDFS上创建一个文件夹,“/tmp/examples/multi-components/mapreduce/input/”,并上传data.txt到此目录,命令如下。
- 在Linux系统HDFS客户端使用命令hdfs dfs -mkdir -p /tmp/examples/multi-components/mapreduce/input/
- 在Linux系统HDFS客户端使用命令hdfs dfs -put data.txt /tmp/examples/multi-components/mapreduce/input/
- 创建HBase表并插入数据。
- 在Linux系统HBase客户端使用命令hbase shell。
- 在HBase shell交互窗口创建数据表table1,该表有一个列族cf,使用命令create 'table1', 'cf'。
- 插入一条rowkey为1、列名为cid、数据值为123的数据,使用命令put 'table1', '1', 'cf:cid', '123'。
- 执行命令quit退出。
- 创建Hive表并载入数据。
- 在Linux系统Hive客户端使用命令beeline。
- 在Hive beeline交互窗口创建数据表person,该表有3个字段:name/gender/stayTime,使用命令CREATE TABLE person(name STRING, gender STRING, stayTime INT) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' stored as textfile;。
- 在Hive beeline交互窗口加载数据文件,LOAD DATA INPATH '/tmp/examples/multi-components/mapreduce/input/' OVERWRITE INTO TABLE person;。
- 执行命令!q退出。
- 由于Hive加载数据将HDFS对应数据目录清空,所以需再次执行1。
功能介绍
该样例主要分为三个部分。
- 从HDFS原文件中抽取name信息,查询HBase、Hive相关数据,并进行数据拼接,通过类MultiComponentMapper继承Mapper抽象类实现。
- 获取拼接后的数据取最后一条输出到HBase、HDFS,通过类MultiComponentReducer继承Reducer抽象类实现。
- main方法提供建立一个MapReduce job,并提交MapReduce作业到Hadoop集群。
代码样例
下面代码片段仅为演示,具体代码请参见com.huawei.bigdata.mapreduce.examples.MultiComponentExample类
样例1:类MultiComponentMapper定义Mapper抽象类的map方法。
private static class MultiComponentMapper extends Mapper<Object, Text, Text, Text> {
Configuration conf;
@Override protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String name = "";
String line = value.toString();
//加载配置文件
conf = context.getConfiguration();
setJaasInfo("krb5.conf", "jaas.conf");
LoginUtil.setJaasConf(ZOOKEEPER_DEFAULT_LOGIN_CONTEXT_NAME, "test", KEYTAB);
LoginUtil.setZookeeperServerPrincipal(ZOOKEEPER_SERVER_PRINCIPAL_KEY, ZOOKEEPER_DEFAULT_SERVER_PRINCIPAL);
//准备hive query
//加载parameter
Properties clientInfo = null;
InputStream fileInputStream = null;
try {
clientInfo = new Properties();
File propertiesFile = new File(hiveClientProperties);
fileInputStream = new FileInputStream(propertiesFile);
clientInfo.load(fileInputStream);
} catch (Exception e) {
} finally {
if (fileInputStream != null) {
fileInputStream.close();
}
}
String zkQuorum = clientInfo.getProperty("zk.quorum");
String zooKeeperNamespace = clientInfo.getProperty("zooKeeperNamespace");
String serviceDiscoveryMode = clientInfo.getProperty("serviceDiscoveryMode");
// 创建Hive鉴权信息
// Read this carefully:
// MapReduce can only use Hive through JDBC.
// Hive will submit another MapReduce Job to execute query.
// So we run Hive in MapReduce is not recommended.
final String driver = "org.apache.hive.jdbc.HiveDriver";
String sql = "select name,sum(stayTime) as "
+ "stayTime from person where name = ? group by name";
StringBuilder sBuilder = new StringBuilder("jdbc:hive2://").append(zkQuorum).append("/");
// in map or reduce, use 'auth=delegationToken'
sBuilder
.append(";serviceDiscoveryMode=")
.append(serviceDiscoveryMode)
.append(";zooKeeperNamespace=")
.append(zooKeeperNamespace)
.append(";auth=delegationToken;");
String url = sBuilder.toString();
try {
Class.forName(driver);
hiveConn = DriverManager.getConnection(url, "", "");
statement = hiveConn.prepareStatement(sql);
} catch (Exception e) {
LOG.error("Init jdbc driver failed.", e);
}
//创建hbase连接
try {
// Create a HBase connection
hbaseConn = ConnectionFactory.createConnection(conf);
// get table
table = hbaseConn.getTable(TableName.valueOf(HBASE_TABLE_NAME));
} catch (IOException e) {
LOG.error("Exception occur when connect to HBase", e);
throw e;
}
if (line.contains("male")) {
name = line.substring(0, line.indexOf(","));
}
// 1. 读取HBase数据
String hbaseData = readHBase();
// 2. 读取Hive数据
String hiveData = readHive(name);
// Map输出键值对,内容为HBase与Hive数据拼接的字符串
context.write(new Text(name), new Text("hbase:" + hbaseData + ", hive:" + hiveData));
}
样例2:HBase数据读取的readHBase方法。
private String readHBase() {
String tableName = "table1";
String columnFamily = "cf";
String hbaseKey = "1";
String hbaseValue;
Configuration hbaseConfig = HBaseConfiguration.create(conf);
org.apache.hadoop.hbase.client.Connection conn = null;
try {
// 创建一个HBase Get请求实例
Get get = new Get(hbaseKey.getBytes());
// 提交Get请求
Result result = table.get(get);
hbaseValue = Bytes.toString(result.getValue(columnFamily.getBytes(), "cid".getBytes()));
return hbaseValue;
} catch (IOException e) {
LOG.warn("Exception occur ", e);
} finally {
if (hbaseConn != null) {
try {
hbaseConn.close();
} catch (Exception e1) {
LOG.error("Failed to close the connection ", e1);
}
}
}
return "";
}
样例3:Hive数据读取的readHive方法。
private int readHive(String name) {
ResultSet resultSet = null;
try {
statement.setString(1, name);
resultSet = statement.executeQuery();
if (resultSet.next()) {
return resultSet.getInt("stayTime");
}
} catch (SQLException e) {
LOG.warn("Exception occur ", e);
} finally {
if (null != resultSet) {
try {
resultSet.close();
} catch (SQLException e) {
// handle exception
}
}
if (null != statement) {
try {
statement.close();
} catch (SQLException e) {
// handle exception
}
}
if (null != hiveConn) {
try {
hiveConn.close();
} catch (SQLException e) {
// handle exception
}
}
}
return 0;
}
样例4:类MultiComponentReducer定义Reducer抽象类的reduce方法。
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
Text finalValue = new Text("");
setJaasInfo("krb5.conf", "jaas.conf");
LoginUtil.setJaasConf(ZOOKEEPER_DEFAULT_LOGIN_CONTEXT_NAME, "test", KEYTAB);
LoginUtil.setZookeeperServerPrincipal(ZOOKEEPER_SERVER_PRINCIPAL_KEY, ZOOKEEPER_DEFAULT_SERVER_PRINCIPAL);
conf = context.getConfiguration();
try {
// 创建hbase连接
conn = ConnectionFactory.createConnection(conf);
// 得到表
table = conn.getTable(TableName.valueOf(HBASE_TABLE_NAME));
} catch (IOException e) {
LOG.error("Exception occur when connect to HBase", e);
throw e;
}
for (Text value : values) {
finalValue = value;
}
// 将结果输出到HBase
writeHBase(key.toString(), finalValue.toString());
// 将结果保存到HDFS
context.write(key, finalValue);
}
样例5:结果输出到HBase的writeHBase方法。
private void writeHBase(String rowKey, String data) {
try {
// 创建一个HBase Put请求实例
List<Put> list = new ArrayList<Put>();
byte[] row = Bytes.toBytes("1");
Put put = new Put(row);
byte[] family = Bytes.toBytes("cf");
byte[] qualifier = Bytes.toBytes("value");
byte[] value = Bytes.toBytes(data);
put.addColumn(family, qualifier, value);
list.add(put);
// 执行Put请求
table.put(list);
} catch (IOException e) {
LOG.warn("Exception occur ", e);
} finally {
if (conn != null) {
try {
conn.close();
} catch (Exception e1) {
LOG.error("Failed to close the connection ", e1);
}
}
}
}
样例6:main()方法创建一个job,配置相关依赖,配置相关鉴权信息,提交作业到hadoop集群。
public static void main(String[] args) throws Exception {
// 清理所需目录
MultiComponentExample.cleanupBeforeRun();
// 查找Hive依赖jar包
Class hiveDriverClass = Class.forName("org.apache.hive.jdbc.HiveDriver");
Class thriftClass = Class.forName("org.apache.thrift.TException");
Class thriftCLIClass = Class.forName("org.apache.hive.service.cli.thrift.TCLIService");
Class hiveConfClass = Class.forName("org.apache.hadoop.hive.conf.HiveConf");
Class hiveTransClass = Class.forName("org.apache.thrift.transport.HiveTSaslServerTransport");
Class hiveMetaClass = Class.forName("org.apache.hadoop.hive.metastore.api.MetaException");
Class hiveShimClass = Class.forName("org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge23");
// 添加Hive运行依赖到Job
JarFinderUtil
.addDependencyJars(config, hiveDriverClass, thriftCLIClass, thriftClass, hiveConfClass, hiveTransClass,
hiveMetaClass, hiveShimClass);
//开启Kerberos认证的安全集群登录
if("kerberos".equalsIgnoreCase(config.get("hadoop.security.authentication"))){
//security mode
LoginUtil.setJaasConf(ZOOKEEPER_DEFAULT_LOGIN_CONTEXT_NAME, PRINCIPAL, KEYTAB);
LoginUtil.setZookeeperServerPrincipal(ZOOKEEPER_SERVER_PRINCIPAL_KEY, ZOOKEEPER_DEFAULT_SERVER_PRINCIPAL);
System.setProperty("java.security.krb5.conf", KRB);
LoginUtil.login(PRINCIPAL, KEYTAB, KRB, config);
}
// 添加Hive配置文件
config.addResource("hive-site.xml");
// 添加HBase配置文件
Configuration conf = HBaseConfiguration.create(config);
// 实例化Job
Job job = Job.getInstance(conf);
job.setJarByClass(MultiComponentExample.class);
// 设置mapper&reducer类
job.setMapperClass(MultiComponentMapper.class);
job.setReducerClass(MultiComponentReducer.class);
//设置Job输入输出路径
FileInputFormat.addInputPath(job, new Path(baseDir, INPUT_DIR_NAME + File.separator + "data.txt"));
FileOutputFormat.setOutputPath(job, new Path(baseDir, OUTPUT_DIR_NAME));
// 设置输出键值类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// HBase提供工具类添加HBase运行依赖到Job
TableMapReduceUtil.addDependencyJars(job);
// 安全模式下必须要执行这个操作
// HBase添加鉴权信息到Job,map或reduce任务将会使用此处的鉴权信息
TableMapReduceUtil.initCredentials(job);
// 创建Hive鉴权信息
Properties clientInfo = null;
InputStream fileInputStream = null;
try {
clientInfo = new Properties();
File propertiesFile = new File(hiveClientProperties);
fileInputStream = new FileInputStream(propertiesFile);
clientInfo.load(fileInputStream);
} catch (Exception e) {
} finally {
if (fileInputStream != null) {
fileInputStream.close();
}
}
String zkQuorum = clientInfo.getProperty("zk.quorum");//zookeeper节点ip和端口列表
String zooKeeperNamespace = clientInfo.getProperty("zooKeeperNamespace");
String serviceDiscoveryMode = clientInfo.getProperty("serviceDiscoveryMode");
String principal = clientInfo.getProperty("principal");
String auth = clientInfo.getProperty("auth");
String sasl_qop = clientInfo.getProperty("sasl.qop");
StringBuilder sBuilder = new StringBuilder("jdbc:hive2://").append(zkQuorum).append("/");
sBuilder.append(";serviceDiscoveryMode=").append(serviceDiscoveryMode).append(";zooKeeperNamespace=")
.append(zooKeeperNamespace)
.append(";sasl.qop=")
.append(sasl_qop)
.append(";auth=")
.append(auth)
.append(";principal=")
.append(principal)
.append(";");
String url = sBuilder.toString();
Connection connection = DriverManager.getConnection(url, "", "");
String tokenStr = ((HiveConnection) connection)
.getDelegationToken(UserGroupInformation.getCurrentUser().getShortUserName(), PRINCIPAL);
connection.close();
Token<DelegationTokenIdentifier> hive2Token = new Token<DelegationTokenIdentifier>();
hive2Token.decodeFromUrlString(tokenStr);
// 添加Hive鉴权信息到Job
job.getCredentials().addToken(new Text("hive.server2.delegation.token"), hive2Token);
job.getCredentials().addToken(new Text(HiveAuthFactory.HS2_CLIENT_TOKEN), hive2Token);
// 提交作业
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
样例中所有zkQuorum对象需替换为实际ZooKeeper集群节点信息。
父主题: 开发MapReduce应用