更新时间:2024-07-19 GMT+08:00

Streaming任务打印两次相同DAG日志

问题

在使用Spark Streaming时,使用以下命令运行程序:

spark-submit -master yarn-client --conf spark.logLineage=true --jars $SPARK_HOME/jars/streamingClient/kafka-clients-0.8.2.1.jar,$SPARK_HOME/jars/streamingClient/kafka_2.11-0.8.2.1.jar,$SPARK_HOME/jars/streamingClient/spark-streaming-kafka-0-8_2.11-2.1.0.jar --class com..bigdata.spark.examples.FemaleInfoCollectionPrint /opt/female/SparkStreamingJavaExample-1.0.jar <checkpoint> <batchTime> <windowTime> <topics> <brokers>

在没有Kafka数据输入的情况下,日志中显示的RDD的DAG结构会在一个Batch中打印两次,相关日志如下所示:

-------------------------------------------
Time: 1491447950000 ms

-------------------------------------------
17/04/06 11:06:00 INFO SparkContext: RDD's recursive dependencies:  
(2) MapPartitionsRDD[49] at filter at FemaleInfoCollectionPrint.java:111 []
   |  MapPartitionsRDD[48] at reduceByKeyAndWindow at FemaleInfoCollectionPrint.java:98 []
   |  CoGroupedRDD[47] at reduceByKeyAndWindow at FemaleInfoCollectionPrint.java:98 []
   |  MapPartitionsRDD[38] at reduceByKeyAndWindow at FemaleInfoCollectionPrint.java:98 []
   |      CachedPartitions: 2; MemorySize: 8.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
   |  ReliableCheckpointRDD[40] at print at FemaleInfoCollectionPrint.java:123 []
   |  ShuffledRDD[36] at reduceByKeyAndWindow at FemaleInfoCollectionPrint.java:98 []
   |      CachedPartitions: 2; MemorySize: 8.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
   +-(5) MapPartitionsRDD[35] at map at FemaleInfoCollectionPrint.java:81 []
      |  MapPartitionsRDD[34] at filter at FemaleInfoCollectionPrint.java:81 []
      |  MapPartitionsRDD[33] at map at FemaleInfoCollectionPrint.java:72 []
      |  MapPartitionsRDD[32] at map at FemaleInfoCollectionPrint.java:63 []
      |  KafkaRDD[31] at createDirectStream at FemaleInfoCollectionPrint.java:63 []
   |  ShuffledRDD[46] at reduceByKeyAndWindow at FemaleInfoCollectionPrint.java:98 []
   +-(5) MapPartitionsRDD[45] at map at FemaleInfoCollectionPrint.java:81 []
      |  MapPartitionsRDD[44] at filter at FemaleInfoCollectionPrint.java:81 []
      |  MapPartitionsRDD[43] at map at FemaleInfoCollectionPrint.java:72 []
      |  MapPartitionsRDD[42] at map at FemaleInfoCollectionPrint.java:63 []
  |  KafkaRDD[41] at createDirectStream at FemaleInfoCollectionPrint.java:63 []
17/04/06 11:06:00 INFO SparkContext: RDD's recursive dependencies:  (2) MapPartitionsRDD[48] at reduceByKeyAndWindow at FemaleInfoCollectionPrint.java:98 [Memory Serialized 1x Replicated]
   |       CachedPartitions: 1; MemorySize: 4.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
   |  CoGroupedRDD[47] at reduceByKeyAndWindow at FemaleInfoCollectionPrint.java:98 [Memory Serialized 1x Replicated]
   |  MapPartitionsRDD[38] at reduceByKeyAndWindow at FemaleInfoCollectionPrint.java:98 [Memory Serialized 1x Replicated]
   |      CachedPartitions: 2; MemorySize: 8.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
   |  ReliableCheckpointRDD[40] at print at FemaleInfoCollectionPrint.java:123 [Memory Serialized 1x Replicated]
   |  ShuffledRDD[36] at reduceByKeyAndWindow at FemaleInfoCollectionPrint.java:98 [Memory Serialized 1x Replicated]
   |      CachedPartitions: 2; MemorySize: 8.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
   +-(5) MapPartitionsRDD[35] at map at FemaleInfoCollectionPrint.java:81 [Memory Serialized 1x Replicated]
      |  MapPartitionsRDD[34] at filter at FemaleInfoCollectionPrint.java:81 [Memory Serialized 1x Replicated]
      |  MapPartitionsRDD[33] at map at FemaleInfoCollectionPrint.java:72 [Memory Serialized 1x Replicated]
      |  MapPartitionsRDD[32] at map at FemaleInfoCollectionPrint.java:63 [Memory Serialized 1x Replicated]
      |  KafkaRDD[31] at createDirectStream at FemaleInfoCollectionPrint.java:63 [Memory Serialized 1x Replicated]
   |  ShuffledRDD[46] at reduceByKeyAndWindow at FemaleInfoCollectionPrint.java:98 [Memory Serialized 1x Replicated]
   |      CachedPartitions: 1; MemorySize: 4.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
   +-(5) MapPartitionsRDD[45] at map at FemaleInfoCollectionPrint.java:81 [Memory Serialized 1x Replicated]
      |  MapPartitionsRDD[44] at filter at FemaleInfoCollectionPrint.java:81 [Memory Serialized 1x Replicated]
      |  MapPartitionsRDD[43] at map at FemaleInfoCollectionPrint.java:72 [Memory Serialized 1x Replicated]
      |  MapPartitionsRDD[42] at map at FemaleInfoCollectionPrint.java:63 [Memory Serialized 1x Replicated]
  |  KafkaRDD[41] at createDirectStream at FemaleInfoCollectionPrint.java:63 [Memory Serialized 1x Replicated]
-------------------------------------------
Time: 1491447960000 ms
-------------------------------------------

解答

该应用程序中使用了DStream中的print算子来显示结果,该算子会调用RDD中的take算子来实现底层的计算。

Take算子会以Partition为单位多次触发计算。

在该问题中,由于Shuffle操作,导致take算子默认有两个Partition,Spark首先计算第一个Partition,但由于没有数据输入,导致获取结果不足10个,从而触发第二次计算,因此会出现RDD的DAG结构打印两次的现象。

在代码中将print算子修改为foreach(collect),该问题则不会出现。