PySpark: Dataframe Analytical Functions Part 2
- This tutorial will explain ranking window functions which can be used to fulfil various user analytical requirements.
- Some of the ranking window functions
Click here to check other analytical functions available in Pyspark.
Window functions/attributes:
These are most important part of ordered analytical functions and should be understood properly in order to effectively use them.
-
Window.partitionBy: This is optional and can be used to further categorized data and perform analysis within sub categories. . Comma separated multiple columns can be added in window partition.
Example: In order to calculate sum of salary for each Department along with individual detail, partition by dept number is required.
Dept No Emp No salary Total_Department_Salary
------ ------ ------ ----------------------
10 11 15000 20000
10 12 5000 20000
20 22 10023 30023
20 23 10000 30023
20 24 10000 30023
30 31 13230 23230
30 32 10000 23230
Note: Moving all data to a single node(i.e without using partition) for using window functions, can cause serious performance degradation.
-
Window.orderBy: This is optional and can be used to specify order in which rows should be processed.
-
Window.rowsBetween: This is also optional and should be used when user want to peek into other rows data to compute current row value.
rowsBetween( Window.unboundedPreceding|Window.currentRow|Window.currentRow+-n (start window) , Window.currentRow+-m |Window.currentRow| Window.unboundedFollowing (end window))
- rowsBetween attribute require user to specify start window and end window of row.
- Window.unboundedPreceding, Window.unboundedFollowing and Window.currentRow values are available to peek into previous rows, following rows and current row respectively.
- User can add or subtract number(e.g 1,2,3) from Window.currentRow to check that many number of previous / following rows or specify UNBOUNDED attributes to check all the previous / following rows.
Sample Data: Dataset used in the below examples can be downloaded from here.
empdf = spark.read.parquet("file:///path_to_files/employee_with_comm.parquet")
empdf.show()
+-------+--------+-------+----------+-------+-----+
| emp_no|emp_name| salary|manager_id|dept_no| comm|
+-------+--------+-------+----------+-------+-----+
|1000245| PRADEEP|5000.00| null| 100| 0.00|
|1000258| BLAKE|2850.00| 1000245| 300|50.00|
|1000262| CLARK|2450.00| 1000245| 100|50.00|
|1000276| JONES|2975.00| 1000245| 200|75.00|
|1000288| SCOTT|3000.00| 1000276| 200| 0.00|
|1000292| FORD|3000.00| 1000276| 200| 0.00|
|1000294| SMITH| 800.00| 1000292| 200| 0.00|
|1000299| ALLEN|1600.00| 1000258| 300| 0.00|
|1000310| WARD|1250.00| 1000258| 300|50.00|
|1000312| MARTIN|1250.00| 1000258| 300|50.00|
|1000315| TURNER|1500.00| 1000258| 300| 0.00|
|1000326| ADAMS|1100.00| 1000288| 200| 0.00|
|1000336| JAMES| 950.00| 1000258| 300|50.00|
|1000346| MILLER|1300.00| 1000262| 100| 0.00|
|1000347| DAVID|1400.00| 1000245| 500| 0.00|
+-------+--------+-------+----------+-------+-----+
➠
row_number function(): This function is used to find continuous number within partitions or sub-partitions based on the window feature values.
- Example 1: Unique row number against employees by salary.
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, col
empdf_update = empdf.select("emp_name", "dept_no", "salary", row_number().over(Window.orderBy("salary")).alias("row_number_salary"))
empdf_update.show()
+--------+-------+-------+-----------------+
|emp_name|dept_no| salary|row_number_salary|
+--------+-------+-------+-----------------+
| SMITH| 200| 800.00| 1|
| JAMES| 300| 950.00| 2|
| ADAMS| 200|1100.00| 3|
| WARD| 300|1250.00| 4|
| MARTIN| 300|1250.00| 5|
| MILLER| 100|1300.00| 6|
| DAVID| 500|1400.00| 7|
| TURNER| 300|1500.00| 8|
| ALLEN| 300|1600.00| 9|
| CLARK| 100|2450.00| 10|
| BLAKE| 300|2850.00| 11|
| JONES| 200|2975.00| 12|
| SCOTT| 200|3000.00| 13|
| FORD| 200|3000.00| 14|
| PRADEEP| 100|5000.00| 15|
+--------+-------+-------+-----------------+
- Example 2: Row number against employees by salary within department.
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, col
empdf_update = empdf.select("emp_name", "dept_no", "salary", row_number().over(Window.partitionBy("dept_no").orderBy("salary")).alias("row_number_Dept_salary"))
empdf_update.show()
+--------+-------+-------+----------------------+
|emp_name|dept_no| salary|row_number_Dept_salary|
+--------+-------+-------+----------------------+
| JAMES| 300| 950.00| 1|
| WARD| 300|1250.00| 2|
| MARTIN| 300|1250.00| 3|
| TURNER| 300|1500.00| 4|
| ALLEN| 300|1600.00| 5|
| BLAKE| 300|2850.00| 6|
| DAVID| 500|1400.00| 1|
| MILLER| 100|1300.00| 1|
| CLARK| 100|2450.00| 2|
| PRADEEP| 100|5000.00| 3|
| SMITH| 200| 800.00| 1|
| ADAMS| 200|1100.00| 2|
| JONES| 200|2975.00| 3|
| SCOTT| 200|3000.00| 4|
| FORD| 200|3000.00| 5|
+--------+-------+-------+----------------------+
➠
rank function(): This function is used to find rank within partitions or sub-partitions. Rank function will miss next number(rank) if there are 2 records with same value.
- Example 1: Rank of employees by salary within department.
from pyspark.sql.window import Window
from pyspark.sql.functions import rank, col
empdf_update = empdf.select("emp_name", "dept_no", "salary", rank().over(Window.partitionBy("dept_no").orderBy( col("salary").desc())).alias("Rank_by_salary"))
empdf_update.show()
+--------+-------+-------+--------------+
|emp_name|dept_no| salary|Rank_by_salary|
+--------+-------+-------+--------------+
| BLAKE| 300|2850.00| 1|
| ALLEN| 300|1600.00| 2|
| TURNER| 300|1500.00| 3|
| WARD| 300|1250.00| 4|
| MARTIN| 300|1250.00| 4|<--
| JAMES| 300| 950.00| 6|<-- Rank 5 is missing
| DAVID| 500|1400.00| 1|
| PRADEEP| 100|5000.00| 1|
| CLARK| 100|2450.00| 2|
| MILLER| 100|1300.00| 3|
| SCOTT| 200|3000.00| 1|
| FORD| 200|3000.00| 1|<--
| JONES| 200|2975.00| 3|<-- Rank 2 is missing
| ADAMS| 200|1100.00| 4|
| SMITH| 200| 800.00| 5|
+--------+-------+-------+--------------+
➠
dense_rank function(): This function is used to find rank within partitions or sub-partitions. Dense Rank function will not miss next number(rank) if there are 2 records with same value. This is also sometime called as class rank.
- Example 1: Dense Rank of employees by salary within department.
from pyspark.sql.window import Window
from pyspark.sql.functions import dense_rank, col
empdf_update = empdf.select("emp_name", "dept_no", "salary", dense_rank().over( Window.partitionBy("dept_no").orderBy( col("salary").desc())).alias("Dense_Rank_by_salary"))
empdf_update.show()
+--------+-------+-------+--------------------+
|emp_name|dept_no| salary|Dense_Rank_by_salary|
+--------+-------+-------+--------------------+
| BLAKE| 300|2850.00| 1|
| ALLEN| 300|1600.00| 2|
| TURNER| 300|1500.00| 3|
| WARD| 300|1250.00| 4|
| MARTIN| 300|1250.00| 4|
| JAMES| 300| 950.00| 5|
| DAVID| 500|1400.00| 1|
| PRADEEP| 100|5000.00| 1|
| CLARK| 100|2450.00| 2|
| MILLER| 100|1300.00| 3|
| SCOTT| 200|3000.00| 1|
| FORD| 200|3000.00| 1|
| JONES| 200|2975.00| 2|
| ADAMS| 200|1100.00| 3|
| SMITH| 200| 800.00| 4|
+--------+-------+-------+--------------------+
➠
percent_rank function(): This function is used to find percent rank (i.e percentile) within partitions or sub-partitions. Percent_Rank function will miss next number(rank) if there are 2 records with same value.
- Example 1: Percent Rank of employees by salary within department.
from pyspark.sql.window import Window
from pyspark.sql.functions import percent_rank, col
empdf_update = empdf.select("emp_name", "dept_no", "salary", percent_rank().over( Window.partitionBy("dept_no").orderBy( col("salary"))).alias("percent_rank_salary"))
empdf_update.show()
+--------+-------+-------+-------------------+
|emp_name|dept_no| salary|percent_rank_salary|
+--------+-------+-------+-------------------+
| JAMES| 300| 950.00| 0.0|
| WARD| 300|1250.00| 0.2|
| MARTIN| 300|1250.00| 0.2|<--
| TURNER| 300|1500.00| 0.6|<--
| ALLEN| 300|1600.00| 0.8|
| BLAKE| 300|2850.00| 1.0|
| DAVID| 500|1400.00| 0.0|
| MILLER| 100|1300.00| 0.0|
| CLARK| 100|2450.00| 0.5|
| PRADEEP| 100|5000.00| 1.0|
| SMITH| 200| 800.00| 0.0|
| ADAMS| 200|1100.00| 0.25|
| JONES| 200|2975.00| 0.5|
| SCOTT| 200|3000.00| 0.75|
| FORD| 200|3000.00| 0.75|
+--------+-------+-------+-------------------+
➠
ntile function(): ntile can be used to divide rows into 'n' buckets within partitions or sub-partitions. For example, if 'n' is 2, the first half of the rows will get value 1 and the second half will get 2.
- Example 1: Example of ntile using 'n'=2
from pyspark.sql.window import Window
from pyspark.sql.functions import ntile, col
empdf_update = empdf.select("emp_name", "dept_no", "salary", ntile(2).over(Window.partitionBy("dept_no").orderBy( col("salary"))).alias("ntile_example"))
empdf_update.show()
+--------+-------+-------+-------------+
|emp_name|dept_no| salary|ntile_example|
+--------+-------+-------+-------------+
| JAMES| 300| 950.00| 1|
| WARD| 300|1250.00| 1|
| MARTIN| 300|1250.00| 1|
| TURNER| 300|1500.00| 2|
| ALLEN| 300|1600.00| 2|
| BLAKE| 300|2850.00| 2|
| DAVID| 500|1400.00| 1|
| MILLER| 100|1300.00| 1|
| CLARK| 100|2450.00| 1|
| PRADEEP| 100|5000.00| 2|
| SMITH| 200| 800.00| 1|
| ADAMS| 200|1100.00| 1|
| JONES| 200|2975.00| 1|
| SCOTT| 200|3000.00| 2|
| FORD| 200|3000.00| 2|
+--------+-------+-------+-------------+
- Example 2: Example of ntile using 'n'=3
from pyspark.sql.window import Window
from pyspark.sql.functions import ntile, col
empdf_update = empdf.select("emp_name", "dept_no", "salary", ntile(3).over(Window.partitionBy("dept_no").orderBy( col("salary"))).alias("ntile_example"))
empdf_update.show()
+--------+-------+-------+-------------+
|emp_name|dept_no| salary|ntile_example|
+--------+-------+-------+-------------+
| JAMES| 300| 950.00| 1|
| WARD| 300|1250.00| 1|
| MARTIN| 300|1250.00| 2|
| TURNER| 300|1500.00| 2|
| ALLEN| 300|1600.00| 3|
| BLAKE| 300|2850.00| 3|
| DAVID| 500|1400.00| 1|
| MILLER| 100|1300.00| 1|
| CLARK| 100|2450.00| 2|
| PRADEEP| 100|5000.00| 3|
| SMITH| 200| 800.00| 1|
| ADAMS| 200|1100.00| 1|
| JONES| 200|2975.00| 2|
| SCOTT| 200|3000.00| 2|
| FORD| 200|3000.00| 3|
+--------+-------+-------+-------------+