Spark从Hive读取数据再写入HBase样例程序(Java)
功能介绍
在Spark应用中,通过使用Spark调用Hive接口来操作hive表,然后把Hive表的数据经过分析后写到HBase表。
代码样例
下面代码片段仅为演示,具体代码参见:com.huawei.bigdata.spark.examples.SparkHivetoHbase
/**
* 从hive表读取数据,根据key值去hbase表获取相应记录,把两者数据做操作后,更新到hbase表
*/
public class SparkHivetoHbase {
public static void main(String[] args) throws Exception {
// 通过spark接口获取表中的数据
SparkConf conf = new SparkConf().setAppName("SparkHivetoHbase");
JavaSparkContext jsc = new JavaSparkContext(conf);
HiveContext sqlContext = new org.apache.spark.sql.hive.HiveContext(jsc);
SparkSession spark = SparkSession
Dataset<Row> dataFrame = sqlContext.sql("select name, account from person");
// 遍历hive表中的每一个partition, 然后更新到hbase表
// 如果数据条数较少,也可以使用foreach()方法
dataFrame.toJavaRDD().foreachPartition(
new VoidFunction<Iterator<Row>>() {
public void call(Iterator<Row> iterator) throws Exception {
hBaseWriter(iterator);
}
}
);
spark.stop();
}
/**
* 在executor端更新hbase表记录
*
* @param iterator hive表的partition数据
*/
private static void hBaseWriter(Iterator<Row> iterator) throws IOException {
// 读取hbase
String tableName = "table2";
String columnFamily = "cf";
Configuration conf = HBaseConfiguration.create();
Connection connection = null;
Table table = null;
try {
connection = ConnectionFactory.createConnection(conf);
table = connection.getTable(TableName.valueOf(tableName));
List<Row> table1List = new ArrayList<Row>();
List<Get> rowList = new ArrayList<Get>();
while (iterator.hasNext()) {
Row item = iterator.next();
Get get = new Get(item.getString(0).getBytes());
table1List.add(item);
rowList.add(get);
}
// 获取hbase表记录
Result[] resultDataBuffer = table.get(rowList);
// 修改hbase表记录
List<Put> putList = new ArrayList<Put>();
for (int i = 0; i < resultDataBuffer.length; i++) {
// hive表值
Result resultData = resultDataBuffer[i];
if (!resultData.isEmpty()) {
// get hiveValue
int hiveValue = table1List.get(i).getInt(1);
// 根据列簇和列,获取hbase值
String hbaseValue = Bytes.toString(resultData.getValue(columnFamily.getBytes(), "cid".getBytes()));
Put put = new Put(table1List.get(i).getString(0).getBytes());
// 计算结果
int resultValue = hiveValue + Integer.valueOf(hbaseValue);
// 设置结果到put对象
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("cid"), Bytes.toBytes(String.valueOf(resultValue)));
putList.add(put);
}
}
if (putList.size() > 0) {
table.put(putList);
}
} catch (IOException e) {
e.printStackTrace();
} finally {
if (table != null) {
try {
table.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (connection != null) {
try {
// 关闭Hbase连接.
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}