Spark — debugging a slow Application
Reasons that make an application slow
Spark has a lot of native optimization tricks (like Catalyst, CBO, AQE, Dynamic Allocation, and Speculation) up its sleeves to make the job run faster. Still many a time we will see our jobs getting slow & slow over time. In this blog, we will see the reasons why our jobs are getting slower.
Note: We will only see the issues in this blog, solutions to them is in this blog
In Spark, the work is done in stages. And these stages as a whole make up the DAG for the application. Each task in spark deals with reading some data from a source or from another task and then doing some transformation and then passing along the result. So based on these, the spark app's slowness can come from majorly 4 sources.
Input / Source
Input Layout (Small file, Directory scanning)
If the data is being read from multiple nested directories and each directory contains small files, Spark will slow down as the driver has to collect metadata before it can assign executors. Also, Spark does an os level listing which might slow down with a very large number of files.
Non Splittable file types
Reading data (map task) can become slow if we have a large file having a non-splittable file format. In this case, the executor reading this file will take a longer time and the job has to wait for this to finish.
Multiline
In Spark, one task is responsible for splitting & managing the files. In the case of multiline files, this optimization is not feasible.
df = spark\
.read \
.option("multiLine", True) \
.format("json") \
.load("path/to/json_file.json")
df.rdd.getNumPartitions() # creates single partitions hence slower
df = spark\
.read \
.option("multiLine", False) \
.format("json") \
.load("path/to/json_file.json")
df.rdd.getNumPartitions() # creates multiple partitions hence faster
Slow Stage
Uneven partition
If the partitions are uneven, then all other executors will be waiting on the slowest executor to finish its job. So the job losses capability of parallel computation opportunity for the wait time. A partition can become skewed:
Partition skew — The number of rows in the given partition is very high compared to other partitions. This can be seen in Spark Web UI. Under the Stages tab in Spark UI, If we look at the summary metrics, the max column usually will have a much larger value than the median and more records count for a skewed partition.
Record Skew — It also might happen that we are doing some kind of explode/array operation and some partitions have rows with a larger array size or a nested JSON object, which makes that partition run slow.
Even Partition
Although the partitions are even we might see slowness. This can be due to:
Too many partitions — If we have way too many partitions and way fewer data, we will be wasting resources. Also, it takes some time for executors to come up and those delays will be added to the job.
High Task concurrency — Spark tries to execute these task concurrently/in parallel based on the spark.executor.cores property. If we have too many cores compared to the memory there will increased memory overhead and lot of GC collection will happen, making tasks to run slower.
Too few partitions — In this case we are not utilizing the full parallelization capability. Data is processed in multiple sequential batches and hence the delay.
Slow transformation — There are transformations that are slow natively, like doing a regex match. Another example can be a withColumn
operation inside a loop.
from pyspark.sql import Row
from pyspark.sql.functions import lit, col
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
# creating a dummy dataframe
df = spark.createDataFrame([Row(id=1, name='abc'), Row(id=2, name='xyz')])
# Creating a list of list of (300) columns
dummy_cols = [f"foo{i}" for i in range(1,301)]
# Get JVM reference
jvm = spark.sparkContext._jvm
# Access Scala package/class
catalyst_rule_executor = jvm.org.apache.spark.sql.catalyst.rules.RuleExecutor
# Using a for-loop to add these columns in dataframe
for col_name in dummy_col_list:
df = df.withColumn(cname, lit(None).cast('string'))
# df.explain("extended")
# print(catalyst_rule_executor.dumpTimeSpent())
The above command will show a multiple project in logical plan. Each time withColumn
is used to add a column in the dataframe, Spark’s Catalyst optimizer re-evaluates the whole plan repeatedly. This adds up fast and strains performance.
Solution #1:
Using .withColumns()
for Spark >= 3.3
Starting from Spark 3.3, withColumns()
transformation is available to use, which takes a dictionary of string
and Column
datatype.
from pyspark.sql import Row
from pyspark.sql.functions import lit, col
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
# creating a dataframe
df = spark.createDataFrame([Row(id=1, name='abc'), Row(id=2, name='xyz')])
# Defining 300 columns
dummy_cols = [f"foo{i}" for i in range(1,301)]
# Get JVM reference
jvm = spark.sparkContext._jvm
# Access Scala package/class
catalyst_rule_executor = jvm.org.apache.spark.sql.catalyst.rules.RuleExecutor
# Creating a dummy_col_val_map for all 300 columns
dummy_col_val_map = {cname: lit("100.326").cast('decimal(10,2)') for cname in dummy_cols}
df1 = df.withColumns(dummy_col_val_map)
print(catalyst_rule_executor.dumpTimeSpent())
# Checking both Analytical and Physical Plan
df1.explain("extended")
Solution #2: Using .select()
with an alias
Predicate is not pushed — Spark tries to put the filter at the source. This allows Spark to read only the data segments it needs for the job. If the predicate in the where clause is dynamic or contains an expression then Spark is not able to push the filter at the source. Sometimes we give different datatypes in the predicate (e.g. passing int instead of string in date). In that case, Spark will not be able to perform partition pruning (which is an optimization for reading fewer data). A similar thing can happen to column pruning.
UDFs — Python UDFs are slow and they are not optimized by the Spark optimizers.
Shuffle — Shuffle is when data from partition(s) are moved within the cluster but across nodes or racks.
Spill — Spill is when data is moved to disk when it is not fitting in the memory. Since this involves disk interactions that are slow compared to memory, it causes tasks to run slowly.
Slow Executor
Sometimes the executors become slow due to GC. We can check that GC wall time under metrics in the spark UI to see if it is causing the executor to slow down.
Big DAG
Spark keeps the DAG for the purposes of resiliency. If we are doing dataframe manipulations in a for loop or some other iterative loop, the DAG will become bigger and bigger over time. This will result in the job getting slower.
Slow Cluster
Scheduling delay
This happens when the cluster manager is not able to launch executors as fas as requested by the Spark scheduler. This is generally seen with the K8s cluster due to etcd limitations.
Resource Contention
This happens when there is a resource crunch in the cluster. The driver is launched but the executors are not getting launched, in UI although the job will be in progress nothing is actually happening.
Resource deadlock between Jobs/apps
There are cases where multiple job request resources from the cluster and they get resources in between stages of each other causing both jobs to slow down.