Optimizing Spark memory - Resolving OutOfMemory (OOM)

Resolving OutOfMemory (OOM) Errors in PySpark: Best Practices

Managing memory in PySpark is crucial for efficient data processing. Here are some best practices to avoid OOM errors:

  1. Adjust Spark Configuration (Memory Management)
    • Increase Executor Memory: spark.conf.set("spark.executor.memory", "8g")
    • Increase Driver Memory: spark.conf.set("spark.driver.memory", "4g")
    • Set Executor Cores: spark.conf.set("spark.executor.cores", "2")
    • Use Disk Persistence: df.persist(StorageLevel.DISK_ONLY)

  2. Enable Dynamic Allocation

    Allow Spark to adjust executors dynamically:

    
    spark.conf.set("spark.dynamicAllocation.enabled", "true")
    spark.conf.set("spark.dynamicAllocation.minExecutors", "1")
          
  3. Enable Adaptive Query Execution (AQE)

    Enable AQE to optimize query plans:

    
    spark.conf.set("spark.sql.adaptive.enabled", "true")
          
  4. Enforce Schema for Unstructured Data

    Prevent schema inference overhead:

    
    df = spark.read.schema(schema).json("path/to/data")
          
  5. Tune the Number of Partitions

    Repartition the DataFrame:

    
    df = df.repartition(200, "column_name")
          
  6. Handle Data Skew Dynamically

    Use salting for skewed joins:

    
    df1.withColumn("join_key_salted", F.concat(F.col("join_key"), F.lit("_"), F.rand()))
          
  7. Limit Cache Usage for Large DataFrames

    Cache selectively, or persist to disk:

    
    df.persist(StorageLevel.MEMORY_AND_DISK)
          
  8. Optimize Joins for Large DataFrames

    Use broadcast joins for smaller tables:

    
    df_join = large_df.join(broadcast(small_df), "join_key", "left")
          
  9. Monitor Spark Jobs

    Use Spark UI to track memory usage and job execution.

  10. Consider Partitioning Strategy

    Write partitioned data:

    
    df.write.partitionBy("partition_column").parquet("path_to_data")
          

Additional Best Practices

  1. Optimize Shuffle Operations
    • Reduce Shuffle Partitions: spark.conf.set("spark.sql.shuffle.partitions", "200")
    • Avoid Wide Transformations like groupBy or join where possible.

  2. Use Columnar File Formats

    Use formats like Parquet or ORC for better memory efficiency.

  3. Avoid Collecting Large Data to Driver

    Use take() or show() instead of collect().

  4. Optimize UDF Usage

    Prefer Spark SQL functions or PySpark built-in functions over Python UDFs.

  5. Compress Data in Memory

    Enable Kryo Serialization:

    
    spark.conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
          
  6. Filter Early

    Reduce data size early in the pipeline:

    
    df = df.filter(df["column_name"] != "unwanted_value")
          
  7. Avoid Over/Under Partitioning

    Ensure partition sizes are between 128MB and 1GB.

  8. Optimize Aggregations

    Use reduceByKey instead of groupByKey with RDDs.

  9. Use Off-Heap Storage

    Leverage Tungsten optimization for efficient memory management.

  10. Manage Executor Resources

    Use balanced executor memory and cores:

    
    spark.conf.set("spark.executor.memoryOverhead", "2g")
          
  11. Control Output File Sizes

    Use coalesce() or repartition() to manage output file sizes:

    
    df = df.coalesce(10)
          
  12. Garbage Collection Tuning

    Adjust JVM GC settings:

    
    --conf "spark.executor.extraJavaOptions=-XX:+UseG1GC -XX:InitiatingHeapOccupancyPercent=35"
          
  13. Preprocess Data Before Spark

    Use lightweight tools like awk or sed for basic preprocessing.

  14. Test with Sample Data

    Use small data samples to identify memory-heavy operations before scaling.

Comments

Popular posts from this blog

Databricks Auto Loader

PySpark tips for day-to-day use