Updated on 2024-08-10 GMT+08:00

Connecting Spark Streaming to Kafka0-10 (Java)

Function

The project uses Streaming in Spark applications to call Kafka APIs to obtain word records or write data to Kafka0-10. Word records are classified to obtain the number of records of each word.

Sample Code for Streaming to Read Kafka0-10

The following code snippets are used as an example. For complete codes, see com.huawei.bigdata.spark.examples.SecurityKafkaWordCount.

/**
  * One or more topic messages from Kafka
  * <checkPointDir> is the Spark Streaming checkpoint directory.
  * <brokers> is used for bootstrapping. The producer only uses it to obtain metadata.
  * <topics> is a list of one or more Kafka topics to be consumed.
  * <batchTime> is the duration (in seconds) of one Spark Streaming batch.
 */
public class SecurityKafkaWordCount
{
  public static void main(String[] args) throws Exception {
    JavaStreamingContext ssc = createContext(args);

    //Start the Streaming system.
    ssc.start();
    try {
      ssc.awaitTermination();
    } catch (InterruptedException e) {
    }
  }

  private static JavaStreamingContext createContext(String[] args) throws Exception {
    String checkPointDir = args[0];
    String brokers = args[1];
    String topics = args[2];
    String batchTime = args[3];

    //Create a Streaming startup environment.
    SparkConf sparkConf = new SparkConf().setAppName("KafkaWordCount");
    JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new Duration(Long.parseLong(batchTime) * 1000));

    //Set the CheckPoint directory of Streaming.
    //This parameter is mandatory because a window concept exists.
    ssc.checkpoint(checkPointDir);

    //Obtain the list of topics used by Kafka.
    String[] topicArr = topics.split(",");
    Set<String> topicSet = new HashSet<String>(Arrays.asList(topicArr));
    Map<String, Object> kafkaParams = new HashMap();
    kafkaParams.put("bootstrap.servers", brokers);
    kafkaParams.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    kafkaParams.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    kafkaParams.put("group.id", "DemoConsumer");
    kafkaParams.put("security.protocol", "SASL_PLAINTEXT");
    kafkaParams.put("sasl.kerberos.service.name", "kafka");
    kafkaParams.put("kerberos.domain.name", "hadoop.<System domain name>");

    LocationStrategy locationStrategy = LocationStrategies.PreferConsistent();
    ConsumerStrategy consumerStrategy = ConsumerStrategies.Subscribe(topicSet, kafkaParams);

    //Create a direct kafka stream using brokers and topics.
    //Receive data from Kafka and generate the corresponding DStream.
    JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream(ssc, locationStrategy, consumerStrategy);

    //Obtain the field attribute of each row.
    JavaDStream<String> lines = messages.map(new Function<ConsumerRecord<String, String>, String>() {
      @Override
      public String call(ConsumerRecord<String, String> tuple2) throws Exception {
        return tuple2.value();
      }
    });

    //Sum the total time for calculating the number of words.
    JavaPairDStream<String, Integer> wordCounts = lines.mapToPair(
        new PairFunction<String, String, Integer>() {
          @Override
          public Tuple2<String, Integer> call(String s) {
            return new Tuple2<String, Integer>(s, 1);
          }
        }).reduceByKey(new Function2<Integer, Integer, Integer>() {
      @Override
      public Integer call(Integer i1, Integer i2) {
        return i1 + i2;
      }
    }).updateStateByKey(
        new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() {
          @Override
          public Optional<Integer> call(List<Integer> values, Optional<Integer> state) {
            int out = 0;
            if (state.isPresent()) {
              out += state.get();
            }
            for (Integer v : values) {
              out += v;
            }
            return Optional.of(out);
          }
        });

    //Print the result.
    wordCounts.print();
    return ssc;
  }
}

Streaming Write To Kafka 0–10 Sample Code

The following code snippets are used as an example. For complete codes, see com.huawei.bigdata.spark.examples.DstreamKafkaWriter.

You are advised to use the new API createDirectStream to develop applications instead of the old API createStream. While the old API remains functional, the new API offers improved performance and stability.

/**
 * Parameter description:
 * <groupId> is the customer group ID.
 * <brokers> is the Kafka address for obtaining metadata.
 * <topic> is the topic subscribed in Kafka.
 */
public class JavaDstreamKafkaWriter {

  public static void main(String[] args) throws InterruptedException {
    if (args.length != 3) {
      System.err.println("Usage: JavaDstreamKafkaWriter <groupId> <brokers> <topic>");
      System.exit(1);
    }

    final String groupId = args[0];
    final String brokers = args[1];
    final String topic = args[2];

    SparkConf sparkConf = new SparkConf().setAppName("KafkaWriter");

    // Enter the properties of Kafka.
    Map<String, Object> kafkaParams = new HashMap<String, Object>();
    kafkaParams.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
    kafkaParams.put("key.deserializer" , "org.apache.kafka.common.serialization.StringDeserializer");
    kafkaParams.put("value.serializer" , "org.apache.kafka.common.serialization.ByteArraySerializer");
    kafkaParams.put("key.serializer" , "org.apache.kafka.common.serialization.StringSerializer");
    kafkaParams.put("bootstrap.servers", brokers);
    kafkaParams.put("group.id", groupId);
    kafkaParams.put("auto.offset.reset", "smallest");

    // Create the context of Java Spark Streaming.
    JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.milliseconds(500));

    // Enter data to be written to Kafka.
    List<String> sentData = new ArrayList();
    sentData.add("kafka_writer_test_msg_01");
    sentData.add("kafka_writer_test_msg_02");
    sentData.add("kafka_writer_test_msg_03");

    // Create a Java RDD queue.
    Queue<JavaRDD<String>> sent = new LinkedList();
    sent.add(ssc.sparkContext().parallelize(sentData));

    // Create a Java DStream for writing data.
    JavaDStream wStream = ssc.queueStream(sent);

    // Write data to Kafka.
    JavaDStreamKafkaWriterFactory.fromJavaDStream(wStream).writeToKafka(
        JavaConverters.mapAsScalaMapConverter(kafkaParams).asScala(),
        new Function<String, ProducerRecord<String, byte[]>>() {
          public ProducerRecord<String, byte[]> call(String s) throws Exception {
            return new ProducerRecord(topic, s.toString().getBytes());
          }
        });

    ssc.start();
    ssc.awaitTermination();
  }
}