Key factors to consider when optimizing Spark Jobs
Developing a spark application is fairly simple and straightforward, as spark provides featured pack APIs. Be that as it may, the tedious task is to deploy it on the cluster in an optimal way which yields ideal performance and following the best practices when developing spark jobs. Here in DataKareSolutions, we often need to get our hands dirty to tune the spark application. Throughout this article I will put out all the best practices we follow in DataKareSolutions to optimize spark application.
Data Serialization
Spark bolsters two types of serialization, Java serialization which is the default one and Kryo serialization. Kryo serialization is significantly faster and compact than Java serialization. Thus, in production it is always recommended to use Kryo over Java serialization. To use Kryo, the spark serializer needs to be changed to Kryo serializer in the spark configurations.
conf.set(“spark.serializer”,”org.apache.spark.serializer.KryoSerializer”)
However, Kryo may fails to serialize few classes rarely to avoid this Kryo requires you to register the classes which are expected to be serialized.
Broadcasting
If the tasks across multiple stages requires the same data, it is better to broadcast the value rather than send it to the executors with each task. And when joining two tables if one of the join is small enough to fit into memory it is advisable to broadcast it, to avoid shuffles.
Avoid UDF and UDAF
It is better to avoid UDF in favor of spark SQL functions as it avoids deserialization of data so that it can process in scala and then serialize it again. Moreover Spark SQL functions are well tested and chances are there it will yield better performance compared to UDF.
Furthermore, with regards to UDAF which produces sortAggregate which is relatively in slower side than its counterpart hashAggregates.
Data locality
To achieve better performance the data should reside as close to the computation. There are four different sorts of locality in spark.
- PROCESS_LOCAL — Data is in the same JVM which is running the task. Obviously the best one.
- NODE_LOCAL — Data is on the same node as the computation. Comparatively slower than process local as the data needs to travel between processes.
- NO_PREF — No locality preference.
- RACK_LOCAL — Data will reside on the same rack.
- ANY — Data may reside anywhere on the network not on the same rack.
It is always preferable to choose the best locality levels to avoid data movements. But there may be times when there are no new data to process, sparks tries to fetch data from lower locality levels. Spark will wait for a certain period of time before switching to lower locality levels.
Use the accompanying parameter to configure the interval time to wait before switching locality levels.
spark.locality.wait //which is 3 seconds by default
To inspect whether a task ran locally, in the spark UI the locality_levels can be found out under stages tab.
Dynamic allocation
Spark is shipped with an excellent mechanism to use the cluster resources rather efficiently based on workloads and on shared clusters. According to the workloads it is possible to scale up and scale down the number of executors which is known as dynamic allocation.
The following configuration parameters are used to enable and tweak dynamic allocation.
spark.dynamicAllocation.enabled //Set the value to true to enable dynamic allocation. spark.dynamicAllocation.executorIdleTimeout //Timeout to wait before releasing the executor. Spark. dynamicAllocation. cachedExecutorIdleTimeout //Timeout to wait before removing the cached data blocks. spark.dynamicAllocation.initialExecutors //The initial number of executors to begin with. However, if the number of executors is explicitly mentioned via --num-executors` (or `spark.executor.instances`) then the larger value between them will be the initial executors value. spark.dynamicAllocation.maxExecutors //Upper bound of executors, by default infinity.spark.dynamicAllocation.minExecutors //Minimum number of executors by default is '0' which ensures our application won’t occupy cluster resources unnecessarily.
When enabling dynamic allocation it is also necessary to enable Spark’s external shuffle service which can be done by the parameter spark.shuffle.service.enabled.This ensures that we have a separate server on each machine in the cluster which will manage shuffle files when the appropriate executor is removed/lost.
Garbage collection
Garbage collection can be a bottleneck in spark applications. It is advisable to try the G1GC garbage collector, which can improve the performance if garbage collection is the bottleneck. The parameter -XX:+UseG1GC is used to specify G1GC as the default garbage collector. And increase the G1GC region heap size if the executer heap size is large this can done by -XX:G1HeapRegionSize.
It is always recommended to use data structures which generated less objects, like array instead of LinkedLists.
Refer this link for in-depth explanation.
Executor Tuning
When deploying a spark application one of the most prominent thing is to allocate the execution resources optimally like number of executors, executors core and executors memory.
Number of executor cores
As the name indicated executor cores will define the number of cores to use for each executors. Each individual cores will be assigned to one different task. Therefore, we will be fancying to allocate more cores to a single executor. But too many tasks per executor will results in garbage collection overload and as a matter of fact HDFS achieves better throughput when the number of executors is set to 5.
Hence, it is always recommended to assign the number of executors cores to ~5.
As a rule of thumb one core in each node of the cluster will be allocated to OS process and Hadoop daemons.
Number of executors
Executer is just another JVM instance, so it is possible to have more than one executors.
The number of executors depends upon the number of cores in the cluster.
If the application is deployed on top of yarn, one executor will be assigned to the yarn application master.
For example, if there are 250 cores in the cluster, then the number of executors should be 250/5 = 50 executors. Out of which one should be left for application master and the remaining 49 can be assigned to our spark application by using the parameter –num-executors=49 when submitting the application for execution.
Memory per executor
As a standard guideline, one GB in each nodes of the cluster will be allocated to OS process and Hadoop daemons.
Let’s assume we have a 250 core cluster and 64 GB memory on each node. If 1 GB is allocated to Hadoop demon and OS processes, we will be left with 63 GB memory in each node.
As discussed above, we will be left with 5 executors per node if we have 25 cores per machine (i.e. 49 executors in a 250 core cluster after allocating one to yarn AM). Thus we may consider to allocate (63 GB/5 executors) = 12.6 GB to each executor.
But there is an additional yarn overhead memory needs to be allocated to each executor. The additional overhead memory is 10% by default (7% for legacy spark versions)of the executor memory. I.e. 12.6-(0.10 * 12.6)= 11.34 GB per executor is the optimal memory per executor.
Parallelism
Degree of parallelism is the most proficient parameter which dictates the performance of a spark job. The partition size should be in a form that it should not be too big which will eventually lead to under-utilization of the cluster neither too small which will lead to too much of small tasks. The partition size should be in such a way it should utilize the clusters efficiently without any overheads of small tasks.
In spark higher level APIs like dataframe and datasets use the following parameters to determine the partition of a file.
spark.sql.files.maxPartitionBytes //Specify maximum partition size by default 128mb spark.sql.files.openCostInBytes //The approximate cost to use to open the file and scan the file which is by default 4MB.
Spark automatically figures the optimal partition size based on these two parameters.
At whatever point, if there is a shuffle operation spark will use spark.sql.shuffle.partitions parameter to determine the partition size which by default is 200 by default. It is recommended to shrink down the shuffle partitions size if the Size of the file is small.
If the dataset was derived from an RDD, it will inherit the partition size of the parent RDD.
Speculative execution
In real time use cases there are scenarios where a spark job may perform poorly compared to other worker nodes due to some external factors which may not in our control. Then spark will re-launch the same task in some other worker nodes.
Speculative execution can be enabled and can be tuned by using the accompanying parameters.
spark.speculation //Set to true to enable by default it is false. spark.speculation.interval //Time interval to check for speculative tasks. spark.speculation.multiplier //specify how much times the tasks can be slower than the median to consider it as a speculative job. spark.speculation.quantile (value in percentage) //The fraction of tasks which must be completed before speculation is enabled in a particular stage.
Spark driver will normally spend a lot of time in detecting speculation when managing a large number of tasks, enable it only if needed.
Parallel transformations
If there are multiple queries/transformations which can be executed independently it is recommended to take leverage of spark scheduler pool. By default spark executes the tasks in FIFO to execute tasks in parallel the first step is to change it into Fair, which will dictate spark to execute the job in round robin fashion. This can be done by changing the configuration parameter “spark. scheduler. mode” to FAIR in the Spark configuration object.
Then create an separate pools for different independent queries.
spark.setLocalProperty("spark.scheduler.pool", "pool1")
// Query 1
spark.setLocalProperty("spark.scheduler.pool", "pool2")
//Query 2
The most common use case for it is when writing the same streaming query
data to multiple destinations in structured streaming
Refer this link for more details.
Data Skewing
Data skewing is a condition when the data is unevenly distributed across the cluster, which causes serious performance degrades because skewed data will makes the cluster underutilized. So, if a spark job is taking more time to complete, it is always a wise decision to check for data skew. And if it proves to be true apply one of the following techniques to handle data skewing.
1. Repartition
2. Salting
3. Isolating salting
4. Isolating Map/Join
5. Iterative Broadcasting
Each of these techniques has its own advantages/limitations. Covering each of these is out of scope for this article.
Databricks delta provides some fairly simple and powerful techniques to handle Data skews.
Miscellaneous optimizations
- Throughout the spark application use only Dataframe/Dataset. Typically, there is no reason to use RDD unless you are writing your own partitioner.
- Cache Spark Sql table and dataframe. It will mimimize memory usage and Garbage collection overhead as Spark will scan only required columns and automatically tune compression.
- Avoid String for keys and consider to use numeric IDs wherever possible.
Conclusion
We have walked through the major key factors which will impact the performance of Spark jobs in detail. Go through this article which will give a good idea on how to optimize Spark joins.
Comments