更新时间:2024-11-25 GMT+08:00
Java样例代码
功能介绍
在Spark应用中,通过使用Spark调用Hive接口来操作hive表,然后把Hive表的数据经过分析后写到HBase表。
代码样例
下面代码片段仅为演示,具体代码参见:com.huawei.bigdata.spark.examples.SparkHivetoHbase。
样例代码获取方式请参考获取MRS应用开发样例工程。
代码样例:
/** * 从hive表读取数据,根据key值去hbase表获取相应记录,把两者数据做操作后,更新到hbase表 */ public class SparkHivetoHbase { public static void main(String[] args) throws Exception { if (args.length < 1) { printUsage(); } // 通过spark接口获取表中的数据 SparkConf conf = new SparkConf().setAppName("SparkHivetoHbase"); JavaSparkContext jsc = new JavaSparkContext(conf); HiveContext sqlContext = new org.apache.spark.sql.hive.HiveContext(jsc); DataFrame dataFrame = sqlContext.sql("select name, account from person"); // 遍历hive表中的每一个partition, 然后更新到hbase表 // 如果数据条数较少,也可以使用foreach()方法 final String zkQuorum = args[0]; dataFrame.toJavaRDD().foreachPartition( new VoidFunction<Iterator<Row>>() { public void call(Iterator<Row> iterator) throws Exception { hBaseWriter(iterator,zkQuorum); } } ); jsc.stop(); } /** * 在executor端更新hbase表记录 * * @param iterator hive表的partition数据 */ private static void hBaseWriter(Iterator<Row> iterator, String zkQuorum) throws IOException { // 读取hbase String tableName = "table2"; String columnFamily = "cf"; Configuration conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.property.clientPort", "24002"); conf.set("hbase.zookeeper.quorum", zkQuorum); Connection connection = null; Table table = null; try { connection = ConnectionFactory.createConnection(conf); table = connection.getTable(TableName.valueOf(tableName)); List<Row> table1List = new ArrayList<Row>(); List<Get> rowList = new ArrayList<Get>(); while (iterator.hasNext()) { Row item = iterator.next(); Get get = new Get(item.getString(0).getBytes()); table1List.add(item); rowList.add(get); } // 获取hbase表记录 Result[] resultDataBuffer = table.get(rowList); // 修改hbase表记录 List<Put> putList = new ArrayList<Put>(); for (int i = 0; i < resultDataBuffer.length; i++) { // hive表值 Result resultData = resultDataBuffer[i]; if (!resultData.isEmpty()) { // get hiveValue int hiveValue = table1List.get(i).getInt(1); // 根据列簇和列,获取hbase值 String hbaseValue = Bytes.toString(resultData.getValue(columnFamily.getBytes(), "cid".getBytes())); Put put = new Put(table1List.get(i).getString(0).getBytes()); // 计算结果 int resultValue = hiveValue + Integer.valueOf(hbaseValue); // 设置结果到put对象 put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("cid"), Bytes.toBytes(String.valueOf(resultValue))); putList.add(put); } } if (putList.size() > 0) { table.put(putList); } } catch (IOException e) { e.printStackTrace(); } finally { if (table != null) { try { table.close(); } catch (IOException e) { e.printStackTrace(); } } if (connection != null) { try { // 关闭Hbase连接. connection.close(); } catch (IOException e) { e.printStackTrace(); } } } } private static void printUsage() { System.out.println("Usage: {zkQuorum}"); System.exit(1); } }
父主题: 从Hive读取数据再写入HBase