This tutorial will explain how to find and remove duplicate data /rows from a dataframe with examples using distinct and dropDuplicates functions.
df = spark.read.csv("file:///path_to_files/csv_file_with_duplicates_v1.csv", header=True)
df.show()
+-----+---------+-------+
|db_id| db_name|db_type|
+-----+---------+-------+
| 12| Teradata| RDBMS|
| 14|Snowflake|CloudDB|
| 15| Vertica| RDBMS|
| 12| Teradata| RDBMS|
| 22| Mysql| RDBMS|
| 50|Snowflake| RDBMS|
+-----+---------+-------+
distinct()
df_updated = df.distinct()
df_updated.show()
+-----+---------+-------+
|db_id| db_name|db_type|
+-----+---------+-------+
| 15| Vertica| RDBMS|
| 12| Teradata| RDBMS|
| 22| Mysql| RDBMS|
| 14|Snowflake|CloudDB|
| 50|Snowflake| RDBMS|
+-----+---------+-------+
df_orders=spark.read.table("retail.orders_hive_new").repartition(300)
df_orders.rdd.getNumPartitions()
Output: 300
sqlContext.getConf("spark.sql.shuffle.partitions") # Getting property value
Output: '200'
df_orders.distinct().rdd.getNumPartitions()
Output: 200
df_orders=spark.read.table("retail.orders_hive_new").repartition(300)
df_orders.rdd.getNumPartitions()
Output: 300
sqlContext.getConf("spark.sql.shuffle.partitions") # Getting property value
Output: '200'
sqlContext.setConf("spark.sql.shuffle.partitions",305) # Reseting property value
df_orders.distinct().rdd.getNumPartitions()
Output: 305
dropDuplicates(list of column/columns)
df_updated = df.dropDuplicates()
df_updated.show()
+-----+---------+-------+
|db_id| db_name|db_type|
+-----+---------+-------+
| 22| Mysql| RDBMS|
| 12| Teradata| RDBMS|
| 15| Vertica| RDBMS|
| 14|Snowflake|CloudDB|
| 50|Snowflake| RDBMS|
+-----+---------+-------+
df_updated = df.dropDuplicates(["db_name"])
df_updated.show()
+-----+---------+-------+
|db_id| db_name|db_type|
+-----+---------+-------+
| 22| Mysql| RDBMS|
| 12| Teradata| RDBMS|
| 15| Vertica| RDBMS|
| 14|Snowflake|CloudDB|
+-----+---------+-------+
df_updated = df.dropDuplicates(["db_id", "db_name"])
df_updated.show()
|db_id| db_name|db_type|
+-----+---------+-------+
| 12| Teradata| RDBMS|
| 22| Mysql| RDBMS|
| 50|Snowflake| RDBMS|
| 15| Vertica| RDBMS|
| 14|Snowflake|CloudDB|
+-----+---------+-------+
df_orders=spark.read.table("retail.orders_hive_new").repartition(300)
df_orders.rdd.getNumPartitions()
Output: 300
sqlContext.getConf("spark.sql.shuffle.partitions") # Getting property value
Output: '200'
df_orders.dropDuplicates().rdd.getNumPartitions()
Output: 200
df_orders=spark.read.table("retail.orders_hive_new").repartition(300)
df_orders.rdd.getNumPartitions()
Output: 300
sqlContext.getConf("spark.sql.shuffle.partitions") # Getting property value
Output: '200'
sqlContext.setConf("spark.sql.shuffle.partitions",310) # Reseting property value
df_orders.dropDuplicates().rdd.getNumPartitions()
Output: 310
from pyspark.sql.functions import col
df_updated = df.groupBy(df.columns).count().select(df.columns)
df_updated.show()
+-----+---------+-------+
|db_id| db_name|db_type|
+-----+---------+-------+
| 14|Snowflake|CloudDB|
| 15| Vertica| RDBMS|
| 12| Teradata| RDBMS|
| 50|Snowflake| RDBMS|
| 22| Mysql| RDBMS|
+-----+---------+-------+
from pyspark.sql.functions import col
df_duplicates = df.groupBy(df.columns).count().filter(col("count")>1)
df_duplicates.show()
+-----+--------+-------+-----+
|db_id| db_name|db_type|count|
+-----+--------+-------+-----+
| 12|Teradata| RDBMS| 2|
+-----+--------+-------+-----+
from pyspark.sql.functions import col
df_duplicates = df.groupBy("db_name").count().filter(col("count")>1)
df_duplicates.show()
+---------+-----+
| db_name|count|
+---------+-----+
|Snowflake| 2|
| Teradata| 2|
+---------+-----+