# code for loading the format for the notebook
import os
# path : store the current path to convert back to it later
path = os.getcwd()
os.chdir(os.path.join('..', 'notebook_format'))
from formats import load_style
load_style(plot_style=False)
os.chdir(path)
# 1. magic to print version
# 2. magic so that the notebook will reload external python modules
%load_ext watermark
%load_ext autoreload
%autoreload 2
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession, Row
%watermark -a 'Ethen' -d -t -v -p pyspark
spark = (SparkSession.
builder.
master('local[*]').
appName('implicit').
config(conf = SparkConf()).
getOrCreate())
spark
The main reason why we should care about partitions is performance. By having all relevant data in one place (node) we reduce the overhead of shuffling (need for serialization and network traffic). Understanding how Spark deals with partitions allow us to control the application parallelism (which leads to better cluster utilization - fewer costs). But keep in mind that partitioning will not be helpful in all applications. For example, if a given RDD is scanned only once, there is no point in partitioning it in advance. It's useful only when a dataset is reused multiple times and performing operations that involves a shuffle, e.g. reduceByKey()
.
We will use the following list of numbers to investigate the behavior of spark's partitioning.
num_partitions = 2
rdd = spark.sparkContext.parallelize(range(10), num_partitions)
print('Number of partitions: {}'.format(rdd.getNumPartitions()))
print('Partitioner: {}'.format(rdd.partitioner))
print('Partitions structure: {}'.format(rdd.glom().collect()))
Let's look at what is happening under the hood. Spark uses different partitioning schemes for various types of RDD, in our case, our partitioner is None, If there is no partitioner, then the partitioning is not based upon characteristic of data but uniformly distributed across nodes.
Looking at the partition structure, we can see that our RDD is in fact split into two partitions, and if we were to apply transformations on this RDD, then each partition's work will be executed in a separate thread.
If you're confused about the glom
method, it returns a RDD created by coalescing all elements within each partition into a list/array. An example usage of this method might be, say we wish to get the maximum value of a RDD, we could do:
# or in Scala: rdd.reduce(_ max _)
rdd.reduce(max)
As reduce introduces lot of shuffles between partitions for comparison, we could instead:
rdd.glom().map(lambda partition: max(partition)).reduce(max)
The next question is: What will happen when the number of partitions exceeds the number of data records?
num_partitions = 15
rdd = spark.sparkContext.parallelize(range(10), num_partitions)
print('Number of partitions: {}'.format(rdd.getNumPartitions()))
print('Partitioner: {}'.format(rdd.partitioner))
print('Partitions structure: {}'.format(rdd.glom().collect()))
From the output above, we can see that Spark created the requested number of partitions, but some of them are empty. This is bad because we would need to spend time preparing these idle threads.
partitionBy()
transformation gives the end-user the flexibility to apply custom partitioning logic over the RDD. To use partitionBy()
, our RDD must be comprised of tuple (pair) objects. And again, it's highly advised to persist it for more optimal later usage.
Let's get into a more realistic example. Imagine that our data consist of various dummy transactions made across different countries.
transactions = [
{'name': 'Bob', 'amount': 100, 'country': 'United Kingdom'},
{'name': 'James', 'amount': 15, 'country': 'United Kingdom'},
{'name': 'Marek', 'amount': 51, 'country': 'Poland'},
{'name': 'Paul', 'amount': 75, 'country': 'Poland'}
]
Say we know that our downstream analysis required analyzing records within the same country. To optimize network traffic it seems to be a good idea to put records from one country onto the same node/partition.
def country_partitioner(country):
return hash(country)
# note that we technically don't need to pass in the custom
# partitioner when using partitionBy, if we don't then spark
# will use its own hash partitioner to carry out the partitioning
rdd = (spark.sparkContext.
parallelize(transactions).
map(lambda record: (record['country'], record)).
partitionBy(2, country_partitioner))
print("Number of partitions: {}".format(rdd.getNumPartitions()))
print("Partitioner: {}".format(rdd.partitioner))
print("Partitions structure: {}".format(rdd.glom().collect()))
It worked as expected, all records from the same country is within the same partition. We can perform some downstream work on them without worrying about large network shuffling. One caveat about this approach is that we should pay attention for potential data skews. Meaning if some keys are overrepresented in the dataset it can result in suboptimal resource usage and potential failure (e.g. in our case, say United Kingdom had a lot more data than Poland).
After partitioning the data, the next common transformation is to use a mapPartitions()
, which operates on each partition of the RDD.
Nowadays we are all advised use structured DataFrames from Spark SQL module as oppose to RDDs as much as possible. When we are calling a DataFrame transformation, it actually becomes a set of RDD transformation underneath the hood. The main advantage is that when using the DataFrame API, spark understands the inner structure of our records much better and is capable of performing internal optimization to increase the processing speed.
spark = (SparkSession.
builder.
master('local[*]').
config('spark.sql.shuffle.partitions', 2).
getOrCreate())
# create a spark DataFrame from the dictionary
rdd = (spark.sparkContext.
parallelize(transactions, 2).
map(lambda x: Row(**x)))
# here, we are essentially creating a custom partitioner,
# by specifying we are going to repartition using the 'country' column.
# We can think of this operation as performing an indexing on the 'country'
# column from a relational database standpoint.
# When not specifying number of partitions, spark will use the value from the
# config parameter 'spark.sql.shuffle.partitions', in this example, we
# explicitly set it to 2, if we didn't specify this value, the default would
# be 200.
df = spark.createDataFrame(rdd).repartition(2, 'country')
print('Number of partitions: {}'.format(df.rdd.getNumPartitions()))
print('Partitioner: {}'.format(rdd.partitioner))
print('Partitions structure: {}'.format(df.rdd.glom().collect()))
The coalesce()
and repartition()
transformations are both used for changing the number of partitions in the RDD. The main difference is that:
repartition()
, this will perform a full shuffle.coalesce()
, this operation ensures that we minimize shuffles.# the coalesce algorithm merged the data from 1 partition to another
# Partition to Partition A, thus it can't be used to increase the partition
df_coalesce = df.coalesce(1)
print('Number of partitions: {}'.format(df_coalesce.rdd.getNumPartitions()))
print('Partitions structure: {}'.format(df_coalesce.rdd.glom().collect()))
df_repartition = df.repartition(4)
print('Number of partitions: {}'.format(df_repartition.rdd.getNumPartitions()))
print('Partitions structure: {}'.format(df_repartition.rdd.glom().collect()))
In real world scenarios, times when we might want to use coalesce is after doing some aggregation or filtering on the giant raw data. e.g. Suppose we have a data that contains 2 billion rows of data (1 TB) split into 13,000 partitions. Suppose after the aggregation, we are only down to 2 million rows of data. Now if we were to save it as is, a lot of the output partitions will be empty. And as we can imagine, it's not efficient to read or write thousands of empty files, to improve this, we should call coalesce
.
# the .sample method mimics that our data is much smaller after the aggregation/filtering
smaller_data = huge_data.sample(withReplacement = False, fraction = 0.001)
smaller_data.coalesce(4)
The next million dollar question is: What is the optimal partition number?
By now, we can probably guessed if we have too few partitions, we would potentially be faced with:
java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
. When this happens, increasing the number of partitions (therefore, reducing the average partition size) usually resolves the issue. Related link: Stackoverflow: Why does Spark RDD partition has 2GB limit?On the other hand, the disadvantages of too many partitions is that our time might be all spent on task scheduling as oppose to performing the actual computation. Hence, it is recommended to partition judiciously depending upon our cluster configuration and requirements. The following number is a rule of thumb that can serve as a guideline:
According to the spark documentation: In general, we recommend 2-3 tasks per CPU core in your cluster. In spark, the definition of a task is computation applied to a unit of data (partition). Thus if a stage consists of 200 task, that means in this stage, we are applying the computation across 200 partitions. If we were to follow the recommendation, that gives us the formula
number_of_partitions = number_of_cpus * [2 or 3]
.
If interested, the following link also contains recommendations for tuning spark applications. Specifically, --num-executors
, --executor-memory
and --executor-cores
. Blog: Distribution of Executors, Cores and Memory for a Spark Application running in Yarn