Dept No Emp No salary Total_Department_Salary
------ ------ ------ ----------------------
10 11 15000 20000
10 12 5000 20000
20 22 10023 30023
20 23 10000 30023
20 24 10000 30023
30 31 13230 23230
30 32 10000 23230
Note: Moving all data to a single node(i.e without using partition) for using window functions, can cause serious performance degradation.
rowsBetween( Window.unboundedPreceding|Window.currentRow|Window.currentRow+-n (start window) , Window.currentRow+-m |Window.currentRow| Window.unboundedFollowing (end window))
empdf = spark.read.parquet("file:///path_to_files/employee_with_comm.parquet")
empdf.show()
+-------+--------+-------+----------+-------+-----+
| emp_no|emp_name| salary|manager_id|dept_no| comm|
+-------+--------+-------+----------+-------+-----+
|1000245| PRADEEP|5000.00| null| 100| 0.00|
|1000258| BLAKE|2850.00| 1000245| 300|50.00|
|1000262| CLARK|2450.00| 1000245| 100|50.00|
|1000276| JONES|2975.00| 1000245| 200|75.00|
|1000288| SCOTT|3000.00| 1000276| 200| 0.00|
|1000292| FORD|3000.00| 1000276| 200| 0.00|
|1000294| SMITH| 800.00| 1000292| 200| 0.00|
|1000299| ALLEN|1600.00| 1000258| 300| 0.00|
|1000310| WARD|1250.00| 1000258| 300|50.00|
|1000312| MARTIN|1250.00| 1000258| 300|50.00|
|1000315| TURNER|1500.00| 1000258| 300| 0.00|
|1000326| ADAMS|1100.00| 1000288| 200| 0.00|
|1000336| JAMES| 950.00| 1000258| 300|50.00|
|1000346| MILLER|1300.00| 1000262| 100| 0.00|
|1000347| DAVID|1400.00| 1000245| 500| 0.00|
+-------+--------+-------+----------+-------+-----+
from pyspark.sql.window import Window
from pyspark.sql.window import Window
from pyspark.sql.functions import count, col
empdf_update = empdf.select("emp_name", "dept_no", "salary", count("*").over(Window.partitionBy("dept_no")).alias("Total_Emps"))
empdf_update.show()
+--------+-------+-------+----------+
|emp_name|dept_no| salary|Total_Emps|
+--------+-------+-------+----------+
| PRADEEP| 100|5000.00| 3|
| CLARK| 100|2450.00| 3|
| MILLER| 100|1300.00| 3|
| BLAKE| 300|2850.00| 6|
| ALLEN| 300|1600.00| 6|
| WARD| 300|1250.00| 6|
| MARTIN| 300|1250.00| 6|
| TURNER| 300|1500.00| 6|
| JAMES| 300| 950.00| 6|
| DAVID| 500|1400.00| 1|
| JONES| 200|2975.00| 5|
| SCOTT| 200|3000.00| 5|
| FORD| 200|3000.00| 5|
| SMITH| 200| 800.00| 5|
| ADAMS| 200|1100.00| 5|
+--------+-------+-------+----------+
from pyspark.sql.window import Window
from pyspark.sql.functions import count, col
empdf_update = empdf.select("emp_name", "dept_no", "salary", count("*").over(Window.partitionBy("dept_no").rowsBetween(Window.currentRow+1, Window.unboundedFollowing)).alias("Remaining_cnt"))
empdf_update.show()
+--------+-------+-------+-------------+
|emp_name|dept_no| salary|Remaining_cnt|
+--------+-------+-------+-------------+
| PRADEEP| 100|5000.00| 2|
| CLARK| 100|2450.00| 1|
| MILLER| 100|1300.00| 0|
| BLAKE| 300|2850.00| 5|
| ALLEN| 300|1600.00| 4|
| WARD| 300|1250.00| 3|
| MARTIN| 300|1250.00| 2|
| TURNER| 300|1500.00| 1|
| JAMES| 300| 950.00| 0|
| DAVID| 500|1400.00| 0|
| JONES| 200|2975.00| 4|
| SCOTT| 200|3000.00| 3|
| FORD| 200|3000.00| 2|
| SMITH| 200| 800.00| 1|
| ADAMS| 200|1100.00| 0|
+--------+-------+-------+-------------+
from pyspark.sql.window import Window
from pyspark.sql.functions import sum, col
empdf_update = empdf.select("emp_name", "dept_no", "salary", sum("salary").over(Window.partitionBy("dept_no").orderBy( col("salary").desc())).alias("cummulative_sum"))
empdf_update.show()
+--------+-------+-------+---------------+
|emp_name|dept_no| salary|cummulative_sum|
+--------+-------+-------+---------------+
| PRADEEP| 100|5000.00| 5000.00|
| CLARK| 100|2450.00| 7450.00|
| MILLER| 100|1300.00| 8750.00|
| BLAKE| 300|2850.00| 2850.00|
| ALLEN| 300|1600.00| 4450.00|
| TURNER| 300|1500.00| 5950.00|
| WARD| 300|1250.00| 8450.00|<--Problem when order by values are same
| MARTIN| 300|1250.00| 8450.00|<--Problem when order by values are same
| JAMES| 300| 950.00| 9400.00|
| DAVID| 500|1400.00| 1400.00|
| SCOTT| 200|3000.00| 6000.00|<--Problem when order by values are same
| FORD| 200|3000.00| 6000.00|<--Problem when order by values are same
| JONES| 200|2975.00| 8975.00|
| ADAMS| 200|1100.00| 10075.00|
| SMITH| 200| 800.00| 10875.00|
+--------+-------+-------+---------------+
from pyspark.sql.window import Window
from pyspark.sql.functions import sum, col
empdf_update = empdf.select("emp_name", "dept_no", "salary", sum("salary").over(Window.partitionBy("dept_no").orderBy( col("salary").desc(), col("emp_name").desc())).alias("true_cummulative_sum"))
empdf_update.show()
+--------+-------+-------+--------------------+
|emp_name|dept_no| salary|true_cummulative_sum|
+--------+-------+-------+--------------------+
| PRADEEP| 100|5000.00| 5000.00|
| CLARK| 100|2450.00| 7450.00|
| MILLER| 100|1300.00| 8750.00|
| BLAKE| 300|2850.00| 2850.00|
| ALLEN| 300|1600.00| 4450.00|
| TURNER| 300|1500.00| 5950.00|
| WARD| 300|1250.00| 7200.00|
| MARTIN| 300|1250.00| 8450.00|
| JAMES| 300| 950.00| 9400.00|
| DAVID| 500|1400.00| 1400.00|
| SCOTT| 200|3000.00| 3000.00|
| FORD| 200|3000.00| 6000.00|
| JONES| 200|2975.00| 8975.00|
| ADAMS| 200|1100.00| 10075.00|
| SMITH| 200| 800.00| 10875.00|
+--------+-------+-------+--------------------+
from pyspark.sql.window import Window
from pyspark.sql.functions import sum, col
empdf_update = empdf.select("emp_name", "dept_no", "salary", sum("salary").over(Window.orderBy( col("salary").desc(), col("emp_name").desc())).alias("true_cummulative_sum"))
empdf_update.show()
+--------+-------+-------+--------------------+
|emp_name|dept_no| salary|true_cummulative_sum|
+--------+-------+-------+--------------------+
| PRADEEP| 100|5000.00| 5000.00|
| SCOTT| 200|3000.00| 8000.00|
| FORD| 200|3000.00| 11000.00|
| JONES| 200|2975.00| 13975.00|
| BLAKE| 300|2850.00| 16825.00|
| CLARK| 100|2450.00| 19275.00|
| ALLEN| 300|1600.00| 20875.00|
| TURNER| 300|1500.00| 22375.00|
| DAVID| 500|1400.00| 23775.00|
| MILLER| 100|1300.00| 25075.00|
| WARD| 300|1250.00| 26325.00|
| MARTIN| 300|1250.00| 27575.00|
| ADAMS| 200|1100.00| 28675.00|
| JAMES| 300| 950.00| 29625.00|
| SMITH| 200| 800.00| 30425.00|
+--------+-------+-------+--------------------+
from pyspark.sql.window import Window
from pyspark.sql.functions import sum, col
empdf_update = empdf.select("emp_name", "dept_no", "salary", sum("salary").over(Window.partitionBy("dept_no").orderBy( col("salary").desc(), col("emp_name").desc()).rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)).alias("true_cummulative_sum"))
empdf_update.show()
+--------+-------+-------+--------------------+
|emp_name|dept_no| salary|true_cummulative_sum|
+--------+-------+-------+--------------------+
| BLAKE| 300|2850.00| 9400.00|
| ALLEN| 300|1600.00| 9400.00|
| TURNER| 300|1500.00| 9400.00|
| WARD| 300|1250.00| 9400.00|
| MARTIN| 300|1250.00| 9400.00|
| JAMES| 300| 950.00| 9400.00|
| DAVID| 500|1400.00| 1400.00|
| PRADEEP| 100|5000.00| 8750.00|
| CLARK| 100|2450.00| 8750.00|
| MILLER| 100|1300.00| 8750.00|
| SCOTT| 200|3000.00| 10875.00|
| FORD| 200|3000.00| 10875.00|
| JONES| 200|2975.00| 10875.00|
| ADAMS| 200|1100.00| 10875.00|
| SMITH| 200| 800.00| 10875.00|
+--------+-------+-------+--------------------+
from pyspark.sql.window import Window
from pyspark.sql.functions import avg, col
empdf_update = empdf.select("emp_name", "dept_no", "salary", avg("salary").over(Window.partitionBy("dept_no")).alias("Average_dept_salary"))
empdf_update.show()
+--------+-------+-------+-------------------+
|emp_name|dept_no| salary|Average_dept_salary|
+--------+-------+-------+-------------------+
| PRADEEP| 100|5000.00| 2916.666667|
| CLARK| 100|2450.00| 2916.666667|
| MILLER| 100|1300.00| 2916.666667|
| BLAKE| 300|2850.00| 1566.666667|
| ALLEN| 300|1600.00| 1566.666667|
| WARD| 300|1250.00| 1566.666667|
| MARTIN| 300|1250.00| 1566.666667|
| TURNER| 300|1500.00| 1566.666667|
| JAMES| 300| 950.00| 1566.666667|
| DAVID| 500|1400.00| 1400.000000|
| JONES| 200|2975.00| 2175.000000|
| SCOTT| 200|3000.00| 2175.000000|
| FORD| 200|3000.00| 2175.000000|
| SMITH| 200| 800.00| 2175.000000|
| ADAMS| 200|1100.00| 2175.000000|
+--------+-------+-------+-------------------+
from pyspark.sql.window import Window
from pyspark.sql.functions import avg, col
empdf_update = empdf.select("emp_name", "dept_no", "salary", avg("salary").over(Window.partitionBy("dept_no").orderBy("emp_name").rowsBetween(Window.unboundedPreceding, Window.currentRow)).alias("Cum_avg_Dept_salary"))
empdf_update.show()
+--------+-------+-------+-------------------+
|emp_name|dept_no| salary|Cum_avg_Dept_salary|
+--------+-------+-------+-------------------+
| CLARK| 100|2450.00| 2450.000000|
| MILLER| 100|1300.00| 1875.000000|
| PRADEEP| 100|5000.00| 2916.666667|
| ALLEN| 300|1600.00| 1600.000000|
| BLAKE| 300|2850.00| 2225.000000|
| JAMES| 300| 950.00| 1800.000000|
| MARTIN| 300|1250.00| 1662.500000|
| TURNER| 300|1500.00| 1630.000000|
| WARD| 300|1250.00| 1566.666667|
| DAVID| 500|1400.00| 1400.000000|
| ADAMS| 200|1100.00| 1100.000000|
| FORD| 200|3000.00| 2050.000000|
| JONES| 200|2975.00| 2358.333333|
| SCOTT| 200|3000.00| 2518.750000|
| SMITH| 200| 800.00| 2175.000000|
+--------+-------+-------+-------------------+
from pyspark.sql.window import Window
from pyspark.sql.functions import min, col
empdf_update = empdf.select("emp_name", "dept_no", "salary", min("salary").over(Window.partitionBy("dept_no")).alias("Minimum_dept_salary"))
empdf_update.show()
+--------+-------+-------+-------------------+
|emp_name|dept_no| salary|Minimum_dept_salary|
+--------+-------+-------+-------------------+
| PRADEEP| 100|5000.00| 1300.00|
| CLARK| 100|2450.00| 1300.00|
| MILLER| 100|1300.00| 1300.00|
| BLAKE| 300|2850.00| 950.00|
| ALLEN| 300|1600.00| 950.00|
| WARD| 300|1250.00| 950.00|
| MARTIN| 300|1250.00| 950.00|
| TURNER| 300|1500.00| 950.00|
| JAMES| 300| 950.00| 950.00|
| DAVID| 500|1400.00| 1400.00|
| JONES| 200|2975.00| 800.00|
| SCOTT| 200|3000.00| 800.00|
| FORD| 200|3000.00| 800.00|
| SMITH| 200| 800.00| 800.00|
| ADAMS| 200|1100.00| 800.00|
+--------+-------+-------+-------------------+
from pyspark.sql.window import Window
from pyspark.sql.functions import min, col
empdf_update = empdf.select("emp_name", "dept_no", "salary", min("salary").over(Window.partitionBy("dept_no").orderBy( col("salary")).rowsBetween(Window.currentRow-1,Window.currentRow-1)).alias("previous_row_salary"))
empdf_update.show()
+--------+-------+-------+-------------------+
|emp_name|dept_no| salary|previous_row_salary|
+--------+-------+-------+-------------------+
| MILLER| 100|1300.00| null|
| CLARK| 100|2450.00| 1300.00|
| PRADEEP| 100|5000.00| 2450.00|
| JAMES| 300| 950.00| null|
| WARD| 300|1250.00| 950.00|
| MARTIN| 300|1250.00| 1250.00|
| TURNER| 300|1500.00| 1250.00|
| ALLEN| 300|1600.00| 1500.00|
| BLAKE| 300|2850.00| 1600.00|
| DAVID| 500|1400.00| null|
| SMITH| 200| 800.00| null|
| ADAMS| 200|1100.00| 800.00|
| JONES| 200|2975.00| 1100.00|
| SCOTT| 200|3000.00| 2975.00|
| FORD| 200|3000.00| 3000.00|
+--------+-------+-------+-------------------+
from pyspark.sql.window import Window
from pyspark.sql.functions import min, col
empdf_update = empdf.select("emp_name", "dept_no", "salary", min("salary").over(Window.partitionBy("dept_no").orderBy( col("salary")).rowsBetween(Window.currentRow+1,Window.currentRow+1)).alias("next_row_salary"))
empdf_update.show()
+--------+-------+-------+---------------+
|emp_name|dept_no| salary|next_row_salary|
+--------+-------+-------+---------------+
| JAMES| 300| 950.00| 1250.00|
| WARD| 300|1250.00| 1250.00|
| MARTIN| 300|1250.00| 1500.00|
| TURNER| 300|1500.00| 1600.00|
| ALLEN| 300|1600.00| 2850.00|
| BLAKE| 300|2850.00| null|
| DAVID| 500|1400.00| null|
| MILLER| 100|1300.00| 2450.00|
| CLARK| 100|2450.00| 5000.00|
| PRADEEP| 100|5000.00| null|
| SMITH| 200| 800.00| 1100.00|
| ADAMS| 200|1100.00| 2975.00|
| JONES| 200|2975.00| 3000.00|
| SCOTT| 200|3000.00| 3000.00|
| FORD| 200|3000.00| null|
+--------+-------+-------+---------------+
from pyspark.sql.window import Window
from pyspark.sql.functions import max, col
empdf_update = empdf.select("emp_name", "dept_no", "salary", max("salary").over(Window.partitionBy("dept_no")).alias("Maximum_dept_salary"))
empdf_update.show()
+--------+-------+-------+-------------------+
|emp_name|dept_no| salary|Maximum_dept_salary|
+--------+-------+-------+-------------------+
| PRADEEP| 100|5000.00| 5000.00|
| CLARK| 100|2450.00| 5000.00|
| MILLER| 100|1300.00| 5000.00|
| BLAKE| 300|2850.00| 2850.00|
| ALLEN| 300|1600.00| 2850.00|
| WARD| 300|1250.00| 2850.00|
| MARTIN| 300|1250.00| 2850.00|
| TURNER| 300|1500.00| 2850.00|
| JAMES| 300| 950.00| 2850.00|
| DAVID| 500|1400.00| 1400.00|
| JONES| 200|2975.00| 3000.00|
| SCOTT| 200|3000.00| 3000.00|
| FORD| 200|3000.00| 3000.00|
| SMITH| 200| 800.00| 3000.00|
| ADAMS| 200|1100.00| 3000.00|
+--------+-------+-------+-------------------+
from pyspark.sql.window import Window
from pyspark.sql.functions import max, col
empdf_update = empdf.select("emp_name", "dept_no", "salary", max("salary").over(Window.partitionBy("dept_no").orderBy( col("emp_name")).rowsBetween(Window.unboundedPreceding,Window.currentRow)).alias("previous_row_salary"))
empdf_update.show()
+--------+-------+-------+-------------------+
|emp_name|dept_no| salary|previous_row_salary|
+--------+-------+-------+-------------------+
| CLARK| 100|2450.00| 2450.00|
| MILLER| 100|1300.00| 2450.00|
| PRADEEP| 100|5000.00| 5000.00|
| ALLEN| 300|1600.00| 1600.00|
| BLAKE| 300|2850.00| 2850.00|
| JAMES| 300| 950.00| 2850.00|
| MARTIN| 300|1250.00| 2850.00|
| TURNER| 300|1500.00| 2850.00|
| WARD| 300|1250.00| 2850.00|
| DAVID| 500|1400.00| 1400.00|
| ADAMS| 200|1100.00| 1100.00|
| FORD| 200|3000.00| 3000.00|
| JONES| 200|2975.00| 3000.00|
| SCOTT| 200|3000.00| 3000.00|
| SMITH| 200| 800.00| 3000.00|
+--------+-------+-------+-------------------+