This tutorial will explain how to read various types of comma separated value(CSV) files or other delimited files into Spark dataframe.
df = spark.read.csv("file:///path_to_file/tutorial_file.txt")
df.show()
+---+---------+-------+
|_c0| _c1| _c2|
+---+---------+-------+
| 12| Teradata| RDBMS|
| 14|Snowflake|CloudDB|
| 15| Vertica| RDBMS|
| 17| Oracle| RDBMS|
| 19| MongoDB| NOSQL|
+---+---------+-------+
df = spark.read.csv("file:///path_to_file/tutorial_file.txt").toDF("db_id", "db_name","db_type")
df.show()
+-----+---------+-------+
|db_id| db_name|db_type|
+-----+---------+-------+
| 12| Teradata| RDBMS|
| 14|Snowflake|CloudDB|
| 15| Vertica| RDBMS|
| 17| Oracle| RDBMS|
| 19| MongoDB| NOSQL|
+-----+---------+-------+
df=spark.read.options(header=True).csv("file:///path_to_file/tutorial_file_with_header.txt")
df.show()
+-----+---------+-------+
|db_id| db_name|db_type|
+-----+---------+-------+
| 12| Teradata| RDBMS|
| 14|Snowflake|CloudDB|
| 15| Vertica| RDBMS|
| 17| Oracle| RDBMS|
| 19| MongoDB| NOSQL|
+-----+---------+-------+
df=spark.read.options(delimiter="\t").csv("file:///path_to_file/tab_delimited_file.txt")
df.show()
+---+---------+-------+
|_c0| _c1| _c2|
+---+---------+-------+
| 12| Teradata| RDBMS|
| 14|Snowflake|CloudDB|
| 15| Vertica| RDBMS|
| 17| Oracle| RDBMS|
| 19| MongoDB| NOSQL|
| 21| Tab| Test|
+---+---------+-------+
df=spark.read.options(header=True, delimiter="|").csv("file:///path_to_file/pipe_delimited_file.txt")
df.show()
+-----+---------+-------+
|db_id| db_name|db_type|
+-----+---------+-------+
| 12| Teradata| RDBMS|
| 14|Snowflake|CloudDB|
| 15| Vertica| RDBMS|
| 17| Oracle| RDBMS|
| 19| MongoDB| NOSQL|
| 21| Pipe| Test|
+-----+---------+-------+
cat -v /path_to_file/ctrl_a_delimited_file.txt
**********File content**********
db_id^Adb_name^Adb_type
12^ATeradata^ARDBMS
14^ASnowflake^ACloudDB
********************************
df=spark.read.options(header=True, delimiter="\01").csv("file:///path_to_file/ctrl_a_delimited_file.txt")
df.show()
+-----+---------+-------+
|db_id| db_name|db_type|
+-----+---------+-------+
| 12| Teradata| RDBMS|
| 14|Snowflake|CloudDB|
| 15| Vertica| RDBMS|
| 17| Oracle| RDBMS|
| 19| MongoDB| NOSQL|
| 21| CtrlA| Test|
+-----+---------+-------+
cat /path_to_file/multichar_delimited_file.txt
**********File content**********
db_id|*|db_name|*|db_type
12|*|Teradata|*|RDBMS
14|*|Snowflake|*|CloudDB
********************************
df=spark.read.options(header=True, delimiter="|*|").csv("file:///path_to_file/multichar_delimited_file.txt")
df.show()
+-----+----------------+-------+
|db_id| db_name|db_type|
+-----+----------------+-------+
| 12| Teradata| RDBMS|
| 14| Snowflake|CloudDB|
| 15| Vertica| RDBMS|
| 17| Oracle| RDBMS|
| 19| MongoDB| NOSQL|
| 21|PipeAsteriskPipe| Test|
+-----+----------------+-------+
df_multiline = spark.read.option("delimiter","|").option("header",True).option("Multiline",True).csv("file:///path_to_directory/multiline_file_with_header.txt")
df_multiline.show()
+-----+---------+--------+
|db_id| db_name| db_type|
+-----+---------+--------+
| 12| Teradata|RDBMS
DB|
| 14|Snowflake|
CloudDB|
| 15| Vertica| RDBMS|
| 17|Oracle
DB| RDBMS|
| 19| MongoDB| NOSQL|
+-----+---------+--------+
--Replacing new line character with **
df_multiline.select("db_id",f.regexp_replace(f.col("db_name"),"\n","**").alias("db_name"),f.regexp_replace(f.col("db_type"),"\n","**").alias("db_type")).show()
+-----+----------+---------+
|db_id| db_name| db_type|
+-----+----------+---------+
| 12| Teradata|RDBMS**DB|
| 14| Snowflake|**CloudDB|
| 15| Vertica| RDBMS|
| 17|Oracle**DB| RDBMS|
| 19| MongoDB| NOSQL|
+-----+----------+---------+
df = spark.read.csv("hdfs://localhost:9000/user/hive/warehouse/retail.db/orders")
df.show(5,truncate=False)
+-----+---------------------+-----+---------------+
|_c0 |_c1 |_c2 |_c3 |
+-----+---------------------+-----+---------------+
|68850|2014-05-25 00:00:00.0|8451 |COMPLETE |
|68851|2014-05-26 00:00:00.0|11193|PENDING_PAYMENT|
|68852|2014-05-29 00:00:00.0|4596 |CLOSED |
|68853|2014-05-31 00:00:00.0|1202 |PENDING_PAYMENT|
|68854|2014-06-01 00:00:00.0|6528 |ON_HOLD |
+-----+---------------------+-----+---------------+
df = spark.read.csv("file:///path_to_file/data_files/csv")
df.show()
+---+-----------+-------+
|_c0| _c1| _c2|
+---+-----------+-------+
| 12| Teradata| RDBMS|
| 14| Snowflake|CloudDB|
| 15| Vertica| RDBMS|
| 17| Oracle| RDBMS|
| 19| MongoDB| NOSQL|
| 17| Oracle| RDBMS|
| 19| MongoDB| NOSQL|
| 21|SingleStore| RDBMS|
| 22| Mysql| RDBMS|
+---+-----------+-------+
df = spark.read.options(pathGlobFilter= "*.csv").csv("file:///path_to_file/data_files")
df.show()
+---+-----------+-------+
|_c0| _c1| _c2|
+---+-----------+-------+
| 12| Teradata| RDBMS|
| 14| Snowflake|CloudDB|
| 15| Vertica| RDBMS|
| 17| Oracle| RDBMS|
| 19| MongoDB| NOSQL|
| 17| Oracle| RDBMS|
| 19| MongoDB| NOSQL|
| 21|SingleStore| RDBMS|
| 22| Mysql| RDBMS|
+---+-----------+-------+
df = spark.read.options(pathGlobFilter= "*.csv", recursiveFileLookup=True).csv("file:///path_to_file/data_files")
df.show()
+---+-----------+-------+
|_c0| _c1| _c2|
+---+-----------+-------+
| 12| Teradata| RDBMS|
| 14| Snowflake|CloudDB|
| 15| Vertica| RDBMS|
| 17| Oracle| RDBMS|
| 19| MongoDB| NOSQL|
| 12| Teradata| RDBMS|
| 14| Snowflake|CloudDB|
| 15| Vertica| RDBMS|
| 17| Oracle| RDBMS|
| 19| MongoDB| NOSQL|
| 17| Oracle| RDBMS|
| 19| MongoDB| NOSQL|
| 21|SingleStore| RDBMS|
| 22| Mysql| RDBMS|
| 17| Oracle| RDBMS|
| 19| MongoDB| NOSQL|
| 21|SingleStore| RDBMS|
| 22| Mysql| RDBMS|
+---+-----------+-------+
from pyspark.sql.types import StructType # imported StructType
schema_def = StructType() # Created a StructType object
schema_def.add("db_id","integer",True) # Adding column 1 to StructType
schema_def.add("db_name","string",True) # Adding column 2 to StructType
schema_def.add("db_type_cd","string",True) # Adding column 3 to StructType
df_with_schema = spark.read.csv("file:///path_to_files/tutorial_file_with_header.csv", schema=schema_def, header=True)
df_with_schema.printSchema()
root
|-- db_id: integer (nullable = true)
|-- db_name: string (nullable = true)
|-- db_type_cd: string (nullable = true)