Updated on 2022-09-14 GMT+08:00

Java Sample Code

Function Description

In Spark applications, use Streaming to call Kafka APIs to obtain data and write data analysis results to an HBase table.

Sample Code

The following code snippets are used as an example. For complete codes, see com.huawei.bigdata.spark.examples.SparkOnStreamingToHbase.

/**
 * Run a Streaming job. Read data from HBase table1 based on the value, sum two data records, and update the new data in the HBase table1.
 */
public class SparkOnStreamingToHbase {
  public static void main(String[] args) throws Exception {
    if (args.length < 4) {
      printUsage();
    }

    String checkPointDir = args[0];
    String topics = args[1];
    final String brokers = args[2];
    final String zkQuorum = args[3];

    Duration batchDuration = Durations.seconds(5);
    SparkConf sparkConf = new SparkConf().setAppName("SparkOnStreamingToHbase");
    JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, batchDuration);

    // Set the CheckPoint directory of Streaming.
    if (!"nocp".equals(checkPointDir)) {
      jssc.checkpoint(checkPointDir);
    }

    final String columnFamily = "cf";
    final String zkClientPort = "24002";
    HashMap<String, String> kafkaParams = new HashMap<String, String>();
    kafkaParams.put("metadata.broker.list", brokers);

    String[] topicArr = topics.split(",");
    Set<String> topicSet = new HashSet<String>(Arrays.asList(topicArr));

    // Create a kafka stream by using brokers and topics.
    // Receive data from Kafka and generate the corresponding DStream. 
    JavaDStream<String> lines = KafkaUtils.createDirectStream(jssc, String.class, String.class,
      StringDecoder.class, StringDecoder.class, kafkaParams, topicSet).map(
      new Function<Tuple2<String, String>, String>() {
        public String call(Tuple2<String, String> tuple2) {
          // map(_._1) is the key of the message, and map(_._2) is the value of the message.
          return tuple2._2();
        }
      }
    );

    lines.foreachRDD(
      new Function<JavaRDD<String>, Void>() {
        public Void call(JavaRDD<String> rdd) throws Exception {
          rdd.foreachPartition(
            new VoidFunction<Iterator<String>>() {
              public void call(Iterator<String> iterator) throws Exception {
                hBaseWriter(iterator, zkClientPort, zkQuorum, columnFamily);
              }
            }
          );
          return null;
        }
      }
    );

    jssc.start();
    jssc.awaitTermination();
  }

  /**
   * Write data to the executor.
   * @param iterator Message
   * @param zkClientPort
   * @param zkQuorum
   * @param columnFamily
   */
  private static void hBaseWriter(Iterator<String> iterator, String zkClientPort, String zkQuorum, String columnFamily) throws IOException {
    Configuration conf = HBaseConfiguration.create();
    conf.set("hbase.zookeeper.property.clientPort", zkClientPort);
    conf.set("hbase.zookeeper.quorum", zkQuorum);
    Connection connection = null;
    Table table = null;
    try {
      connection = ConnectionFactory.createConnection(conf);
      table = connection.getTable(TableName.valueOf("table1"));
      List<Get> rowList = new ArrayList<Get>();
      while (iterator.hasNext()) {
        Get get = new Get(iterator.next().getBytes());
        rowList.add(get);
      }
      // Obtain data in table1.
      Result[] resultDataBuffer = table.get(rowList);
      // Set data in table1.
      List<Put> putList = new ArrayList<Put>();
      for (int i = 0; i < resultDataBuffer.length; i++) {
        String row = new String(rowList.get(i).getRow());
        Result resultData = resultDataBuffer[i];
        if (!resultData.isEmpty()) {
          // Obtain the old value based on the column family and column.
          String aCid = Bytes.toString(resultData.getValue(columnFamily.getBytes(), "cid".getBytes()));
          Put put = new Put(Bytes.toBytes(row));
          // Calculate the result.
          int resultValue = Integer.valueOf(row) + Integer.valueOf(aCid);
          put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("cid"), Bytes.toBytes(String.valueOf(resultValue)));
          putList.add(put);
        }
      }
      if (putList.size() > 0) {
        table.put(putList);
      }
    } catch (IOException e) {
      e.printStackTrace();
    } finally {
      if (table != null) {
        try {
          table.close();
        } catch (IOException e) {
          e.printStackTrace();
        }
      }
      if (connection != null) {
        try {
          // Close the HBase connection.
          connection.close();
        } catch (IOException e) {
          e.printStackTrace();
        }
      }
    }
  }
 

  private static void printUsage() {
    System.out.println("Usage: {checkPointDir} {topic} {brokerList} {zkQuorum}");
    System.exit(1);
  }
}