This tutorial will explain various approaches with examples on how to rename an existing column in a dataframe. Below listed topics will be explained with examples on this page, click on item in the below list and it will take you to the respective section of the page:
df = spark.read.csv("file:///path_to_files/csv_file_with_duplicates.csv", header=True)
df.show()
+-----+---------+-------+
|db_id| db_name|db_type|
+-----+---------+-------+
| 12| Teradata| RDBMS|
| 14|Snowflake|CloudDB|
| 15| Vertica| RDBMS|
| 12| Teradata| RDBMS|
| 22| Mysql| RDBMS|
+-----+---------+-------+
df.columns
Output: ['db_id', 'db_name', 'db_type']
withColumnRenamed(existingColumnName, newColumnName)
This function takes 2 string parameters, 1st parameter is the name of the existing column and 2nd parameter is the new name of the column.
df_updated = df.withColumnRenamed("db_type","db_type_cd")
df_updated.show()
+-----+---------+----------+
|db_id| db_name|db_type_cd|
+-----+---------+----------+
| 12| Teradata| RDBMS|
| 14|Snowflake| CloudDB|
| 15| Vertica| RDBMS|
| 12| Teradata| RDBMS|
| 22| Mysql| RDBMS|
+-----+---------+----------+
df_updated = df.withColumnRenamed("db_type_test","db_type_cd")
df_updated.show()
+-----+---------+-------+
|db_id| db_name|db_type|
+-----+---------+-------+
| 12| Teradata| RDBMS|
| 14|Snowflake|CloudDB|
| 15| Vertica| RDBMS|
| 12| Teradata| RDBMS|
| 22| Mysql| RDBMS|
+-----+---------+-------+
from pyspark.sql.functions import col,lit,substring
df_updated = df.select("db_id", "db_name", col("db_type").alias("db_type_cd"))
df_updated.show()
+-----+---------+----------+
|db_id| db_name|db_type_cd|
+-----+---------+----------+
| 12| Teradata| RDBMS|
| 14|Snowflake| CloudDB|
| 15| Vertica| RDBMS|
| 12| Teradata| RDBMS|
| 22| Mysql| RDBMS|
+-----+---------+----------+
#same example as above but by using list
new_col_name="db_type_cd"
existing_col_name="db_type"
column_li = df.columns
if existing_col_name in column_li:
idx = column_li.index(existing_col_name)
column_li.pop(idx)
column_li.insert(idx,col(existing_col_name).alias(new_col_name))
df_updated = df.select(column_li)
df_updated.show()
+-----+---------+----------+
|db_id| db_name|db_type_cd|
+-----+---------+----------+
| 12| Teradata| RDBMS|
| 14|Snowflake| CloudDB|
| 15| Vertica| RDBMS|
| 12| Teradata| RDBMS|
| 22| Mysql| RDBMS|
+-----+---------+----------+
from pyspark.sql.functions import col
df_prefix = df.select([col(column).alias("prefix_"+column) for column in df.columns])
df_prefix.show()
+------------+--------------+--------------+
|prefix_db_id|prefix_db_name|prefix_db_type|
+------------+--------------+--------------+
| 12| Teradata| RDBMS|
| 14| Snowflake| CloudDB|
| 15| Vertica| RDBMS|
| 12| Teradata| RDBMS|
| 22| Mysql| RDBMS|
+------------+--------------+--------------+
from pyspark.sql.functions import col
df_suffix = df.select([col(column).alias(column+"suffix") for column in df.columns])
df_suffix.show()
+------------+--------------+--------------+
|suffix_db_id|suffix_db_name|suffix_db_type|
+------------+--------------+--------------+
| 12| Teradata| RDBMS|
| 14| Snowflake| CloudDB|
| 15| Vertica| RDBMS|
| 12| Teradata| RDBMS|
| 22| Mysql| RDBMS|
+------------+--------------+--------------+