This tutorial will explain various types of joins that are supported in Pyspark. It will also cover some challenges in joining 2 tables having same column names. Following topics will be covered on this page:
leftDataframe.join(otherDataframe, on=None, how=None)
Data 1:
empdf=spark.read.parquet("file:///path_to_file/employee.parquet")
deptdf=spark.read.parquet("file:///path_to_file/department.parquet")
Data 2:
df_1 = spark.read.option("header",True).csv("file:///path_to_file/join_example_file_1.csv")
df_2 = spark.read.option("header",True).csv("file:///path_to_file/join_example_file_2.csv")
df_1.show()
+-----+---------+-------+
|db_id| db_name|db_type|
+-----+---------+-------+
| 12| Teradata| RDBMS|
| 14|Snowflake|CloudDB|
| 15| Vertica| RDBMS|
| 17| Oracle| RDBMS|
| 19| MongoDB| NOSQL|
+-----+---------+-------+
df_2.show()
+-----+-----------+-------+
|db_id| db_name|db_type|
+-----+-----------+-------+
| 17| Oracle| RDBMS|
| 19| MongoDB| NOSQL|
| 21|SingleStore| RDBMS|
| 22| Mysql| RDBMS|
| 14| Snowflake| RDBMS|
+-----+-----------+-------+
df_1.join(df_2,"db_id").show()
+-----+---------+-------+---------+-------+
|db_id| db_name|db_type| db_name|db_type|
+-----+---------+-------+---------+-------+
| 14|Snowflake|CloudDB|Snowflake| RDBMS|
| 17| Oracle| RDBMS| Oracle| RDBMS|
| 19| MongoDB| NOSQL| MongoDB| NOSQL|
+-----+---------+-------+---------+-------+
df_1.join(df_2,df_1.db_id==df_2.db_id).show()
+-----+---------+-------+-----+---------+-------+
|db_id| db_name|db_type|db_id| db_name|db_type|
+-----+---------+-------+-----+---------+-------+
| 14|Snowflake|CloudDB| 14|Snowflake| RDBMS|
| 17| Oracle| RDBMS| 17| Oracle| RDBMS|
| 19| MongoDB| NOSQL| 19| MongoDB| NOSQL|
+-----+---------+-------+-----+---------+-------+
df_1.join(df_2,df_1.db_id==df_2.db_id,"inner").show()
+-----+---------+-------+-----+---------+-------+
|db_id| db_name|db_type|db_id| db_name|db_type|
+-----+---------+-------+-----+---------+-------+
| 14|Snowflake|CloudDB| 14|Snowflake| RDBMS|
| 17| Oracle| RDBMS| 17| Oracle| RDBMS|
| 19| MongoDB| NOSQL| 19| MongoDB| NOSQL|
+-----+---------+-------+-----+---------+-------+
empdf.join(deptdf,empdf.dept_no==deptdf.dept_no,"inner").show()
+-------+--------+-------+-------+-------+---------------+---------+
| emp_no|emp_name| salary|dept_no|dept_no|department_name| loc_name|
+-------+--------+-------+-------+-------+---------------+---------+
|1000245| PRADEEP|5000.00| 100| 100| ACCOUNTS| JAIPUR|
|1000258| BLAKE|2850.00| 300| 300| SALES|BENGALURU|
|1000262| CLARK|2450.00| 100| 100| ACCOUNTS| JAIPUR|
|1000276| JONES|2975.00| 200| 200| R & D|NEW DELHI|
|1000288| SCOTT|3000.00| 200| 200| R & D|NEW DELHI|
|1000292| FORD|3000.00| 200| 200| R & D|NEW DELHI|
|1000294| SMITH| 800.00| 200| 200| R & D|NEW DELHI|
|1000299| ALLEN|1600.00| 300| 300| SALES|BENGALURU|
|1000310| WARD|1250.00| 300| 300| SALES|BENGALURU|
|1000312| MARTIN|1250.00| 300| 300| SALES|BENGALURU|
|1000315| TURNER|1500.00| 300| 300| SALES|BENGALURU|
|1000326| ADAMS|1100.00| 200| 200| R & D|NEW DELHI|
|1000336| JAMES| 950.00| 300| 300| SALES|BENGALURU|
|1000346| MILLER|1300.00| 100| 100| ACCOUNTS| JAIPUR|
+-------+--------+-------+-------+-------+---------------+---------+
df_1.join(df_2,df_1.db_id==df_2.db_id,"left").show()
+-----+---------+-------+-----+-------+-------+
|db_id| db_name|db_type|db_id|db_name|db_type|
+-----+---------+-------+-----+-------+-------+
| 12| Teradata| RDBMS| null| null| null|<--
| 14|Snowflake|CloudDB| null| null| null|<--
| 15| Vertica| RDBMS| null| null| null|<--
| 17| Oracle| RDBMS| 17| Oracle| RDBMS|
| 19| MongoDB| NOSQL| 19|MongoDB| NOSQL|
+-----+---------+-------+-----+-------+-------+
empdf.join(deptdf,empdf.dept_no==deptdf.dept_no,"left").show()
+-------+--------+-------+-------+-------+---------------+---------+
| emp_no|emp_name| salary|dept_no|dept_no|department_name| loc_name|
+-------+--------+-------+-------+-------+---------------+---------+
|1000245| PRADEEP|5000.00| 100| 100| ACCOUNTS| JAIPUR|
|1000258| BLAKE|2850.00| 300| 300| SALES|BENGALURU|
|1000262| CLARK|2450.00| 100| 100| ACCOUNTS| JAIPUR|
|1000276| JONES|2975.00| 200| 200| R & D|NEW DELHI|
|1000288| SCOTT|3000.00| 200| 200| R & D|NEW DELHI|
|1000292| FORD|3000.00| 200| 200| R & D|NEW DELHI|
|1000294| SMITH| 800.00| 200| 200| R & D|NEW DELHI|
|1000299| ALLEN|1600.00| 300| 300| SALES|BENGALURU|
|1000310| WARD|1250.00| 300| 300| SALES|BENGALURU|
|1000312| MARTIN|1250.00| 300| 300| SALES|BENGALURU|
|1000315| TURNER|1500.00| 300| 300| SALES|BENGALURU|
|1000326| ADAMS|1100.00| 200| 200| R & D|NEW DELHI|
|1000336| JAMES| 950.00| 300| 300| SALES|BENGALURU|
|1000346| MILLER|1300.00| 100| 100| ACCOUNTS| JAIPUR|
|1000347| DAVID|1400.00| 500| null| null| null|<--
+-------+--------+-------+-------+-------+---------------+---------+
df_1.join(df_2,df_1.db_id==df_2.db_id,"right").show()
+-----+---------+-------+-----+-----------+-------+
|db_id| db_name|db_type|db_id| db_name|db_type|
+-----+---------+-------+-----+-----------+-------+
| 17| Oracle| RDBMS| 17| Oracle| RDBMS|
| 19| MongoDB| NOSQL| 19| MongoDB| NOSQL|
| null| null| null| 21|SingleStore| RDBMS|<--
| null| null| null| 22| Mysql| RDBMS|<--
| 14|Snowflake|CloudDB| 14| Snowflake| RDBMS|
+-----+---------+-------+-----+-----------+-------+
empdf.join(deptdf,empdf.dept_no==deptdf.dept_no,"right").show()
+-------+--------+-------+-------+-------+--------------------+-----------+
| emp_no|emp_name| salary|dept_no|dept_no| department_name| loc_name|
+-------+--------+-------+-------+-------+--------------------+-----------+
|1000346| MILLER|1300.00| 100| 100| ACCOUNTS| JAIPUR|
|1000262| CLARK|2450.00| 100| 100| ACCOUNTS| JAIPUR|
|1000245| PRADEEP|5000.00| 100| 100| ACCOUNTS| JAIPUR|
|1000326| ADAMS|1100.00| 200| 200| R & D| NEW DELHI|
|1000294| SMITH| 800.00| 200| 200| R & D| NEW DELHI|
|1000292| FORD|3000.00| 200| 200| R & D| NEW DELHI|
|1000288| SCOTT|3000.00| 200| 200| R & D| NEW DELHI|
|1000276| JONES|2975.00| 200| 200| R & D| NEW DELHI|
|1000336| JAMES| 950.00| 300| 300| SALES| BENGALURU|
|1000315| TURNER|1500.00| 300| 300| SALES| BENGALURU|
|1000312| MARTIN|1250.00| 300| 300| SALES| BENGALURU|
|1000310| WARD|1250.00| 300| 300| SALES| BENGALURU|
|1000299| ALLEN|1600.00| 300| 300| SALES| BENGALURU|
|1000258| BLAKE|2850.00| 300| 300| SALES| BENGALURU|
| null| null| null| null| 400|INFORMATION TECHN...|BHUBANESWAR|<--
+-------+--------+-------+-------+-------+--------------------+-----------+
df_1.join(df_2,df_1.db_id==df_2.db_id,"full").show()
+-----+---------+-------+-----+-----------+-------+
|db_id| db_name|db_type|db_id| db_name|db_type|
+-----+---------+-------+-----+-----------+-------+
| 15| Vertica| RDBMS| null| null| null|<--
| null| null| null| 22| Mysql| RDBMS|<--
| 17| Oracle| RDBMS| 17| Oracle| RDBMS|
| 19| MongoDB| NOSQL| 19| MongoDB| NOSQL|
| 12| Teradata| RDBMS| null| null| null|<--
| 14|Snowflake|CloudDB| 14| Snowflake| RDBMS|
| null| null| null| 21|SingleStore| RDBMS|<--
+-----+---------+-------+-----+-----------+-------+
empdf.join(deptdf,empdf.dept_no==deptdf.dept_no,"full").show()
+-------+--------+-------+-------+-------+--------------------+-----------+
| emp_no|emp_name| salary|dept_no|dept_no| department_name| loc_name|
+-------+--------+-------+-------+-------+--------------------+-----------+
|1000258| BLAKE|2850.00| 300| 300| SALES| BENGALURU|
|1000299| ALLEN|1600.00| 300| 300| SALES| BENGALURU|
|1000310| WARD|1250.00| 300| 300| SALES| BENGALURU|
|1000312| MARTIN|1250.00| 300| 300| SALES| BENGALURU|
|1000315| TURNER|1500.00| 300| 300| SALES| BENGALURU|
|1000336| JAMES| 950.00| 300| 300| SALES| BENGALURU|
|1000347| DAVID|1400.00| 500| null| null| null|<--
|1000245| PRADEEP|5000.00| 100| 100| ACCOUNTS| JAIPUR|
|1000262| CLARK|2450.00| 100| 100| ACCOUNTS| JAIPUR|
|1000346| MILLER|1300.00| 100| 100| ACCOUNTS| JAIPUR|
| null| null| null| null| 400|INFORMATION TECHN...|BHUBANESWAR|<--
|1000276| JONES|2975.00| 200| 200| R & D| NEW DELHI|
|1000288| SCOTT|3000.00| 200| 200| R & D| NEW DELHI|
|1000292| FORD|3000.00| 200| 200| R & D| NEW DELHI|
|1000294| SMITH| 800.00| 200| 200| R & D| NEW DELHI|
|1000326| ADAMS|1100.00| 200| 200| R & D| NEW DELHI|
+-------+--------+-------+-------+-------+--------------------+-----------+
df_1.join(df_2).show()
+-----+---------+-------+-----+-----------+-------+
|db_id| db_name|db_type|db_id| db_name|db_type|
+-----+---------+-------+-----+-----------+-------+
| 12| Teradata| RDBMS| 17| Oracle| RDBMS|
| 12| Teradata| RDBMS| 19| MongoDB| NOSQL|
| 12| Teradata| RDBMS| 21|SingleStore| RDBMS|
| 12| Teradata| RDBMS| 22| Mysql| RDBMS|
| 14|Snowflake|CloudDB| 17| Oracle| RDBMS|
.
.
.
| 19| MongoDB| NOSQL| 22| Mysql| RDBMS|
+-----+---------+-------+-----+-----------+-------+
df_1.crossJoin(df_2).show()
+-----+---------+-------+-----+-----------+-------+
|db_id| db_name|db_type|db_id| db_name|db_type|
+-----+---------+-------+-----+-----------+-------+
| 12| Teradata| RDBMS| 17| Oracle| RDBMS|
| 12| Teradata| RDBMS| 19| MongoDB| NOSQL|
| 12| Teradata| RDBMS| 21|SingleStore| RDBMS|
| 12| Teradata| RDBMS| 22| Mysql| RDBMS|
| 14|Snowflake|CloudDB| 17| Oracle| RDBMS|
.
.
.
| 19| MongoDB| NOSQL| 22| Mysql| RDBMS|
+-----+---------+-------+-----+-----------+-------+
empdf.join(deptdf).orderBy("emp_no").show()
+-------+--------+-------+-------+-------+--------------------+-----------+
| emp_no|emp_name| salary|dept_no|dept_no| department_name| loc_name|
+-------+--------+-------+-------+-------+--------------------+-----------+
|1000245| PRADEEP|5000.00| 100| 300| SALES| BENGALURU|
|1000245| PRADEEP|5000.00| 100| 400|INFORMATION TECHN...|BHUBANESWAR|
|1000245| PRADEEP|5000.00| 100| 100| ACCOUNTS| JAIPUR|
|1000245| PRADEEP|5000.00| 100| 200| R & D| NEW DELHI|
|1000258| BLAKE|2850.00| 300| 100| ACCOUNTS| JAIPUR|
|1000258| BLAKE|2850.00| 300| 200| R & D| NEW DELHI|
|1000258| BLAKE|2850.00| 300| 300| SALES| BENGALURU|
|1000258| BLAKE|2850.00| 300| 400|INFORMATION TECHN...|BHUBANESWAR|
|1000262| CLARK|2450.00| 100| 400|INFORMATION TECHN...|BHUBANESWAR|
|1000262| CLARK|2450.00| 100| 200| R & D| NEW DELHI|
.
.
.
|1000288| SCOTT|3000.00| 200| 400|INFORMATION TECHN...|BHUBANESWAR|
+-------+--------+-------+-------+-------+--------------------+-----------+
df_1.join(df_2,df_1.db_id==df_2.db_id,"semi").show()
+-----+---------+-------+
|db_id| db_name|db_type|
+-----+---------+-------+
| 14|Snowflake|CloudDB|
| 17| Oracle| RDBMS|
| 19| MongoDB| NOSQL|
+-----+---------+-------+
empdf.join(deptdf,empdf.dept_no==deptdf.dept_no,"semi").show()
+-------+--------+-------+-------+
| emp_no|emp_name| salary|dept_no|
+-------+--------+-------+-------+
|1000245| PRADEEP|5000.00| 100|
|1000258| BLAKE|2850.00| 300|
|1000262| CLARK|2450.00| 100|
|1000276| JONES|2975.00| 200|
|1000288| SCOTT|3000.00| 200|
|1000292| FORD|3000.00| 200|
|1000294| SMITH| 800.00| 200|
|1000299| ALLEN|1600.00| 300|
|1000310| WARD|1250.00| 300|
|1000312| MARTIN|1250.00| 300|
|1000315| TURNER|1500.00| 300|
|1000326| ADAMS|1100.00| 200|
|1000336| JAMES| 950.00| 300|
|1000346| MILLER|1300.00| 100|
+-------+--------+-------+-------+
df_1.join(df_2,df_1.db_id==df_2.db_id,"anti").show()
+-----+---------+-------+
|db_id| db_name|db_type|
+-----+---------+-------+
| 12| Teradata| RDBMS|
| 15| Vertica| RDBMS|
+-----+---------+-------+
df_1.join(df_2,"db_id","left").show()
+-----+-------+-------+-------+-------+
|db_id|db_name|db_type|db_name|db_type|
+-----+-------+-------+-------+-------+
| 17| Oracle| RDBMS| Oracle| RDBMS|
| 19|MongoDB| NOSQL|MongoDB| NOSQL|
+-----+-------+-------+-------+-------+
df_1.join(df_2,[df_1.db_id==df_2.db_id, df_1.db_name==df_2.db_name],"inner").show()
+-----+---------+-------+-----+---------+-------+
|db_id| db_name|db_type|db_id| db_name|db_type|
+-----+---------+-------+-----+---------+-------+
| 14|Snowflake|CloudDB| 14|Snowflake| RDBMS|
| 17| Oracle| RDBMS| 17| Oracle| RDBMS|
| 19| MongoDB| NOSQL| 19| MongoDB| NOSQL|
+-----+---------+-------+-----+---------+-------+
df_1.join(df_2,[df_1.db_id==df_2.db_id, df_1.db_type!=df_2.db_type],"inner").show()
+-----+---------+-------+-----+---------+-------+
|db_id| db_name|db_type|db_id| db_name|db_type|
+-----+---------+-------+-----+---------+-------+
| 14|Snowflake|CloudDB| 14|Snowflake| RDBMS|
+-----+---------+-------+-----+---------+-------+
--Renaming all columns to append df_ as prefix
import pyspark.sql.functions as f
df_2_1 = df_2.select([f.col(column).alias("df_"+column) for column in df_2.columns])
df_2_1.show()
+--------+-----------+----------+
|df_db_id| df_db_name|df_db_type|
+--------+-----------+----------+
| 17| Oracle| RDBMS|
| 19| MongoDB| NOSQL|
| 21|SingleStore| RDBMS|
| 22| Mysql| RDBMS|
| 14| Snowflake| RDBMS|
+--------+-----------+----------+
df_1.join(df_2_1,df_1.db_id==df_2_1.df_db_id,"inner").show()
+-----+---------+-------+--------+----------+----------+
|db_id| db_name|db_type|df_db_id|df_db_name|df_db_type|
+-----+---------+-------+--------+----------+----------+
| 14|Snowflake|CloudDB| 14| Snowflake| RDBMS|
| 17| Oracle| RDBMS| 17| Oracle| RDBMS|
| 19| MongoDB| NOSQL| 19| MongoDB| NOSQL|
+-----+---------+-------+--------+----------+----------+
joined_df = df_1.join(df_2,[df_1.db_id==df_2.db_id, df_1.db_type!=df_2.db_type],"inner")
joined_df.printSchema()
root
|-- db_id: string (nullable = true)
|-- db_name: string (nullable = true)
|-- db_type: string (nullable = true)
|-- db_id: string (nullable = true)
|-- db_name: string (nullable = true)
|-- db_type: string (nullable = true)
joined_df.select("db_type").show()
Output:
pyspark.sql.utils.AnalysisException: Reference 'db_type' is ambiguous, could be: db_name, db_name.;
joined_df = df_1.join(df_2,[df_1.db_id==df_2.db_id, df_1.db_type!=df_2.db_type],"inner")
joined_df.select(df_2.db_type, df_1.db_type).show()
Output:
+-------+-------+
|db_type|db_type|
+-------+-------+
| RDBMS|CloudDB|
+-------+-------+
--Renaming all columns to append df_ as prefix
import pyspark.sql.functions as f
df_2_renamed = df_2.select([f.col(column).alias("df_"+column) for column in df_2.columns])
df_2_renamed.show()
+--------+-----------+----------+
|df_db_id| df_db_name|df_db_type|<--
+--------+-----------+----------+
| 17| Oracle| RDBMS|
| 19| MongoDB| NOSQL|
| 21|SingleStore| RDBMS|
| 22| Mysql| RDBMS|
| 14| Snowflake| RDBMS|
+--------+-----------+----------+
joined_df = df_1.join(df_2_renamed,[ df_1.db_id == df_2_renamed.df_db_id, df_1.db_type!=df_2_renamed.df_db_type],"inner")
joined_df.select("db_type","df_db_type").show()
+-------+----------+
|db_type|df_db_type|
+-------+----------+
|CloudDB| RDBMS|
+-------+----------+