PySpark: Dataframe Partitions Part 2
This tutorial is continuation of the part 1 which explains how to partition a dataframe randomly or based on specified column(s) of a dataframe and some of the partition related operations.
- By default, Spark will create as many number of partitions in dataframe as there will be number of files in the read path.
- Function getNumPartitions can be used to get the number of partition in a dataframe.
- Partitions are useful for processing data in parallel, as many executors can work on the data in parallel as there will be partitions in a dataframe.
- If the joins are between dataframes which are partitioned on the joined columns then join will have better performance.
- Partitions can be created in a dataframe while reading data or after reading data from a data source.
- Number of partitions can be increased or decreased in a dataframe. However if data volume is high, this might be a costlier operation with respect to resources in the cluster.
- Dataframe functions such as sort, dropduplicates, distinct will result in shuffle partition. This mean that the data present in current partitions will be reshuffled into new partitions and the number of partitions in target dataframe will be equivalent to the value set for "spark.sql.shuffle.partitions" property, default value for this property is 200
- Following topics will be covered on these pages related to partitions, click on item in the below list and it will take you to the respective section of the page(s):
Sample Data: Dataset used in the below examples can be downloaded from here.
df = spark.read.csv("file:///path_to_files/csv_with_duplicates_and_nulls.csv",header=True)
df.show()
+-----+---------+-------+
|db_id| db_name|db_type|
+-----+---------+-------+
| 12| Teradata| RDBMS|
| 14|Snowflake|CloudDB|
| 15| Vertica| RDBMS|
| 12| Teradata| RDBMS|
| 22| Mysql| null|
| 50|Snowflake| RDBMS|
| 51| null|CloudDB|
+-----+---------+-------+
➠
getNumPartitions: RDD function getNumPartitions can be used to get the number of partition in a dataframe.
➠
RepartitionByRange Function: repartitionByRange() function can be used to increase or decrease number of partitions. Target dataframe after applying repartitionByRange function is range partitioned.
- Syntax: This function can take upto 2 parameters, 1st parameter is optional while 2nd parameter is manadatory.
If 1st parameter(i.e NumPartitions) is not passed then Spark will create number of partitions equivalent to the count of distinct values for the underlying column.
repartitionByRange((numPartitions, *cols)
- → 1st parameter can be used to specify integer value to create that many number of partitions in a dataframe.
- → 2nd parameter can be used to specify 4 type of values: single column as string, comma separated string for column names, single column expression or comma separated other column expressions.
- Example 1: Increasing number of partitions (creating partitions) in a dataframe.
Only 2nd parameter was passed as input to repartitionByRange function.
In the below example, 6 numbers of partitions were created because there are 6 distinct values for "db_name" column.
It can be observed in the below example for values "Snowflake" and "Teradata" where same value belongs to the same partition in a dataframe.
from pyspark.sql.functions import spark_partition_id
df.rdd.getNumpartitins()
Output: 1
df_update = df.repartitionByRange("db_name")
df_update.rdd.getNumPartitions()
Output: 6
df_update.select("db_name",spark_partition_id()).show()
+---------+--------------------+
| db_name|SPARK_PARTITION_ID()|
+---------+--------------------+
| null| 0|
| Mysql| 1|
|Snowflake| 2|
|Snowflake| 2|
| Teradata| 3|
| Teradata| 3|
| Vertica| 4|
+---------+--------------------+
- Example 2: Creating partitions based on the multiple columns, same value from the combination of columns will be part of same partition.
This can be observed in the below example for db_name column value "Teradata" and db_id column value "12" where same combination value belongs to the same partition in a dataframe.
As NumPartitions parameter is not passed, the number of partitions in the target dataframe in this case will be equivalent number of distinct values for the combination of column "db_name" and "db_id".
from pyspark.sql.functions import spark_partition_id
df_update = df.repartitionByRange("db_name", "db_id")
df_update.select("db_name","db_id","db_type",spark_partition_id()).show()
+---------+-----+--------------------+
| db_name|db_id|SPARK_PARTITION_ID()|
+---------+-----+--------------------+
| null| 51| 0|
| Mysql| 22| 1|
|Snowflake| 14| 2|
|Snowflake| 50| 3|
| Teradata| 12| 4|
| Teradata| 12| 4|
| Vertica| 15| 5|
+---------+-----+--------------------+
df_update.rdd.getNumPartitions()
Output: 7
- Example 3: Creating partitions based on the column name but with NumPartitions parameter to restrict number of partitions in the target dataframe. In this example, you can observe same value for column "db_name" are part of same partition.
from pyspark.sql.functions import spark_partition_id
df_update = df.repartitionByRange(2, "db_name")
df_update.select("db_name",spark_partition_id()).show()
+---------+--------------------+
| db_name|SPARK_PARTITION_ID()|
+---------+--------------------+
|Snowflake| 0|
| Mysql| 0|
|Snowflake| 0|
| null| 0|
| Teradata| 1|
| Vertica| 1|
| Teradata| 1|
+---------+--------------------+
df_update.rdd.getNumPartitions()
Output: 2
- Example 4: Creating partitions based on single column expression, same output value of this column expression will be part of same partition.
In this example, substring was used on a column to create partitions based on first letter of the column value.
As NumPartitions parameter is not passed, the number of partitions in the target dataframe in this case will be equivalent number of distinct values for the column "db_name".
from pyspark.sql.functions import col, spark_partition_id
df_update = df.repartitionByRange(col("db_name").substr(1,1))
df_update.select("db_name",spark_partition_id()).show()
+---------+--------------------+
| db_name|SPARK_PARTITION_ID()|
+---------+--------------------+
| null| 0|
| Mysql| 1|
|Snowflake| 2|
|Snowflake| 2|
| Teradata| 3|
| Teradata| 3|
| Vertica| 4|
+---------+--------------------+
df_update.rdd.getNumPartitions()
Output: 200
- Example 5: Creating partitions based on the column expression but with NumPartitions parameter to restrict number of partitions in the target dataframe.
In this example, you can observe that the same output for column expression "db_id" mod(remainder) 5 belongs to the same partition. "DB_ID" 12 and 22 belongs to same partiton(remainder is 2), similarly 15 and 50 belongs to same partition(remainder is 0).
from pyspark.sql.functions import col, spark_partition_id
df_update = df.repartitionByRange(3, col("db_id")%5)
df_update.select("db_id","db_name",spark_partition_id()).show()
+-----+---------+--------------------+
|db_id| db_name|SPARK_PARTITION_ID()|
+-----+---------+--------------------+
| 15| Vertica| 0|
| 50|Snowflake| 0|
| 51| null| 0|
| 12| Teradata| 1|
| 12| Teradata| 1|
| 22| Mysql| 1|
| 14|Snowflake| 2|
+-----+---------+--------------------+
df_update.rdd.getNumPartitions()
Output: 3
➠
Coalesce Function: coalesce() function can be used to reduce partitions in a dataframe. If a larger number of partitions are requested using coalesce function then dataframe will stay at the current number of partitions.
- Syntax: This function takes only 1 mandatory parameter i.e integer value to reduce partitions to that many numbers in a dataframe.
coalesce(numPartitions)
- → If partitions are reduced from 100 partitions to 10 partitions then there will not be a shuffle, instead each of 10 new partitions will claim 10 of the current partitions.
- → If users will do drastic coalesce(such number of partitions to 1) then this may result in computation taking place on fewer nodes(one node when number of partitions parameter is 1).
- Example 1: Coalesce function is used to reduce partition's from 4 to 2.
df_update = df.repartition(4)
df_update.rdd.getNumPartitions()
Output: 4
df_update.coalesce(2).rdd.getNumPartitions()
Output: 2
- Example 2: Even though larger number of partitions were requested, dataframe will stay at current number of partitions.
df_update = df.repartition(4)
df_update.rdd.getNumPartitions()
Output: 4
df_update.coalesce(6).rdd.getNumPartitions()
Output: 4