更新时间:2024-11-25 GMT+08:00
Java样例代码
功能介绍
在Spark应用中,通过使用Streaming调用kafka接口来获取数据,然后把数据经过分析后,找到对应的HBase表记录,再写到HBase表。
代码样例
下面代码片段仅为演示,具体代码参见:com.huawei.bigdata.spark.examples.SparkOnStreamingToHbase。
样例代码获取方式请参考获取MRS应用开发样例工程。
代码样例:
/** * 运行streaming任务,根据value值从hbase table1表读取数据,把两者数据做操作后,更新到hbase table1表 */ public class SparkOnStreamingToHbase { public static void main(String[] args) throws Exception { if (args.length < 4) { printUsage(); } String checkPointDir = args[0]; String topics = args[1]; final String brokers = args[2]; final String zkQuorum = args[3]; Duration batchDuration = Durations.seconds(5); SparkConf sparkConf = new SparkConf().setAppName("SparkOnStreamingToHbase"); JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, batchDuration); // 设置Streaming的CheckPoint目录 if (!"nocp".equals(checkPointDir)) { jssc.checkpoint(checkPointDir); } final String columnFamily = "cf"; final String zkClientPort = "24002"; HashMap<String, String> kafkaParams = new HashMap<String, String>(); kafkaParams.put("metadata.broker.list", brokers); String[] topicArr = topics.split(","); Set<String> topicSet = new HashSet<String>(Arrays.asList(topicArr)); // 通过brokers和topics直接创建kafka stream // 接收Kafka中数据,生成相应DStream JavaDStream<String> lines = KafkaUtils.createDirectStream(jssc, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topicSet).map( new Function<Tuple2<String, String>, String>() { public String call(Tuple2<String, String> tuple2) { // map(_._1)是消息的key, map(_._2)是消息的value return tuple2._2(); } } ); lines.foreachRDD( new Function<JavaRDD<String>, Void>() { public Void call(JavaRDD<String> rdd) throws Exception { rdd.foreachPartition( new VoidFunction<Iterator<String>>() { public void call(Iterator<String> iterator) throws Exception { hBaseWriter(iterator, zkClientPort, zkQuorum, columnFamily); } } ); return null; } } ); jssc.start(); jssc.awaitTermination(); } /** * 在executor端写入数据 * @param iterator 消息 * @param zkClientPort * @param zkQuorum * @param columnFamily */ private static void hBaseWriter(Iterator<String> iterator, String zkClientPort, String zkQuorum, String columnFamily) throws IOException { Configuration conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.property.clientPort", zkClientPort); conf.set("hbase.zookeeper.quorum", zkQuorum); Connection connection = null; Table table = null; try { connection = ConnectionFactory.createConnection(conf); table = connection.getTable(TableName.valueOf("table1")); List<Get> rowList = new ArrayList<Get>(); while (iterator.hasNext()) { Get get = new Get(iterator.next().getBytes()); rowList.add(get); } // 获取table1的数据 Result[] resultDataBuffer = table.get(rowList); // 设置table1的数据 List<Put> putList = new ArrayList<Put>(); for (int i = 0; i < resultDataBuffer.length; i++) { String row = new String(rowList.get(i).getRow()); Result resultData = resultDataBuffer[i]; if (!resultData.isEmpty()) { // 根据列簇和列,获取旧值 String aCid = Bytes.toString(resultData.getValue(columnFamily.getBytes(), "cid".getBytes())); Put put = new Put(Bytes.toBytes(row)); // 计算结果 int resultValue = Integer.valueOf(row) + Integer.valueOf(aCid); put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("cid"), Bytes.toBytes(String.valueOf(resultValue))); putList.add(put); } } if (putList.size() > 0) { table.put(putList); } } catch (IOException e) { e.printStackTrace(); } finally { if (table != null) { try { table.close(); } catch (IOException e) { e.printStackTrace(); } } if (connection != null) { try { // 关闭Hbase连接. connection.close(); } catch (IOException e) { e.printStackTrace(); } } } } private static void printUsage() { System.out.println("Usage: {checkPointDir} {topic} {brokerList} {zkQuorum}"); System.exit(1); } }