Dept No Emp No salary Total_Department_Salary
------ ------ ------ ----------------------
10 11 15000 20000
10 12 5000 20000
20 22 10023 30023
20 23 10000 30023
20 24 10000 30023
30 31 13230 23230
30 32 10000 23230
Note: Moving all data to a single node(i.e without using partition) for using window functions, can cause serious performance degradation.
rowsBetween( Window.unboundedPreceding|Window.currentRow|Window.currentRow+-n (start window) , Window.currentRow+-m |Window.currentRow| Window.unboundedFollowing (end window))
empdf = spark.read.parquet("file:///path_to_files/employee_with_comm.parquet")
empdf.show()
+-------+--------+-------+----------+-------+-----+
| emp_no|emp_name| salary|manager_id|dept_no| comm|
+-------+--------+-------+----------+-------+-----+
|1000245| PRADEEP|5000.00| null| 100| 0.00|
|1000258| BLAKE|2850.00| 1000245| 300|50.00|
|1000262| CLARK|2450.00| 1000245| 100|50.00|
|1000276| JONES|2975.00| 1000245| 200|75.00|
|1000288| SCOTT|3000.00| 1000276| 200| 0.00|
|1000292| FORD|3000.00| 1000276| 200| 0.00|
|1000294| SMITH| 800.00| 1000292| 200| 0.00|
|1000299| ALLEN|1600.00| 1000258| 300| 0.00|
|1000310| WARD|1250.00| 1000258| 300|50.00|
|1000312| MARTIN|1250.00| 1000258| 300|50.00|
|1000315| TURNER|1500.00| 1000258| 300| 0.00|
|1000326| ADAMS|1100.00| 1000288| 200| 0.00|
|1000336| JAMES| 950.00| 1000258| 300|50.00|
|1000346| MILLER|1300.00| 1000262| 100| 0.00|
|1000347| DAVID|1400.00| 1000245| 500| 0.00|
+-------+--------+-------+----------+-------+-----+
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, col
empdf_update = empdf.select("emp_name", "dept_no", "salary", row_number().over(Window.orderBy("salary")).alias("row_number_salary"))
empdf_update.show()
+--------+-------+-------+-----------------+
|emp_name|dept_no| salary|row_number_salary|
+--------+-------+-------+-----------------+
| SMITH| 200| 800.00| 1|
| JAMES| 300| 950.00| 2|
| ADAMS| 200|1100.00| 3|
| WARD| 300|1250.00| 4|
| MARTIN| 300|1250.00| 5|
| MILLER| 100|1300.00| 6|
| DAVID| 500|1400.00| 7|
| TURNER| 300|1500.00| 8|
| ALLEN| 300|1600.00| 9|
| CLARK| 100|2450.00| 10|
| BLAKE| 300|2850.00| 11|
| JONES| 200|2975.00| 12|
| SCOTT| 200|3000.00| 13|
| FORD| 200|3000.00| 14|
| PRADEEP| 100|5000.00| 15|
+--------+-------+-------+-----------------+
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, col
empdf_update = empdf.select("emp_name", "dept_no", "salary", row_number().over(Window.partitionBy("dept_no").orderBy("salary")).alias("row_number_Dept_salary"))
empdf_update.show()
+--------+-------+-------+----------------------+
|emp_name|dept_no| salary|row_number_Dept_salary|
+--------+-------+-------+----------------------+
| JAMES| 300| 950.00| 1|
| WARD| 300|1250.00| 2|
| MARTIN| 300|1250.00| 3|
| TURNER| 300|1500.00| 4|
| ALLEN| 300|1600.00| 5|
| BLAKE| 300|2850.00| 6|
| DAVID| 500|1400.00| 1|
| MILLER| 100|1300.00| 1|
| CLARK| 100|2450.00| 2|
| PRADEEP| 100|5000.00| 3|
| SMITH| 200| 800.00| 1|
| ADAMS| 200|1100.00| 2|
| JONES| 200|2975.00| 3|
| SCOTT| 200|3000.00| 4|
| FORD| 200|3000.00| 5|
+--------+-------+-------+----------------------+
from pyspark.sql.window import Window
from pyspark.sql.functions import rank, col
empdf_update = empdf.select("emp_name", "dept_no", "salary", rank().over(Window.partitionBy("dept_no").orderBy( col("salary").desc())).alias("Rank_by_salary"))
empdf_update.show()
+--------+-------+-------+--------------+
|emp_name|dept_no| salary|Rank_by_salary|
+--------+-------+-------+--------------+
| BLAKE| 300|2850.00| 1|
| ALLEN| 300|1600.00| 2|
| TURNER| 300|1500.00| 3|
| WARD| 300|1250.00| 4|
| MARTIN| 300|1250.00| 4|<--
| JAMES| 300| 950.00| 6|<-- Rank 5 is missing
| DAVID| 500|1400.00| 1|
| PRADEEP| 100|5000.00| 1|
| CLARK| 100|2450.00| 2|
| MILLER| 100|1300.00| 3|
| SCOTT| 200|3000.00| 1|
| FORD| 200|3000.00| 1|<--
| JONES| 200|2975.00| 3|<-- Rank 2 is missing
| ADAMS| 200|1100.00| 4|
| SMITH| 200| 800.00| 5|
+--------+-------+-------+--------------+
from pyspark.sql.window import Window
from pyspark.sql.functions import dense_rank, col
empdf_update = empdf.select("emp_name", "dept_no", "salary", dense_rank().over( Window.partitionBy("dept_no").orderBy( col("salary").desc())).alias("Dense_Rank_by_salary"))
empdf_update.show()
+--------+-------+-------+--------------------+
|emp_name|dept_no| salary|Dense_Rank_by_salary|
+--------+-------+-------+--------------------+
| BLAKE| 300|2850.00| 1|
| ALLEN| 300|1600.00| 2|
| TURNER| 300|1500.00| 3|
| WARD| 300|1250.00| 4|
| MARTIN| 300|1250.00| 4|
| JAMES| 300| 950.00| 5|
| DAVID| 500|1400.00| 1|
| PRADEEP| 100|5000.00| 1|
| CLARK| 100|2450.00| 2|
| MILLER| 100|1300.00| 3|
| SCOTT| 200|3000.00| 1|
| FORD| 200|3000.00| 1|
| JONES| 200|2975.00| 2|
| ADAMS| 200|1100.00| 3|
| SMITH| 200| 800.00| 4|
+--------+-------+-------+--------------------+
from pyspark.sql.window import Window
from pyspark.sql.functions import percent_rank, col
empdf_update = empdf.select("emp_name", "dept_no", "salary", percent_rank().over( Window.partitionBy("dept_no").orderBy( col("salary"))).alias("percent_rank_salary"))
empdf_update.show()
+--------+-------+-------+-------------------+
|emp_name|dept_no| salary|percent_rank_salary|
+--------+-------+-------+-------------------+
| JAMES| 300| 950.00| 0.0|
| WARD| 300|1250.00| 0.2|
| MARTIN| 300|1250.00| 0.2|<--
| TURNER| 300|1500.00| 0.6|<--
| ALLEN| 300|1600.00| 0.8|
| BLAKE| 300|2850.00| 1.0|
| DAVID| 500|1400.00| 0.0|
| MILLER| 100|1300.00| 0.0|
| CLARK| 100|2450.00| 0.5|
| PRADEEP| 100|5000.00| 1.0|
| SMITH| 200| 800.00| 0.0|
| ADAMS| 200|1100.00| 0.25|
| JONES| 200|2975.00| 0.5|
| SCOTT| 200|3000.00| 0.75|
| FORD| 200|3000.00| 0.75|
+--------+-------+-------+-------------------+
from pyspark.sql.window import Window
from pyspark.sql.functions import ntile, col
empdf_update = empdf.select("emp_name", "dept_no", "salary", ntile(2).over(Window.partitionBy("dept_no").orderBy( col("salary"))).alias("ntile_example"))
empdf_update.show()
+--------+-------+-------+-------------+
|emp_name|dept_no| salary|ntile_example|
+--------+-------+-------+-------------+
| JAMES| 300| 950.00| 1|
| WARD| 300|1250.00| 1|
| MARTIN| 300|1250.00| 1|
| TURNER| 300|1500.00| 2|
| ALLEN| 300|1600.00| 2|
| BLAKE| 300|2850.00| 2|
| DAVID| 500|1400.00| 1|
| MILLER| 100|1300.00| 1|
| CLARK| 100|2450.00| 1|
| PRADEEP| 100|5000.00| 2|
| SMITH| 200| 800.00| 1|
| ADAMS| 200|1100.00| 1|
| JONES| 200|2975.00| 1|
| SCOTT| 200|3000.00| 2|
| FORD| 200|3000.00| 2|
+--------+-------+-------+-------------+
from pyspark.sql.window import Window
from pyspark.sql.functions import ntile, col
empdf_update = empdf.select("emp_name", "dept_no", "salary", ntile(3).over(Window.partitionBy("dept_no").orderBy( col("salary"))).alias("ntile_example"))
empdf_update.show()
+--------+-------+-------+-------------+
|emp_name|dept_no| salary|ntile_example|
+--------+-------+-------+-------------+
| JAMES| 300| 950.00| 1|
| WARD| 300|1250.00| 1|
| MARTIN| 300|1250.00| 2|
| TURNER| 300|1500.00| 2|
| ALLEN| 300|1600.00| 3|
| BLAKE| 300|2850.00| 3|
| DAVID| 500|1400.00| 1|
| MILLER| 100|1300.00| 1|
| CLARK| 100|2450.00| 2|
| PRADEEP| 100|5000.00| 3|
| SMITH| 200| 800.00| 1|
| ADAMS| 200|1100.00| 1|
| JONES| 200|2975.00| 2|
| SCOTT| 200|3000.00| 2|
| FORD| 200|3000.00| 3|
+--------+-------+-------+-------------+