/** * 業務場景:數據去重問題 * Created by YJ on 2017/2/7. * 統計數據,盡量用reduceByKey,不要用groupByKey,優化點 * reduceByKey,在本機suffle后,再發送一個總map,發送到一個總機器上匯總,(匯總要壓力小) * groupByKey,發送本機所有的map,在一個機器上匯總(匯總壓力大) */ /*
數據格式 flie1: 2012-3-1 a 2012-3-2 b 2012-3-3 c 2012-3-4 d 2012-3-5 a 2012-3-6 b 2012-3-7 c 2012-3-3 c flie2: 2012-3-1 b 2012-3-2 a 2012-3-3 b 2012-3-4 d 2012-3-5 a 2012-3-6 c 2012-3-7 d 2012-3-3 c */
package ClassicCaseimport org.apache.spark.{SparkConf, SparkContext}object case2 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local").setAppName("reduce") val sc = new SparkContext(conf) sc.setLogLevel("ERROR") //獲取數據 val two = sc.textFile("hdfs://192.168.109.130:8020//user/flume/ClassicCase/case2/*") two.filter(_.trim().length>0) //需要有空格。 .map(line=>(line.trim,""))//全部值當key,(key value,"") .groupByKey()//groupByKey,過濾重復的key value ,發送到總機器上匯總 .sortByKey() //按key value的自然順序排序 .keys.collect().foreach(PRintln) //所有的keys變成數組再輸出 //第二種有風險 two.filter(_.trim().length>0) .map(line=>(line.trim,"1")) .distinct() .reduceByKey(_+_) .sortByKey() .foreach(println) //reduceByKey,在本機suffle后,再發送一個總map,發送到一個總機器上匯總,(匯總要壓力小) //groupByKey,發送本機所有的map,在一個機器上匯總(匯總壓力大) //如果數據在不同的機器上,則會出現先重復數據,distinct,reduceBykey,只是在本機上去重,謹慎一點的話,在reduceByKey后面需要加多一個distinct }}輸出結果 2012-3-1 a 2012-3-1 b 2012-3-2 a 2012-3-2 b 2012-3-3 b 2012-3-3 c 2012-3-4 d 2012-3-5 a 2012-3-6 b 2012-3-6 c 2012-3-7 c 2012-3-7 d (2012-3-1 a,1) (2012-3-1 b,1) (2012-3-2 a,1) (2012-3-2 b,1) (2012-3-3 b,1) (2012-3-3 c,1) (2012-3-4 d,1) (2012-3-5 a,1) (2012-3-6 b,1) (2012-3-6 c,1) (2012-3-7 c,1) (2012-3-7 d,1)
(1)當采用reduceByKeyt時,Spark可以在每個分區移動數據之前將待輸出數據與一個共用的key結合。借助下圖可以理解在reduceByKey里究竟發生了什么。 注意在數據對被搬移前同一機器上同樣的key是怎樣被組合的(reduceByKey中的lamdba函數)。然后lamdba函數在每個區上被再次調用來將所有值reduce成一個最終結果。整個過程如下:
(2)當采用groupByKey時,由于它不接收函數,spark只能先將所有的鍵值對(key-value pair)都移動,這樣的后果是集群節點之間的開銷很大,導致傳輸延時。整個過程如下:
( 3 )區別 reduceByKey,在本機suffle后,再發送一個總map,發送到一個總機器上suffle匯總map,(匯總要壓力小) groupByKey,發送本機所有的map,在一個機器上suffle匯總map(匯總壓力大)
因此,在對大數據進行復雜計算時,reduceByKey優于groupByKey。 另外,如果僅僅是group處理,那么以下函數應該優先于 groupByKey : (1)、combineByKey 組合數據,但是組合之后的數據類型與輸入時值的類型不一樣。 (2)、foldByKey合并每一個 key 的所有值,在級聯函數和“零值”中使用。
新聞熱點
疑難解答