There are multiple ways to generate sequence number (incremental number) in Pyspark, this tutorial will explain (with examples) how to generate sequence number using below listed methods.
df = spark.read.csv("file:///path_to_files/csv_with_duplicates_and_nulls.csv",header=True)
df.show()
+-----+---------+-------+
|db_id| db_name|db_type|
+-----+---------+-------+
| 12| Teradata| RDBMS|
| 14|Snowflake|CloudDB|
| 15| Vertica| RDBMS|
| 12| Teradata| RDBMS|
| 22| Mysql| null|
| 50|Snowflake| RDBMS|
| 51| null|CloudDB|
+-----+---------+-------+
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
df_update = df.withColumn("seq_num", row_number().over(Window.orderBy("db_id")))
df_update.show()
+-----+---------+-------+-------+
|db_id| db_name|db_type|seq_num|
+-----+---------+-------+-------+
| 12| Teradata| RDBMS| 1|
| 12| Teradata| RDBMS| 2|
| 14|Snowflake|CloudDB| 3|
| 15| Vertica| RDBMS| 4|
| 22| Mysql| null| 5|
| 50|Snowflake| RDBMS| 6|
| 51| null|CloudDB| 7|
+-----+---------+-------+-------+
from pyspark.sql.functions import monotonically_increasing_id
df.rdd.getNumPartitions() # 1
df_update = df.withColumn("seq_num", monotonically_increasing_id())
df_update.show()
+-----+---------+-------+-------+
|db_id| db_name|db_type|seq_num|
+-----+---------+-------+-------+
| 12| Teradata| RDBMS| 0|
| 14|Snowflake|CloudDB| 1|
| 15| Vertica| RDBMS| 2|
| 12| Teradata| RDBMS| 3|
| 22| Mysql| null| 4|
| 50|Snowflake| RDBMS| 5|
| 51| null|CloudDB| 6|
+-----+---------+-------+-------+
from pyspark.sql.functions import monotonically_increasing_id
df.rdd.getNumPartitions() # 1
df = df.repartition(4)
df.rdd.getNumPartitions() #4
df_update = df.withColumn("seq_num", monotonically_increasing_id())
df_update.show()
+-----+---------+-------+-----------+
|db_id| db_name|db_type| seq_num|
+-----+---------+-------+-----------+
| 22| Mysql| null| 0|
| 12| Teradata| RDBMS| 1|
| 51| null|CloudDB| 8589934592|
| 15| Vertica| RDBMS| 8589934593|
| 14|Snowflake|CloudDB|17179869184|
| 50|Snowflake| RDBMS|25769803776|
| 12| Teradata| RDBMS|25769803777|
+-----+---------+-------+-----------+
from pyspark.sql.functions import monotonically_increasing_id, spark_partition_id
df.rdd.getNumPartitions() # 1
df = df.repartition(4)
df.rdd.getNumPartitions() #4
df_update = df.withColumn("seq_num", monotonically_increasing_id()).withColumn("partition#", spark_partition_id())
df_update.show()
+-----+---------+-------+-----------+----------+
|db_id| db_name|db_type| seq_num|partition#|
+-----+---------+-------+-----------+----------+
| 22| Mysql| null| 0| 0|
| 12| Teradata| RDBMS| 1| 0|
| 51| null|CloudDB| 8589934592| 1|
| 15| Vertica| RDBMS| 8589934593| 1|
| 14|Snowflake|CloudDB|17179869184| 2|
| 50|Snowflake| RDBMS|25769803776| 3|
| 12| Teradata| RDBMS|25769803777| 3|
+-----+---------+-------+-----------+----------+