Spark Streaming对接Kafka0-10样例程序(Java)
功能介绍
在Spark应用中,通过使用Streaming调用Kafka接口来获取单词记录,然后把单词记录分类统计,得到每个单词记录数,或将数据写入Kafka0-10。
Streaming读取Kafka0-10代码样例
下面代码片段仅为演示,具体代码参见:com.huawei.bigdata.spark.examples.SecurityKafkaWordCount。
/** *从Kafka的一个或多个主题消息。 * <checkPointDir>是Spark Streaming检查点目录。 * <brokers>是用于自举,制作人只会使用它来获取元数据 * <topics>是要消费的一个或多个kafka主题的列表 * <batchTime>是Spark Streaming批次持续时间(以秒为单位)。 */ public class SecurityKafkaWordCount { public static void main(String[] args) throws Exception { JavaStreamingContext ssc = createContext(args); //启动Streaming系统。 ssc.start(); try { ssc.awaitTermination(); } catch (InterruptedException e) { } } private static JavaStreamingContext createContext(String[] args) throws Exception { String checkPointDir = args[0]; String brokers = args[1]; String topics = args[2]; String batchTime = args[3]; //新建一个Streaming启动环境. SparkConf sparkConf = new SparkConf().setAppName("KafkaWordCount"); JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new Duration(Long.parseLong(batchTime) * 1000)); //配置Streaming的CheckPoint目录。 //由于窗口概念的存在,此参数是必需的。 ssc.checkpoint(checkPointDir); //获取kafka使用的topic列表。 String[] topicArr = topics.split(","); Set<String> topicSet = new HashSet<String>(Arrays.asList(topicArr)); Map<String, Object> kafkaParams = new HashMap(); kafkaParams.put("bootstrap.servers", brokers); kafkaParams.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); kafkaParams.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); kafkaParams.put("group.id", "DemoConsumer"); kafkaParams.put("security.protocol", "SASL_PLAINTEXT"); kafkaParams.put("sasl.kerberos.service.name", "kafka"); kafkaParams.put("kerberos.domain.name", "hadoop.<系统域名>"); LocationStrategy locationStrategy = LocationStrategies.PreferConsistent(); ConsumerStrategy consumerStrategy = ConsumerStrategies.Subscribe(topicSet, kafkaParams); //用brokers and topics新建direct kafka stream //从Kafka接收数据并生成相应的DStream。 JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream(ssc, locationStrategy, consumerStrategy); //获取每行中的字段属性。 JavaDStream<String> lines = messages.map(new Function<ConsumerRecord<String, String>, String>() { @Override public String call(ConsumerRecord<String, String> tuple2) throws Exception { return tuple2.value(); } }); //汇总计算字数的总时间。 JavaPairDStream<String, Integer> wordCounts = lines.mapToPair( new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String s) { return new Tuple2<String, Integer>(s, 1); } }).reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer i1, Integer i2) { return i1 + i2; } }).updateStateByKey( new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() { @Override public Optional<Integer> call(List<Integer> values, Optional<Integer> state) { int out = 0; if (state.isPresent()) { out += state.get(); } for (Integer v : values) { out += v; } return Optional.of(out); } }); //打印结果 wordCounts.print(); return ssc; } }
Streaming Write To Kafka 0-10样例代码
下面代码片段仅为演示,具体代码参见:com.huawei.bigdata.spark.examples.DstreamKafkaWriter。
建议使用新的API createDirectStream代替旧的API createStream进行应用程序开发。旧的API仍然可以使用,但新的API性能和稳定性更好。
/** * 参数解析: * <groupId>为客户的组编号。 * <brokers>为获取元数据的Kafka地址。 * <topic>为Kafka中订阅的主题。 */ public class JavaDstreamKafkaWriter { public static void main(String[] args) throws InterruptedException { if (args.length != 3) { System.err.println("Usage: JavaDstreamKafkaWriter <groupId> <brokers> <topic>"); System.exit(1); } final String groupId = args[0]; final String brokers = args[1]; final String topic = args[2]; SparkConf sparkConf = new SparkConf().setAppName("KafkaWriter"); // 填写Kafka的properties。 Map<String, Object> kafkaParams = new HashMap<String, Object>(); kafkaParams.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); kafkaParams.put("key.deserializer" , "org.apache.kafka.common.serialization.StringDeserializer"); kafkaParams.put("value.serializer" , "org.apache.kafka.common.serialization.ByteArraySerializer"); kafkaParams.put("key.serializer" , "org.apache.kafka.common.serialization.StringSerializer"); kafkaParams.put("bootstrap.servers", brokers); kafkaParams.put("group.id", groupId); kafkaParams.put("auto.offset.reset", "smallest"); // 创建Java Spark Streaming的Context。 JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.milliseconds(500)); // 填写写入Kafka的数据。 List<String> sentData = new ArrayList(); sentData.add("kafka_writer_test_msg_01"); sentData.add("kafka_writer_test_msg_02"); sentData.add("kafka_writer_test_msg_03"); // 创建Java RDD队列。 Queue<JavaRDD<String>> sent = new LinkedList(); sent.add(ssc.sparkContext().parallelize(sentData)); // 创建写数据的Java DStream。 JavaDStream wStream = ssc.queueStream(sent); // 写入Kafka。 JavaDStreamKafkaWriterFactory.fromJavaDStream(wStream).writeToKafka( JavaConverters.mapAsScalaMapConverter(kafkaParams).asScala(), new Function<String, ProducerRecord<String, byte[]>>() { public ProducerRecord<String, byte[]> call(String s) throws Exception { return new ProducerRecord(topic, s.toString().getBytes()); } }); ssc.start(); ssc.awaitTermination(); } }