Dept No Emp No salary Total_Department_Salary
------ ------ ------ ----------------------
10 11 15000 20000
10 12 5000 20000
20 22 10023 30023
20 23 10000 30023
20 24 10000 30023
30 31 13230 23230
30 32 10000 23230
Note: Moving all data to a single node(i.e without using partition) for using window functions, can cause serious performance degradation.
rowsBetween( Window.unboundedPreceding|Window.currentRow|Window.currentRow+-n (start window) , Window.currentRow+-m |Window.currentRow| Window.unboundedFollowing (end window))
empdf = spark.read.parquet("file:///path_to_files/employee_with_comm.parquet")
empdf.show()
+-------+--------+-------+----------+-------+-----+
| emp_no|emp_name| salary|manager_id|dept_no| comm|
+-------+--------+-------+----------+-------+-----+
|1000245| PRADEEP|5000.00| null| 100| 0.00|
|1000258| BLAKE|2850.00| 1000245| 300|50.00|
|1000262| CLARK|2450.00| 1000245| 100|50.00|
|1000276| JONES|2975.00| 1000245| 200|75.00|
|1000288| SCOTT|3000.00| 1000276| 200| 0.00|
|1000292| FORD|3000.00| 1000276| 200| 0.00|
|1000294| SMITH| 800.00| 1000292| 200| 0.00|
|1000299| ALLEN|1600.00| 1000258| 300| 0.00|
|1000310| WARD|1250.00| 1000258| 300|50.00|
|1000312| MARTIN|1250.00| 1000258| 300|50.00|
|1000315| TURNER|1500.00| 1000258| 300| 0.00|
|1000326| ADAMS|1100.00| 1000288| 200| 0.00|
|1000336| JAMES| 950.00| 1000258| 300|50.00|
|1000346| MILLER|1300.00| 1000262| 100| 0.00|
|1000347| DAVID|1400.00| 1000245| 500| 0.00|
+-------+--------+-------+----------+-------+-----+
from pyspark.sql.window import Window
from pyspark.sql.window import Window
from pyspark.sql.functions import lag, col
empdf_update = empdf.select("emp_name", "dept_no", "salary", lag("salary").over(Window.partitionBy("dept_no").orderBy("salary")).alias("previous_row_salary"))
empdf_update.show()
+--------+-------+-------+-------------------+
|emp_name|dept_no| salary|previous_row_salary|
+--------+-------+-------+-------------------+
| JAMES| 300| 950.00| null|
| WARD| 300|1250.00| 950.00|
| MARTIN| 300|1250.00| 1250.00|
| TURNER| 300|1500.00| 1250.00|
| ALLEN| 300|1600.00| 1500.00|
| BLAKE| 300|2850.00| 1600.00|
| DAVID| 500|1400.00| null|
| MILLER| 100|1300.00| null|
| CLARK| 100|2450.00| 1300.00|
| PRADEEP| 100|5000.00| 2450.00|
| SMITH| 200| 800.00| null|
| ADAMS| 200|1100.00| 800.00|
| JONES| 200|2975.00| 1100.00|
| SCOTT| 200|3000.00| 2975.00|
| FORD| 200|3000.00| 3000.00|
+--------+-------+-------+-------------------+
from pyspark.sql.window import Window
from pyspark.sql.functions import lead, col
empdf_update = empdf.select("emp_name", "dept_no", "salary", lead("salary").over(Window.partitionBy("dept_no").orderBy( col("salary").desc())).alias("next_row_salary"))
empdf_update.show()
+--------+-------+-------+---------------+
|emp_name|dept_no| salary|next_row_salary|
+--------+-------+-------+---------------+
| BLAKE| 300|2850.00| 1600.00|
| ALLEN| 300|1600.00| 1500.00|
| TURNER| 300|1500.00| 1250.00|
| WARD| 300|1250.00| 1250.00|
| MARTIN| 300|1250.00| 950.00|
| JAMES| 300| 950.00| null|
| DAVID| 500|1400.00| null|
| PRADEEP| 100|5000.00| 2450.00|
| CLARK| 100|2450.00| 1300.00|
| MILLER| 100|1300.00| null|
| SCOTT| 200|3000.00| 3000.00|
| FORD| 200|3000.00| 2975.00|
| JONES| 200|2975.00| 1100.00|
| ADAMS| 200|1100.00| 800.00|
| SMITH| 200| 800.00| null|
+--------+-------+-------+---------------+
from pyspark.sql.window import Window
from pyspark.sql.functions import first, col
empdf_update = empdf.select("emp_name", "dept_no", "salary", first("salary").over(Window.partitionBy("dept_no").orderBy("salary")).alias("first_value_salary"))
empdf_update.show()
+--------+-------+-------+------------------+
|emp_name|dept_no| salary|first_value_salary|
+--------+-------+-------+------------------+
| JAMES| 300| 950.00| 950.00|
| WARD| 300|1250.00| 950.00|
| MARTIN| 300|1250.00| 950.00|
| TURNER| 300|1500.00| 950.00|
| ALLEN| 300|1600.00| 950.00|
| BLAKE| 300|2850.00| 950.00|
| DAVID| 500|1400.00| 1400.00|
| MILLER| 100|1300.00| 1300.00|
| CLARK| 100|2450.00| 1300.00|
| PRADEEP| 100|5000.00| 1300.00|
| SMITH| 200| 800.00| 800.00|
| ADAMS| 200|1100.00| 800.00|
| JONES| 200|2975.00| 800.00|
| SCOTT| 200|3000.00| 800.00|
| FORD| 200|3000.00| 800.00|
+--------+-------+-------+------------------+
from pyspark.sql.window import Window
from pyspark.sql.functions import last, col
empdf_update = empdf.select("emp_name", "dept_no", "salary", last("salary").over(Window.partitionBy("dept_no").orderBy( col("salary"))).alias("last_value_example"))
empdf_update.show()
+--------+-------+-------+------------------+
|emp_name|dept_no| salary|last_value_example|
+--------+-------+-------+------------------+
| JAMES| 300| 950.00| 950.00|
| WARD| 300|1250.00| 1250.00|
| MARTIN| 300|1250.00| 1250.00|
| TURNER| 300|1500.00| 1500.00|
| ALLEN| 300|1600.00| 1600.00|
| BLAKE| 300|2850.00| 2850.00|
| DAVID| 500|1400.00| 1400.00|
| MILLER| 100|1300.00| 1300.00|
| CLARK| 100|2450.00| 2450.00|
| PRADEEP| 100|5000.00| 5000.00|
| SMITH| 200| 800.00| 800.00|
| ADAMS| 200|1100.00| 1100.00|
| JONES| 200|2975.00| 2975.00|
| SCOTT| 200|3000.00| 3000.00|
| FORD| 200|3000.00| 3000.00|
+--------+-------+-------+------------------+
from pyspark.sql.window import Window
from pyspark.sql.functions import last, col
empdf_update = empdf.select("emp_name", "dept_no", "salary", last("salary").over(Window.partitionBy("dept_no").orderBy( col("salary")).rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)).alias("last_value_example"))
empdf_update.show()
+--------+-------+-------+------------------+
|emp_name|dept_no| salary|last_value_example|
+--------+-------+-------+------------------+
| JAMES| 300| 950.00| 2850.00|
| WARD| 300|1250.00| 2850.00|
| MARTIN| 300|1250.00| 2850.00|
| TURNER| 300|1500.00| 2850.00|
| ALLEN| 300|1600.00| 2850.00|
| BLAKE| 300|2850.00| 2850.00|
| DAVID| 500|1400.00| 1400.00|
| MILLER| 100|1300.00| 5000.00|
| CLARK| 100|2450.00| 5000.00|
| PRADEEP| 100|5000.00| 5000.00|
| SMITH| 200| 800.00| 3000.00|
| ADAMS| 200|1100.00| 3000.00|
| JONES| 200|2975.00| 3000.00|
| SCOTT| 200|3000.00| 3000.00|
| FORD| 200|3000.00| 3000.00|
+--------+-------+-------+------------------+
from pyspark.sql.window import Window
from pyspark.sql.functions import cume_dist, col
empdf_update = empdf.select("emp_name", "dept_no", "salary", cume_dist().over(Window.partitionBy("dept_no").orderBy( col("salary"))).alias("cume_dist_example"))
empdf_update.show()
+--------+-------+-------+-------------------+
|emp_name|dept_no| salary| cume_dist_example|
+--------+-------+-------+-------------------+
| JAMES| 300| 950.00|0.16666666666666666|
| WARD| 300|1250.00| 0.5|
| MARTIN| 300|1250.00| 0.5|
| TURNER| 300|1500.00| 0.6666666666666666|
| ALLEN| 300|1600.00| 0.8333333333333334|
| BLAKE| 300|2850.00| 1.0|
| DAVID| 500|1400.00| 1.0|
| MILLER| 100|1300.00| 0.3333333333333333|
| CLARK| 100|2450.00| 0.6666666666666666|
| PRADEEP| 100|5000.00| 1.0|
| SMITH| 200| 800.00| 0.2|
| ADAMS| 200|1100.00| 0.4|
| JONES| 200|2975.00| 0.6|
| SCOTT| 200|3000.00| 1.0|
| FORD| 200|3000.00| 1.0|
+--------+-------+-------+-------------------+