Error: pyspark.sql.utils.AnalysisException: Only one generator allowed per select clause but found 2: explode(_2), explode(_3)
Dataset 1:
data_list = [(1, [2,3,4], [5,6,7]), (2,[3,4,5], [6,7,8])]
df= spark.createDataFrame( sc.parallelize(data_list) )
df.show()
+---+---------+---------+
| _1| _2| _3|
+---+---------+---------+
| 1|[2, 3, 4]|[5, 6, 7]|
| 2|[3, 4, 5]|[6, 7, 8]|
+---+---------+---------+
Dataset 2:
data_list_2 = [(1, [2,3,4], [5,6,7]), (2,[], [6,7,8]), (3, [4,5,6], [])]
df_2= spark.createDataFrame( sc.parallelize(data_list_2) )
df_2.show()
+---+---------+---------+
| _1| _2| _3|
+---+---------+---------+
| 1|[2, 3, 4]|[5, 6, 7]|
| 2| []|[6, 7, 8]|
| 3|[4, 5, 6]| []|
+---+---------+---------+
from pyspark.sql.functions import arrays_zip, explode
arrays_zip(*array_cols)
from pyspark.sql.functions import arrays_zip, explode
exp_step1_df = df.select("_1",explode(arrays_zip("_2","_3")))
exp_step1_df.show()
+---+------+
| _1| col|
+---+------+
| 1|[2, 5]|
| 1|[3, 6]|
| 1|[4, 7]|
| 2|[3, 6]|
| 2|[4, 7]|
| 2|[5, 8]|
+---+------+
exp_step2_df = exp_step1_df.select("_1","col._2","col._3")
exp_step2_df.show()
+---+---+---+
| _1| _2| _3|
+---+---+---+
| 1| 2| 5|
| 1| 3| 6|
| 1| 4| 7|
| 2| 3| 6|
| 2| 4| 7|
| 2| 5| 8|
+---+---+---+
from pyspark.sql.functions import zip_with, concat_ws, explode, substring_index
zip_with(array_1, array_2, function)
from pyspark.sql.functions import zip_with, concat_ws, explode, substring_index
exp_step1_df = df.select("_1", zip_with("_2", "_3", lambda x, y: concat_ws("|*|" , x, y) ).alias("join_col"))
exp_step1_df.show(truncate=False)
+---+---------------------+
|_1 |join_col |
+---+---------------------+
|1 |[2|*|5, 3|*|6, 4|*|7]|
|2 |[3|*|6, 4|*|7, 5|*|8]|
+---+---------------------+
exp_step2_df = exp_step1_df.select("_1", explode("join_col"))
exp_step2_df.show()
+---+-----+
| _1| col|
+---+-----+
| 1|2|*|5|
| 1|3|*|6|
| 1|4|*|7|
| 2|3|*|6|
| 2|4|*|7|
| 2|5|*|8|
+---+-----+
exp_step3_df = exp_step2_df.select("_1", substring_index("col","|*|",1).alias("_2"), substring_index("col","|*|",-1).alias("_3"))
exp_step3_df.show()
+---+---+---+
| _1| _2| _3|
+---+---+---+
| 1| 2| 5|
| 1| 3| 6|
| 1| 4| 7|
| 2| 3| 6|
| 2| 4| 7|
| 2| 5| 8|
+---+---+---+
from pyspark.sql.functions import zip_with, concat_ws, explode, substring_index, coalesce, lit
exp_step1_df = df_2.select("_1", zip_with("_2", "_3", lambda x, y: concat_ws("|*|" , coalesce(x,lit("")), coalesce(y,lit(""))) ).alias("join_col"))
exp_step1_df.show(truncate=False)
+---+---------------------+
|_1 |join_col |
+---+---------------------+
|1 |[2|*|5, 3|*|6, 4|*|7]|
|2 |[|*|6, |*|7, |*|8] |
|3 |[4|*|, 5|*|, 6|*|] |
+---+---------------------+
exp_step2_df = exp_step1_df.select("_1", explode("join_col"))
exp_step2_df.show()
+---+-----+
| _1| col|
+---+-----+
| 1|2|*|5|
| 1|3|*|6|
| 1|4|*|7|
| 2| |*|6|
| 2| |*|7|
| 2| |*|8|
| 3| 4|*||
| 3| 5|*||
| 3| 6|*||
+---+-----+
exp_step3_df = exp_step2_df.select("_1", substring_index("col","|*|",1).alias("_2"), substring_index("col","|*|",-1).alias("_3"))
exp_step3_df.show()
+---+---+---+
| _1| _2| _3|
+---+---+---+
| 1| 2| 5|
| 1| 3| 6|
| 1| 4| 7|
| 2| | 6|
| 2| | 7|
| 2| | 8|
| 3| 4| |
| 3| 5| |
| 3| 6| |
+---+---+---+
from pyspark.sql.functions import posexplode
exp_step1_df = df.select("_1", posexplode("_2"))
exp_step1_df.show(truncate=False)
+---+---+---+
|_1 |pos|col|
+---+---+---+
|1 |0 |2 |
|1 |1 |3 |
|1 |2 |4 |
|2 |0 |3 |
|2 |1 |4 |
|2 |2 |5 |
+---+---+---+
exp_step2_df = df.select("_1", posexplode("_3"))
exp_step2_df.show()
+---+---+---+
| _1|pos|col|
+---+---+---+
| 1| 0| 5|
| 1| 1| 6|
| 1| 2| 7|
| 2| 0| 6|
| 2| 1| 7|
| 2| 2| 8|
+---+---+---+
exp_step3_df = exp_step1_df.join( exp_step2_df,( ( exp_step1_df._1 == exp_step2_df._1 ) & ( exp_step1_df.pos == exp_step2_df.pos)))
exp_step3_df.show()
+---+---+---+---+---+---+
| _1|pos|col| _1|pos|col|
+---+---+---+---+---+---+
| 1| 2| 4| 1| 2| 7|
| 2| 2| 5| 2| 2| 8|
| 1| 1| 3| 1| 1| 6|
| 2| 1| 4| 2| 1| 7|
| 1| 0| 2| 1| 0| 5|
| 2| 0| 3| 2| 0| 6|
+---+---+---+---+---+---+
exp_step4_df = exp_step3_df.select(exp_step1_df._1, exp_step1_df.col.alias("_2"), exp_step2_df.col.alias("_3"))
exp_step4_df.show()
+---+---+---+
| _1| _2| _3|
+---+---+---+
| 1| 4| 7|
| 2| 5| 8|
| 1| 3| 6|
| 2| 4| 7|
| 1| 2| 5|
| 2| 3| 6|
+---+---+---+