Updated on 2022-12-14 GMT+08:00

Streaming Task Prints the Same DAG Log Twice

Question

When using the Spark Streaming, execute the following command to run the program:

spark-submit -master yarn-client --conf spark.logLineage=true --jars $SPARK_HOME/jars/streamingClient/kafka-clients-0.8.2.1.jar,$SPARK_HOME/jars/streamingClient/kafka_2.11-0.8.2.1.jar,$SPARK_HOME/jars/streamingClient/spark-streaming-kafka-0-8_2.11-2.1.0.jar --class com.xxx.bigdata.spark.examples.FemaleInfoCollectionPrint /opt/female/SparkStreamingJavaExample-1.0.jar <checkpoint> <batchTime> <windowTime> <topics> <brokers>

In case of no Kafka data input, the RDD DAG structure displayed in a log is printed twice in a batch. The relevant log is as follows:

-------------------------------------------
Time: 1491447950000 ms
-------------------------------------------
17/04/06 11:06:00 INFO SparkContext: RDD's recursive dependencies:  
(2) MapPartitionsRDD[49] at filter at FemaleInfoCollectionPrint.java:111 []
   |  MapPartitionsRDD[48] at reduceByKeyAndWindow at FemaleInfoCollectionPrint.java:98 []
   |  CoGroupedRDD[47] at reduceByKeyAndWindow at FemaleInfoCollectionPrint.java:98 []
   |  MapPartitionsRDD[38] at reduceByKeyAndWindow at FemaleInfoCollectionPrint.java:98 []
   |      CachedPartitions: 2; MemorySize: 8.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
   |  ReliableCheckpointRDD[40] at print at FemaleInfoCollectionPrint.java:123 []
   |  ShuffledRDD[36] at reduceByKeyAndWindow at FemaleInfoCollectionPrint.java:98 []
   |      CachedPartitions: 2; MemorySize: 8.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
   +-(5) MapPartitionsRDD[35] at map at FemaleInfoCollectionPrint.java:81 []
      |  MapPartitionsRDD[34] at filter at FemaleInfoCollectionPrint.java:81 []
      |  MapPartitionsRDD[33] at map at FemaleInfoCollectionPrint.java:72 []
      |  MapPartitionsRDD[32] at map at FemaleInfoCollectionPrint.java:63 []
      |  KafkaRDD[31] at createDirectStream at FemaleInfoCollectionPrint.java:63 []
   |  ShuffledRDD[46] at reduceByKeyAndWindow at FemaleInfoCollectionPrint.java:98 []
   +-(5) MapPartitionsRDD[45] at map at FemaleInfoCollectionPrint.java:81 []
      |  MapPartitionsRDD[44] at filter at FemaleInfoCollectionPrint.java:81 []
      |  MapPartitionsRDD[43] at map at FemaleInfoCollectionPrint.java:72 []
      |  MapPartitionsRDD[42] at map at FemaleInfoCollectionPrint.java:63 []
  |  KafkaRDD[41] at createDirectStream at FemaleInfoCollectionPrint.java:63 []
17/04/06 11:06:00 INFO SparkContext: RDD's recursive dependencies:  (2) MapPartitionsRDD[48] at reduceByKeyAndWindow at FemaleInfoCollectionPrint.java:98 [Memory Serialized 1x Replicated]
   |       CachedPartitions: 1; MemorySize: 4.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
   |  CoGroupedRDD[47] at reduceByKeyAndWindow at FemaleInfoCollectionPrint.java:98 [Memory Serialized 1x Replicated]
   |  MapPartitionsRDD[38] at reduceByKeyAndWindow at FemaleInfoCollectionPrint.java:98 [Memory Serialized 1x Replicated]
   |      CachedPartitions: 2; MemorySize: 8.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
   |  ReliableCheckpointRDD[40] at print at FemaleInfoCollectionPrint.java:123 [Memory Serialized 1x Replicated]
   |  ShuffledRDD[36] at reduceByKeyAndWindow at FemaleInfoCollectionPrint.java:98 [Memory Serialized 1x Replicated]
   |      CachedPartitions: 2; MemorySize: 8.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
   +-(5) MapPartitionsRDD[35] at map at FemaleInfoCollectionPrint.java:81 [Memory Serialized 1x Replicated]
      |  MapPartitionsRDD[34] at filter at FemaleInfoCollectionPrint.java:81 [Memory Serialized 1x Replicated]
      |  MapPartitionsRDD[33] at map at FemaleInfoCollectionPrint.java:72 [Memory Serialized 1x Replicated]
      |  MapPartitionsRDD[32] at map at FemaleInfoCollectionPrint.java:63 [Memory Serialized 1x Replicated]
      |  KafkaRDD[31] at createDirectStream at FemaleInfoCollectionPrint.java:63 [Memory Serialized 1x Replicated]
   |  ShuffledRDD[46] at reduceByKeyAndWindow at FemaleInfoCollectionPrint.java:98 [Memory Serialized 1x Replicated]
   |      CachedPartitions: 1; MemorySize: 4.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
   +-(5) MapPartitionsRDD[45] at map at FemaleInfoCollectionPrint.java:81 [Memory Serialized 1x Replicated]
      |  MapPartitionsRDD[44] at filter at FemaleInfoCollectionPrint.java:81 [Memory Serialized 1x Replicated]
      |  MapPartitionsRDD[43] at map at FemaleInfoCollectionPrint.java:72 [Memory Serialized 1x Replicated]
      |  MapPartitionsRDD[42] at map at FemaleInfoCollectionPrint.java:63 [Memory Serialized 1x Replicated]
  |  KafkaRDD[41] at createDirectStream at FemaleInfoCollectionPrint.java:63 [Memory Serialized 1x Replicated]
-------------------------------------------
Time: 1491447960000 ms
-------------------------------------------

Answer

The application uses the print operator in the DStream to display results. The print operator invokes the take operator in the RDD to perform underlying calculations.

The take operator triggers calculations for many times in units of partitions.

In this case, the take operator has two partitions by default because of the shuffle operation. The Spark first calculates the first partition. Fewer than 10 results are obtained because there is no data input. Then, the second calculation is triggered. As a result, the same RDD DAG log is printed twice.

To solve this problem, change the print operator to for each (collect) in the codes.