文档首页/
    
      
      MapReduce服务 MRS/
      
      
        
        
        组件操作指南(普通版)/
        
        
        使用Spark2x(MRS 3.x及之后版本)/
        
        
        Spark2x常见问题/
        
        
        Spark Streaming/
        
      
      Streaming任务打印两次相同DAG日志
    
  
  
    
        更新时间:2023-04-10 GMT+08:00
        
          
          
        
      
      
      
      
      
      
      
      
  
      
      
      
        
Streaming任务打印两次相同DAG日志
问题
在使用Spark Streaming时,使用以下命令运行程序:
spark-submit -master yarn-client --conf spark.logLineage=true --jars $SPARK_HOME/jars/streamingClient/kafka-clients-0.8.2.1.jar,$SPARK_HOME/jars/streamingClient/kafka_2.11-0.8.2.1.jar,$SPARK_HOME/jars/streamingClient/spark-streaming-kafka-0-8_2.11-2.1.0.jar --class com.huawei.bigdata.spark.examples.FemaleInfoCollectionPrint /opt/female/SparkStreamingJavaExample-1.0.jar <checkpoint> <batchTime> <windowTime> <topics> <brokers>
  在没有Kafka数据输入的情况下,日志中显示的RDD的DAG结构会在一个Batch中打印两次,相关日志如下所示:
------------------------------------------- Time: 1491447950000 ms -------------------------------------------
17/04/06 11:06:00 INFO SparkContext: RDD's recursive dependencies:  
(2) MapPartitionsRDD[49] at filter at FemaleInfoCollectionPrint.java:111 []
   |  MapPartitionsRDD[48] at reduceByKeyAndWindow at FemaleInfoCollectionPrint.java:98 []
   |  CoGroupedRDD[47] at reduceByKeyAndWindow at FemaleInfoCollectionPrint.java:98 []
   |  MapPartitionsRDD[38] at reduceByKeyAndWindow at FemaleInfoCollectionPrint.java:98 []
   |      CachedPartitions: 2; MemorySize: 8.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
   |  ReliableCheckpointRDD[40] at print at FemaleInfoCollectionPrint.java:123 []
   |  ShuffledRDD[36] at reduceByKeyAndWindow at FemaleInfoCollectionPrint.java:98 []
   |      CachedPartitions: 2; MemorySize: 8.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
   +-(5) MapPartitionsRDD[35] at map at FemaleInfoCollectionPrint.java:81 []
      |  MapPartitionsRDD[34] at filter at FemaleInfoCollectionPrint.java:81 []
      |  MapPartitionsRDD[33] at map at FemaleInfoCollectionPrint.java:72 []
      |  MapPartitionsRDD[32] at map at FemaleInfoCollectionPrint.java:63 []
      |  KafkaRDD[31] at createDirectStream at FemaleInfoCollectionPrint.java:63 []
   |  ShuffledRDD[46] at reduceByKeyAndWindow at FemaleInfoCollectionPrint.java:98 []
   +-(5) MapPartitionsRDD[45] at map at FemaleInfoCollectionPrint.java:81 []
      |  MapPartitionsRDD[44] at filter at FemaleInfoCollectionPrint.java:81 []
      |  MapPartitionsRDD[43] at map at FemaleInfoCollectionPrint.java:72 []
      |  MapPartitionsRDD[42] at map at FemaleInfoCollectionPrint.java:63 []
  |  KafkaRDD[41] at createDirectStream at FemaleInfoCollectionPrint.java:63 []
  17/04/06 11:06:00 INFO SparkContext: RDD's recursive dependencies:  (2) MapPartitionsRDD[48] at reduceByKeyAndWindow at FemaleInfoCollectionPrint.java:98 [Memory Serialized 1x Replicated]
   |       CachedPartitions: 1; MemorySize: 4.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
   |  CoGroupedRDD[47] at reduceByKeyAndWindow at FemaleInfoCollectionPrint.java:98 [Memory Serialized 1x Replicated]
   |  MapPartitionsRDD[38] at reduceByKeyAndWindow at FemaleInfoCollectionPrint.java:98 [Memory Serialized 1x Replicated]
   |      CachedPartitions: 2; MemorySize: 8.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
   |  ReliableCheckpointRDD[40] at print at FemaleInfoCollectionPrint.java:123 [Memory Serialized 1x Replicated]
   |  ShuffledRDD[36] at reduceByKeyAndWindow at FemaleInfoCollectionPrint.java:98 [Memory Serialized 1x Replicated]
   |      CachedPartitions: 2; MemorySize: 8.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
   +-(5) MapPartitionsRDD[35] at map at FemaleInfoCollectionPrint.java:81 [Memory Serialized 1x Replicated]
      |  MapPartitionsRDD[34] at filter at FemaleInfoCollectionPrint.java:81 [Memory Serialized 1x Replicated]
      |  MapPartitionsRDD[33] at map at FemaleInfoCollectionPrint.java:72 [Memory Serialized 1x Replicated]
      |  MapPartitionsRDD[32] at map at FemaleInfoCollectionPrint.java:63 [Memory Serialized 1x Replicated]
      |  KafkaRDD[31] at createDirectStream at FemaleInfoCollectionPrint.java:63 [Memory Serialized 1x Replicated]
   |  ShuffledRDD[46] at reduceByKeyAndWindow at FemaleInfoCollectionPrint.java:98 [Memory Serialized 1x Replicated]
   |      CachedPartitions: 1; MemorySize: 4.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
   +-(5) MapPartitionsRDD[45] at map at FemaleInfoCollectionPrint.java:81 [Memory Serialized 1x Replicated]
      |  MapPartitionsRDD[44] at filter at FemaleInfoCollectionPrint.java:81 [Memory Serialized 1x Replicated]
      |  MapPartitionsRDD[43] at map at FemaleInfoCollectionPrint.java:72 [Memory Serialized 1x Replicated]
      |  MapPartitionsRDD[42] at map at FemaleInfoCollectionPrint.java:63 [Memory Serialized 1x Replicated]
  |  KafkaRDD[41] at createDirectStream at FemaleInfoCollectionPrint.java:63 [Memory Serialized 1x Replicated]
  ------------------------------------------- Time: 1491447960000 ms -------------------------------------------
解答
该应用程序中使用了DStream中的print算子来显示结果,该算子会调用RDD中的take算子来实现底层的计算。
Take算子会以Partition为单位多次触发计算。
在该问题中,由于Shuffle操作,导致take算子默认有两个Partition,Spark首先计算第一个Partition,但由于没有数据输入,导致获取结果不足10个,从而触发第二次计算,因此会出现RDD的DAG结构打印两次的现象。
在代码中将print算子修改为foreach(collect),该问题则不会出现。
   父主题: Spark Streaming