Apache Spark- Performance Tuning

Apache Spark is a compute engine and it’s very important to use this engine in efficient ways. Before moving forward let us discuss few basic terms used in performance.

Spark performance can be improved at the job level and another at the spark-SQL level.

Spark job optimizations

We can optimize the spark jobs by following ways:

  1. Choose the data abstraction.
  2. Use optimal data format.
  3. Select default storage.
  4. Use the cache.
  5. Use memory efficiently.
  6. Optimize data serialization: Kyro Serialization (Serialization is a mechanism of converting the state of an object into a byte stream.)
  7. Use bucketing: Bucketing boosts performance by sorting and shuffling data before performing downstream operations, such as table joins. 
  8. Optimize joins and shuffles.

Before moving forward, recap the terms of spark again.

Spark SQL is a Spark module for structured data processing. Unlike the basic Spark RDD API, the interfaces provided by Spark SQL provide Spark with more information about the structure of both the data and the computation being performed.

Dataset is a distributed collection of data. Dataset is a new interface added in Spark 1.6 that provides the benefits of RDDs (strong typing, ability to use powerful lambda functions) with the benefits of Spark SQL’s optimized execution engine. A Dataset can be constructed from JVM objects and then manipulated using functional transformations (mapflatMapfilter, etc.).

DataFrame is a Dataset organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood. DataFrames can be constructed from a wide array of sources such as structured data files, tables in Hive, external databases, or existing RDDs. 

Shuffle can occur when the resulting RDD depends on the other elements of the same RDD or another RDD. The transformation causes shuffles to happen but what are Transformations?

While Writing Spark code we use two basic Operations:

  • Transformations are basically lazy in nature i.e., they get execute when we call an action. They are not executed immediately. There are two types of transformations:
    • Narrow transformation, all the elements that are required to compute the records in single partition live in the single partition of parent RDD. A limited subset of partition is used to calculate the result. Expamples:
      • Map
      • FlatMap
      • Mappartition
      • Filter
      • Sample
      • Union
    • Wide
      • Intersection
      • Distinct
      • ReduceByKey
      • GroupByKey
      • Join
      • Cartesian
      • Repartition
      • Coalesce
  • Actions are Spark RDD operations that give non-RDD values. The values of action are stored to drivers or to the external storage system. It brings laziness of RDD into motion.

Action is one of the ways of sending data from the Executer to the driver. Executors are agents that are responsible for executing a task. While the driver is a JVM process that coordinates workers and execution of the task. Some of the actions of Spark are:

    • Count
    • Collect
    • Take
    • top
    • CountByValue
    • Reduce
    • Fold
    • Aggregate
    • Foreach

SPARK-SQL Optimisation techniques:

Caching Data In Memory

Spark SQL can cache tables using an in-memory columnar format by calling spark.catalog.cacheTable("tableName") or dataFrame.cache(). Then Spark SQL will scan only required columns and will automatically tune compression to minimize memory usage and GC pressure.

Join Strategy Hints for SQL Queries

The join strategy hints, namely BROADCASTMERGESHUFFLE_HASH and SHUFFLE_REPLICATE_NL, instruct Spark to use the hinted strategy on each specified relation when joining them with another relation. For example, when the BROADCAST hint is used on table ‘t1’, broadcast join (either broadcast hash join or broadcast nested loop join depending on whether there is any equi-join key) with ‘t1’ as the build side will be prioritized by Spark even if the size of table ‘t1’ suggested by the statistics is above the configuration spark.sql.autoBroadcastJoinThreshold.

When different join strategy hints are specified on both sides of a join, Spark prioritizes the BROADCAST hint over the MERGE hint over the SHUFFLE_HASH hint over the SHUFFLE_REPLICATE_NL hint. When both sides are specified with the BROADCAST hint or the SHUFFLE_HASH hint, Spark will pick the build side based on the join type and the sizes of the relations.

Note that there is no guarantee that Spark will choose the join strategy specified in the hint since a specific strategy may not support all join types.

spark.table("src").join(spark.table("records").hint("broadcast"), "key").show()

Coalesce Hints for SQL Queries

Coalesce hints allows the Spark SQL users to control the number of output files just like the coalescerepartition and repartitionByRange in Dataset API, they can be used for performance tuning and reducing the number of output files. 

Adaptive Query Execution

Adaptive Query Execution (AQE) is an optimization technique in Spark SQL that makes use of the runtime statistics to choose the most efficient query execution plan. AQE is disabled by default. Spark SQL can use the umbrella configuration of spark.sql.adaptive.enabled to control whether turn it on/off. As of Spark 3.0, there are three major features in AQE, including coalescing post-shuffle partitions, converting sort-merge join to broadcast join, and skew join optimization.

1-Coalescing Post Shuffle Partitions

This feature coalesces the post shuffle partitions based on the map output statistics when both spark.sql.adaptive.enabled and spark.sql.adaptive.coalescePartitions.enabled configurations are true. This feature simplifies the tuning of shuffle partition number when running queries. You do not need to set a proper shuffle partition number to fit your dataset. Spark can pick the proper shuffle partition number at runtime once you set a large enough initial number of shuffle partitions via spark.sql.adaptive.coalescePartitions.initialPartitionNum configuration.

2-Converting sort-merge join to broadcast join

AQE converts sort-merge join to broadcast hash join when the runtime statistics of any join side is smaller than the broadcast hash join threshold. This is not as efficient as planning a broadcast hash join in the first place, but it’s better than keep doing the sort-merge join, as we can save the sorting of both the join sides, and read shuffle files locally to save network traffic(if spark.sql.adaptive.localShuffleReader.enabled is true)

3-Optimizing Skew Join

Data skew can severely downgrade the performance of join queries. This feature dynamically handles skew in sort-merge join by splitting (and replicating if needed) skewed tasks into roughly evenly sized tasks. It takes effect when both spark.sql.adaptive.enabled and spark.sql.adaptive.skewJoin.enabled configurations are enabled.

Note:

1-we can use toDebugString to check the lineage of a transformation.

2-Recovery from a narrow transformation is fast as compared to wide transformations.

Ref:

https://training.databricks.com/visualapi.pdf

https://mas-dse.github.io/DSE230/docs/Spark%20Cheat-Sheets%20(DZone).pdf

https://intellipaat.com/mediaFiles/2019/03/spark-and-rdd-cheat-sheet-1.png

Spark-SQL-white Paper:

http://people.csail.mit.edu/matei/papers/2015/sigmod_spark_sql.pdf

https://data-flair.training/blogs/spark-sql-optimization/

Leave a Reply