文档首页/
MapReduce服务 MRS/
开发指南(普通版_2.x及之前)/
Spark开发指南/
开发Spark应用/
Streaming从Kafka读取数据再写入HBase/
Java样例代码
更新时间:2024-12-10 GMT+08:00
Java样例代码
功能介绍
在Spark应用中,通过使用Streaming调用kafka接口来获取数据,然后把数据经过分析后,找到对应的HBase表记录,再写到HBase表。
代码样例
下面代码片段仅为演示,具体代码参见:com.huawei.bigdata.spark.examples.SparkOnStreamingToHbase。
样例代码获取方式请参考获取MRS应用开发样例工程。
代码样例:
/**
* 运行streaming任务,根据value值从hbase table1表读取数据,把两者数据做操作后,更新到hbase table1表
*/
public class SparkOnStreamingToHbase {
public static void main(String[] args) throws Exception {
if (args.length < 4) {
printUsage();
}
String checkPointDir = args[0];
String topics = args[1];
final String brokers = args[2];
final String zkQuorum = args[3];
Duration batchDuration = Durations.seconds(5);
SparkConf sparkConf = new SparkConf().setAppName("SparkOnStreamingToHbase");
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, batchDuration);
// 设置Streaming的CheckPoint目录
if (!"nocp".equals(checkPointDir)) {
jssc.checkpoint(checkPointDir);
}
final String columnFamily = "cf";
final String zkClientPort = "24002";
HashMap<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put("metadata.broker.list", brokers);
String[] topicArr = topics.split(",");
Set<String> topicSet = new HashSet<String>(Arrays.asList(topicArr));
// 通过brokers和topics直接创建kafka stream
// 接收Kafka中数据,生成相应DStream
JavaDStream<String> lines = KafkaUtils.createDirectStream(jssc, String.class, String.class,
StringDecoder.class, StringDecoder.class, kafkaParams, topicSet).map(
new Function<Tuple2<String, String>, String>() {
public String call(Tuple2<String, String> tuple2) {
// map(_._1)是消息的key, map(_._2)是消息的value
return tuple2._2();
}
}
);
lines.foreachRDD(
new Function<JavaRDD<String>, Void>() {
public Void call(JavaRDD<String> rdd) throws Exception {
rdd.foreachPartition(
new VoidFunction<Iterator<String>>() {
public void call(Iterator<String> iterator) throws Exception {
hBaseWriter(iterator, zkClientPort, zkQuorum, columnFamily);
}
}
);
return null;
}
}
);
jssc.start();
jssc.awaitTermination();
}
/**
* 在executor端写入数据
* @param iterator 消息
* @param zkClientPort
* @param zkQuorum
* @param columnFamily
*/
private static void hBaseWriter(Iterator<String> iterator, String zkClientPort, String zkQuorum, String columnFamily) throws IOException {
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.property.clientPort", zkClientPort);
conf.set("hbase.zookeeper.quorum", zkQuorum);
Connection connection = null;
Table table = null;
try {
connection = ConnectionFactory.createConnection(conf);
table = connection.getTable(TableName.valueOf("table1"));
List<Get> rowList = new ArrayList<Get>();
while (iterator.hasNext()) {
Get get = new Get(iterator.next().getBytes());
rowList.add(get);
}
// 获取table1的数据
Result[] resultDataBuffer = table.get(rowList);
// 设置table1的数据
List<Put> putList = new ArrayList<Put>();
for (int i = 0; i < resultDataBuffer.length; i++) {
String row = new String(rowList.get(i).getRow());
Result resultData = resultDataBuffer[i];
if (!resultData.isEmpty()) {
// 根据列簇和列,获取旧值
String aCid = Bytes.toString(resultData.getValue(columnFamily.getBytes(), "cid".getBytes()));
Put put = new Put(Bytes.toBytes(row));
// 计算结果
int resultValue = Integer.valueOf(row) + Integer.valueOf(aCid);
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("cid"), Bytes.toBytes(String.valueOf(resultValue)));
putList.add(put);
}
}
if (putList.size() > 0) {
table.put(putList);
}
} catch (IOException e) {
e.printStackTrace();
} finally {
if (table != null) {
try {
table.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (connection != null) {
try {
// 关闭Hbase连接.
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
private static void printUsage() {
System.out.println("Usage: {checkPointDir} {topic} {brokerList} {zkQuorum}");
System.exit(1);
}
}