Optimizing Spark memory - Resolving OutOfMemory (OOM)
Resolving OutOfMemory (OOM) Errors in PySpark: Best Practices
Managing memory in PySpark is crucial for efficient data processing. Here are some best practices to avoid OOM errors:
-
Adjust Spark Configuration (Memory Management)
- Increase Executor Memory:
spark.conf.set("spark.executor.memory", "8g") - Increase Driver Memory:
spark.conf.set("spark.driver.memory", "4g") - Set Executor Cores:
spark.conf.set("spark.executor.cores", "2") - Use Disk Persistence:
df.persist(StorageLevel.DISK_ONLY)
- Increase Executor Memory:
-
Enable Dynamic Allocation
Allow Spark to adjust executors dynamically:
spark.conf.set("spark.dynamicAllocation.enabled", "true") spark.conf.set("spark.dynamicAllocation.minExecutors", "1") -
Enable Adaptive Query Execution (AQE)
Enable AQE to optimize query plans:
spark.conf.set("spark.sql.adaptive.enabled", "true") -
Enforce Schema for Unstructured Data
Prevent schema inference overhead:
df = spark.read.schema(schema).json("path/to/data") -
Tune the Number of Partitions
Repartition the DataFrame:
df = df.repartition(200, "column_name") -
Handle Data Skew Dynamically
Use salting for skewed joins:
df1.withColumn("join_key_salted", F.concat(F.col("join_key"), F.lit("_"), F.rand())) -
Limit Cache Usage for Large DataFrames
Cache selectively, or persist to disk:
df.persist(StorageLevel.MEMORY_AND_DISK) -
Optimize Joins for Large DataFrames
Use broadcast joins for smaller tables:
df_join = large_df.join(broadcast(small_df), "join_key", "left") -
Monitor Spark Jobs
Use Spark UI to track memory usage and job execution.
-
Consider Partitioning Strategy
Write partitioned data:
df.write.partitionBy("partition_column").parquet("path_to_data")
Additional Best Practices
-
Optimize Shuffle Operations
- Reduce Shuffle Partitions:
spark.conf.set("spark.sql.shuffle.partitions", "200") - Avoid Wide Transformations like groupBy or join where possible.
- Reduce Shuffle Partitions:
-
Use Columnar File Formats
Use formats like Parquet or ORC for better memory efficiency.
-
Avoid Collecting Large Data to Driver
Use
take()orshow()instead ofcollect(). -
Optimize UDF Usage
Prefer Spark SQL functions or PySpark built-in functions over Python UDFs.
-
Compress Data in Memory
Enable Kryo Serialization:
spark.conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") -
Filter Early
Reduce data size early in the pipeline:
df = df.filter(df["column_name"] != "unwanted_value") -
Avoid Over/Under Partitioning
Ensure partition sizes are between 128MB and 1GB.
-
Optimize Aggregations
Use
reduceByKeyinstead ofgroupByKeywith RDDs. -
Use Off-Heap Storage
Leverage Tungsten optimization for efficient memory management.
-
Manage Executor Resources
Use balanced executor memory and cores:
spark.conf.set("spark.executor.memoryOverhead", "2g") -
Control Output File Sizes
Use
coalesce()orrepartition()to manage output file sizes:df = df.coalesce(10) -
Garbage Collection Tuning
Adjust JVM GC settings:
--conf "spark.executor.extraJavaOptions=-XX:+UseG1GC -XX:InitiatingHeapOccupancyPercent=35" -
Preprocess Data Before Spark
Use lightweight tools like
awkorsedfor basic preprocessing. -
Test with Sample Data
Use small data samples to identify memory-heavy operations before scaling.
Comments
Post a Comment