Updated on 2022-09-14 GMT+08:00

Java Example Code

Function

In Spark applications, use Streaming to invoke Kafka interface to obtain word records. Collect the statistics of records for each word, and write the data to Kafka 0-10.

Code Sample (Streaming Read Data from Kafka 0-10)

The following code is an example. For details, see com.huawei.bigdata.spark.examples.KafkaWordCount.

/**
 * Consumes messages from one or more topics in Kafka.
 * <checkPointDir> is the Spark Streaming checkpoint directory.
 * <brokers> is for bootstrapping and the producer will only use it for getting metadata
 * <topics> is a list of one or more kafka topics to consume from
 * <batchTime> is the Spark Streaming batch duration in seconds.
 */
public class KafkaWordCount
{
  public static void main(String[] args) {
    JavaStreamingContext ssc = createContext(args);
    //The Streaming system starts.
    ssc.start();
    try {
      ssc.awaitTermination();
    } catch (InterruptedException e) {
    }
  }
  private static JavaStreamingContext createContext(String[] args) {
    String checkPointDir = args[0];
    String brokers = args[1];
    String topics = args[2];
    String batchTime = args[3];

    // Create a Streaming startup environment.
    SparkConf sparkConf = new SparkConf().setAppName("KafkaWordCount");
    JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new Duration(Long.parseLong(batchTime) * 1000));

    //Configure the CheckPoint directory for the Streaming.
    //This parameter is mandatory because of existence of the window concept.
    ssc.checkpoint(checkPointDir);

    // Get the list of topic used by kafka
    String[] topicArr = topics.split(",");
    Set<String> topicSet = new HashSet<String>(Arrays.asList(topicArr));
    Map<String, Object> kafkaParams = new HashMap();
    kafkaParams.put("bootstrap.servers", brokers);
    kafkaParams.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    kafkaParams.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    kafkaParams.put("group.id", "DemoConsumer");

    LocationStrategy locationStrategy = LocationStrategies.PreferConsistent();
    ConsumerStrategy consumerStrategy = ConsumerStrategies.Subscribe(topicSet, kafkaParams);

    // Create direct kafka stream with brokers and topics
    // Receive data from the Kafka and generate the corresponding DStream
    JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream(ssc, locationStrategy, consumerStrategy);

    // Obtain field properties in each row.
    JavaDStream<String> lines = messages.map(new Function<ConsumerRecord<String, String>, String>() {
      @Override
      public String call(ConsumerRecord<String, String> tuple2) throws Exception {
        return tuple2.value();
      }
    });

    // Aggregate the total time that calculate word count
    JavaPairDStream<String, Integer> wordCounts = lines.mapToPair(
        new PairFunction<String, String, Integer>() {
       @Override
          public Tuple2<String, Integer> call(String s) {
            return new Tuple2<String, Integer>(s, 1);
          }
        }).reduceByKey(new Function2<Integer, Integer, Integer>() {
      @Override
      public Integer call(Integer i1, Integer i2) {
        return i1 + i2;
      }
    }).updateStateByKey(
        new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() {
          @Override
          public Optional<Integer> call(List<Integer> values, Optional<Integer> state) {
            int out = 0;
            if (state.isPresent()) {
              out += state.get();
            }
            for (Integer v : values) {
              out += v;
            }
            return Optional.of(out);
          }
        });

    // print the results
    wordCounts.print();
    return ssc;
  }
}

Example Code (Streaming Write To Kafka 0-10)

The following code segment is only an example. For details, see com.huawei.bigdata.spark.examples.DstreamKafkaWriter.

It is advisable to use the new API createDirectStream instead of the old API createStream for application development. The old API continues to exist, but its performance and stability are worse than the new API.

/**
 * Parameter description:
 * <groupId> is the group ID for the consumer.
 * <brokers> is for bootstrapping and the producer will only use
 * <topic> is a kafka topic to consume from.
 */
public class JavaDstreamKafkaWriter {

  public static void main(String[] args) throws InterruptedException {
    if (args.length != 3) {
      System.err.println("Usage: JavaDstreamKafkaWriter <groupId> <brokers> <topic>");
      System.exit(1);
    }

    final String groupId = args[0];
    final String brokers = args[1];
    final String topic = args[2];

    SparkConf sparkConf = new SparkConf().setAppName("KafkaWriter");

    // Populate Kafka properties
    Map<String, Object> kafkaParams = new HashMap<String, Object>();
    kafkaParams.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
    kafkaParams.put("key.deserializer" , "org.apache.kafka.common.serialization.StringDeserializer");
    kafkaParams.put("value.serializer" , "org.apache.kafka.common.serialization.ByteArraySerializer");
    kafkaParams.put("key.serializer" , "org.apache.kafka.common.serialization.StringSerializer");
    kafkaParams.put("bootstrap.servers", brokers);
    kafkaParams.put("group.id", groupId);
    kafkaParams.put("auto.offset.reset", "smallest");

    // Create Spark Java streaming context
    JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.milliseconds(500));

    // Populate data to write to kafka
    List<String> sentData = new ArrayList();
    sentData.add("kafka_writer_test_msg_01");
    sentData.add("kafka_writer_test_msg_02");
    sentData.add("kafka_writer_test_msg_03");

    // Create Java RDD queue
    Queue<JavaRDD<String>> sent = new LinkedList();
    sent.add(ssc.sparkContext().parallelize(sentData));

    // Create java Dstream with the data to be written
    JavaDStream wStream = ssc.queueStream(sent);

    // Write to kafka
    JavaDStreamKafkaWriterFactory.fromJavaDStream(wStream).writeToKafka(
        JavaConverters.mapAsScalaMapConverter(kafkaParams).asScala(),
        new Function<String, ProducerRecord<String, byte[]>>() {
          public ProducerRecord<String, byte[]> call(String s) throws Exception {
            return new ProducerRecord(topic, s.toString().getBytes());
          }
        });

    ssc.start();
    ssc.awaitTermination();
  }
}