Updated on 2022-06-01 GMT+08:00

Java Sample Code

Function Description

Collect statistics on female netizens who continuously dwell on online shopping for more than half an hour in real time. Print statistics directly or output statistics and write them to Kafka.

Spark Streaming Write To Print Sample Code

The following code snippets are used as an example. For complete codes, see com.huawei.bigdata.spark.examples.FemaleInfoCollectionPrint.

    // Parameter description:
    // <batchTime>: Interval for Streaming processing in batches.
    // <windowTime> is the time span of the statistics data. The unit is second.
    // <topics>: Topics subscribed in the Kafka. Multiple topics are separated by commas (,).
    // <brokers> is the Kafka address for obtaining metadata.
public class FemaleInfoCollectionPrint {
    public static void main(String[] args) throws Exception {

        String batchTime = args[0];
        final String windowTime = args[1];
        String topics = args[2];
        String brokers = args[3];

        Duration batchDuration = Durations.seconds(Integer.parseInt(batchTime));
        Duration windowDuration = Durations.seconds(Integer.parseInt(windowTime));


        SparkConf conf = new SparkConf().setAppName("DataSightStreamingExample");
        JavaStreamingContext jssc = new JavaStreamingContext(conf, batchDuration);

        // Set the CheckPoint directory of Streaming. This parameter is mandatory because the window concept exists.
        jssc.checkpoint("checkpoint");

        // Assemble a Kafka topic list.
        HashSet<String> topicsSet = new HashSet<String>(Arrays.asList(topics.split(",")));
        HashMap<String, String> kafkaParams = new HashMap<String, String>();
        kafkaParams.put("metadata.broker.list", brokers);

        // Create a kafka stream by using brokers and topics.
        // 1. Receive data from Kafka and generate the corresponding DStream. 
        JavaDStream<String> lines = KafkaUtils.createDirectStream(jssc,String.class,String.class,
                StringDecoder.class,  StringDecoder.class, kafkaParams, topicsSet).map(
                new Function<Tuple2<String, String>, String>() {
                    @Override
                    public String call(Tuple2<String, String> tuple2) {
                        return tuple2._2();
                    }
                }
        );

        // 2. Obtain the field attribute of each row.
        JavaDStream<Tuple3<String, String, Integer>> records = lines.map(
                new Function<String, Tuple3<String, String, Integer>>() {
                    @Override
                    public Tuple3<String, String, Integer> call(String line) throws Exception {
                        String[] elems = line.split(",");
                        return new Tuple3<String, String, Integer>(elems[0], elems[1], Integer.parseInt(elems[2]));
                    }
                }
        );

        // 3. Filter data information of the time that female netizens spend online. 
        JavaDStream<Tuple2<String, Integer>> femaleRecords = records.filter(new Function<Tuple3<String, String, Integer>, Boolean>() {
            public Boolean call(Tuple3<String, String, Integer> line) throws Exception {
                if (line._2().equals("female")) {
                    return true;
                } else {
                    return false;
                }
            }
        }).map(new Function<Tuple3<String, String, Integer>, Tuple2<String, Integer>>() {
            public Tuple2<String, Integer> call(Tuple3<String, String, Integer> stringStringIntegerTuple3) throws Exception {
                return new Tuple2<String, Integer>(stringStringIntegerTuple3._1(), stringStringIntegerTuple3._3());
            }
        });

        // 4. Summarize the total time that each female netizen spends online within a time window. 
        JavaPairDStream<String, Integer> aggregateRecords = JavaPairDStream.fromJavaDStream(femaleRecords)
                .reduceByKeyAndWindow(new Function2<Integer, Integer, Integer>() {
                    public Integer call(Integer integer, Integer integer2) throws Exception {
                        return integer + integer2;
                    }
                }, new Function2<Integer, Integer, Integer>() {
                    public Integer call(Integer integer, Integer integer2) throws Exception {
                        return integer - integer2;
                    }
                }, windowDuration, batchDuration);


        JavaPairDStream<String, Integer> upTimeUser = aggregateRecords.filter(new Function<Tuple2<String, Integer>, Boolean>() {
            public Boolean call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
                if (stringIntegerTuple2._2() > 0.9 * Integer.parseInt(windowTime)) {
                    return true;
                } else {
                    return false;
                }
            }
        });

        // 5. Filter data about netizens whose consecutive online duration exceeds the threshold, and obtain the results. 
        upTimeUser.print();

        // 6. Start Streaming.
        jssc.start();
        jssc.awaitTermination();

    }

Spark Streaming Write To Kafka Sample Code

The following code snippets are used as an example. For complete codes, see com.huawei.bigdata.spark.examples.JavaDstreamKafkaWriter.

  • After the Spark is upgraded, the new API createDirectStream is recommended. The old API createStream still exists, but the performance and stability are poor. You are advised not to use the old API to develop applications.
  • The sample code exists only in mrs-sample-project-1.6.0.zip.
        // Parameter description:
    //<groupId> Consumer's group.id
    //<brokers> IP address and port number of the broker
     //<topic> Topic of Kafka
public class JavaDstreamKafkaWriter {

  public static void main(String[] args) throws InterruptedException {

    if (args.length != 3) {
      System.err.println("Usage: JavaDstreamKafkaWriter <groupId> <brokers> <topic>");
      System.exit(1);
    }

    final String groupId = args[0];
    final String brokers = args[1];
    final String topic = args[2];

    SparkConf sparkConf = new SparkConf().setAppName("KafkaWriter");

    // Configure Kafka.
    Properties kafkaParams = new Properties();
    kafkaParams.put("metadata.broker.list", brokers);
    kafkaParams.put("group.id", groupId);
    kafkaParams.put("auto.offset.reset", "smallest");

    // Create Java streaming context.
    JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.milliseconds(500));

    // Send data to Kafka.
    List<String> sentData = new ArrayList();
    sentData.add("kafka_writer_test_msg_01");
    sentData.add("kafka_writer_test_msg_02");
    sentData.add("kafka_writer_test_msg_03");

   // Create an RDD queue.
    Queue<JavaRDD<String>> sent = new LinkedList();
    sent.add(ssc.sparkContext().parallelize(sentData));

   // Use the written data to create Dstream.
    JavaDStream wStream = ssc.queueStream(sent);

    // Write data to Kafka.
    JavaDStreamKafkaWriterFactory.fromJavaDStream(wStream).writeToKafka(kafkaParams,
        new Function<String, KeyedMessage<String, byte[]>>() {
            public KeyedMessage<String, byte[]> call(String s) {
              return new KeyedMessage(topic, s.getBytes());
            }
    });

    ssc.start();
    ssc.awaitTermination();
  }
}