Connecting Spark Streaming to Kafka0-10 (Java)
Function
The project uses Streaming in Spark applications to call Kafka APIs to obtain word records or write data to Kafka0-10. Word records are classified to obtain the number of records of each word.
Sample Code for Streaming to Read Kafka0-10
The following code snippets are used as an example. For complete codes, see com.huawei.bigdata.spark.examples.SecurityKafkaWordCount.
/** *One or more topic messages from Kafka * <checkPointDir> is the Spark Streaming checkpoint directory. * <brokers> is used for bootstrapping. The producer only uses it to obtain metadata. * <topics> is a list of one or more Kafka topics to be consumed. * <batchTime> is the duration (in seconds) of one Spark Streaming batch. */ public class SecurityKafkaWordCount { public static void main(String[] args) throws Exception { JavaStreamingContext ssc = createContext(args); //Start the Streaming system. ssc.start(); try { ssc.awaitTermination(); } catch (InterruptedException e) { } } private static JavaStreamingContext createContext(String[] args) throws Exception { String checkPointDir = args[0]; String brokers = args[1]; String topics = args[2]; String batchTime = args[3]; //Create a Streaming startup environment. SparkConf sparkConf = new SparkConf().setAppName("KafkaWordCount"); JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new Duration(Long.parseLong(batchTime) * 1000)); //Set the CheckPoint directory of Streaming. //This parameter is mandatory because a window concept exists. ssc.checkpoint(checkPointDir); //Obtain the list of topics used by Kafka. String[] topicArr = topics.split(","); Set<String> topicSet = new HashSet<String>(Arrays.asList(topicArr)); Map<String, Object> kafkaParams = new HashMap(); kafkaParams.put("bootstrap.servers", brokers); kafkaParams.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); kafkaParams.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); kafkaParams.put("group.id", "DemoConsumer"); kafkaParams.put("security.protocol", "SASL_PLAINTEXT"); kafkaParams.put("sasl.kerberos.service.name", "kafka"); kafkaParams.put("kerberos.domain.name", "hadoop.<System domain name>"); LocationStrategy locationStrategy = LocationStrategies.PreferConsistent(); ConsumerStrategy consumerStrategy = ConsumerStrategies.Subscribe(topicSet, kafkaParams); //Create a direct kafka stream using brokers and topics. //Receive data from Kafka and generate the corresponding DStream. JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream(ssc, locationStrategy, consumerStrategy); //Obtain the field attribute of each row. JavaDStream<String> lines = messages.map(new Function<ConsumerRecord<String, String>, String>() { @Override public String call(ConsumerRecord<String, String> tuple2) throws Exception { return tuple2.value(); } }); //Sum the total time for calculating the number of words. JavaPairDStream<String, Integer> wordCounts = lines.mapToPair( new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String s) { return new Tuple2<String, Integer>(s, 1); } }).reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer i1, Integer i2) { return i1 + i2; } }).updateStateByKey( new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() { @Override public Optional<Integer> call(List<Integer> values, Optional<Integer> state) { int out = 0; if (state.isPresent()) { out += state.get(); } for (Integer v : values) { out += v; } return Optional.of(out); } }); //Print the result. wordCounts.print(); return ssc; } }
Streaming Write To Kafka 0–10 Sample Code
The following code snippets are used as an example. For complete codes, see com.huawei.bigdata.spark.examples.DstreamKafkaWriter.
You are advised to use the new API createDirectStream to develop applications instead of the old API createStream. While the old API remains functional, the new API offers improved performance and stability.
/** * Parameter description: * <groupId> is the customer group ID. * <brokers> is the Kafka address for obtaining metadata. * <topic> is the topic subscribed in Kafka. */ public class JavaDstreamKafkaWriter { public static void main(String[] args) throws InterruptedException { if (args.length != 3) { System.err.println("Usage: JavaDstreamKafkaWriter <groupId> <brokers> <topic>"); System.exit(1); } final String groupId = args[0]; final String brokers = args[1]; final String topic = args[2]; SparkConf sparkConf = new SparkConf().setAppName("KafkaWriter"); // Enter the properties of Kafka. Map<String, Object> kafkaParams = new HashMap<String, Object>(); kafkaParams.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); kafkaParams.put("key.deserializer" , "org.apache.kafka.common.serialization.StringDeserializer"); kafkaParams.put("value.serializer" , "org.apache.kafka.common.serialization.ByteArraySerializer"); kafkaParams.put("key.serializer" , "org.apache.kafka.common.serialization.StringSerializer"); kafkaParams.put("bootstrap.servers", brokers); kafkaParams.put("group.id", groupId); kafkaParams.put("auto.offset.reset", "smallest"); // Create the context of Java Spark Streaming. JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.milliseconds(500)); // Enter data to be written to Kafka. List<String> sentData = new ArrayList(); sentData.add("kafka_writer_test_msg_01"); sentData.add("kafka_writer_test_msg_02"); sentData.add("kafka_writer_test_msg_03"); // Create a Java RDD queue. Queue<JavaRDD<String>> sent = new LinkedList(); sent.add(ssc.sparkContext().parallelize(sentData)); // Create a Java DStream for writing data. JavaDStream wStream = ssc.queueStream(sent); // Write data to Kafka. JavaDStreamKafkaWriterFactory.fromJavaDStream(wStream).writeToKafka( JavaConverters.mapAsScalaMapConverter(kafkaParams).asScala(), new Function<String, ProducerRecord<String, byte[]>>() { public ProducerRecord<String, byte[]> call(String s) throws Exception { return new ProducerRecord(topic, s.toString().getBytes()); } }); ssc.start(); ssc.awaitTermination(); } }
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot