Spark helps us enter data analysis environment quickly, so that
還記得幾年前接觸Hadoop的時候,碰到運算分析的問題,都要特別的去考量分散運算的問題,像是有些需要做迭代的演算法(如:基因演算法),作法上因為需要一直大量做Map Reduce,所以都會有效能的問題,因此最後大部分的做法都回到In-Memory上做計算。
一直到去年2014年11月Databricks團隊透過 Apache Spark在國際資料排序基準競賽(Sort Benchmark Competition)(新聞),以不到30分鐘的時間排序高達100 TB的資料量,打破Hadoop的72分鐘,將分散式運算(Distributed Computing)產業的應用領域帶到另一個高峰。Spark源自於加州柏克萊大學的AMPLab,這個實驗室投入各項數據分析的應用已經很多年,像是資料探勘、機器學習、資料庫等等,其中一位成員Matei為Spark主要發展者,目前已經是麻省理工學院的助理教授,同時也是Databricks的技術長。
Spark目前依存在Hadoop Ecosystem裡面,像是Hortonworks (HDP2.2)開始就有包含Spark,而普遍的做法就是搭配Hadoop的Hbase或HDFS使用,搭配的語言包含Java、Scala、Python、R (1.4版本開始支援),這些都是目前資料科學家常使用的語言。
今天的主題並不是主要介紹Spark,而是於2015年8月剛IBM推出的Apache Spark for Bluemix。我們都知道資料科學家需要專注於分析上,因此在基礎設施上需要一個便利的環境讓資料科學家能夠使用。(筆者按:其他雲端服務亦有類似功能,但仍須安裝與設定,包含:Azure、AWS)
而Bluxmix上的Spark就有這樣的潛力,他們把Jupyter Notebooks直接整合在Spark上面,讓資料科學家可以直接進行計算與分析,在儲存方面則採用Swift Object Storage的檔案儲存方式,方便做存取之用。如下圖所示,資料科學家只需要在Jupyter Notebooks上面直接下指令,就可以與資料進行互動,完全不需要擔心基礎設施的問題。
筆者接下來就進行簡單的實作
首先我們必須申請Bluemix的帳號,登入後進入下面網址:
https://console.ng.bluemix.net/catalog/apache-spark-starter/
步驟一、
接著就會看到下圖,接下來選擇右邊的「Apache Spark」,並點選Create進行Deploy。
不到一分鐘,就會看到已經佈署好的Spark,同時他也把Object Storage也整合完成,同時我們也可在Instances與Memory Quota上面直接調整我們想要的Spark分散主機數量,以及每一台的記憶體多寡。最後我們就可以透過上方Routes旁的網址,直接進入Spark的畫面。
步驟三、
進入後,直接點選右邊的Launch即可登入。
步驟四、
進入之後會出現Jupyter Notebooks需要做帳號密碼的輸入,我們可以回到前面步驟二的畫面中找到下圖所示的密碼資訊,它放在Environment Variables裡面的app_pw裡面。
步驟五、
接著我們就可以直接進入Jupyter Notebooks直接做分析!
首先我們到國際氣候數據網站(National Climatic Data website)進行天氣資料下載,如下圖所示。
使用ZIP將檔案進行解壓縮,得到2015.csv檔案,因為檔案內容相當大,透過Excel打開會很久,筆者這邊是使用Sublime Text把檔案打開,在第一行加入「STATION, DATE, METRIC, VALUE, C5, C6, C7, C8」,如下圖所示:
步驟七、
回到步驟三的頁面,點選右下角的Manage Files,我們要把csv檔案上傳到Bluemix Object Storage,如下圖紅框所示:
步驟八、
Object Storage的密碼與Jupyter Notebooks登入密碼相同,我們輸入後就到右邊「+」的符號進行新增資料,並填入ClimateDataForTutorial,建立內容,完成後就會如下圖這樣呈現:
步驟九、
點選我們建立的ClimateDataForTutorial,上傳資料集,如下圖所示:
步驟十、
從這個階段開始,我們將開始進行資料分析,下面操作程式碼都已內建在Jupyter Notebooks上面,所以讀者只要照著操作,基本上不會有太大的問題,如果需要筆者的程式碼,請參考筆者GitHub,如果您想自己安裝Spark,也可參考這裡。
首先我們先設定資料路徑,Spark的資料結構名為彈性分佈式數據集(Resilient Distributed Dataset 簡稱RDD),Spark透過RDD將資料進行分區儲存與處理,因此我們透過sc.textFile這個指令就可以建立RDD,接著在輸入weather.count()就可以查看整個資料集筆數,如下圖所示,2015.csv有超過兩千萬筆資料。
步驟十一、
接下來透過Lambda指令建立一個匿名函數(Anonymous functions),將line : line.split(",")結果進行回傳,將資料改成行並以逗號做分割,透過map產生一個新的RDD,叫做weatherParse,如下圖所示:
步驟十二、
透過filter抓取降雨量(PRCP)的資料,獲得weatherPrecp資料,接著將資料做個整理,一樣透過map加上Lambda將資料轉換產生新的RDD為weatherPrecpCountByKey,如下圖所示:
步驟十三、
接著透過reduceByKey,將把個Key的相同元素做一個reduce的動作,譬如說我們RDD資料有(A,2)、(B,2)、(A,5),我們想把它做x + y的動作,那reduceByKey就會產生(A,7)、(B,2)的結果。同樣地用到這裡,是希望將前面整理好的氣象站降雨量資料,做某一些特定動作,在這裡我們一樣做加總,並產生新的RDD為weatherPrecpAddByKey如下圖所示:
步驟十三、
接著我們將資料做平均的計算,一樣透過map搭配計算公式的方式取得每個氣象站的平均值,產生名為weatherAverages的RDD。
步驟十四、接下來透過Lambda指令建立一個匿名函數(Anonymous functions),將line : line.split(",")結果進行回傳,將資料改成行並以逗號做分割,透過map產生一個新的RDD,叫做weatherParse,如下圖所示:
步驟十二、
透過filter抓取降雨量(PRCP)的資料,獲得weatherPrecp資料,接著將資料做個整理,一樣透過map加上Lambda將資料轉換產生新的RDD為weatherPrecpCountByKey,如下圖所示:
步驟十三、
接著透過reduceByKey,將把個Key的相同元素做一個reduce的動作,譬如說我們RDD資料有(A,2)、(B,2)、(A,5),我們想把它做x + y的動作,那reduceByKey就會產生(A,7)、(B,2)的結果。同樣地用到這裡,是希望將前面整理好的氣象站降雨量資料,做某一些特定動作,在這裡我們一樣做加總,並產生新的RDD為weatherPrecpAddByKey如下圖所示:
步驟十三、
接著我們將資料做平均的計算,一樣透過map搭配計算公式的方式取得每個氣象站的平均值,產生名為weatherAverages的RDD。
到這一步基本上都已取得各氣象站的平均降雨量,接著我們只要印出來即可。透過下面指令的動作,我們可以獲得前10筆的資料,結果如下圖所示:
for pair inweatherAverages . top( 10): print "Station %s had average precipitations of %f" % (pair[ 0], pair[ 1])
我們再透過下面指令做降冪排序的動作,由大排到小,結果如圖所示:
for pair inweatherAverages . map( lambda (x, y): (y, x)). top( 10): print "Station %s had average precipitations of %f" % (pair[ 1], pair[ 0])
完成了簡單的資料讀取與處理。
透過上面實作,我們完成了幾個部分,包含上傳資料到Object Storage、建立RDD資料、將資料做過濾、將資料進行平均的計算、以及印出結果與排序。綜合以上使用,筆者整理Bluemix Spark特色如下:
- 不到1分鐘就可以開始進行資料科學運算
- 只有1~2個按鈕就可以資源擴展 (可自選記憶體與主機數量)
- 便利的資料儲存與取用
- 不用顧慮基礎設施問題
沒有留言:
張貼留言