Function aggregateByKey is one of the aggregate function (Others are reduceByKey & groupByKey) available in Spark. This is the only aggregation function which allows multiple type of aggregation(Maximun, minimun, average, sum & count) at the same time. People find it hard to understand this function initially but will try to explain the function in simpler way.
scala> val orderItemMap = orderItemRDD.map(orderItem => (1,orderItem.split(",")(4).toFloat)) orderItemMap: org.apache.spark.rdd.RDD[(Int, Float)] = MapPartitionsRDD[27] at map at:25 scala> orderItemMap.getNumPartitions res23: Int = 2
scala> val orderItemMap = orderItemRDD.map(orderItem => (1,orderItem.split(",")(4).toFloat)).repartition(1) orderItemMap: org.apache.spark.rdd.RDD[(Int, Float)] = MapPartitionsRDD[13] at repartition at:25 scala> orderItemMap.getNumPartitions res7: Int = 1
dataframeRDD.aggregateByKey(init_value)(combinerFunc,reduceFunc)
val orderItemRDD = sc.textFile("file:///Users/dbmstutorials/spark/orderItem.txt") --Setting partition to 1 val orderItemMap = orderItemRDD.map(orderItem => (1,orderItem.split(",")(4).toFloat)).repartition(1) //5th field in sample file is revenue orderItemMap.getNumPartitions // Check Number of RDD partiton is 1 res1: Int = 1 AggregateByKey using external functions val init_value = (0.0f, 0.0f,0.0f) //Intial Value for Revenue(sum), total items(count) & Average revenue (sum/count) val combinerFunc = (inter:(Float, Float, Float), value:Float) => { (inter._1 + value, inter._2+1,(inter._1+value)/(inter._2+1))} val reduceFunc = (p1:(Float, Float, Float), p2:(Float, Float, Float)) => { (p1._1 + p2._1, p1._2+p2._2, (p1._1 + p2._1)/(p1._2+p2._2)) } val revenueAndCountAndAvg = orderItemMap. aggregateByKey(init_value)(combinerFunc,reduceFunc) AggregateByKey by defining functions internally val revenueAndCountAndAvg = orderItemMap. aggregateByKey((0.0f, 0,0.0f))( (inter, value) => { (inter._1 + value, inter._2+1,(inter._1+value)/(inter._2+1))}, //1st processing adds all the values, 2nd processing counts all records, 3rd processing is sum/count (p1, p2) => { (p1._1 + p2._1, p1._2+p2._2, (p1._1 + p2._1)/(p1._2+p2._2)) } ) revenueAndCountAndAvg.collect.foreach(println)
combinerFunc--> inter: (0.0,0,0.0) value:149.94 combinerFunc--> inter: (149.94,1,149.94) value:499.95 combinerFunc--> inter: (649.89,2,324.945) value:250.0 combinerFunc--> inter: (899.89,3,299.96335) value:127.96 combinerFunc--> inter: (1027.85,4,256.9625) value:399.98 combinerFunc--> inter: (1427.83,5,285.56598) value:99.96 combinerFunc--> inter: (1527.7899,6,254.63165) value:50.0 combinerFunc--> inter: (1577.7899,7,225.39856) value:50.0 combinerFunc--> inter: (1627.7899,8,203.47374) value:199.99 combinerFunc--> inter: (1827.7799,9,203.08665) value:95.97 combinerFunc--> inter: (1923.7499,10,192.37498) value:179.97 combinerFunc--> inter: (2103.72,11,191.24727) value:399.98 combinerFunc--> inter: (2503.7,12,208.64166) value:119.98 combinerFunc--> inter: (2623.68,13,201.82153) value:299.95 combinerFunc--> inter: (2923.63,14,208.8307) value:79.96 combinerFunc--> inter: (3003.5898,15,200.23932) value:99.96 combinerFunc--> inter: (3103.5498,16,193.97186) value:74.97 combinerFunc--> inter: (3178.5198,17,186.97176) value:199.99 combinerFunc--> inter: (3378.5098,18,187.69499) value:239.96 combinerFunc--> inter: (3618.4697,19,190.44577) value:199.99 (1,(3818.4597,20,190.92299))
1st Iteration combinerFunc--> inter: (0.0,0,0.0) value:149.94 Explanation: Next Parameter (1st:inter._1=0.0 + value=149.94=>149.94, 2nd:inter._2=0.0 +1=>1, 3rd: 1st/2nd=>149.94 ) => (149.94,1,149.94) 2nd Iteration combinerFunc--> inter: (149.94,1,149.94) value:499.95 Explanation: Next Parameter (1st:inter._1=149.94 + value=499.95=>649.89, 2nd:inter._2=1 +1=>2, 3rd: 1st/2nd=>324.945 ) => (649.89,2,324.945) 3rd Iteration combinerFunc--> inter: (649.89,2,324.945) value:250.0 Explanation: Next Parameter (1st:inter._1=649.89 + value=250.0=>899.89, 2nd:inter._2=2 +1=>3, 3rd: 1st/2nd=>299.96335 ) => (899.89,3,299.96335)
val orderItemRDD = sc.textFile("file:///Users/dbmstutorials/spark/orderItem.txt") val orderItemMap = orderItemRDD.map(orderItem => (1,orderItem.split(",")(4).toFloat)) //5th field in sample file is revenue scala> orderItemMap.getNumPartitions // Number of RDD partitons are 2 res32: Int = 2 val revenueAndCountAndAvg = orderItemMap. aggregateByKey((0.0f, 0,0.0f))( (inter, value) => { (inter._1 + value, inter._2+1,inter._1/(inter._2+1))}, //1st processing adding all the values, 2nd processing counting all records, 3rd procssing average sum/count (p1, p2) => { (p1._1 + p2._1, p1._2+p2._2, (p1._1 + p2._1)/(p1._2+p2._2)) } ) revenueAndCountAndAvg.take(1).foreach(println) (1,(3818.46,20,190.923))
combinerFunc--> inter: (0.0,0,0.0) value:399.98 combinerFunc--> inter: (0.0,0,0.0) value:149.94 combinerFunc--> inter: (399.98,1,399.98) value:119.98 combinerFunc--> inter: (149.94,1,149.94) value:499.95 combinerFunc--> inter: (519.96,2,259.98) value:299.95 combinerFunc--> inter: (649.89,2,324.945) value:250.0 combinerFunc--> inter: (819.91003,3,273.30334) value:79.96 combinerFunc--> inter: (899.89,3,299.96335) value:127.96 combinerFunc--> inter: (899.87006,4,224.96751) value:99.96 combinerFunc--> inter: (1027.85,4,256.9625) value:399.98 combinerFunc--> inter: (999.8301,5,199.96602) value:74.97 combinerFunc--> inter: (1427.83,5,285.56598) value:99.96 combinerFunc--> inter: (1074.8,6,179.13335) value:199.99 combinerFunc--> inter: (1527.7899,6,254.63165) value:50.0 combinerFunc--> inter: (1274.79,7,182.11287) value:239.96 combinerFunc--> inter: (1577.7899,7,225.39856) value:50.0 combinerFunc--> inter: (1627.7899,8,203.47374) value:199.99 combinerFunc--> inter: (1514.75,8,189.34375) value:199.99 combinerFunc--> inter: (1827.7799,9,203.08665) value:95.97 combinerFunc--> inter: (1923.7499,10,192.37498) value:179.97
reduceFunc--> p1: (2103.72,11,191.24727) p2:(1714.74,9,190.52667)
(1,(3818.46,20,190.923))