文档首页/
MapReduce服务 MRS/
开发指南(普通版_3.x)/
MapReduce开发指南(普通模式)/
开发MapReduce应用/
MapReduce访问多组件样例程序/
MapReduce访问多组件样例代码
更新时间:2024-10-23 GMT+08:00
MapReduce访问多组件样例代码
功能介绍
主要分为三个部分:
- 从HDFS原文件中抽取name信息,查询HBase、Hive相关数据,并进行数据拼接,通过类MultiComponentMapper继承Mapper抽象类实现。
- 获取拼接后的数据取最后一条输出到HBase、HDFS,通过类MultiComponentReducer继承Reducer抽象类实现。
- main方法提供建立一个MapReduce job,并提交MapReduce作业到Hadoop集群。
代码样例
下面代码片段仅为演示,具体代码参见com.huawei.bigdata.mapreduce.examples.MultiComponentExample类:
样例1:类MultiComponentMapper定义Mapper抽象类的Map方法。
private static class MultiComponentMapper extends Mapper<Object, Text, Text, Text> {
Configuration conf;
@Override protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
conf = context.getConfiguration();
String name = "";
String line = value.toString();
if (line.contains("male")) {
// A character string that has been read
name = line.substring(0, line.indexOf(","));
}
// 1. 读取HBase数据
String hbaseData = readHBase();
// 2. 读取Hive数据
String hiveData = readHive(name);
// Map输出键值对,内容为HBase与Hive数据拼接的字符串
context.write(new Text(name), new Text("hbase:" + hbaseData + ", hive:" + hiveData));
}
样例2:HBase数据读取的readHBase方法。
private String readHBase() {
String tableName = "table1";
String columnFamily = "cf";
String hbaseKey = "1";
String hbaseValue;
Configuration hbaseConfig = HBaseConfiguration.create(conf);
org.apache.hadoop.hbase.client.Connection conn = null;
try {
// 建立HBase连接
conn = ConnectionFactory.createConnection(hbaseConfig);
// 获取HBase表
Table table = conn.getTable(TableName.valueOf(tableName));
// 创建一个HBase Get操作实例
Get get = new Get(hbaseKey.getBytes());
// 提交Get请求
Result result = table.get(get);
hbaseValue = Bytes.toString(result.getValue(columnFamily.getBytes(), "cid".getBytes()));
return hbaseValue;
} catch (IOException e) {
LOG.warn("Exception occur ", e);
} finally {
if (conn != null) {
try {
conn.close();
} catch (Exception e1) {
LOG.error("Failed to close the connection ", e1);
}
}
}
return "";
}
样例3:Hive数据读取的readHive方法。
private String readHive(String name) throws IOException {
//加载配置信息
Properties clientInfo = null;
String userdir = System.getProperty("user.dir") + "/";
InputStream fileInputStream = null;
try {
clientInfo = new Properties();
String hiveclientProp = userdir + "hiveclient.properties";
File propertiesFile = new File(hiveclientProp);
fileInputStream = new FileInputStream(propertiesFile);
clientInfo.load(fileInputStream);
} catch (Exception e) {
throw new IOException(e);
} finally {
if (fileInputStream != null) {
fileInputStream.close();
}
}
String zkQuorum = clientInfo.getProperty("zk.quorum");
String zooKeeperNamespace = clientInfo.getProperty("zooKeeperNamespace");
String serviceDiscoveryMode = clientInfo.getProperty("serviceDiscoveryMode");
// 请仔细阅读此内容:
// MapReduce任务通过JDBC方式访问Hive
// Hive会将sql查询封装成另一个MapReduce任务并提交
// 所以不建议在MapReduce作业中调用Hive
final String driver = "org.apache.hive.jdbc.HiveDriver";
// 集群ZooKeeper节点信息
String sql = "select name,sum(stayTime) as " + "stayTime from person where name = '" + name + "' group by name";
StringBuilder sBuilder = new StringBuilder("jdbc:hive2://").append(zkQuorum).append("/");
sBuilder
.append(";serviceDiscoveryMode=")
.append(serviceDiscoveryMode)
.append(";zooKeeperNamespace=")
.append(zooKeeperNamespace)
.append(";");
String url = sBuilder.toString();
Connection connection = null;
PreparedStatement statement = null;
ResultSet resultSet = null;
try {
Class.forName(driver);
connection = DriverManager.getConnection(url, "", "");
statement = connection.prepareStatement(sql);
//执行查询
resultSet = statement.executeQuery();
if (resultSet.next()) {
return resultSet.getString(1);
}
} catch (ClassNotFoundException e) {
LOG.warn("Exception occur ", e);
} catch (SQLException e) {
LOG.warn("Exception occur ", e);
} finally {
if (null != resultSet) {
try {
resultSet.close();
} catch (SQLException e) {
// handle exception
}
}
if (null != statement) {
try {
statement.close();
} catch (SQLException e) {
// handle exception
}
}
if (null != connection) {
try {
connection.close();
} catch (SQLException e) {
// handle exception
}
}
}
return "";
}
样例中zkQuorum对象需替换为实际ZooKeeper集群节点信息。
样例4:类MultiComponentReducer定义Reducer抽象类的Reduce方法。
private static class MultiComponentReducer extends Reducer<Text, Text, Text, Text> {
Configuration conf;
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
conf = context.getConfiguration();
Text finalValue = new Text("");
// 取最后一行数据作为最终结果
for (Text value : values) {
finalValue = value;
}
// 将结果输出到HBase
writeHBase(key.toString(), finalValue.toString());
// 将结果保存到HDFS
context.write(key, finalValue);
}
样例5:结果输出到HBase的writeHBase方法。
private void writeHBase(String rowKey, String data) {
String tableName = "table1";
String columnFamily = "cf";
Configuration hbaseConfig = HBaseConfiguration.create(conf);
org.apache.hadoop.hbase.client.Connection conn = null;
try {
// 创建HBase连接
conn = ConnectionFactory.createConnection(hbaseConfig);
// 获取HBase表
Table table = conn.getTable(TableName.valueOf(tableName));
// 创建一个HBase Put请求实例
List<Put> list = new ArrayList<Put>();
byte[] row = Bytes.toBytes("row" + rowKey);
Put put = new Put(row);
byte[] family = Bytes.toBytes(columnFamily);
byte[] qualifier = Bytes.toBytes("value");
byte[] value = Bytes.toBytes(data);
put.addColumn(family, qualifier, value);
list.add(put);
// 执行Put请求
table.put(list);
} catch (IOException e) {
LOG.warn("Exception occur ", e);
} finally {
if (conn != null) {
try {
conn.close();
} catch (Exception e1) {
LOG.error("Failed to close the connection ", e1);
}
}
}
}
}
样例6:main()方法创建一个job,配置相关依赖,提交作业到hadoop集群。
public static void main(String[] args) throws Exception {
String hiveClientProperties = MultiComponentExample.class.getClassLoader().getResource("hiveclient.properties").getPath();
// 包含配置信息的文件
String file = "file://" + hiveClientProperties;
// 运行时,把配置信息放到HDFS上
config.set("tmpfiles", file);
// 提交作业前清理所需目录
MultiComponentExample.cleanupBeforeRun();
// 查找Hive运行依赖
Class hiveDriverClass = Class.forName("org.apache.hive.jdbc.HiveDriver");
Class thriftClass = Class.forName("org.apache.thrift.TException");
Class serviceThriftCLIClass = Class.forName("org.apache.hive.service.rpc.thrift.TCLIService");
Class hiveConfClass = Class.forName("org.apache.hadoop.hive.conf.HiveConf");
Class hiveTransClass = Class.forName("org.apache.thrift.transport.HiveTSaslServerTransport");
Class hiveMetaClass = Class.forName("org.apache.hadoop.hive.metastore.api.MetaException");
Class hiveShimClass = Class.forName("org.apache.hadoop.hive.metastore.security.HadoopThriftAuthBridge23");
Class thriftCLIClass = Class.forName("org.apache.hive.service.cli.thrift.ThriftCLIService");
Class thriftType = Class.forName("org.apache.hadoop.hive.serde2.thrift.Type");
// 添加Hive依赖到作业
JarFinderUtil.addDependencyJars(config, hiveDriverClass, serviceThriftCLIClass, thriftCLIClass, thriftClass,
hiveConfClass, hiveTransClass, hiveMetaClass, hiveShimClass, thriftType);
// 添加Hive配置文件
config.addResource("hive-site.xml");
// 添加HBase配置文件
Configuration conf = HBaseConfiguration.create(config);
// 实例化作业对象
Job job = Job.getInstance(conf);
job.setJarByClass(MultiComponentExample.class);
// 配置mapper&reducer类
job.setMapperClass(MultiComponentMapper.class);
job.setReducerClass(MultiComponentReducer.class);
// 配置数据输入路径和输出路径
FileInputFormat.addInputPath(job, new Path(baseDir, INPUT_DIR_NAME + File.separator + "data.txt"));
FileOutputFormat.setOutputPath(job, new Path(baseDir, OUTPUT_DIR_NAME));
// 设置输出键值类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// HBase提供工具类添加HBase的运行依赖
TableMapReduceUtil.addDependencyJars(job);
// 提交作业
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
父主题: MapReduce访问多组件样例程序