更新时间:2024-12-10 GMT+08:00

Java样例代码

功能介绍

在Spark应用中,通过使用Streaming调用kafka接口来获取数据,然后把数据经过分析后,找到对应的HBase表记录,再写到HBase表。

代码样例

下面代码片段仅为演示,具体代码参见:com.huawei.bigdata.spark.examples.SparkOnStreamingToHbase。

样例代码获取方式请参考获取MRS应用开发样例工程

代码样例:

/**
 * 运行streaming任务,根据value值从hbase table1表读取数据,把两者数据做操作后,更新到hbase table1表
 */
public class SparkOnStreamingToHbase {
  public static void main(String[] args) throws Exception {
    if (args.length < 4) {
      printUsage();
    }

    String checkPointDir = args[0];
    String topics = args[1];
    final String brokers = args[2];
    final String zkQuorum = args[3];

    Duration batchDuration = Durations.seconds(5);
    SparkConf sparkConf = new SparkConf().setAppName("SparkOnStreamingToHbase");
    JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, batchDuration);

    // 设置Streaming的CheckPoint目录
    if (!"nocp".equals(checkPointDir)) {
      jssc.checkpoint(checkPointDir);
    }

    final String columnFamily = "cf";
    final String zkClientPort = "24002";
    HashMap<String, String> kafkaParams = new HashMap<String, String>();
    kafkaParams.put("metadata.broker.list", brokers);

    String[] topicArr = topics.split(",");
    Set<String> topicSet = new HashSet<String>(Arrays.asList(topicArr));

    // 通过brokers和topics直接创建kafka stream
    // 接收Kafka中数据,生成相应DStream
    JavaDStream<String> lines = KafkaUtils.createDirectStream(jssc, String.class, String.class,
      StringDecoder.class, StringDecoder.class, kafkaParams, topicSet).map(
      new Function<Tuple2<String, String>, String>() {
        public String call(Tuple2<String, String> tuple2) {
          // map(_._1)是消息的key, map(_._2)是消息的value
          return tuple2._2();
        }
      }
    );

    lines.foreachRDD(
      new Function<JavaRDD<String>, Void>() {
        public Void call(JavaRDD<String> rdd) throws Exception {
          rdd.foreachPartition(
            new VoidFunction<Iterator<String>>() {
              public void call(Iterator<String> iterator) throws Exception {
                hBaseWriter(iterator, zkClientPort, zkQuorum, columnFamily);
              }
            }
          );
          return null;
        }
      }
    );

    jssc.start();
    jssc.awaitTermination();
  }

  /**
   * 在executor端写入数据
   * @param iterator  消息
   * @param zkClientPort
   * @param zkQuorum
   * @param columnFamily
   */
  private static void hBaseWriter(Iterator<String> iterator, String zkClientPort, String zkQuorum, String columnFamily) throws IOException {
    Configuration conf = HBaseConfiguration.create();
    conf.set("hbase.zookeeper.property.clientPort", zkClientPort);
    conf.set("hbase.zookeeper.quorum", zkQuorum);
    Connection connection = null;
    Table table = null;
    try {
      connection = ConnectionFactory.createConnection(conf);
      table = connection.getTable(TableName.valueOf("table1"));
      List<Get> rowList = new ArrayList<Get>();
      while (iterator.hasNext()) {
        Get get = new Get(iterator.next().getBytes());
        rowList.add(get);
      }
      // 获取table1的数据
      Result[] resultDataBuffer = table.get(rowList);
      // 设置table1的数据
      List<Put> putList = new ArrayList<Put>();
      for (int i = 0; i < resultDataBuffer.length; i++) {
        String row = new String(rowList.get(i).getRow());
        Result resultData = resultDataBuffer[i];
        if (!resultData.isEmpty()) {
          // 根据列簇和列,获取旧值
          String aCid = Bytes.toString(resultData.getValue(columnFamily.getBytes(), "cid".getBytes()));
          Put put = new Put(Bytes.toBytes(row));
          // 计算结果
          int resultValue = Integer.valueOf(row) + Integer.valueOf(aCid);
          put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("cid"), Bytes.toBytes(String.valueOf(resultValue)));
          putList.add(put);
        }
      }
      if (putList.size() > 0) {
        table.put(putList);
      }
    } catch (IOException e) {
      e.printStackTrace();
    } finally {
      if (table != null) {
        try {
          table.close();
        } catch (IOException e) {
          e.printStackTrace();
        }
      }
      if (connection != null) {
        try {
          // 关闭Hbase连接.
          connection.close();
        } catch (IOException e) {
          e.printStackTrace();
        }
      }
    }
  }
 

  private static void printUsage() {
    System.out.println("Usage: {checkPointDir} {topic} {brokerList} {zkQuorum}");
    System.exit(1);
  }
}