PySpark: Dataframe Analytical Functions Part 3
- This tutorial will explain other window analytical functions which can be used to fulfil various user analytical requirements.
- Some of the other window analytical functions
Click here to check common analytical functions available in Pyspark.
Window functions/attributes:
These are most important part of ordered analytical functions and should be understood properly in order to effectively use them.
-
Window.partitionBy: This is optional and can be used to further categorized data and perform analysis within sub categories. . Comma separated multiple columns can be added in window partition.
Example: In order to calculate sum of salary for each Department along with individual detail, partition by dept number is required.
Dept No Emp No salary Total_Department_Salary
------ ------ ------ ----------------------
10 11 15000 20000
10 12 5000 20000
20 22 10023 30023
20 23 10000 30023
20 24 10000 30023
30 31 13230 23230
30 32 10000 23230
Note: Moving all data to a single node(i.e without using partition) for using window functions, can cause serious performance degradation.
-
Window.orderBy: This is optional and can be used to specify order in which rows should be processed.
-
Window.rowsBetween: This is also optional and should be used when user want to peek into other rows data to compute current row value.
rowsBetween( Window.unboundedPreceding|Window.currentRow|Window.currentRow+-n (start window) , Window.currentRow+-m |Window.currentRow| Window.unboundedFollowing (end window))
- rowsBetween attribute require user to specify start window and end window of row.
- Window.unboundedPreceding, Window.unboundedFollowing and Window.currentRow values are available to peek into previous rows, following rows and current row respectively.
- User can add or subtract number(e.g 1,2,3) from Window.currentRow to check that many number of previous / following rows or specify UNBOUNDED attributes to check all the previous / following rows.
Sample Data: Dataset used in the below examples can be downloaded from here.
empdf = spark.read.parquet("file:///path_to_files/employee_with_comm.parquet")
empdf.show()
+-------+--------+-------+----------+-------+-----+
| emp_no|emp_name| salary|manager_id|dept_no| comm|
+-------+--------+-------+----------+-------+-----+
|1000245| PRADEEP|5000.00| null| 100| 0.00|
|1000258| BLAKE|2850.00| 1000245| 300|50.00|
|1000262| CLARK|2450.00| 1000245| 100|50.00|
|1000276| JONES|2975.00| 1000245| 200|75.00|
|1000288| SCOTT|3000.00| 1000276| 200| 0.00|
|1000292| FORD|3000.00| 1000276| 200| 0.00|
|1000294| SMITH| 800.00| 1000292| 200| 0.00|
|1000299| ALLEN|1600.00| 1000258| 300| 0.00|
|1000310| WARD|1250.00| 1000258| 300|50.00|
|1000312| MARTIN|1250.00| 1000258| 300|50.00|
|1000315| TURNER|1500.00| 1000258| 300| 0.00|
|1000326| ADAMS|1100.00| 1000288| 200| 0.00|
|1000336| JAMES| 950.00| 1000258| 300|50.00|
|1000346| MILLER|1300.00| 1000262| 100| 0.00|
|1000347| DAVID|1400.00| 1000245| 500| 0.00|
+-------+--------+-------+----------+-------+-----+
from pyspark.sql.window import Window
➠
lag function(): This function is used to find previous row values within partitions or sub-partitions.
- Example 1: Previous row salary within a partition(group).
from pyspark.sql.window import Window
from pyspark.sql.functions import lag, col
empdf_update = empdf.select("emp_name", "dept_no", "salary", lag("salary").over(Window.partitionBy("dept_no").orderBy("salary")).alias("previous_row_salary"))
empdf_update.show()
+--------+-------+-------+-------------------+
|emp_name|dept_no| salary|previous_row_salary|
+--------+-------+-------+-------------------+
| JAMES| 300| 950.00| null|
| WARD| 300|1250.00| 950.00|
| MARTIN| 300|1250.00| 1250.00|
| TURNER| 300|1500.00| 1250.00|
| ALLEN| 300|1600.00| 1500.00|
| BLAKE| 300|2850.00| 1600.00|
| DAVID| 500|1400.00| null|
| MILLER| 100|1300.00| null|
| CLARK| 100|2450.00| 1300.00|
| PRADEEP| 100|5000.00| 2450.00|
| SMITH| 200| 800.00| null|
| ADAMS| 200|1100.00| 800.00|
| JONES| 200|2975.00| 1100.00|
| SCOTT| 200|3000.00| 2975.00|
| FORD| 200|3000.00| 3000.00|
+--------+-------+-------+-------------------+
➠
lead function(): This function is used to find next row values within partitions or sub-partitions.
- Example 1: Next row salary within a partition(group).
from pyspark.sql.window import Window
from pyspark.sql.functions import lead, col
empdf_update = empdf.select("emp_name", "dept_no", "salary", lead("salary").over(Window.partitionBy("dept_no").orderBy( col("salary").desc())).alias("next_row_salary"))
empdf_update.show()
+--------+-------+-------+---------------+
|emp_name|dept_no| salary|next_row_salary|
+--------+-------+-------+---------------+
| BLAKE| 300|2850.00| 1600.00|
| ALLEN| 300|1600.00| 1500.00|
| TURNER| 300|1500.00| 1250.00|
| WARD| 300|1250.00| 1250.00|
| MARTIN| 300|1250.00| 950.00|
| JAMES| 300| 950.00| null|
| DAVID| 500|1400.00| null|
| PRADEEP| 100|5000.00| 2450.00|
| CLARK| 100|2450.00| 1300.00|
| MILLER| 100|1300.00| null|
| SCOTT| 200|3000.00| 3000.00|
| FORD| 200|3000.00| 2975.00|
| JONES| 200|2975.00| 1100.00|
| ADAMS| 200|1100.00| 800.00|
| SMITH| 200| 800.00| null|
+--------+-------+-------+---------------+
➠
first function(): This function is used to get the value of expression column for 1st row within each partition. This is equivalent to first_value() in SQL
- Example 1:
from pyspark.sql.window import Window
from pyspark.sql.functions import first, col
empdf_update = empdf.select("emp_name", "dept_no", "salary", first("salary").over(Window.partitionBy("dept_no").orderBy("salary")).alias("first_value_salary"))
empdf_update.show()
+--------+-------+-------+------------------+
|emp_name|dept_no| salary|first_value_salary|
+--------+-------+-------+------------------+
| JAMES| 300| 950.00| 950.00|
| WARD| 300|1250.00| 950.00|
| MARTIN| 300|1250.00| 950.00|
| TURNER| 300|1500.00| 950.00|
| ALLEN| 300|1600.00| 950.00|
| BLAKE| 300|2850.00| 950.00|
| DAVID| 500|1400.00| 1400.00|
| MILLER| 100|1300.00| 1300.00|
| CLARK| 100|2450.00| 1300.00|
| PRADEEP| 100|5000.00| 1300.00|
| SMITH| 200| 800.00| 800.00|
| ADAMS| 200|1100.00| 800.00|
| JONES| 200|2975.00| 800.00|
| SCOTT| 200|3000.00| 800.00|
| FORD| 200|3000.00| 800.00|
+--------+-------+-------+------------------+
➠
last function(): This function is used to get the value of expression column for last row within each partition. By default last() returns the current value of expression column. Default window set for this function is "UNBOUNDED PRECEDING AND CURRENT ROW" and to get the last value within partition this value need to be overridden. . This is equivalent to last_value() in SQL
- Example 1: This will return value of current row for the given column within each partition.
from pyspark.sql.window import Window
from pyspark.sql.functions import last, col
empdf_update = empdf.select("emp_name", "dept_no", "salary", last("salary").over(Window.partitionBy("dept_no").orderBy( col("salary"))).alias("last_value_example"))
empdf_update.show()
+--------+-------+-------+------------------+
|emp_name|dept_no| salary|last_value_example|
+--------+-------+-------+------------------+
| JAMES| 300| 950.00| 950.00|
| WARD| 300|1250.00| 1250.00|
| MARTIN| 300|1250.00| 1250.00|
| TURNER| 300|1500.00| 1500.00|
| ALLEN| 300|1600.00| 1600.00|
| BLAKE| 300|2850.00| 2850.00|
| DAVID| 500|1400.00| 1400.00|
| MILLER| 100|1300.00| 1300.00|
| CLARK| 100|2450.00| 2450.00|
| PRADEEP| 100|5000.00| 5000.00|
| SMITH| 200| 800.00| 800.00|
| ADAMS| 200|1100.00| 1100.00|
| JONES| 200|2975.00| 2975.00|
| SCOTT| 200|3000.00| 3000.00|
| FORD| 200|3000.00| 3000.00|
+--------+-------+-------+------------------+
- Example 2: Default window is changed, this will return value of last row for the given column within each partition.
from pyspark.sql.window import Window
from pyspark.sql.functions import last, col
empdf_update = empdf.select("emp_name", "dept_no", "salary", last("salary").over(Window.partitionBy("dept_no").orderBy( col("salary")).rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)).alias("last_value_example"))
empdf_update.show()
+--------+-------+-------+------------------+
|emp_name|dept_no| salary|last_value_example|
+--------+-------+-------+------------------+
| JAMES| 300| 950.00| 2850.00|
| WARD| 300|1250.00| 2850.00|
| MARTIN| 300|1250.00| 2850.00|
| TURNER| 300|1500.00| 2850.00|
| ALLEN| 300|1600.00| 2850.00|
| BLAKE| 300|2850.00| 2850.00|
| DAVID| 500|1400.00| 1400.00|
| MILLER| 100|1300.00| 5000.00|
| CLARK| 100|2450.00| 5000.00|
| PRADEEP| 100|5000.00| 5000.00|
| SMITH| 200| 800.00| 3000.00|
| ADAMS| 200|1100.00| 3000.00|
| JONES| 200|2975.00| 3000.00|
| SCOTT| 200|3000.00| 3000.00|
| FORD| 200|3000.00| 3000.00|
+--------+-------+-------+------------------+
➠
cume_dist function(): This function will return fraction of rows that are below the current row.
- Example 1:
from pyspark.sql.window import Window
from pyspark.sql.functions import cume_dist, col
empdf_update = empdf.select("emp_name", "dept_no", "salary", cume_dist().over(Window.partitionBy("dept_no").orderBy( col("salary"))).alias("cume_dist_example"))
empdf_update.show()
+--------+-------+-------+-------------------+
|emp_name|dept_no| salary| cume_dist_example|
+--------+-------+-------+-------------------+
| JAMES| 300| 950.00|0.16666666666666666|
| WARD| 300|1250.00| 0.5|
| MARTIN| 300|1250.00| 0.5|
| TURNER| 300|1500.00| 0.6666666666666666|
| ALLEN| 300|1600.00| 0.8333333333333334|
| BLAKE| 300|2850.00| 1.0|
| DAVID| 500|1400.00| 1.0|
| MILLER| 100|1300.00| 0.3333333333333333|
| CLARK| 100|2450.00| 0.6666666666666666|
| PRADEEP| 100|5000.00| 1.0|
| SMITH| 200| 800.00| 0.2|
| ADAMS| 200|1100.00| 0.4|
| JONES| 200|2975.00| 0.6|
| SCOTT| 200|3000.00| 1.0|
| FORD| 200|3000.00| 1.0|
+--------+-------+-------+-------------------+