PySpark: Dataframe Analytical Functions Part 1
- This tutorial will explain ordered analytical window aggregate functions which can be used to fulfil various user analytical requirements.
- Some of the commonly used analytical functions
Click here to check more analytical functions available in Pyspark
Window functions/attributes:
These are most important part of ordered analytical functions and should be understood properly in order to effectively use them.
-
Window.partitionBy: This is optional and can be used to further categorized data and perform analysis within sub categories. Comma separated multiple columns can be added in window partition.
Example: In order to calculate sum of salary for each Department along with individual detail, partition by dept number is required.
Dept No Emp No salary Total_Department_Salary
------ ------ ------ ----------------------
10 11 15000 20000
10 12 5000 20000
20 22 10023 30023
20 23 10000 30023
20 24 10000 30023
30 31 13230 23230
30 32 10000 23230
Note: Moving all data to a single node(i.e without using partition) for using window functions, can cause serious performance degradation.
-
Window.orderBy: This is optional and can be used to specify order in which rows should be processed.
-
Window.rowsBetween: This is also optional and should be used when user want to peek into other rows data to compute current row value.
rowsBetween( Window.unboundedPreceding|Window.currentRow|Window.currentRow+-n (start window) , Window.currentRow+-m |Window.currentRow| Window.unboundedFollowing (end window))
- rowsBetween attribute require user to specify start window and end window of row.
- Window.unboundedPreceding, Window.unboundedFollowing and Window.currentRow values are available to peek into previous rows, following rows and current row respectively.
- User can add or subtract number(e.g 1,2,3) from Window.currentRow to check that many number of previous / following rows or specify UNBOUNDED attributes to check all the previous / following rows.
Sample Data: Dataset used in the below examples can be downloaded from here.
empdf = spark.read.parquet("file:///path_to_files/employee_with_comm.parquet")
empdf.show()
+-------+--------+-------+----------+-------+-----+
| emp_no|emp_name| salary|manager_id|dept_no| comm|
+-------+--------+-------+----------+-------+-----+
|1000245| PRADEEP|5000.00| null| 100| 0.00|
|1000258| BLAKE|2850.00| 1000245| 300|50.00|
|1000262| CLARK|2450.00| 1000245| 100|50.00|
|1000276| JONES|2975.00| 1000245| 200|75.00|
|1000288| SCOTT|3000.00| 1000276| 200| 0.00|
|1000292| FORD|3000.00| 1000276| 200| 0.00|
|1000294| SMITH| 800.00| 1000292| 200| 0.00|
|1000299| ALLEN|1600.00| 1000258| 300| 0.00|
|1000310| WARD|1250.00| 1000258| 300|50.00|
|1000312| MARTIN|1250.00| 1000258| 300|50.00|
|1000315| TURNER|1500.00| 1000258| 300| 0.00|
|1000326| ADAMS|1100.00| 1000288| 200| 0.00|
|1000336| JAMES| 950.00| 1000258| 300|50.00|
|1000346| MILLER|1300.00| 1000262| 100| 0.00|
|1000347| DAVID|1400.00| 1000245| 500| 0.00|
+-------+--------+-------+----------+-------+-----+
from pyspark.sql.window import Window
➠
count function(): count function can be used to count number of records for each group.
- Example 1: Count of records in each department number(dept_no).
from pyspark.sql.window import Window
from pyspark.sql.functions import count, col
empdf_update = empdf.select("emp_name", "dept_no", "salary", count("*").over(Window.partitionBy("dept_no")).alias("Total_Emps"))
empdf_update.show()
+--------+-------+-------+----------+
|emp_name|dept_no| salary|Total_Emps|
+--------+-------+-------+----------+
| PRADEEP| 100|5000.00| 3|
| CLARK| 100|2450.00| 3|
| MILLER| 100|1300.00| 3|
| BLAKE| 300|2850.00| 6|
| ALLEN| 300|1600.00| 6|
| WARD| 300|1250.00| 6|
| MARTIN| 300|1250.00| 6|
| TURNER| 300|1500.00| 6|
| JAMES| 300| 950.00| 6|
| DAVID| 500|1400.00| 1|
| JONES| 200|2975.00| 5|
| SCOTT| 200|3000.00| 5|
| FORD| 200|3000.00| 5|
| SMITH| 200| 800.00| 5|
| ADAMS| 200|1100.00| 5|
+--------+-------+-------+----------+
- Example 2: from pyspark.sql.window import Window
from pyspark.sql.window import Window
from pyspark.sql.functions import count, col
empdf_update = empdf.select("emp_name", "dept_no", "salary", count("*").over(Window.partitionBy("dept_no").rowsBetween(Window.currentRow+1, Window.unboundedFollowing)).alias("Remaining_cnt"))
empdf_update.show()
+--------+-------+-------+-------------+
|emp_name|dept_no| salary|Remaining_cnt|
+--------+-------+-------+-------------+
| PRADEEP| 100|5000.00| 2|
| CLARK| 100|2450.00| 1|
| MILLER| 100|1300.00| 0|
| BLAKE| 300|2850.00| 5|
| ALLEN| 300|1600.00| 4|
| WARD| 300|1250.00| 3|
| MARTIN| 300|1250.00| 2|
| TURNER| 300|1500.00| 1|
| JAMES| 300| 950.00| 0|
| DAVID| 500|1400.00| 0|
| JONES| 200|2975.00| 4|
| SCOTT| 200|3000.00| 3|
| FORD| 200|3000.00| 2|
| SMITH| 200| 800.00| 1|
| ADAMS| 200|1100.00| 0|
+--------+-------+-------+-------------+
➠
sum function(): sum function can be used to calculate sum of each column passed to this function for each group. This function can be applied to only numeric columns.
- Example 1: Cumulative sum of salaries grouped over department number(dept_no). There will be problem when order by values are same within a partition.
from pyspark.sql.window import Window
from pyspark.sql.functions import sum, col
empdf_update = empdf.select("emp_name", "dept_no", "salary", sum("salary").over(Window.partitionBy("dept_no").orderBy( col("salary").desc())).alias("cummulative_sum"))
empdf_update.show()
+--------+-------+-------+---------------+
|emp_name|dept_no| salary|cummulative_sum|
+--------+-------+-------+---------------+
| PRADEEP| 100|5000.00| 5000.00|
| CLARK| 100|2450.00| 7450.00|
| MILLER| 100|1300.00| 8750.00|
| BLAKE| 300|2850.00| 2850.00|
| ALLEN| 300|1600.00| 4450.00|
| TURNER| 300|1500.00| 5950.00|
| WARD| 300|1250.00| 8450.00|<--Problem when order by values are same
| MARTIN| 300|1250.00| 8450.00|<--Problem when order by values are same
| JAMES| 300| 950.00| 9400.00|
| DAVID| 500|1400.00| 1400.00|
| SCOTT| 200|3000.00| 6000.00|<--Problem when order by values are same
| FORD| 200|3000.00| 6000.00|<--Problem when order by values are same
| JONES| 200|2975.00| 8975.00|
| ADAMS| 200|1100.00| 10075.00|
| SMITH| 200| 800.00| 10875.00|
+--------+-------+-------+---------------+
- Example 2: True cumulative sum of salaries within each department, use multiple order by columns to help spark distinguish rows.
from pyspark.sql.window import Window
from pyspark.sql.functions import sum, col
empdf_update = empdf.select("emp_name", "dept_no", "salary", sum("salary").over(Window.partitionBy("dept_no").orderBy( col("salary").desc(), col("emp_name").desc())).alias("true_cummulative_sum"))
empdf_update.show()
+--------+-------+-------+--------------------+
|emp_name|dept_no| salary|true_cummulative_sum|
+--------+-------+-------+--------------------+
| PRADEEP| 100|5000.00| 5000.00|
| CLARK| 100|2450.00| 7450.00|
| MILLER| 100|1300.00| 8750.00|
| BLAKE| 300|2850.00| 2850.00|
| ALLEN| 300|1600.00| 4450.00|
| TURNER| 300|1500.00| 5950.00|
| WARD| 300|1250.00| 7200.00|
| MARTIN| 300|1250.00| 8450.00|
| JAMES| 300| 950.00| 9400.00|
| DAVID| 500|1400.00| 1400.00|
| SCOTT| 200|3000.00| 3000.00|
| FORD| 200|3000.00| 6000.00|
| JONES| 200|2975.00| 8975.00|
| ADAMS| 200|1100.00| 10075.00|
| SMITH| 200| 800.00| 10875.00|
+--------+-------+-------+--------------------+
- Example 3: Cumulative sum of salaries without any partition.
from pyspark.sql.window import Window
from pyspark.sql.functions import sum, col
empdf_update = empdf.select("emp_name", "dept_no", "salary", sum("salary").over(Window.orderBy( col("salary").desc(), col("emp_name").desc())).alias("true_cummulative_sum"))
empdf_update.show()
+--------+-------+-------+--------------------+
|emp_name|dept_no| salary|true_cummulative_sum|
+--------+-------+-------+--------------------+
| PRADEEP| 100|5000.00| 5000.00|
| SCOTT| 200|3000.00| 8000.00|
| FORD| 200|3000.00| 11000.00|
| JONES| 200|2975.00| 13975.00|
| BLAKE| 300|2850.00| 16825.00|
| CLARK| 100|2450.00| 19275.00|
| ALLEN| 300|1600.00| 20875.00|
| TURNER| 300|1500.00| 22375.00|
| DAVID| 500|1400.00| 23775.00|
| MILLER| 100|1300.00| 25075.00|
| WARD| 300|1250.00| 26325.00|
| MARTIN| 300|1250.00| 27575.00|
| ADAMS| 200|1100.00| 28675.00|
| JAMES| 300| 950.00| 29625.00|
| SMITH| 200| 800.00| 30425.00|
+--------+-------+-------+--------------------+
- Example 4: Sum of salaries within each department
from pyspark.sql.window import Window
from pyspark.sql.functions import sum, col
empdf_update = empdf.select("emp_name", "dept_no", "salary", sum("salary").over(Window.partitionBy("dept_no").orderBy( col("salary").desc(), col("emp_name").desc()).rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)).alias("true_cummulative_sum"))
empdf_update.show()
+--------+-------+-------+--------------------+
|emp_name|dept_no| salary|true_cummulative_sum|
+--------+-------+-------+--------------------+
| BLAKE| 300|2850.00| 9400.00|
| ALLEN| 300|1600.00| 9400.00|
| TURNER| 300|1500.00| 9400.00|
| WARD| 300|1250.00| 9400.00|
| MARTIN| 300|1250.00| 9400.00|
| JAMES| 300| 950.00| 9400.00|
| DAVID| 500|1400.00| 1400.00|
| PRADEEP| 100|5000.00| 8750.00|
| CLARK| 100|2450.00| 8750.00|
| MILLER| 100|1300.00| 8750.00|
| SCOTT| 200|3000.00| 10875.00|
| FORD| 200|3000.00| 10875.00|
| JONES| 200|2975.00| 10875.00|
| ADAMS| 200|1100.00| 10875.00|
| SMITH| 200| 800.00| 10875.00|
+--------+-------+-------+--------------------+
➠
avg function(): sum function can be used to calculate sum of each column passed to this function for each group. This function can be applied to only numeric columns.
- Example 1: Average salary within each department
from pyspark.sql.window import Window
from pyspark.sql.functions import avg, col
empdf_update = empdf.select("emp_name", "dept_no", "salary", avg("salary").over(Window.partitionBy("dept_no")).alias("Average_dept_salary"))
empdf_update.show()
+--------+-------+-------+-------------------+
|emp_name|dept_no| salary|Average_dept_salary|
+--------+-------+-------+-------------------+
| PRADEEP| 100|5000.00| 2916.666667|
| CLARK| 100|2450.00| 2916.666667|
| MILLER| 100|1300.00| 2916.666667|
| BLAKE| 300|2850.00| 1566.666667|
| ALLEN| 300|1600.00| 1566.666667|
| WARD| 300|1250.00| 1566.666667|
| MARTIN| 300|1250.00| 1566.666667|
| TURNER| 300|1500.00| 1566.666667|
| JAMES| 300| 950.00| 1566.666667|
| DAVID| 500|1400.00| 1400.000000|
| JONES| 200|2975.00| 2175.000000|
| SCOTT| 200|3000.00| 2175.000000|
| FORD| 200|3000.00| 2175.000000|
| SMITH| 200| 800.00| 2175.000000|
| ADAMS| 200|1100.00| 2175.000000|
+--------+-------+-------+-------------------+
- Example 2: Cumulative Average salary within each department
from pyspark.sql.window import Window
from pyspark.sql.functions import avg, col
empdf_update = empdf.select("emp_name", "dept_no", "salary", avg("salary").over(Window.partitionBy("dept_no").orderBy("emp_name").rowsBetween(Window.unboundedPreceding, Window.currentRow)).alias("Cum_avg_Dept_salary"))
empdf_update.show()
+--------+-------+-------+-------------------+
|emp_name|dept_no| salary|Cum_avg_Dept_salary|
+--------+-------+-------+-------------------+
| CLARK| 100|2450.00| 2450.000000|
| MILLER| 100|1300.00| 1875.000000|
| PRADEEP| 100|5000.00| 2916.666667|
| ALLEN| 300|1600.00| 1600.000000|
| BLAKE| 300|2850.00| 2225.000000|
| JAMES| 300| 950.00| 1800.000000|
| MARTIN| 300|1250.00| 1662.500000|
| TURNER| 300|1500.00| 1630.000000|
| WARD| 300|1250.00| 1566.666667|
| DAVID| 500|1400.00| 1400.000000|
| ADAMS| 200|1100.00| 1100.000000|
| FORD| 200|3000.00| 2050.000000|
| JONES| 200|2975.00| 2358.333333|
| SCOTT| 200|3000.00| 2518.750000|
| SMITH| 200| 800.00| 2175.000000|
+--------+-------+-------+-------------------+
➠
min function(): min function can be used to calculate minimum value within each column passed to this function in each group.
- Example 1: Minimum salary within each department
from pyspark.sql.window import Window
from pyspark.sql.functions import min, col
empdf_update = empdf.select("emp_name", "dept_no", "salary", min("salary").over(Window.partitionBy("dept_no")).alias("Minimum_dept_salary"))
empdf_update.show()
+--------+-------+-------+-------------------+
|emp_name|dept_no| salary|Minimum_dept_salary|
+--------+-------+-------+-------------------+
| PRADEEP| 100|5000.00| 1300.00|
| CLARK| 100|2450.00| 1300.00|
| MILLER| 100|1300.00| 1300.00|
| BLAKE| 300|2850.00| 950.00|
| ALLEN| 300|1600.00| 950.00|
| WARD| 300|1250.00| 950.00|
| MARTIN| 300|1250.00| 950.00|
| TURNER| 300|1500.00| 950.00|
| JAMES| 300| 950.00| 950.00|
| DAVID| 500|1400.00| 1400.00|
| JONES| 200|2975.00| 800.00|
| SCOTT| 200|3000.00| 800.00|
| FORD| 200|3000.00| 800.00|
| SMITH| 200| 800.00| 800.00|
| ADAMS| 200|1100.00| 800.00|
+--------+-------+-------+-------------------+
- Example 2: This is the example of LAG analytical function using Min analytical function.
from pyspark.sql.window import Window
from pyspark.sql.functions import min, col
empdf_update = empdf.select("emp_name", "dept_no", "salary", min("salary").over(Window.partitionBy("dept_no").orderBy( col("salary")).rowsBetween(Window.currentRow-1,Window.currentRow-1)).alias("previous_row_salary"))
empdf_update.show()
+--------+-------+-------+-------------------+
|emp_name|dept_no| salary|previous_row_salary|
+--------+-------+-------+-------------------+
| MILLER| 100|1300.00| null|
| CLARK| 100|2450.00| 1300.00|
| PRADEEP| 100|5000.00| 2450.00|
| JAMES| 300| 950.00| null|
| WARD| 300|1250.00| 950.00|
| MARTIN| 300|1250.00| 1250.00|
| TURNER| 300|1500.00| 1250.00|
| ALLEN| 300|1600.00| 1500.00|
| BLAKE| 300|2850.00| 1600.00|
| DAVID| 500|1400.00| null|
| SMITH| 200| 800.00| null|
| ADAMS| 200|1100.00| 800.00|
| JONES| 200|2975.00| 1100.00|
| SCOTT| 200|3000.00| 2975.00|
| FORD| 200|3000.00| 3000.00|
+--------+-------+-------+-------------------+
- Example 3: This is the example of LEAD analytical function using Minimum.
from pyspark.sql.window import Window
from pyspark.sql.functions import min, col
empdf_update = empdf.select("emp_name", "dept_no", "salary", min("salary").over(Window.partitionBy("dept_no").orderBy( col("salary")).rowsBetween(Window.currentRow+1,Window.currentRow+1)).alias("next_row_salary"))
empdf_update.show()
+--------+-------+-------+---------------+
|emp_name|dept_no| salary|next_row_salary|
+--------+-------+-------+---------------+
| JAMES| 300| 950.00| 1250.00|
| WARD| 300|1250.00| 1250.00|
| MARTIN| 300|1250.00| 1500.00|
| TURNER| 300|1500.00| 1600.00|
| ALLEN| 300|1600.00| 2850.00|
| BLAKE| 300|2850.00| null|
| DAVID| 500|1400.00| null|
| MILLER| 100|1300.00| 2450.00|
| CLARK| 100|2450.00| 5000.00|
| PRADEEP| 100|5000.00| null|
| SMITH| 200| 800.00| 1100.00|
| ADAMS| 200|1100.00| 2975.00|
| JONES| 200|2975.00| 3000.00|
| SCOTT| 200|3000.00| 3000.00|
| FORD| 200|3000.00| null|
+--------+-------+-------+---------------+
➠
max function(): max function can be used to calculate maximum value within each column passed to this function in each group.
- Example 1: Maximum salary within each department
from pyspark.sql.window import Window
from pyspark.sql.functions import max, col
empdf_update = empdf.select("emp_name", "dept_no", "salary", max("salary").over(Window.partitionBy("dept_no")).alias("Maximum_dept_salary"))
empdf_update.show()
+--------+-------+-------+-------------------+
|emp_name|dept_no| salary|Maximum_dept_salary|
+--------+-------+-------+-------------------+
| PRADEEP| 100|5000.00| 5000.00|
| CLARK| 100|2450.00| 5000.00|
| MILLER| 100|1300.00| 5000.00|
| BLAKE| 300|2850.00| 2850.00|
| ALLEN| 300|1600.00| 2850.00|
| WARD| 300|1250.00| 2850.00|
| MARTIN| 300|1250.00| 2850.00|
| TURNER| 300|1500.00| 2850.00|
| JAMES| 300| 950.00| 2850.00|
| DAVID| 500|1400.00| 1400.00|
| JONES| 200|2975.00| 3000.00|
| SCOTT| 200|3000.00| 3000.00|
| FORD| 200|3000.00| 3000.00|
| SMITH| 200| 800.00| 3000.00|
| ADAMS| 200|1100.00| 3000.00|
+--------+-------+-------+-------------------+
- Example 2: Finding maximum in a group and within a window (previous rows and current row)
from pyspark.sql.window import Window
from pyspark.sql.functions import max, col
empdf_update = empdf.select("emp_name", "dept_no", "salary", max("salary").over(Window.partitionBy("dept_no").orderBy( col("emp_name")).rowsBetween(Window.unboundedPreceding,Window.currentRow)).alias("previous_row_salary"))
empdf_update.show()
+--------+-------+-------+-------------------+
|emp_name|dept_no| salary|previous_row_salary|
+--------+-------+-------+-------------------+
| CLARK| 100|2450.00| 2450.00|
| MILLER| 100|1300.00| 2450.00|
| PRADEEP| 100|5000.00| 5000.00|
| ALLEN| 300|1600.00| 1600.00|
| BLAKE| 300|2850.00| 2850.00|
| JAMES| 300| 950.00| 2850.00|
| MARTIN| 300|1250.00| 2850.00|
| TURNER| 300|1500.00| 2850.00|
| WARD| 300|1250.00| 2850.00|
| DAVID| 500|1400.00| 1400.00|
| ADAMS| 200|1100.00| 1100.00|
| FORD| 200|3000.00| 3000.00|
| JONES| 200|2975.00| 3000.00|
| SCOTT| 200|3000.00| 3000.00|
| SMITH| 200| 800.00| 3000.00|
+--------+-------+-------+-------------------+