This tutorial will explain with examples on how to sort data within partitions based on specified column(s) in a dataframe.
sortWithinPartitions(*cols, ascending=True / ascending =[list of 1 and 0])
from pyspark.sql.functions import spark_partition_id
df = spark.read.csv("file:///path_to_files/csv_with_duplicates_and_nulls.csv",header=True).repartitionByRange(3,"db_id")
df = df.withColumn("partition#", spark_partition_id())
df.show()
+-----+---------+-------+----------+
|db_id| db_name|db_type|partition#|
+-----+---------+-------+----------+
| 12| Teradata| RDBMS| 0|
| 14|Snowflake|CloudDB| 0|
| 12| Teradata| RDBMS| 0|
| 15| Vertica| RDBMS| 1|
| 22| Mysql| null| 1|
| 50|Snowflake| RDBMS| 2|
| 51| null|CloudDB| 2|
+-----+---------+-------+----------+
df_update = df.sortWithinPartitions("db_id")
df_update.show()
+-----+---------+-------+----------+
|db_id| db_name|db_type|partition#|
+-----+---------+-------+----------+
| 12| Teradata| RDBMS| 0|
| 12| Teradata| RDBMS| 0|
| 14|Snowflake|CloudDB| 0|
| 15| Vertica| RDBMS| 1|
| 22| Mysql| null| 1|
| 50|Snowflake| RDBMS| 2|
| 51| null|CloudDB| 2|
+-----+---------+-------+----------+
from pyspark.sql.functions import asc
df_update = df.sortWithinPartitions(asc("db_id"))
df_update.show()
+-----+---------+-------+----------+
|db_id| db_name|db_type|partition#|
+-----+---------+-------+----------+
| 12| Teradata| RDBMS| 0|
| 12| Teradata| RDBMS| 0|
| 14|Snowflake|CloudDB| 0|
| 15| Vertica| RDBMS| 1|
| 22| Mysql| null| 1|
| 50|Snowflake| RDBMS| 2|
| 51| null|CloudDB| 2|
+-----+---------+-------+----------+
from pyspark.sql.functions import col
df_update = df.sortWithinPartitions(col("db_id").asc())
df_update.show()
+-----+---------+-------+----------+
|db_id| db_name|db_type|partition#|
+-----+---------+-------+----------+
| 12| Teradata| RDBMS| 0|
| 12| Teradata| RDBMS| 0|
| 14|Snowflake|CloudDB| 0|
| 15| Vertica| RDBMS| 1|
| 22| Mysql| null| 1|
| 50|Snowflake| RDBMS| 2|
| 51| null|CloudDB| 2|
+-----+---------+-------+----------+
df_update = df.sortWithinPartitions("db_id", ascending=False)
df_update.show()
+-----+---------+-------+----------+
|db_id| db_name|db_type|partition#|
+-----+---------+-------+----------+
| 14|Snowflake|CloudDB| 0|
| 12| Teradata| RDBMS| 0|
| 12| Teradata| RDBMS| 0|
| 22| Mysql| null| 1|
| 15| Vertica| RDBMS| 1|
| 51| null|CloudDB| 2|
| 50|Snowflake| RDBMS| 2|
+-----+---------+-------+----------+
from pyspark.sql.functions import desc
df_update = df.sortWithinPartitions(desc("db_id"))
df_update.show()
+-----+---------+-------+----------+
|db_id| db_name|db_type|partition#|
+-----+---------+-------+----------+
| 14|Snowflake|CloudDB| 0|
| 12| Teradata| RDBMS| 0|
| 12| Teradata| RDBMS| 0|
| 22| Mysql| null| 1|
| 15| Vertica| RDBMS| 1|
| 51| null|CloudDB| 2|
| 50|Snowflake| RDBMS| 2|
+-----+---------+-------+----------+
from pyspark.sql.functions import col
df_update = df.sortWithinPartitions(col("db_id").desc())
df_update.show()
+-----+---------+-------+----------+
|db_id| db_name|db_type|partition#|
+-----+---------+-------+----------+
| 14|Snowflake|CloudDB| 0|
| 12| Teradata| RDBMS| 0|
| 12| Teradata| RDBMS| 0|
| 22| Mysql| null| 1|
| 15| Vertica| RDBMS| 1|
| 51| null|CloudDB| 2|
| 50|Snowflake| RDBMS| 2|
+-----+---------+-------+----------+
from pyspark.sql.functions import col,asc_nulls_first
df_update = df.sortWithinPartitions(asc_nulls_first("db_name"))
df_update.show()
+-----+---------+-------+----------+
|db_id| db_name|db_type|partition#|
+-----+---------+-------+----------+
| 14|Snowflake|CloudDB| 0|
| 12| Teradata| RDBMS| 0|
| 12| Teradata| RDBMS| 0|
| 22| Mysql| null| 1|
| 15| Vertica| RDBMS| 1|
| 51| null|CloudDB| 2|
| 50|Snowflake| RDBMS| 2|
+-----+---------+-------+----------+
#same result as above
df_update = df.sortWithinPartitions(col("db_name").asc_nulls_first())
from pyspark.sql.functions import col,asc_nulls_last
df_update = df.sortWithinPartitions(asc_nulls_last("db_name"))
df_update.show()
+-----+---------+-------+----------+
|db_id| db_name|db_type|partition#|
+-----+---------+-------+----------+
| 14|Snowflake|CloudDB| 0|
| 12| Teradata| RDBMS| 0|
| 12| Teradata| RDBMS| 0|
| 22| Mysql| null| 1|
| 15| Vertica| RDBMS| 1|
| 50|Snowflake| RDBMS| 2|
| 51| null|CloudDB| 2|
+-----+---------+-------+----------+
#same result as above
df_update = df.sortWithinPartitions(col("db_name").asc_nulls_last())
from pyspark.sql.functions import col,desc_nulls_first
df_update = df.sortWithinPartitions(desc_nulls_first("db_name"))
df_update.show()
+-----+---------+-------+----------+
|db_id| db_name|db_type|partition#|
+-----+---------+-------+----------+
| 12| Teradata| RDBMS| 0|
| 12| Teradata| RDBMS| 0|
| 14|Snowflake|CloudDB| 0|
| 15| Vertica| RDBMS| 1|
| 22| Mysql| null| 1|
| 51| null|CloudDB| 2|
| 50|Snowflake| RDBMS| 2|
+-----+---------+-------+----------+
#same result as above
df_update = df.sortWithinPartitions(col("db_name").desc_nulls_first())
from pyspark.sql.functions import col,desc_nulls_last
df_update = df.sortWithinPartitions(desc_nulls_last("db_name"))
df_update.show()
+-----+---------+-------+----------+
|db_id| db_name|db_type|partition#|
+-----+---------+-------+----------+
| 12| Teradata| RDBMS| 0|
| 12| Teradata| RDBMS| 0|
| 14|Snowflake|CloudDB| 0|
| 15| Vertica| RDBMS| 1|
| 22| Mysql| null| 1|
| 50|Snowflake| RDBMS| 2|
| 51| null|CloudDB| 2|
+-----+---------+-------+----------+
#same result as above
df_update = df.sortWithinPartitions(col("db_name").desc_nulls_last())
df_update = df.sortWithinPartitions("db_name","db_id")
df_update.show()
+-----+---------+-------+----------+
|db_id| db_name|db_type|partition#|
+-----+---------+-------+----------+
| 14|Snowflake|CloudDB| 0|
| 12| Teradata| RDBMS| 0|
| 12| Teradata| RDBMS| 0|
| 22| Mysql| null| 1|
| 15| Vertica| RDBMS| 1|
| 51| null|CloudDB| 2|
| 50|Snowflake| RDBMS| 2|
+-----+---------+-------+----------+
sort_list= ["db_name","db_id"]
df_update = df.sortWithinPartitions(sort_list)
df_update.show()
+-----+---------+-------+----------+
|db_id| db_name|db_type|partition#|
+-----+---------+-------+----------+
| 14|Snowflake|CloudDB| 0|
| 12| Teradata| RDBMS| 0|
| 12| Teradata| RDBMS| 0|
| 22| Mysql| null| 1|
| 15| Vertica| RDBMS| 1|
| 51| null|CloudDB| 2|
| 50|Snowflake| RDBMS| 2|
+-----+---------+-------+----------+
df_update = df.sortWithinPartitions("db_name","db_id",ascending=[1,1])
df_update.show()
+-----+---------+-------+----------+
|db_id| db_name|db_type|partition#|
+-----+---------+-------+----------+
| 14|Snowflake|CloudDB| 0|
| 12| Teradata| RDBMS| 0|
| 12| Teradata| RDBMS| 0|
| 22| Mysql| null| 1|
| 15| Vertica| RDBMS| 1|
| 51| null|CloudDB| 2|
| 50|Snowflake| RDBMS| 2|
+-----+---------+-------+----------+
from pyspark.sql.functions import asc,desc
df_update = df.sortWithinPartitions(desc("db_name"),desc("db_id"))
df_update.show()
+-----+---------+-------+----------+
|db_id| db_name|db_type|partition#|
+-----+---------+-------+----------+
| 12| Teradata| RDBMS| 0|
| 12| Teradata| RDBMS| 0|
| 14|Snowflake|CloudDB| 0|
| 15| Vertica| RDBMS| 1|
| 22| Mysql| null| 1|
| 50|Snowflake| RDBMS| 2|
| 51| null|CloudDB| 2|
+-----+---------+-------+----------+
df_update = df.sortWithinPartitions("db_name","db_id",ascending=[0,0])
df_update.show()
+-----+---------+-------+----------+
|db_id| db_name|db_type|partition#|
+-----+---------+-------+----------+
| 12| Teradata| RDBMS| 0|
| 12| Teradata| RDBMS| 0|
| 14|Snowflake|CloudDB| 0|
| 15| Vertica| RDBMS| 1|
| 22| Mysql| null| 1|
| 50|Snowflake| RDBMS| 2|
| 51| null|CloudDB| 2|
+-----+---------+-------+----------+
from pyspark.sql.functions import asc,desc
df_update = df.sortWithinPartitions(desc("db_name"),asc("db_id"))
df_update.show()
+-----+---------+-------+----------+
|db_id| db_name|db_type|partition#|
+-----+---------+-------+----------+
| 12| Teradata| RDBMS| 0|
| 12| Teradata| RDBMS| 0|
| 14|Snowflake|CloudDB| 0|
| 15| Vertica| RDBMS| 1|
| 22| Mysql| null| 1|
| 50|Snowflake| RDBMS| 2|
| 51| null|CloudDB| 2|
+-----+---------+-------+----------+
from pyspark.sql.functions import asc,desc
df_update = df.sortWithinPartitions("db_name","db_id",ascending=[0,1])
df_update.show()
+-----+---------+-------+----------+
|db_id| db_name|db_type|partition#|
+-----+---------+-------+----------+
| 12| Teradata| RDBMS| 0|
| 12| Teradata| RDBMS| 0|
| 14|Snowflake|CloudDB| 0|
| 15| Vertica| RDBMS| 1|
| 22| Mysql| null| 1|
| 50|Snowflake| RDBMS| 2|
| 51| null|CloudDB| 2|
+-----+---------+-------+----------+
from pyspark.sql.functions import col,desc_nulls_last,asc_nulls_last
df_update = df.sortWithinPartitions(asc_nulls_last("db_type"), desc_nulls_last("db_name"))
df_update.show()
+-----+---------+-------+----------+
|db_id| db_name|db_type|partition#|
+-----+---------+-------+----------+
| 14|Snowflake|CloudDB| 0|
| 12| Teradata| RDBMS| 0|
| 12| Teradata| RDBMS| 0|
| 15| Vertica| RDBMS| 1|
| 22| Mysql| null| 1|
| 51| null|CloudDB| 2|
| 50|Snowflake| RDBMS| 2|
+-----+---------+-------+----------+
#same result as above
df_update = df.sortWithinPartitions(col("db_type").asc_nulls_last(), col("db_name").desc_nulls_last())
from pyspark.sql.functions import col,desc_nulls_last,asc_nulls_first
df_update = df.sortWithinPartitions(asc_nulls_first("db_type"), desc_nulls_last("db_name"))
df_update.show()
+-----+---------+-------+----------+
|db_id| db_name|db_type|partition#|
+-----+---------+-------+----------+
| 14|Snowflake|CloudDB| 0|
| 12| Teradata| RDBMS| 0|
| 12| Teradata| RDBMS| 0|
| 22| Mysql| null| 1|
| 15| Vertica| RDBMS| 1|
| 51| null|CloudDB| 2|
| 50|Snowflake| RDBMS| 2|
+-----+---------+-------+----------+
from pyspark.sql.functions import col,desc_nulls_last,asc_nulls_first
df_update = df.sortWithinPartitions(desc_nulls_last("db_type"), asc_nulls_first("db_name"))
df_update.show()
+-----+---------+-------+----------+
|db_id| db_name|db_type|partition#|
+-----+---------+-------+----------+
| 12| Teradata| RDBMS| 0|
| 12| Teradata| RDBMS| 0|
| 14|Snowflake|CloudDB| 0|
| 15| Vertica| RDBMS| 1|
| 22| Mysql| null| 1|
| 50|Snowflake| RDBMS| 2|
| 51| null|CloudDB| 2|
+-----+---------+-------+----------+
from pyspark.sql.functions import col,desc_nulls_last,asc_nulls_first
df_update = df.sortWithinPartitions(desc_nulls_first("db_type"), asc_nulls_last("db_name"))
df_update.show()
+-----+---------+-------+----------+
|db_id| db_name|db_type|partition#|
+-----+---------+-------+----------+
| 12| Teradata| RDBMS| 0|
| 12| Teradata| RDBMS| 0|
| 14|Snowflake|CloudDB| 0|
| 22| Mysql| null| 1|
| 15| Vertica| RDBMS| 1|
| 50|Snowflake| RDBMS| 2|
| 51| null|CloudDB| 2|
+-----+---------+-------+----------+