Streaming任务打印两次相同DAG日志
问题
在使用Spark Streaming时,使用以下命令运行程序:
spark-submit -master yarn-client --conf spark.logLineage=true --jars $SPARK_HOME/jars/streamingClient/kafka-clients-0.8.2.1.jar,$SPARK_HOME/jars/streamingClient/kafka_2.11-0.8.2.1.jar,$SPARK_HOME/jars/streamingClient/spark-streaming-kafka-0-8_2.11-2.1.0.jar --class com.huawei.bigdata.spark.examples.FemaleInfoCollectionPrint /opt/female/SparkStreamingJavaExample-1.0.jar <checkpoint> <batchTime> <windowTime> <topics> <brokers>
在没有Kafka数据输入的情况下,日志中显示的RDD的DAG结构会在一个Batch中打印两次,相关日志如下所示:
------------------------------------------- Time: 1491447950000 ms -------------------------------------------
17/04/06 11:06:00 INFO SparkContext: RDD's recursive dependencies: (2) MapPartitionsRDD[49] at filter at FemaleInfoCollectionPrint.java:111 [] | MapPartitionsRDD[48] at reduceByKeyAndWindow at FemaleInfoCollectionPrint.java:98 [] | CoGroupedRDD[47] at reduceByKeyAndWindow at FemaleInfoCollectionPrint.java:98 [] | MapPartitionsRDD[38] at reduceByKeyAndWindow at FemaleInfoCollectionPrint.java:98 [] | CachedPartitions: 2; MemorySize: 8.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B | ReliableCheckpointRDD[40] at print at FemaleInfoCollectionPrint.java:123 [] | ShuffledRDD[36] at reduceByKeyAndWindow at FemaleInfoCollectionPrint.java:98 [] | CachedPartitions: 2; MemorySize: 8.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B +-(5) MapPartitionsRDD[35] at map at FemaleInfoCollectionPrint.java:81 [] | MapPartitionsRDD[34] at filter at FemaleInfoCollectionPrint.java:81 [] | MapPartitionsRDD[33] at map at FemaleInfoCollectionPrint.java:72 [] | MapPartitionsRDD[32] at map at FemaleInfoCollectionPrint.java:63 [] | KafkaRDD[31] at createDirectStream at FemaleInfoCollectionPrint.java:63 [] | ShuffledRDD[46] at reduceByKeyAndWindow at FemaleInfoCollectionPrint.java:98 [] +-(5) MapPartitionsRDD[45] at map at FemaleInfoCollectionPrint.java:81 [] | MapPartitionsRDD[44] at filter at FemaleInfoCollectionPrint.java:81 [] | MapPartitionsRDD[43] at map at FemaleInfoCollectionPrint.java:72 [] | MapPartitionsRDD[42] at map at FemaleInfoCollectionPrint.java:63 [] | KafkaRDD[41] at createDirectStream at FemaleInfoCollectionPrint.java:63 []
17/04/06 11:06:00 INFO SparkContext: RDD's recursive dependencies: (2) MapPartitionsRDD[48] at reduceByKeyAndWindow at FemaleInfoCollectionPrint.java:98 [Memory Serialized 1x Replicated] | CachedPartitions: 1; MemorySize: 4.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B | CoGroupedRDD[47] at reduceByKeyAndWindow at FemaleInfoCollectionPrint.java:98 [Memory Serialized 1x Replicated] | MapPartitionsRDD[38] at reduceByKeyAndWindow at FemaleInfoCollectionPrint.java:98 [Memory Serialized 1x Replicated] | CachedPartitions: 2; MemorySize: 8.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B | ReliableCheckpointRDD[40] at print at FemaleInfoCollectionPrint.java:123 [Memory Serialized 1x Replicated] | ShuffledRDD[36] at reduceByKeyAndWindow at FemaleInfoCollectionPrint.java:98 [Memory Serialized 1x Replicated] | CachedPartitions: 2; MemorySize: 8.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B +-(5) MapPartitionsRDD[35] at map at FemaleInfoCollectionPrint.java:81 [Memory Serialized 1x Replicated] | MapPartitionsRDD[34] at filter at FemaleInfoCollectionPrint.java:81 [Memory Serialized 1x Replicated] | MapPartitionsRDD[33] at map at FemaleInfoCollectionPrint.java:72 [Memory Serialized 1x Replicated] | MapPartitionsRDD[32] at map at FemaleInfoCollectionPrint.java:63 [Memory Serialized 1x Replicated] | KafkaRDD[31] at createDirectStream at FemaleInfoCollectionPrint.java:63 [Memory Serialized 1x Replicated] | ShuffledRDD[46] at reduceByKeyAndWindow at FemaleInfoCollectionPrint.java:98 [Memory Serialized 1x Replicated] | CachedPartitions: 1; MemorySize: 4.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B +-(5) MapPartitionsRDD[45] at map at FemaleInfoCollectionPrint.java:81 [Memory Serialized 1x Replicated] | MapPartitionsRDD[44] at filter at FemaleInfoCollectionPrint.java:81 [Memory Serialized 1x Replicated] | MapPartitionsRDD[43] at map at FemaleInfoCollectionPrint.java:72 [Memory Serialized 1x Replicated] | MapPartitionsRDD[42] at map at FemaleInfoCollectionPrint.java:63 [Memory Serialized 1x Replicated] | KafkaRDD[41] at createDirectStream at FemaleInfoCollectionPrint.java:63 [Memory Serialized 1x Replicated]
------------------------------------------- Time: 1491447960000 ms -------------------------------------------
解答
该应用程序中使用了DStream中的print算子来显示结果,该算子会调用RDD中的take算子来实现底层的计算。
Take算子会以Partition为单位多次触发计算。
在该问题中,由于Shuffle操作,导致take算子默认有两个Partition,Spark首先计算第一个Partition,但由于没有数据输入,导致获取结果不足10个,从而触发第二次计算,因此会出现RDD的DAG结构打印两次的现象。
在代码中将print算子修改为foreach(collect),该问题则不会出现。