PySpark Performance Optimalisatie Tips

17 bewezen technieken om je Spark jobs tot 10x te versnellen

PySpark UI met performance metrics

Waarom PySpark Optimaliseren?

Ongeoptimaliseerde PySpark jobs kunnen 10-100x langzamer zijn dan hun optimale versie. Deze gids toont bewezen technieken uit productie-omgevingen:

Case Study: Bij een financiële klant reduceerden we een maandelijkse job van 14 uur naar 47 minuten door partitionering en join strategieën aan te passen.

Top 5 Meest Impactvolle Optimalisaties

1. Partitionering

De juiste partition size (128MB-1GB) voorkomt small files problemen en shuffle overhead.

2. Geheugenbeheer

Optimaliseer executor memory, overhead en storage fractions.

3. Join Strategieën

Kies bewust tussen broadcast, sort-merge en shuffle hash joins.

4. Lazy Evaluation

Voorkom onnodige acties met .cache() en .persist() op kritieke paden.

5. Predicate Pushdown

Filter vroeg met WHERE i.p.v. later met filter() voor Parquet/Delta.

1. Data Partitionering Optimalisatie

Voorbeeld van optimale partitionering:

# Slecht: Veel kleine partities
df = spark.read.parquet("s3://data-lake/")
df.repartition(2000)  # 2000 kleine partities = overhead

# Goed: Optimale grootte (aim for ~128MB-1GB per partitie)
optimal_partitions = max(10, int(df_size_in_gb * 8))  # ~8 partities per GB
(df.repartition(optimal_partitions, "join_key")
   .write.parquet("s3://optimized-data/"))

# Alternatief voor joins:
spark.conf.set("spark.sql.shuffle.partitions", optimal_partitions)

2. Geheugenconfiguratie

Standaard configuratie is vaak suboptimaal. Belangrijke parameters:

Parameter Aanbevolen Waarde Beschrijving
spark.executor.memory 8g - 16g Niet te groot ivm garbage collection
spark.memory.fraction 0.6 Voor Spark 3.0+ (execution + storage)
spark.sql.adaptive.enabled true Automatische optimalisatie in Spark 3

3. Join Optimalisaties

Join strategieën vergelijken:

# Forceer broadcast join voor kleine tabellen (<10MB)
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "10mb") 

# Voor middelgrote tabellen: Sort-Merge Join
(df1.join(df2.hint("merge"), "key")
    .write.parquet("..."))

# Grote joins: Bucketing strategie
spark.conf.set("spark.sql.sources.bucketing.enabled", "true")
(df.bucketBy(50, "join_column")
   .write.parquet("bucketed_data/"))
Waarschuwing: Vermijd cartesian joins (cross joins) - deze schalen quadratisch O(n²)!

4. Caching Strategieën

Wanneer wel/niet cachen:

# Goed gebruik van cache:
df = spark.read.parquet("...")
filtered = df.filter("...").cache()  # Vaak gebruikt

# Transformatie pipeline
result = (filtered
  .groupBy("...")
  .agg(...)
  .join(...))

# Expliciet unpersist wanneer klaar
filtered.unpersist()

# Slecht: Te vroeg cachen
raw = spark.read.parquet("...").cache()  # Verspilt geheugen
cleaned = raw.filter(...)  # Beter hier cachen

5. Pandas UDFs vs Scala UDFs

Performance benchmark resultaten:

Benadering Relatieve Snelheid Gebruik
Native Spark functies 1x (baseline) Altijd eerste keuze
Pandas UDF (vectorized) 2-5x langzamer Complexe Python logica
Scala UDF (via Py4J) 10-100x langzamer Vermijden

Advanced: Delta Lake Optimalisaties

Voor Delta Lake gebruikers:

# ZOPT (Z-Order Optimized Partitioning)
from delta.tables import DeltaTable

(DeltaTable.forPath(spark, "/data/events")
  .optimize()
  .executeZOrderBy("date", "user_id"))

# Auto-compactie
spark.conf.set("spark.databricks.delta.optimizeWrite.enabled", "true")
spark.conf.set("spark.databricks.delta.autoCompact.enabled", "true")

PySpark Performance Checklist

  1. Partitionering geoptimaliseerd (niet te veel/weinig)
  2. Juiste join strategie geselecteerd
  3. Onnodige data scans geëlimineerd (SELECT alleen benodigde kolommen)
  4. Caching alleen gebruikt waar nodig
  5. Spark UI geanalyseerd voor bottlenecks

Veelgestelde Vragen

Hoe identificeer ik performance bottlenecks?

Gebruik de Spark UI (port 4040):

  1. Check 'Stages' tab voor langlopende tasks
  2. Analyze 'Storage' tab voor caching effectiviteit
  3. Inspect 'SQL' tab voor query-plannen

Waarom is mijn job traag bij kleine data?

Vaak door:

  • Te veel kleine partities (partitionering overhead)
  • Onnodige serialisatie (bijv. complexe UDFs)
  • Driver bottlenecks (collect() op grote datasets)