
Waarom PySpark Optimaliseren?
Ongeoptimaliseerde PySpark jobs kunnen 10-100x langzamer zijn dan hun optimale versie. Deze gids toont bewezen technieken uit productie-omgevingen:
Top 5 Meest Impactvolle Optimalisaties
1. Partitionering
De juiste partition size (128MB-1GB) voorkomt small files problemen en shuffle overhead.
2. Geheugenbeheer
Optimaliseer executor memory, overhead en storage fractions.
3. Join Strategieën
Kies bewust tussen broadcast, sort-merge en shuffle hash joins.
4. Lazy Evaluation
Voorkom onnodige acties met .cache() en .persist() op kritieke paden.
5. Predicate Pushdown
Filter vroeg met WHERE i.p.v. later met filter() voor Parquet/Delta.
1. Data Partitionering Optimalisatie
Voorbeeld van optimale partitionering:
# Slecht: Veel kleine partities
df = spark.read.parquet("s3://data-lake/")
df.repartition(2000) # 2000 kleine partities = overhead
# Goed: Optimale grootte (aim for ~128MB-1GB per partitie)
optimal_partitions = max(10, int(df_size_in_gb * 8)) # ~8 partities per GB
(df.repartition(optimal_partitions, "join_key")
.write.parquet("s3://optimized-data/"))
# Alternatief voor joins:
spark.conf.set("spark.sql.shuffle.partitions", optimal_partitions)
2. Geheugenconfiguratie
Standaard configuratie is vaak suboptimaal. Belangrijke parameters:
Parameter | Aanbevolen Waarde | Beschrijving |
---|---|---|
spark.executor.memory | 8g - 16g | Niet te groot ivm garbage collection |
spark.memory.fraction | 0.6 | Voor Spark 3.0+ (execution + storage) |
spark.sql.adaptive.enabled | true | Automatische optimalisatie in Spark 3 |
3. Join Optimalisaties
Join strategieën vergelijken:
# Forceer broadcast join voor kleine tabellen (<10MB)
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "10mb")
# Voor middelgrote tabellen: Sort-Merge Join
(df1.join(df2.hint("merge"), "key")
.write.parquet("..."))
# Grote joins: Bucketing strategie
spark.conf.set("spark.sql.sources.bucketing.enabled", "true")
(df.bucketBy(50, "join_column")
.write.parquet("bucketed_data/"))
4. Caching Strategieën
Wanneer wel/niet cachen:
# Goed gebruik van cache:
df = spark.read.parquet("...")
filtered = df.filter("...").cache() # Vaak gebruikt
# Transformatie pipeline
result = (filtered
.groupBy("...")
.agg(...)
.join(...))
# Expliciet unpersist wanneer klaar
filtered.unpersist()
# Slecht: Te vroeg cachen
raw = spark.read.parquet("...").cache() # Verspilt geheugen
cleaned = raw.filter(...) # Beter hier cachen
5. Pandas UDFs vs Scala UDFs
Performance benchmark resultaten:
Benadering | Relatieve Snelheid | Gebruik |
---|---|---|
Native Spark functies | 1x (baseline) | Altijd eerste keuze |
Pandas UDF (vectorized) | 2-5x langzamer | Complexe Python logica |
Scala UDF (via Py4J) | 10-100x langzamer | Vermijden |
Advanced: Delta Lake Optimalisaties
Voor Delta Lake gebruikers:
# ZOPT (Z-Order Optimized Partitioning)
from delta.tables import DeltaTable
(DeltaTable.forPath(spark, "/data/events")
.optimize()
.executeZOrderBy("date", "user_id"))
# Auto-compactie
spark.conf.set("spark.databricks.delta.optimizeWrite.enabled", "true")
spark.conf.set("spark.databricks.delta.autoCompact.enabled", "true")
PySpark Performance Checklist
- Partitionering geoptimaliseerd (niet te veel/weinig)
- Juiste join strategie geselecteerd
- Onnodige data scans geëlimineerd (SELECT alleen benodigde kolommen)
- Caching alleen gebruikt waar nodig
- Spark UI geanalyseerd voor bottlenecks
Veelgestelde Vragen
Hoe identificeer ik performance bottlenecks?
Gebruik de Spark UI (port 4040):
- Check 'Stages' tab voor langlopende tasks
- Analyze 'Storage' tab voor caching effectiviteit
- Inspect 'SQL' tab voor query-plannen
Waarom is mijn job traag bij kleine data?
Vaak door:
- Te veel kleine partities (partitionering overhead)
- Onnodige serialisatie (bijv. complexe UDFs)
- Driver bottlenecks (collect() op grote datasets)