Java样例代码
功能介绍
实时统计连续网购时间超过半个小时的女性网民信息,将统计结果直接打印或者输出写入到Kafka中。
Spark Streaming Write To Print代码样例
下面代码片段仅为演示,具体代码参见com.huawei.bigdata.spark.examples.FemaleInfoCollectionPrint:
// 参数解析: // <batchTime>为Streaming分批的处理间隔。 // <windowTime>为统计数据的时间跨度,时间单位都是秒。 // <topics>为Kafka中订阅的主题,多以逗号分隔。 // <brokers>为获取元数据的kafka地址。 public class FemaleInfoCollectionPrint { public static void main(String[] args) throws Exception { String batchTime = args[0]; final String windowTime = args[1]; String topics = args[2]; String brokers = args[3]; Duration batchDuration = Durations.seconds(Integer.parseInt(batchTime)); Duration windowDuration = Durations.seconds(Integer.parseInt(windowTime)); SparkConf conf = new SparkConf().setAppName("DataSightStreamingExample"); JavaStreamingContext jssc = new JavaStreamingContext(conf, batchDuration); // 设置Streaming的CheckPoint目录,由于窗口概念存在,该参数必须设置 jssc.checkpoint("checkpoint"); // 组装Kafka的主题列表 HashSet<String> topicsSet = new HashSet<String>(Arrays.asList(topics.split(","))); HashMap<String, String> kafkaParams = new HashMap<String, String>(); kafkaParams.put("metadata.broker.list", brokers); // 通过brokers和topics直接创建kafka stream // 1.接收Kafka中数据,生成相应DStream JavaDStream<String> lines = KafkaUtils.createDirectStream(jssc,String.class,String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topicsSet).map( new Function<Tuple2<String, String>, String>() { @Override public String call(Tuple2<String, String> tuple2) { return tuple2._2(); } } ); // 2.获取每一个行的字段属性 JavaDStream<Tuple3<String, String, Integer>> records = lines.map( new Function<String, Tuple3<String, String, Integer>>() { @Override public Tuple3<String, String, Integer> call(String line) throws Exception { String[] elems = line.split(","); return new Tuple3<String, String, Integer>(elems[0], elems[1], Integer.parseInt(elems[2])); } } ); // 3.筛选女性网民上网时间数据信息 JavaDStream<Tuple2<String, Integer>> femaleRecords = records.filter(new Function<Tuple3<String, String, Integer>, Boolean>() { public Boolean call(Tuple3<String, String, Integer> line) throws Exception { if (line._2().equals("female")) { return true; } else { return false; } } }).map(new Function<Tuple3<String, String, Integer>, Tuple2<String, Integer>>() { public Tuple2<String, Integer> call(Tuple3<String, String, Integer> stringStringIntegerTuple3) throws Exception { return new Tuple2<String, Integer>(stringStringIntegerTuple3._1(), stringStringIntegerTuple3._3()); } }); // 4.汇总在一个时间窗口内每个女性上网时间 JavaPairDStream<String, Integer> aggregateRecords = JavaPairDStream.fromJavaDStream(femaleRecords) .reduceByKeyAndWindow(new Function2<Integer, Integer, Integer>() { public Integer call(Integer integer, Integer integer2) throws Exception { return integer + integer2; } }, new Function2<Integer, Integer, Integer>() { public Integer call(Integer integer, Integer integer2) throws Exception { return integer - integer2; } }, windowDuration, batchDuration); JavaPairDStream<String, Integer> upTimeUser = aggregateRecords.filter(new Function<Tuple2<String, Integer>, Boolean>() { public Boolean call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception { if (stringIntegerTuple2._2() > 0.9 * Integer.parseInt(windowTime)) { return true; } else { return false; } } }); // 5.筛选连续上网时间超过阈值的用户,并获取结果 upTimeUser.print(); // 6.Streaming系统启动 jssc.start(); jssc.awaitTermination(); }
Spark Streaming Write To Kafka代码样例
下面代码片段仅为演示,具体代码参见com.huawei.bigdata.spark.examples.JavaDstreamKafkaWriter:
- Spark版本升级后,推荐使用新接口createDirectStream,老接口createStream仍然存在,但是性能和稳定性差,建议不要使用老接口开发应用程序。
- 该样例代码只存在于mrs-sample-project-1.6.0.zip中
// 参数解析: //<groupId> 消费者的group.id. //<brokers> broker的IP和端口. //<topic> kafka的topic. public class JavaDstreamKafkaWriter { public static void main(String[] args) throws InterruptedException { if (args.length != 3) { System.err.println("Usage: JavaDstreamKafkaWriter <groupId> <brokers> <topic>"); System.exit(1); } final String groupId = args[0]; final String brokers = args[1]; final String topic = args[2]; SparkConf sparkConf = new SparkConf().setAppName("KafkaWriter"); // 配置Kafka Properties kafkaParams = new Properties(); kafkaParams.put("metadata.broker.list", brokers); kafkaParams.put("group.id", groupId); kafkaParams.put("auto.offset.reset", "smallest"); // 创建一个Java streaming context JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.milliseconds(500)); // 向kafka发送数据 List<String> sentData = new ArrayList(); sentData.add("kafka_writer_test_msg_01"); sentData.add("kafka_writer_test_msg_02"); sentData.add("kafka_writer_test_msg_03"); // 创建RDD queue Queue<JavaRDD<String>> sent = new LinkedList(); sent.add(ssc.sparkContext().parallelize(sentData)); // 使用写入的数据创建Dstream JavaDStream wStream = ssc.queueStream(sent); // 写入Kafka JavaDStreamKafkaWriterFactory.fromJavaDStream(wStream).writeToKafka(kafkaParams, new Function<String, KeyedMessage<String, byte[]>>() { public KeyedMessage<String, byte[]> call(String s) { return new KeyedMessage(topic, s.getBytes()); } }); ssc.start(); ssc.awaitTermination(); } }