This tutorial will explain with examples on how to sort data based on specified column(s) in a dataframe.
sort(*cols, ascending=True / ascending =[list of 1 and 0])
df = spark.read.csv("file:///path_to_files/csv_with_duplicates_and_nulls.csv",header=True)
df.show()
+-----+---------+-------+
|db_id| db_name|db_type|
+-----+---------+-------+
| 12| Teradata| RDBMS|
| 14|Snowflake|CloudDB|
| 15| Vertica| RDBMS|
| 12| Teradata| RDBMS|
| 22| Mysql| null|
| 50|Snowflake| RDBMS|
| 51| null|CloudDB|
+-----+---------+-------+
df_update = df.sort("db_id")
df_update.show()
+-----+---------+-------+
|db_id| db_name|db_type|
+-----+---------+-------+
| 12| Teradata| RDBMS|
| 12| Teradata| RDBMS|
| 14|Snowflake|CloudDB|
| 15| Vertica| RDBMS|
| 22| Mysql| null|
| 50|Snowflake| RDBMS|
| 51| null|CloudDB|
+-----+---------+-------+
from pyspark.sql.functions import asc
df_update = df.sort(asc("db_id"))
df_update.show()
+-----+---------+-------+
|db_id| db_name|db_type|
+-----+---------+-------+
| 12| Teradata| RDBMS|
| 12| Teradata| RDBMS|
| 14|Snowflake|CloudDB|
| 15| Vertica| RDBMS|
| 22| Mysql| null|
| 50|Snowflake| RDBMS|
| 51| null|CloudDB|
+-----+---------+-------+
from pyspark.sql.functions import col
df_update = df.sort(col("db_id").asc())
df_update.show()
+-----+---------+-------+
|db_id| db_name|db_type|
+-----+---------+-------+
| 12| Teradata| RDBMS|
| 12| Teradata| RDBMS|
| 14|Snowflake|CloudDB|
| 15| Vertica| RDBMS|
| 22| Mysql| null|
| 50|Snowflake| RDBMS|
| 51| null|CloudDB|
+-----+---------+-------+
df_update = df.sort("db_id", ascending=False)
df_update.show()
+-----+---------+-------+
|db_id| db_name|db_type|
+-----+---------+-------+
| 51| null|CloudDB|
| 50|Snowflake| RDBMS|
| 22| Mysql| null|
| 15| Vertica| RDBMS|
| 14|Snowflake|CloudDB|
| 12| Teradata| RDBMS|
| 12| Teradata| RDBMS|
+-----+---------+-------+
from pyspark.sql.functions import desc
df_update = df.sort(desc("db_id"))
df_update.show()
+-----+---------+-------+
|db_id| db_name|db_type|
+-----+---------+-------+
| 51| null|CloudDB|
| 50|Snowflake| RDBMS|
| 22| Mysql| null|
| 15| Vertica| RDBMS|
| 14|Snowflake|CloudDB|
| 12| Teradata| RDBMS|
| 12| Teradata| RDBMS|
+-----+---------+-------+
from pyspark.sql.functions import col
df_update = df.sort(col("db_id").desc())
df_update.show()
+-----+---------+-------+
|db_id| db_name|db_type|
+-----+---------+-------+
| 51| null|CloudDB|
| 50|Snowflake| RDBMS|
| 22| Mysql| null|
| 15| Vertica| RDBMS|
| 14|Snowflake|CloudDB|
| 12| Teradata| RDBMS|
| 12| Teradata| RDBMS|
+-----+---------+-------+
from pyspark.sql.functions import col,asc_nulls_first
df_update = df.sort(asc_nulls_first("db_name"))
df_update.show()
+-----+---------+-------+
|db_id| db_name|db_type|
+-----+---------+-------+
| 51| null|CloudDB|
| 22| Mysql| null|
| 14|Snowflake|CloudDB|
| 50|Snowflake| RDBMS|
| 12| Teradata| RDBMS|
| 12| Teradata| RDBMS|
| 15| Vertica| RDBMS|
+-----+---------+-------+
#same result as above
df_update = df.sort(col("db_name").asc_nulls_first())
from pyspark.sql.functions import col,asc_nulls_last
df_update = df.sort(asc_nulls_last("db_name"))
df_update.show()
+-----+---------+-------+
|db_id| db_name|db_type|
+-----+---------+-------+
| 22| Mysql| null|
| 14|Snowflake|CloudDB|
| 50|Snowflake| RDBMS|
| 12| Teradata| RDBMS|
| 12| Teradata| RDBMS|
| 15| Vertica| RDBMS|
| 51| null|CloudDB|
+-----+---------+-------+
#same result as above
df_update = df.sort(col("db_name").asc_nulls_last())
from pyspark.sql.functions import col,desc_nulls_first
df_update = df.sort(desc_nulls_first("db_name"))
df_update.show()
+-----+---------+-------+
|db_id| db_name|db_type|
+-----+---------+-------+
| 51| null|CloudDB|
| 15| Vertica| RDBMS|
| 12| Teradata| RDBMS|
| 12| Teradata| RDBMS|
| 14|Snowflake|CloudDB|
| 50|Snowflake| RDBMS|
| 22| Mysql| null|
+-----+---------+-------+
#same result as above
df_update = df.sort(col("db_name").desc_nulls_first())
from pyspark.sql.functions import col,desc_nulls_last
df_update = df.sort(desc_nulls_last("db_name"))
df_update.show()
+-----+---------+-------+
|db_id| db_name|db_type|
+-----+---------+-------+
| 15| Vertica| RDBMS|
| 12| Teradata| RDBMS|
| 12| Teradata| RDBMS|
| 50|Snowflake| RDBMS|
| 14|Snowflake|CloudDB|
| 22| Mysql| null|
| 51| null|CloudDB|
+-----+---------+-------+
#same result as above
df_update = df.sort(col("db_name").desc_nulls_last())
df_update = df.sort("db_name","db_id")
df_update.show()
+-----+---------+-------+
|db_id| db_name|db_type|
+-----+---------+-------+
| 51| null|CloudDB|
| 22| Mysql| null|
| 14|Snowflake|CloudDB|
| 50|Snowflake| RDBMS|
| 12| Teradata| RDBMS|
| 12| Teradata| RDBMS|
| 15| Vertica| RDBMS|
+-----+---------+-------+
sort_list= ["db_name","db_id"]
df_update = df.sort(sort_list)
df_update.show()
+-----+---------+-------+
|db_id| db_name|db_type|
+-----+---------+-------+
| 51| null|CloudDB|
| 22| Mysql| null|
| 14|Snowflake|CloudDB|
| 50|Snowflake| RDBMS|
| 12| Teradata| RDBMS|
| 12| Teradata| RDBMS|
| 15| Vertica| RDBMS|
+-----+---------+-------+
df_update = df.sort("db_name","db_id",ascending=[1,1])
df_update.show()
+-----+---------+-------+
|db_id| db_name|db_type|
+-----+---------+-------+
| 51| null|CloudDB|
| 22| Mysql| null|
| 14|Snowflake|CloudDB|
| 50|Snowflake| RDBMS|
| 12| Teradata| RDBMS|
| 12| Teradata| RDBMS|
| 15| Vertica| RDBMS|
+-----+---------+-------+
from pyspark.sql.functions import asc,desc
df_update = df.sort(desc("db_name"),desc("db_id"))
df_update.show()
+-----+---------+-------+
|db_id| db_name|db_type|
+-----+---------+-------+
| 15| Vertica| RDBMS|
| 12| Teradata| RDBMS|
| 12| Teradata| RDBMS|
| 50|Snowflake| RDBMS|
| 14|Snowflake|CloudDB|
| 22| Mysql| null|
| 51| null|CloudDB|
+-----+---------+-------+
df_update = df.sort("db_name","db_id",ascending=[0,0])
df_update.show()
+-----+---------+-------+
|db_id| db_name|db_type|
+-----+---------+-------+
| 15| Vertica| RDBMS|
| 12| Teradata| RDBMS|
| 12| Teradata| RDBMS|
| 50|Snowflake| RDBMS|
| 14|Snowflake|CloudDB|
| 22| Mysql| null|
| 51| null|CloudDB|
+-----+---------+-------+
from pyspark.sql.functions import asc,desc
df_update = df.sort(desc("db_name"),asc("db_id"))
df_update.show()
+-----+---------+-------+
|db_id| db_name|db_type|
+-----+---------+-------+
| 15| Vertica| RDBMS|
| 12| Teradata| RDBMS|
| 12| Teradata| RDBMS|
| 14|Snowflake|CloudDB|
| 50|Snowflake| RDBMS|
| 22| Mysql| null|
| 51| null|CloudDB|
+-----+---------+-------+
from pyspark.sql.functions import asc,desc
df_update = df.sort("db_name","db_id",ascending=[0,1])
df_update.show()
+-----+---------+-------+
|db_id| db_name|db_type|
+-----+---------+-------+
| 15| Vertica| RDBMS|
| 12| Teradata| RDBMS|
| 12| Teradata| RDBMS|
| 14|Snowflake|CloudDB|
| 50|Snowflake| RDBMS|
| 22| Mysql| null|
| 51| null|CloudDB|
+-----+---------+-------+
from pyspark.sql.functions import col,desc_nulls_last,asc_nulls_last
df_update = df.sort(asc_nulls_last("db_type"), desc_nulls_last("db_name"))
df_update.show()
+-----+---------+-------+
|db_id| db_name|db_type|
+-----+---------+-------+
| 14|Snowflake|CloudDB|
| 51| null|CloudDB|
| 15| Vertica| RDBMS|
| 12| Teradata| RDBMS|
| 12| Teradata| RDBMS|
| 50|Snowflake| RDBMS|
| 22| Mysql| null|
+-----+---------+-------+
#same result as above
df_update = df.sort(col("db_type").asc_nulls_last(), col("db_name").desc_nulls_last())
from pyspark.sql.functions import col,desc_nulls_last,asc_nulls_first
df_update = df.sort(asc_nulls_first("db_type"), desc_nulls_last("db_name"))
df_update.show()
+-----+---------+-------+
|db_id| db_name|db_type|
+-----+---------+-------+
| 22| Mysql| null|
| 14|Snowflake|CloudDB|
| 51| null|CloudDB|
| 15| Vertica| RDBMS|
| 12| Teradata| RDBMS|
| 12| Teradata| RDBMS|
| 50|Snowflake| RDBMS|
+-----+---------+-------+
from pyspark.sql.functions import col,desc_nulls_last,asc_nulls_first
df_update = df.sort(desc_nulls_last("db_type"), asc_nulls_first("db_name"))
df_update.show()
+-----+---------+-------+
|db_id| db_name|db_type|
+-----+---------+-------+
| 50|Snowflake| RDBMS|
| 12| Teradata| RDBMS|
| 12| Teradata| RDBMS|
| 15| Vertica| RDBMS|
| 51| null|CloudDB|
| 14|Snowflake|CloudDB|
| 22| Mysql| null|
+-----+---------+-------+
from pyspark.sql.functions import col,desc_nulls_last,asc_nulls_first
df_update = df.sort(desc_nulls_first("db_type"), asc_nulls_last("db_name"))
df_update.show()
+-----+---------+-------+
|db_id| db_name|db_type|
+-----+---------+-------+
| 50|Snowflake| RDBMS|
| 12| Teradata| RDBMS|
| 12| Teradata| RDBMS|
| 15| Vertica| RDBMS|
| 51| null|CloudDB|
| 14|Snowflake|CloudDB|
| 22| Mysql| null|
+-----+---------+-------+