This tutorial will explain and list multiple attributes that can used within option/options function to define how read operation should behave and how contents of datasource should be interpreted. Most of the attributes listed below can be used in either of the function. The attributes are passed as string in option() function but not in options() function. This difference will be cleared in the example section.
Key |
Possible Value |
Description |
header |
True | False |
To use the first line of a file as names of columns. Default value of this option is False. |
delimiter |
Any single/multi character field separator |
To specify single / multiple character(s) as a separator for each column/field. |
sep |
Any single character field separator |
To specify single character as a separator for each column/field. |
encoding |
UTF-8,UTF-16, etc |
To decode the CSV files by the given encoding type. Default encoding is UTF-8 |
lineSep |
Any single character line separator |
To define the line separator that should be used for file parsing. Default value of this option is False. |
pathGlobFilter |
File pattern |
To define pattern to read files only with filenames matching the pattern. |
recursiveFileLookup |
True | False |
To recursively scan a directory to read files. Default value of this option is False. |
codec | compression |
none, bzip2, gzip, lz4, snappy and deflate |
To compress CSV or other delimited files using passed compression method. Libraries should be available before using compression method other than gzip. |
quoteAll |
True | False |
To specify whether to quote all fields / columns or not. Default value for this option is False. |
quote |
Single character to be quoted |
To quote fields/columns containing fields where the delimiter / separater can be part of the value. This character To quote all fields when used with quoteAll option. Default value of this option is double quote("). |
path |
File / Directory path to read |
To pass file or directory path where files need to be read or written. |
modifiedBefore |
Timestamp |
To read files that were modified before the specified timestamp. |
modifiedAfter |
Timestamp |
To read files that were modified after the specified timestamp. |
nullValue |
String to replace null |
To replace null values with the string while reading and writing dataframe. |
multiLine |
True | False |
To parse one record per file which may span multiple lines. This attribute will also work on csv or delimited files. Default value of this option is False. |
dateFormat |
Valid date format |
To define a string that indicates a date format. Default format is yyyy-MM-dd. |
timestampFormat |
Valid timestamp format |
To define a string that indicates a timestamp format. Default format is yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX]. |
escape |
Any single character |
To set a single character as escaping character to override default escape character(\). |
mergeSchema |
True | False |
To set whether schemas collected from all Parquet files should be merged or not. Default value is the value stored in spark.sql.parquet.mergeSchema property and this option will override spark.sql.parquet.mergeSchema property. |
Key |
Possible Value |
Description |
driver |
JDBC Driver |
To set JDBC driver for a source being connected through JDBC. |
url |
URL for JDBC connection |
To set URL for a source being connected through JDBC. |
user |
Username for JDBC connection |
To set username for a source being connected through JDBC. |
dbtable |
Tablename for JDBC connection |
To set tablename to be queried from a source being connected through JDBC. |
query |
Custom query for JDBC connection |
To specify custom query that will be executed on a source being connected through JDBC. |
password |
Password for JDBC connection |
To set password for a source being connected through JDBC. Don't hardcode password in source code for production or server deployment. Generally password should be passed as a parameter after being fetched programmatically from password manager. |
lowerBound |
Minimum value of partition columnName |
To specify minimum value of partition columnName used to decide partition range. |
upperBound |
Maximum value of partition columnName |
To specify maximum value of partition columnName used to decide partition range. |
partitionColumn |
Partition column name |
To specify the name of a column that will be used for partitioning. |
numPartitions |
Number of partitions |
To specify number of partitions for the dataframe to be read from JDBC source. This, along with lowerBound (inclusive) and upperBound (exclusive), form partition range for generated WHERE clause expressions used to split the partitioned column evenly. |
fetchSize |
Number of rows to pull at once |
To pass hint to the JDBC driver as to fetch these many rows in one go from the database. |
Key |
Possible Value |
Description |
kafka.bootstrap.servers |
hostname1:port,hostname2:port |
To specify Kafka bootstrap servers to read data from Kafka topics. |
subscribe |
topic_1,topic_2 |
To specify topic names (comma separated in case of multiple topics) to read data from Kafka topics. |
subscribePattern |
Number of rows to pull at once |
To specify topic name pattern to read data from Kafka topics. |
startingOffsets |
Nested JSON to specify particular offset |
To specify nested JSON to define particular offset or to read from the beginning using "earliest" keyword. |
endingOffsets |
Number of rows to pull at once |
To specify nested JSON to define particular offset or to read till the end using "latest" keyword. |
df=spark.read.option("header",True).csv("file:///path_to_file/tutorial_file_with_header.txt")
df.show()
+-----+---------+-------+
|db_id| db_name|db_type|
+-----+---------+-------+
| 12| Teradata| RDBMS|
| 14|Snowflake|CloudDB|
| 15| Vertica| RDBMS|
| 17| Oracle| RDBMS|
| 19| MongoDB| NOSQL|
+-----+---------+-------+
df=spark.read.options(header=True).csv("file:///path_to_file/tutorial_file_with_header.txt")
df.show()
+-----+---------+-------+
|db_id| db_name|db_type|
+-----+---------+-------+
| 12| Teradata| RDBMS|
| 14|Snowflake|CloudDB|
| 15| Vertica| RDBMS|
| 17| Oracle| RDBMS|
| 19| MongoDB| NOSQL|
+-----+---------+-------+
df=spark.read.option("header",True).option("delimiter","|").csv("file:///path_to_file/delimited_file_with_header.txt")
df=spark.read.options(header=True, delimiter="|").csv("file:///path_to_file/tutorial_file_with_header.txt")
df=spark.read.option("header",True).option("delimiter", "|").option("lineSep","\t").csv("file:///path_to_file/delimited_file_with_header_and_line_separator.txt")
df=spark.read.options(header=True, delimiter="|",lineSep="\t").csv("file:///path_to_file/delimited_file_with_header_and_line_separator.txt")
df=spark.read.option("pathGlobFilter","*.json").json("file:///path_to_file/data_files")
df=spark.read.options(pathGlobFilter= "*.json").json("file:///path_to_file/data_files")
df=spark.read.option("pathGlobFilter", "*.json").option("recursiveFileLookup",True).json("file:///path_to_file/data_files")
df=spark.read.options(pathGlobFilter= "*.json",recursiveFileLookup=True).json("file:///path_to_file/data_files")
df.write.option("codec","gzip").csv("file:///path_to_directory/codec_test")
df.write.options(codec= "gzip").json("file:///path_to_file/data_files")
df.write.option("quoteAll",True).csv("file:///path_to_directory/quoteAll_test")
****Content of written file*****
"12","Teradata","RDBMS"
"14","Snowflake","CloudDB"
"15","Vertica","RDBMS"
"17","Oracle","RDBMS"
"19","MongoDB","NOSQL"
"20","","NOSQL"
***************************
df.write.options(quoteAll=True).csv("file:///path_to_directory/quoteAll_test")
****Content of written file*****
"12","Teradata","RDBMS"
"14","Snowflake","CloudDB"
"15","Vertica","RDBMS"
"17","Oracle","RDBMS"
"19","MongoDB","NOSQL"
"20","","NOSQL"
***************************
df.write.option("quoteAll",True).option("quote","*").csv("file:///path_to_directory/quote_test")
****Content of written file*****
*12*,*Teradata*,*RDBMS*
*14*,*Snowflake*,*CloudDB*
*15*,*Vertica*,*RDBMS*
*17*,*Oracle*,*RDBMS*
*19*,*MongoDB*,*NOSQL*
***************************
df.write.options(quoteAll=True, quote="*").csv("file:///path_to_directory/quote_test1")
****Content of written file*****
*12*,*Teradata*,*RDBMS*
*14*,*Snowflake*,*CloudDB*
*15*,*Vertica*,*RDBMS*
*17*,*Oracle*,*RDBMS*
*19*,*MongoDB*,*NOSQL*
***************************
df=spark.read.option("modifiedBefore","2022-02-27T05:30:00").json("file:///path_to_file/data_files/")
df=spark.read.options(modifiedBefore="2022-02-27T05:30:00").json("file:///path_to_file/data_files/")
df=spark.read.option("modifiedAfter","2022-02-27T05:30:00").json("file:///path_to_file/data_files/")
df=spark.read.options(modifiedAfter="2022-02-27T05:30:00").json("file:///path_to_file/data_files/")
df.write.option("nullValue","**").csv("file:///path_to_directory/nullvalue_test")
df.write.options(nullValue="**").csv("file:///path_to_directory/nullvalue_test1")
df=spark.read.option("multiline",True).csv("file:///path_to_file/json_multiline_file.json")
df.show()
+-----+--------+-------+
|db_id| db_name|db_type|
+-----+--------+-------+
| 12|Teradata| RDBMS|
+-----+--------+-------+
df=spark.read.options(multiline=True).json("file:///path_to_file/json_multiline_file.json")
df.show()
+-----+--------+-------+
|db_id| db_name|db_type|
+-----+--------+-------+
| 12|Teradata| RDBMS|
+-----+--------+-------+
df_multiline = spark.read.option("delimiter","|").option("header",True).option("Multiline",True).csv("file:///path_to_file/multiline_file_with_header.txt")
df_multiline.show()
+-----+---------+--------+
|db_id| db_name| db_type|
+-----+---------+--------+
| 12| Teradata|RDBMS
DB|
| 14|Snowflake|
CloudDB|
| 15| Vertica| RDBMS|
| 17|Oracle
DB| RDBMS|
| 19| MongoDB| NOSQL|
+-----+---------+--------+
--Replacing new line character with **
df_multiline.select("db_id",f.regexp_replace(f.col("db_name"),"\n","**").alias("db_name"),f.regexp_replace(f.col("db_type"),"\n","**").alias("db_type")).show()
+-----+----------+---------+
|db_id| db_name| db_type|
+-----+----------+---------+
| 12| Teradata|RDBMS**DB|
| 14| Snowflake|**CloudDB|
| 15| Vertica| RDBMS|
| 17|Oracle**DB| RDBMS|
| 19| MongoDB| NOSQL|
+-----+----------+---------+
df=spark.read.format("json").option("multiline",True).option("path","file:///path_to_file/json_multiline_file.json").load()
df.show()
+-----+--------+-------+
|db_id| db_name|db_type|
+-----+--------+-------+
| 12|Teradata| RDBMS|
+-----+--------+-------+
df=spark.read.format("json").options(multiline=True,path="file:///path_to_file/json_multiline_file.json").load()
df.show()
+-----+--------+-------+
|db_id| db_name|db_type|
+-----+--------+-------+
| 12|Teradata| RDBMS|
+-----+--------+-------+
df=spark.read.format("jdbc").options(driver="org.mariadb.jdbc.Driver", user="tutorial_user", password="user_password",url="jdbc:mysql://singlestore.dbmstutorials.com:3306/tutorial_db",dbtable="emp", partitionColumn="deptno", lowerBound=10,upperBound=30,numPartitions=5).load()
df=spark.read.format("jdbc").options(driver="org.mariadb.jdbc.Driver", user="tutorial_user", password="user_password",url="jdbc:mysql://singlestore.dbmstutorials.com:3306/tutorial_db",query="SELECT * FROM emp WHERE deptno=10").load()
df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "hostname1:port,hostname2:port")
.option("subscribe", "topic_1,topic_2")
.option("startingOffsets", """{"topic_1":{"0":50,"1":-2},"topic_2":{"0":-2}}""")
.option("endingOffsets", """{"topic_1":{"0":75,"1":-1},"topic_2":{"0":-1}}""")
.load()
df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "hostname1:port,hostname2:port")
.option("subscribePattern", "topicname*")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()