PySpark set operators provide ways to combine similar datasets from two dataframes into a single dataframe. There are many SET operators available in Spark and most of those work in similar way as the mathematical SET operations. These can also be used to compare 2 tables.
Following functions will be covered on this pages, click on item in the below list and it will take you to the respective section of the page:
df_1 = spark.read.option("header",True).csv("file:///path_to_file/set_example_file_1.csv")
df_2 = spark.read.option("header",True).csv("file:///path_to_file/set_example_file_2.csv")
df_1.show()
+-----+---------+-------+
|db_id| db_name|db_type|
+-----+---------+-------+
| 12| Teradata| RDBMS|
| 14|Snowflake|CloudDB|
| 15| Vertica| RDBMS|
| 17| Oracle| RDBMS|
| 17| Oracle| RDBMS|
| 19| MongoDB| NOSQL|
| 19| MongoDB| NOSQL|
+-----+---------+-------+
df_2.show()
+-----+-----------+-------+
|db_id| db_name|db_type|
+-----+-----------+-------+
| 17| Oracle| RDBMS|
| 19| MongoDB| NOSQL|
| 21|SingleStore| RDBMS|
| 22| Mysql| RDBMS|
| 14| Snowflake| RDBMS|
| 17| Oracle| RDBMS|
+-----+-----------+-------+
df_1.intersect(df_2).show()
+-----+-------+-------+
|db_id|db_name|db_type|
+-----+-------+-------+
| 17| Oracle| RDBMS|
| 19|MongoDB| NOSQL|
+-----+-------+-------+
df_2.intersect(df_1).show()
+-----+-------+-------+
|db_id|db_name|db_type|
+-----+-------+-------+
| 17| Oracle| RDBMS|
| 19|MongoDB| NOSQL|
+-----+-------+-------+
df_1.intersectAll(df_2).show()
+-----+-------+-------+
|db_id|db_name|db_type|
+-----+-------+-------+
| 17| Oracle| RDBMS|
| 17| Oracle| RDBMS|
| 19|MongoDB| NOSQL|
+-----+-------+-------+
df_1.subtract(df_2).show()
+-----+---------+-------+
|db_id| db_name|db_type|
+-----+---------+-------+
| 15| Vertica| RDBMS|
| 12| Teradata| RDBMS|
| 14|Snowflake|CloudDB|
+-----+---------+-------+
df_2.subtract(df_1).show()
+-----+-----------+-------+
|db_id| db_name|db_type|
+-----+-----------+-------+
| 14| Snowflake| RDBMS|
| 22| Mysql| RDBMS|
| 21|SingleStore| RDBMS|
+-----+-----------+-------+
df_1.exceptAll(df_2).show()
+-----+---------+-------+
|db_id| db_name|db_type|
+-----+---------+-------+
| 15| Vertica| RDBMS|
| 12| Teradata| RDBMS|
| 19| MongoDB| NOSQL|
| 14|Snowflake|CloudDB|
+-----+---------+-------+
df_2.exceptAll(df_1).show()
+-----+-----------+-------+
|db_id| db_name|db_type|
+-----+-----------+-------+
| 14| Snowflake| RDBMS|
| 22| Mysql| RDBMS|
| 21|SingleStore| RDBMS|
+-----+-----------+-------+
df_1.union(df_2).show()
+-----+-----------+-------+
|db_id| db_name|db_type|
+-----+-----------+-------+
| 12| Teradata| RDBMS|
| 14| Snowflake|CloudDB|
| 15| Vertica| RDBMS|
| 17| Oracle| RDBMS|
| 17| Oracle| RDBMS|
| 19| MongoDB| NOSQL|
| 19| MongoDB| NOSQL|
| 17| Oracle| RDBMS|
| 19| MongoDB| NOSQL|
| 21|SingleStore| RDBMS|
| 22| Mysql| RDBMS|
| 14| Snowflake| RDBMS|
| 17| Oracle| RDBMS|
+-----+-----------+-------+
df_1.unionAll(df_2).show()
+-----+-----------+-------+
|db_id| db_name|db_type|
+-----+-----------+-------+
| 12| Teradata| RDBMS|
| 14| Snowflake|CloudDB|
| 15| Vertica| RDBMS|
| 17| Oracle| RDBMS|
| 17| Oracle| RDBMS|
| 19| MongoDB| NOSQL|
| 19| MongoDB| NOSQL|
| 17| Oracle| RDBMS|
| 19| MongoDB| NOSQL|
| 21|SingleStore| RDBMS|
| 22| Mysql| RDBMS|
| 14| Snowflake| RDBMS|
| 17| Oracle| RDBMS|
+-----+-----------+-------+
unionByName(2nd dataframe, allowMissingColumns=False)
df_3 =spark.createDataFrame(sc.parallelize([('A', 'P', 'P')])).toDF("col1", "col2", "col3")
df_4 = spark.createDataFrame(sc.parallelize([('L', 'E', 'S')])).toDF("col2", "col1", "col3")
df_3.unionByName(df_4).show()
+----+----+----+
|col1|col2|col3|
+----+----+----+
| A| P| P|
| E| L| S|
+----+----+----+
df_5 =spark.createDataFrame(sc.parallelize([('A', 'P', 'P')])).toDF("col1", "col2", "col3")
df_6 = spark.createDataFrame(sc.parallelize([('L', 'E', 'S')])).toDF("col2", "col1", "col4")
df_5.unionByName(df_6).show()
pyspark.sql.utils.AnalysisException: Cannot resolve column name "col3" among (col2, col1, col4);
df_5 =spark.createDataFrame(sc.parallelize([('A', 'P', 'P')])).toDF("col1", "col2", "col3")
df_6 = spark.createDataFrame(sc.parallelize([('L', 'E', 'S')])).toDF("col2", "col1", "col4")
df_5.unionByName(df_6, True).show()
+----+----+----+----+
|col1|col2|col3|col4|
+----+----+----+----+
| A| P| P|null|
| E| L|null| S|
+----+----+----+----+
df_1.unionByName(df_2).show()
+-----+-----------+-------+
|db_id| db_name|db_type|
+-----+-----------+-------+
| 12| Teradata| RDBMS|
| 14| Snowflake|CloudDB|
| 15| Vertica| RDBMS|
| 17| Oracle| RDBMS|
| 17| Oracle| RDBMS|
| 19| MongoDB| NOSQL|
| 19| MongoDB| NOSQL|
| 17| Oracle| RDBMS|
| 19| MongoDB| NOSQL|
| 21|SingleStore| RDBMS|
| 22| Mysql| RDBMS|
| 14| Snowflake| RDBMS|
| 17| Oracle| RDBMS|
+-----+-----------+-------+
df_3 =spark.createDataFrame(sc.parallelize([('A', 'P', 'P')])).toDF("col1", "col2", "col3")
df_4 = spark.createDataFrame(sc.parallelize([('L', 'E', 'S')])).toDF("col2", "col1", "col3")
-> df_3.union(df_4).show()
+----+----+----+
|col1|col2|col3|
+----+----+----+
| A| P| P|
| L| E| S|
+----+----+----+
-> df_3.unionByName(df_4).show()
+----+----+----+
|col1|col2|col3|
+----+----+----+
| A| P| P|
| E| L| S|
+----+----+----+
df_3 =spark.createDataFrame(sc.parallelize([('A', 'P', 'P')])).toDF("col1", "col2", "col3")
df_4 = spark.createDataFrame(sc.parallelize([('L', 'E', 'S')])).toDF("col2", "col1", "col3")
-> df_3.union(df_4).show()
+----+----+----+
|col1|col2|col3|
+----+----+----+
| A| P| P|
| L| E| S|
+----+----+----+
-> df_5.unionByName(df_6, True).show()
+----+----+----+----+
|col1|col2|col3|col4|
+----+----+----+----+
| A| P| P|null|
| E| L|null| S|
+----+----+----+----+