更新时间:2022-07-19 GMT+08:00
分享

Java样例代码

功能介绍

实时统计连续网购时间超过半个小时的女性网民信息,将统计结果直接打印或者输出写入到Kafka中。

Spark Streaming Write To Print代码样例

下面代码片段仅为演示,具体代码参见com.huawei.bigdata.spark.examples.FemaleInfoCollectionPrint:

    // 参数解析:
    // <batchTime>为Streaming分批的处理间隔。
    // <windowTime>为统计数据的时间跨度,时间单位都是秒。
    // <topics>为Kafka中订阅的主题,多以逗号分隔。
    // <brokers>为获取元数据的kafka地址。
public class FemaleInfoCollectionPrint {
    public static void main(String[] args) throws Exception {

        String batchTime = args[0];
        final String windowTime = args[1];
        String topics = args[2];
        String brokers = args[3];

        Duration batchDuration = Durations.seconds(Integer.parseInt(batchTime));
        Duration windowDuration = Durations.seconds(Integer.parseInt(windowTime));


        SparkConf conf = new SparkConf().setAppName("DataSightStreamingExample");
        JavaStreamingContext jssc = new JavaStreamingContext(conf, batchDuration);

        // 设置Streaming的CheckPoint目录,由于窗口概念存在,该参数必须设置
        jssc.checkpoint("checkpoint");

        // 组装Kafka的主题列表
        HashSet<String> topicsSet = new HashSet<String>(Arrays.asList(topics.split(",")));
        HashMap<String, String> kafkaParams = new HashMap<String, String>();
        kafkaParams.put("metadata.broker.list", brokers);

        // 通过brokers和topics直接创建kafka stream
        // 1.接收Kafka中数据,生成相应DStream
        JavaDStream<String> lines = KafkaUtils.createDirectStream(jssc,String.class,String.class,
                StringDecoder.class,  StringDecoder.class, kafkaParams, topicsSet).map(
                new Function<Tuple2<String, String>, String>() {
                    @Override
                    public String call(Tuple2<String, String> tuple2) {
                        return tuple2._2();
                    }
                }
        );

        // 2.获取每一个行的字段属性
        JavaDStream<Tuple3<String, String, Integer>> records = lines.map(
                new Function<String, Tuple3<String, String, Integer>>() {
                    @Override
                    public Tuple3<String, String, Integer> call(String line) throws Exception {
                        String[] elems = line.split(",");
                        return new Tuple3<String, String, Integer>(elems[0], elems[1], Integer.parseInt(elems[2]));
                    }
                }
        );

        // 3.筛选女性网民上网时间数据信息
        JavaDStream<Tuple2<String, Integer>> femaleRecords = records.filter(new Function<Tuple3<String, String, Integer>, Boolean>() {
            public Boolean call(Tuple3<String, String, Integer> line) throws Exception {
                if (line._2().equals("female")) {
                    return true;
                } else {
                    return false;
                }
            }
        }).map(new Function<Tuple3<String, String, Integer>, Tuple2<String, Integer>>() {
            public Tuple2<String, Integer> call(Tuple3<String, String, Integer> stringStringIntegerTuple3) throws Exception {
                return new Tuple2<String, Integer>(stringStringIntegerTuple3._1(), stringStringIntegerTuple3._3());
            }
        });

        // 4.汇总在一个时间窗口内每个女性上网时间
        JavaPairDStream<String, Integer> aggregateRecords = JavaPairDStream.fromJavaDStream(femaleRecords)
                .reduceByKeyAndWindow(new Function2<Integer, Integer, Integer>() {
                    public Integer call(Integer integer, Integer integer2) throws Exception {
                        return integer + integer2;
                    }
                }, new Function2<Integer, Integer, Integer>() {
                    public Integer call(Integer integer, Integer integer2) throws Exception {
                        return integer - integer2;
                    }
                }, windowDuration, batchDuration);


        JavaPairDStream<String, Integer> upTimeUser = aggregateRecords.filter(new Function<Tuple2<String, Integer>, Boolean>() {
            public Boolean call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
                if (stringIntegerTuple2._2() > 0.9 * Integer.parseInt(windowTime)) {
                    return true;
                } else {
                    return false;
                }
            }
        });

        // 5.筛选连续上网时间超过阈值的用户,并获取结果
        upTimeUser.print();

        // 6.Streaming系统启动
        jssc.start();
        jssc.awaitTermination();

    }

Spark Streaming Write To Kafka代码样例

下面代码片段仅为演示,具体代码参见com.huawei.bigdata.spark.examples.JavaDstreamKafkaWriter:

  • Spark版本升级后,推荐使用新接口createDirectStream,老接口createStream仍然存在,但是性能和稳定性差,建议不要使用老接口开发应用程序。
  • 该样例代码只存在于mrs-sample-project-1.6.0.zip中
    // 参数解析:
    //<groupId> 消费者的group.id.
    //<brokers> broker的IP和端口.
    //<topic> kafka的topic.
public class JavaDstreamKafkaWriter {

  public static void main(String[] args) throws InterruptedException {

    if (args.length != 3) {
      System.err.println("Usage: JavaDstreamKafkaWriter <groupId> <brokers> <topic>");
      System.exit(1);
    }

    final String groupId = args[0];
    final String brokers = args[1];
    final String topic = args[2];

    SparkConf sparkConf = new SparkConf().setAppName("KafkaWriter");

    // 配置Kafka
    Properties kafkaParams = new Properties();
    kafkaParams.put("metadata.broker.list", brokers);
    kafkaParams.put("group.id", groupId);
    kafkaParams.put("auto.offset.reset", "smallest");

    // 创建一个Java streaming context
    JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.milliseconds(500));

    // 向kafka发送数据
    List<String> sentData = new ArrayList();
    sentData.add("kafka_writer_test_msg_01");
    sentData.add("kafka_writer_test_msg_02");
    sentData.add("kafka_writer_test_msg_03");

    // 创建RDD queue
    Queue<JavaRDD<String>> sent = new LinkedList();
    sent.add(ssc.sparkContext().parallelize(sentData));

    // 使用写入的数据创建Dstream
    JavaDStream wStream = ssc.queueStream(sent);

    // 写入Kafka
    JavaDStreamKafkaWriterFactory.fromJavaDStream(wStream).writeToKafka(kafkaParams,
        new Function<String, KeyedMessage<String, byte[]>>() {
            public KeyedMessage<String, byte[]> call(String s) {
              return new KeyedMessage(topic, s.getBytes());
            }
    });

    ssc.start();
    ssc.awaitTermination();
  }
}

相关文档