Java样例代码
功能介绍
实时统计连续网购时间超过半个小时的女性网民信息,将统计结果直接打印或者输出写入到Kafka中。
Spark Streaming Write To Print代码样例
下面代码片段仅为演示,具体代码参见com.huawei.bigdata.spark.examples.FemaleInfoCollectionPrint:
// 参数解析:
// <batchTime>为Streaming分批的处理间隔。
// <windowTime>为统计数据的时间跨度,时间单位都是秒。
// <topics>为Kafka中订阅的主题,多以逗号分隔。
// <brokers>为获取元数据的kafka地址。
public class FemaleInfoCollectionPrint {
public static void main(String[] args) throws Exception {
String batchTime = args[0];
final String windowTime = args[1];
String topics = args[2];
String brokers = args[3];
Duration batchDuration = Durations.seconds(Integer.parseInt(batchTime));
Duration windowDuration = Durations.seconds(Integer.parseInt(windowTime));
SparkConf conf = new SparkConf().setAppName("DataSightStreamingExample");
JavaStreamingContext jssc = new JavaStreamingContext(conf, batchDuration);
// 设置Streaming的CheckPoint目录,由于窗口概念存在,该参数必须设置
jssc.checkpoint("checkpoint");
// 组装Kafka的主题列表
HashSet<String> topicsSet = new HashSet<String>(Arrays.asList(topics.split(",")));
HashMap<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put("metadata.broker.list", brokers);
// 通过brokers和topics直接创建kafka stream
// 1.接收Kafka中数据,生成相应DStream
JavaDStream<String> lines = KafkaUtils.createDirectStream(jssc,String.class,String.class,
StringDecoder.class, StringDecoder.class, kafkaParams, topicsSet).map(
new Function<Tuple2<String, String>, String>() {
@Override
public String call(Tuple2<String, String> tuple2) {
return tuple2._2();
}
}
);
// 2.获取每一个行的字段属性
JavaDStream<Tuple3<String, String, Integer>> records = lines.map(
new Function<String, Tuple3<String, String, Integer>>() {
@Override
public Tuple3<String, String, Integer> call(String line) throws Exception {
String[] elems = line.split(",");
return new Tuple3<String, String, Integer>(elems[0], elems[1], Integer.parseInt(elems[2]));
}
}
);
// 3.筛选女性网民上网时间数据信息
JavaDStream<Tuple2<String, Integer>> femaleRecords = records.filter(new Function<Tuple3<String, String, Integer>, Boolean>() {
public Boolean call(Tuple3<String, String, Integer> line) throws Exception {
if (line._2().equals("female")) {
return true;
} else {
return false;
}
}
}).map(new Function<Tuple3<String, String, Integer>, Tuple2<String, Integer>>() {
public Tuple2<String, Integer> call(Tuple3<String, String, Integer> stringStringIntegerTuple3) throws Exception {
return new Tuple2<String, Integer>(stringStringIntegerTuple3._1(), stringStringIntegerTuple3._3());
}
});
// 4.汇总在一个时间窗口内每个女性上网时间
JavaPairDStream<String, Integer> aggregateRecords = JavaPairDStream.fromJavaDStream(femaleRecords)
.reduceByKeyAndWindow(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer integer, Integer integer2) throws Exception {
return integer + integer2;
}
}, new Function2<Integer, Integer, Integer>() {
public Integer call(Integer integer, Integer integer2) throws Exception {
return integer - integer2;
}
}, windowDuration, batchDuration);
JavaPairDStream<String, Integer> upTimeUser = aggregateRecords.filter(new Function<Tuple2<String, Integer>, Boolean>() {
public Boolean call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
if (stringIntegerTuple2._2() > 0.9 * Integer.parseInt(windowTime)) {
return true;
} else {
return false;
}
}
});
// 5.筛选连续上网时间超过阈值的用户,并获取结果
upTimeUser.print();
// 6.Streaming系统启动
jssc.start();
jssc.awaitTermination();
}
Spark Streaming Write To Kafka代码样例
下面代码片段仅为演示,具体代码参见com.huawei.bigdata.spark.examples.JavaDstreamKafkaWriter:
- Spark版本升级后,推荐使用新接口createDirectStream,老接口createStream仍然存在,但是性能和稳定性差,建议不要使用老接口开发应用程序。
- 该样例代码只存在于mrs-sample-project-1.6.0.zip中
// 参数解析:
//<groupId> 消费者的group.id.
//<brokers> broker的IP和端口.
//<topic> kafka的topic.
public class JavaDstreamKafkaWriter {
public static void main(String[] args) throws InterruptedException {
if (args.length != 3) {
System.err.println("Usage: JavaDstreamKafkaWriter <groupId> <brokers> <topic>");
System.exit(1);
}
final String groupId = args[0];
final String brokers = args[1];
final String topic = args[2];
SparkConf sparkConf = new SparkConf().setAppName("KafkaWriter");
// 配置Kafka
Properties kafkaParams = new Properties();
kafkaParams.put("metadata.broker.list", brokers);
kafkaParams.put("group.id", groupId);
kafkaParams.put("auto.offset.reset", "smallest");
// 创建一个Java streaming context
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.milliseconds(500));
// 向kafka发送数据
List<String> sentData = new ArrayList();
sentData.add("kafka_writer_test_msg_01");
sentData.add("kafka_writer_test_msg_02");
sentData.add("kafka_writer_test_msg_03");
// 创建RDD queue
Queue<JavaRDD<String>> sent = new LinkedList();
sent.add(ssc.sparkContext().parallelize(sentData));
// 使用写入的数据创建Dstream
JavaDStream wStream = ssc.queueStream(sent);
// 写入Kafka
JavaDStreamKafkaWriterFactory.fromJavaDStream(wStream).writeToKafka(kafkaParams,
new Function<String, KeyedMessage<String, byte[]>>() {
public KeyedMessage<String, byte[]> call(String s) {
return new KeyedMessage(topic, s.getBytes());
}
});
ssc.start();
ssc.awaitTermination();
}
}