El contenido no se encuentra disponible en el idioma seleccionado. Estamos trabajando continuamente para agregar más idiomas. Gracias por su apoyo.

Compute
Elastic Cloud Server
Huawei Cloud Flexus
Bare Metal Server
Auto Scaling
Image Management Service
Dedicated Host
FunctionGraph
Cloud Phone Host
Huawei Cloud EulerOS
Networking
Virtual Private Cloud
Elastic IP
Elastic Load Balance
NAT Gateway
Direct Connect
Virtual Private Network
VPC Endpoint
Cloud Connect
Enterprise Router
Enterprise Switch
Global Accelerator
Management & Governance
Cloud Eye
Identity and Access Management
Cloud Trace Service
Resource Formation Service
Tag Management Service
Log Tank Service
Config
OneAccess
Resource Access Manager
Simple Message Notification
Application Performance Management
Application Operations Management
Organizations
Optimization Advisor
IAM Identity Center
Cloud Operations Center
Resource Governance Center
Migration
Server Migration Service
Object Storage Migration Service
Cloud Data Migration
Migration Center
Cloud Ecosystem
KooGallery
Partner Center
User Support
My Account
Billing Center
Cost Center
Resource Center
Enterprise Management
Service Tickets
HUAWEI CLOUD (International) FAQs
ICP Filing
Support Plans
My Credentials
Customer Operation Capabilities
Partner Support Plans
Professional Services
Analytics
MapReduce Service
Data Lake Insight
CloudTable Service
Cloud Search Service
Data Lake Visualization
Data Ingestion Service
GaussDB(DWS)
DataArts Studio
Data Lake Factory
DataArts Lake Formation
IoT
IoT Device Access
Others
Product Pricing Details
System Permissions
Console Quick Start
Common FAQs
Instructions for Associating with a HUAWEI CLOUD Partner
Message Center
Security & Compliance
Security Technologies and Applications
Web Application Firewall
Host Security Service
Cloud Firewall
SecMaster
Anti-DDoS Service
Data Encryption Workshop
Database Security Service
Cloud Bastion Host
Data Security Center
Cloud Certificate Manager
Edge Security
Managed Threat Detection
Blockchain
Blockchain Service
Web3 Node Engine Service
Media Services
Media Processing Center
Video On Demand
Live
SparkRTC
MetaStudio
Storage
Object Storage Service
Elastic Volume Service
Cloud Backup and Recovery
Storage Disaster Recovery Service
Scalable File Service Turbo
Scalable File Service
Volume Backup Service
Cloud Server Backup Service
Data Express Service
Dedicated Distributed Storage Service
Containers
Cloud Container Engine
SoftWare Repository for Container
Application Service Mesh
Ubiquitous Cloud Native Service
Cloud Container Instance
Databases
Relational Database Service
Document Database Service
Data Admin Service
Data Replication Service
GeminiDB
GaussDB
Distributed Database Middleware
Database and Application Migration UGO
TaurusDB
Middleware
Distributed Cache Service
API Gateway
Distributed Message Service for Kafka
Distributed Message Service for RabbitMQ
Distributed Message Service for RocketMQ
Cloud Service Engine
Multi-Site High Availability Service
EventGrid
Dedicated Cloud
Dedicated Computing Cluster
Business Applications
Workspace
ROMA Connect
Message & SMS
Domain Name Service
Edge Data Center Management
Meeting
AI
Face Recognition Service
Graph Engine Service
Content Moderation
Image Recognition
Optical Character Recognition
ModelArts
ImageSearch
Conversational Bot Service
Speech Interaction Service
Huawei HiLens
Video Intelligent Analysis Service
Developer Tools
SDK Developer Guide
API Request Signing Guide
Terraform
Koo Command Line Interface
Content Delivery & Edge Computing
Content Delivery Network
Intelligent EdgeFabric
CloudPond
Intelligent EdgeCloud
Solutions
SAP Cloud
High Performance Computing
Developer Services
ServiceStage
CodeArts
CodeArts PerfTest
CodeArts Req
CodeArts Pipeline
CodeArts Build
CodeArts Deploy
CodeArts Artifact
CodeArts TestPlan
CodeArts Check
CodeArts Repo
Cloud Application Engine
MacroVerse aPaaS
KooMessage
KooPhone
KooDrive
Help Center/ MapReduce Service/ Component Development Specifications/ Spark/ Spark Application Development Suggestions

Spark Application Development Suggestions

Updated on 2024-08-30 GMT+08:00

Persist the RDD that will be frequently used

The default RDD storage level is StorageLevel.NONE, which means that the RDD is not stored on disks or in memory. If an RDD is frequently used, persist the RDD as follows:

Call cache(), persist(), or persist(newLevel: StorageLevel) of spark.RDD to persist the RDD. The cache() and persist() functions set the RDD storage level to StorageLevel.MEMORY_ONLY. The persist(newLevel: StorageLevel) function allows you to set other storage level for the RDD. However, before calling this function, ensure that the RDD storage level is StorageLevel.NONE or the same as the newLevel. That is, once the RDD storage level is set to a value other than StorageLevel.NONE, the storage level cannot be changed.

To unpersist an RDD, call unpersist(blocking: Boolean = true). The function can:

1. Remove the RDD from the persistence list. The corresponding RDD data becomes recyclable.

2. Set the storage level of the RDD to StorageLevel.NONE.

Carefully select the the shuffle operator

This type of operator features wide dependency. That is, a partition of the parent RDD affects multiple partitions of the child RDD. The elements in an RDD are <key, value> pairs. During the execution process, the partitions of the RDD will be sequenced again. This operation is called shuffle.

Network transmission between nodes is involved in the shuffle operators. Therefore, for an RDD with large data volume, you are advised to extract information as much as possible to minimize the size of each piece of data and then call the shuffle operators.

The following methods are often used:

  • combineByKey() : RDD[(K, V)] => RDD[(K, C)]

    This method is used to convert all the keys that have the same value in RDD[(K, V)] to a value with type of C.

  • groupByKey() and reduceByKey() are two types of implementation of combineByKey. If groupByKey and reduceByKey cannot meet requirements in complex data aggregation, you can use customized aggregation functions as the parameters of combineByKey.
  • distinct(): RDD[T] => RDD[T]

    This method is used to remove repeated elements. The code is as follows:

    map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)

    This process is time-consuming, especially when the data volume is high. Therefore, it is not recommended for the RDD generated from large files.

  • join() : (RDD[(K, V)], RDD[(K, W)]) => RDD[(K, (V, W))]

    This method is used to combine two RDDs through key.

    If a key in RDD[(K, V)] has X values and the same key in RDD[(K, W)] has Y values, a total of (X * Y) data records will be generated in RDD[(K, (V, W))].

Use high-performance operators if the service permits

  1. Using reduceByKey/aggregateByKey to replace groupByKey

    The map-side pre-aggregation refers to that each local node performs the aggregation operation on the same key, which is similar to the local combiner in MapReduce. The map-side pre-aggregation ensures that each key on a node is unique. When a node is collecting the data of the same key in the processing results of the previous nodes, data that needs to be obtained will be significantly reduced, decreasing disk I/O and Internet transmission cost. Generally speaking, it is advised to replace groupByKey operator with reduceByKey or aggregateByKey operator if possible because they will pre-aggregate the local same key on each node by using user-defined functions. However, the groupByKey operator does not support pre-aggregation and delivers lower performance than reduceByKey or aggregateByKey because all data are distributed and transmitted on all the nodes.

  2. Using mapPartitions to replace ordinary map operators

    During a function invocation, mapPartitions operators will process all the data in a partition instead of only one piece of data, and therefore delivers higher performance than the ordinary map operators. However, mapPartitions may occasionally result in Out of Memory (OOM). If memory is insufficient, some objects cannot be recycled during memory recycling. Therefore, exercise caution when using mapPartitions.

  3. Performing the coalesce operation after filtering

    After filtering a large portion of data (for example, above 30%) by using the filter operator in an RDD, you are advised to manually decrease the number of partitions by using coalesce in order to compress the data in RDD to fewer partitions. This is because after filtering, much data in each partition is filtered out, leaving little data to be processed. If the computing is continued, resources can be wasted. The task handling speed decreases as the number of tasks increases. Therefore, decreasing the number of partitions by using coalesce to compress the RDD data to fewer partitions can ensure that all the partitions are handled with fewer tasks. The performance can also be enhanced in some scenarios.

  4. Using repartitionAndSortWithinPartitions to replace repartition and sort

    repartitionAndSortWithinPartitions is recommended by Spark official website. It is advised to use repartitionAndSortWithinPartitions for sorting after repartitioning. This operator can sort and shuffle repartitions at the same time, delivering higher performance.

  5. Using foreachPartitions to replace foreach

    Similar to "Using mapPartitions to replace ordinary map operators", this mechanism handles all the data in a partition during a function invocation instead of one piece of data. In practice, foreachPartitions is proved to be helpful in improving performance. For example, the foreach function can be used to write all the data in RDD into MySQL. Ordinary foreach operators, write data piece by piece, and a database connection is established for each function invocation. Frequent connection establishments and destructions cause low performance. foreachPartitions, however, processes all the data in a partition at a time. Only one database connection is required for each partition. Batch insertion delivers higher performance.

RDD Shared Variables

In application development, when a function is transferred to a Spark operation(such as map and reduce) and runs on a remote cluster, the operation is actually performed on the independent copies of all the variables involved in the function. These variables will be copied to each machine. In general, reading and writing shared variables across tasks is apparently inefficient. Spark provides two shared variables that are commonly used: broadcast variable and accumulator.

Kryo can be used to optimize serialization performance in performance-demanding scenarios.

Spark offers two serializers:

org.apache.spark.serializer.KryoSerializer: high-performance but low compatibility

org.apache.spark.serializer.JavaSerializer: average performance and high compatibility

Method: conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

NOTE:

The following are reasons why Spark does not use Kryo-based serialization by default:

Spark uses Java serialization by default, that is, uses the ObjectOutputStream and ObjectInputStream API to perform serialization and deserialization. Spark can also use Kryo serialization library, which delivers higher performance than Java serialization library. According to official statistics, Kryo-based serialization is 10 times more efficient than Java-based serialization. Kryo-based serialization requires the registration of all the user-defined types to be serialized, which is a burden for developers.

Suggestions on Optimizing Spark Streaming Performance

  1. Set an appropriate batch processing duration (batchDuration).
  2. Set concurrent data receiving appropriately.
    • Set multiple receivers to receive data.
    • Set an appropriate receiver congestion duration.
  3. Set concurrent data processing appropriately.
  4. Use Kryo-based serialization.
  5. Optimize memory.
    • Set the persistence level to reduce GC costs.
    • Use concurrent Mark Sweep GC algorithms to shorten GC pauses.

Suggestions for a Running PySpark

To run a PySpark application, you must install the Python environment and upload the necessary Python dependency package to the HDFS. The Python environment provided by the cluster cannot be used.

Utilizamos cookies para mejorar nuestro sitio y tu experiencia. Al continuar navegando en nuestro sitio, tú aceptas nuestra política de cookies. Descubre más

Feedback

Feedback

Feedback

0/500

Selected Content

Submit selected content with the feedback