文档首页/
MapReduce服务 MRS/
开发指南(普通版_3.x)/
Spark2x开发指南(普通模式)/
开发Spark应用/
Spark Streaming对接Kafka0-10样例程序/
Spark Streaming对接Kafka0-10样例程序(Java)
更新时间:2024-08-05 GMT+08:00
Spark Streaming对接Kafka0-10样例程序(Java)
功能介绍
在Spark应用中,通过使用Streaming调用Kafka接口来获取单词记录,然后把单词记录分类统计,得到每个单词记录数,或将数据写入Kafka0-10。
Streaming读取Kafka0-10代码样例
下面代码片段仅为演示,具体代码参见:com.huawei.bigdata.spark.examples.KafkaWordCount。
/**
* 从Kafka的一个或多个主题消息。
* <checkPointDir>是Spark Streaming检查点目录。
* <brokers>是用于自举,制作人只会使用它来获取元数据
* <topics>是要消费的一个或多个kafka主题的列表
* <batchTime>是Spark Streaming批次持续时间(以秒为单位)。
*/
public class KafkaWordCount
{
public static void main(String[] args) {
JavaStreamingContext ssc = createContext(args);
//启动Streaming系统。
ssc.start();
try {
ssc.awaitTermination();
} catch (InterruptedException e) {
}
}
private static JavaStreamingContext createContext(String[] args) {
String checkPointDir = args[0];
String brokers = args[1];
String topics = args[2];
String batchTime = args[3];
// 新建一个Streaming启动环境。
SparkConf sparkConf = new SparkConf().setAppName("KafkaWordCount");
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new Duration(Long.parseLong(batchTime) * 1000));
//配置Streaming的CheckPoint目录。
//由于窗口概念的存在,此参数是必需的。
ssc.checkpoint(checkPointDir);
// 获取kafka使用的topic列表。
String[] topicArr = topics.split(",");
Set<String> topicSet = new HashSet<String>(Arrays.asList(topicArr));
Map<String, Object> kafkaParams = new HashMap();
kafkaParams.put("bootstrap.servers", brokers);
kafkaParams.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
kafkaParams.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
kafkaParams.put("group.id", "DemoConsumer");
LocationStrategy locationStrategy = LocationStrategies.PreferConsistent();
ConsumerStrategy consumerStrategy = ConsumerStrategies.Subscribe(topicSet, kafkaParams);
// 用brokers and topics新建direct kafka stream
//从Kafka接收数据并生成相应的DStream。
JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream(ssc, locationStrategy, consumerStrategy);
// 获取每行中的字段属性。
JavaDStream<String> lines = messages.map(new Function<ConsumerRecord<String, String>, String>() {
@Override
public String call(ConsumerRecord<String, String> tuple2) throws Exception {
return tuple2.value();
}
});
// 汇总计算字数的总时间。
JavaPairDStream<String, Integer> wordCounts = lines.mapToPair(
new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) {
return new Tuple2<String, Integer>(s, 1);
}
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
}).updateStateByKey(
new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() {
@Override
public Optional<Integer> call(List<Integer> values, Optional<Integer> state) {
int out = 0;
if (state.isPresent()) {
out += state.get();
}
for (Integer v : values) {
out += v;
}
return Optional.of(out);
}
});
// 打印结果
wordCounts.print();
return ssc;
}
}
Streaming Write To Kafka 0-10样例代码
下面代码片段仅为演示,具体代码参见:com.huawei.bigdata.spark.examples.DstreamKafkaWriter。
建议使用新的API createDirectStream代替旧的API createStream进行应用程序开发。旧的API仍然可以使用,但新的API性能和稳定性更好。
/**
* 参数解析:
* <groupId>为客户的组编号。
* <brokers>为获取元数据的Kafka地址。
* <topic>为Kafka中订阅的主题。
*/
public class JavaDstreamKafkaWriter {
public static void main(String[] args) throws InterruptedException {
if (args.length != 3) {
System.err.println("Usage: JavaDstreamKafkaWriter <groupId> <brokers> <topic>");
System.exit(1);
}
final String groupId = args[0];
final String brokers = args[1];
final String topic = args[2];
SparkConf sparkConf = new SparkConf().setAppName("KafkaWriter");
// 填写Kafka的properties。
Map<String, Object> kafkaParams = new HashMap<String, Object>();
kafkaParams.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
kafkaParams.put("key.deserializer" , "org.apache.kafka.common.serialization.StringDeserializer");
kafkaParams.put("value.serializer" , "org.apache.kafka.common.serialization.ByteArraySerializer");
kafkaParams.put("key.serializer" , "org.apache.kafka.common.serialization.StringSerializer");
kafkaParams.put("bootstrap.servers", brokers);
kafkaParams.put("group.id", groupId);
kafkaParams.put("auto.offset.reset", "smallest");
// 创建Java Spark Streaming的Context。
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.milliseconds(500));
// 填写写入Kafka的数据。
List<String> sentData = new ArrayList();
sentData.add("kafka_writer_test_msg_01");
sentData.add("kafka_writer_test_msg_02");
sentData.add("kafka_writer_test_msg_03");
// 创建Java RDD队列。
Queue<JavaRDD<String>> sent = new LinkedList();
sent.add(ssc.sparkContext().parallelize(sentData));
// 创建写数据的Java DStream。
JavaDStream wStream = ssc.queueStream(sent);
// 写入Kafka。
JavaDStreamKafkaWriterFactory.fromJavaDStream(wStream).writeToKafka(
JavaConverters.mapAsScalaMapConverter(kafkaParams).asScala(),
new Function<String, ProducerRecord<String, byte[]>>() {
public ProducerRecord<String, byte[]> call(String s) throws Exception {
return new ProducerRecord(topic, s.toString().getBytes());
}
});
ssc.start();
ssc.awaitTermination();
}
}