Optimize Spark SQL Joins

Joins are one of the fundamental operation when developing a spark job. So, it is worth knowing about the optimizations before working with joins.
In Data Kare Solutions we often found ourselves in situations to joining two big tables (data frames) when dealing with Spark SQL. In this article we put out the best practices and optimization techniques we used to pursue when managing Spark Joins.

Spark approaches two types of cluster communication Strategy:

  • node-node communication strategy
  • per-node communication strategy

In node-node communication Spark shuffles the data across the clusters, whereas in per-node strategy spark perform broadcast joins.

Performance of Spark joins depends upon the strategy used to tackle each scenario which in turn relies on the size of the tables. Sort Merge join and Shuffle Hash join are the two major power horses which drive the Spark SQL joins. Despite the fact that Broadcast joins are the most preferable and efficient one because it is based on per-node communication strategy which avoids shuffles but it’s applicable only for a smaller set of data. Thus, more often than not Spark SQL will go with both of Sort Merge join or Shuffle Hash.

Sort -Merge Join

Sort-Merge join is composed of 2 steps. The first step is to sort the datasets and the second operation is to merge the sorted data in the partition by iterating over the elements and according to the join key join the rows having the same value.

From spark 2.3 Merge-Sort join is the default join algorithm in spark. However, this can be turned down by using the internal parameter ‘spark.sql.join.preferSortMergeJoin’ which by default is true.

To accomplish ideal performance in Sort Merge Join:
• Make sure the partitions have been co-located. Otherwise, there will be shuffle operations to co-locate the data as it has a pre-requirement that all rows having the same value for the join key should be stored in the same partition.
• The DataFrame should be distributed uniformly on the joining columns.
• To leverage parallelism the DataFrame should have an adequate number of unique keys

Broadcast joins

Easily Broadcast joins are the one which yield the maximum performance in spark. However, it is relevant only for little datasets. In broadcast join, the smaller table will be broadcasted to all worker nodes. Thus, when working with one large table and another smaller table always makes sure to broadcast the smaller table. We can hint spark to broadcast a table.

import org.apache.spark.sql.functions.broadcast
val dataframe = largedataframe.join(broadcast(smalldataframe), "key")

Recently Spark has increased the maximum size for the broadcast table from 2GB to 8GB. Thus, it is not possible to broadcast tables which are greater than 8GB.

Spark also internally maintains a threshold of the table size to automatically apply broadcast joins. The threshold can be configured using “spark.sql.autoBroadcastJoinThreshold” which is by default 10mb.
So, it is wise to leverage Broadcast Joins whenever possible and Broadcast joins also solves uneven sharding and limited parallelism problems if the data frame is small enough to fit into the memory.

Shuffle Hash Join

Shuffle Hash join works based on the concept of map reduce. Map through the data frames and use the values of the join column as output key. Shuffles the data frames based on the output keys and join the data frames in the reduce phase as the rows from the different data frame with the same keys will ended up in the same machine.

Spark chooses Shuffle Hash join when Sort merge join is turned off or if the key is not suitable and also based on the accompanying two functions.

def canBuildLocalHashMap(plan:LogicalPlan):Boolean= { plan.statistics.sizeInBytes < conf.autoBroadcastJoinThreshold * conf.numShufflePartitions
}

As the name of the function indicates Spark uses this function to make sure Shuffle Hash join is better suitable for the given dataset than broadcast join. But, there might be a few situations where some scenarios join will be superior to anything Shuffle Hash join.

So, at that point we may increase the automatic broadcast join threshold size (‘spark.sql.autoBroadcastJoinThreshold’) to trick the catalyst optimizer to use Broadcast join.

def muchSmaller(a: LogicalPlan, b: LogicalPlan): Boolean = { a.statistics.sizeInBytes * 3 <= b.statistics.sizeInBytes
}

Creating hash tables are costly and it can be only done when the average size of a single partition is small enough to build a hash table.

Sort merge join is a very good candidate in most of times as it can spill the data to the disk and doesn’t need to hold the data in memory like its counterpart Shuffle Hash join. However, when the build size is smaller than the stream size Shuffle Hash join will outperform Sort Merge join.

The factors which decide the performance in the Shuffle Hash Join is same as the one in the Sort Merge Join, except it doesn’t necessarily need the partitions should be co-located.

  • The DataFrame should be distributed evenly on the joining columns.
  • To leverage parallelism the DataFrame should have an adequate number of unique keys.

Tips: During joins if there are rows which are irrelevant to the key, filter the rows before the join. Otherwise, there will be more data shuffle over the network.

Conclusion

Thus we have walked through the different types of algorithms used by spark for joins. Try to use Broadcast joins wherever possible and filter out the irrelevant rows to the join key to avoid unnecessary data shuffling.

And for cases if you are confident enough that Shuffle Hash join is better than Sort Merge join, disable Sort Merge join for those scenarios.

Comments

Leave a Reply

XHTML: You can use these tags: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>