2015年10月16日 星期五

透過Python與Spark做氣象大數據分析

In this project, we applied Spark in weather data analysis. This application includes uploading data to Object Storage, establishing RDD, making data filtered, calculating the average of data, printing results and sorting. Based on the uses mentioned above, Bluemix Spark includes many advantages. It can start to make data analysis within one minute. Users only need to push one to two button to make resource extension. (You can choose the memory and the number of hosts)  It is convenient for users to make data storage and access and they do not have to consider infrastructure problem.

Spark helps us enter data analysis environment  quickly, so that data scientists can create more values for the data.  

還記得幾年前接觸Hadoop的時候,碰到運算分析的問題,都要特別的去考量分散運算的問題,像是有些需要做迭代的演算法(如:基因演算法),作法上因為需要一直大量做Map Reduce,所以都會有效能的問題,因此最後大部分的做法都回到In-Memory上做計算。



一直到去年2014年11月Databricks團隊透過 Apache Spark國際資料排序基準競賽(Sort Benchmark Competition)(新聞),以不到30分鐘的時間排序高達100 TB的資料量,打破Hadoop的72分鐘,將分散式運算(Distributed Computing)產業的應用領域帶到另一個高峰。Spark源自於加州柏克萊大學的AMPLab,這個實驗室投入各項數據分析的應用已經很多年,像是資料探勘、機器學習、資料庫等等,其中一位成員Matei為Spark主要發展者,目前已經是麻省理工學院的助理教授,同時也是Databricks的技術長。

Spark目前依存在Hadoop Ecosystem裡面,像是Hortonworks (HDP2.2)開始就有包含Spark,而普遍的做法就是搭配Hadoop的Hbase或HDFS使用,搭配的語言包含Java、Scala、Python、R (1.4版本開始支援),這些都是目前資料科學家常使用的語言。

今天的主題並不是主要介紹Spark,而是於2015年8月剛IBM推出的Apache Spark for Bluemix。我們都知道資料科學家需要專注於分析上,因此在基礎設施上需要一個便利的環境讓資料科學家能夠使用。(筆者按:其他雲端服務亦有類似功能,但仍須安裝與設定,包含:AzureAWS)



而Bluxmix上的Spark就有這樣的潛力,他們把Jupyter Notebooks直接整合在Spark上面,讓資料科學家可以直接進行計算與分析,在儲存方面則採用Swift Object Storage的檔案儲存方式,方便做存取之用。如下圖所示,資料科學家只需要在Jupyter Notebooks上面直接下指令,就可以與資料進行互動,完全不需要擔心基礎設施的問題。

筆者接下來就進行簡單的實作透過Python語言體驗一下Apache Spark for Bluemix

首先我們必須申請Bluemix的帳號,登入後進入下面網址:
https://console.ng.bluemix.net/catalog/apache-spark-starter/

步驟一、
接著就會看到下圖,接下來選擇右邊的「Apache Spark」,並點選Create進行Deploy。

步驟二、

不到一分鐘,就會看到已經佈署好的Spark,同時他也把Object Storage也整合完成,同時我們也可在Instances與Memory Quota上面直接調整我們想要的Spark分散主機數量,以及每一台的記憶體多寡。最後我們就可以透過上方Routes旁的網址,直接進入Spark的畫面。
步驟三、
進入後,直接點選右邊的Launch即可登入。


步驟四、
進入之後會出現Jupyter Notebooks需要做帳號密碼的輸入,我們可以回到前面步驟二的畫面中找到下圖所示的密碼資訊,它放在Environment Variables裡面的app_pw裡面。

步驟五、
接著我們就可以直接進入Jupyter Notebooks直接做分析! 
首先我們到國際氣候數據網站(National Climatic Data website)進行天氣資料下載,如下圖所示。





步驟六、
使用ZIP將檔案進行解壓縮,得到2015.csv檔案,因為檔案內容相當大,透過Excel打開會很久,筆者這邊是使用Sublime Text把檔案打開,在第一行加入「STATION, DATE, METRIC, VALUE, C5, C6, C7, C8」,如下圖所示:

步驟七、
回到步驟三的頁面,點選右下角的Manage Files,我們要把csv檔案上傳到Bluemix Object Storage,如下圖紅框所示:

步驟八、
Object Storage的密碼與Jupyter Notebooks登入密碼相同,我們輸入後就到右邊「+」的符號進行新增資料,並填入ClimateDataForTutorial,建立內容,完成後就會如下圖這樣呈現:

步驟九、
點選我們建立的ClimateDataForTutorial,上傳資料集,如下圖所示:
步驟十、
從這個階段開始,我們將開始進行資料分析,下面操作程式碼都已內建在Jupyter Notebooks上面,所以讀者只要照著操作,基本上不會有太大的問題,如果需要筆者的程式碼,請參考筆者GitHub,如果您想自己安裝Spark,也可參考這裡

首先我們先設定資料路徑,Spark的資料結構名為彈性分佈式數據集(Resilient Distributed Dataset 簡稱RDD),Spark透過RDD將資料進行分區儲存與處理,因此我們透過sc.textFile這個指令就可以建立RDD,接著在輸入weather.count()就可以查看整個資料集筆數,如下圖所示,2015.csv有超過兩千萬筆資料。

步驟十一、
接下來透過Lambda指令建立一個匿名函數(Anonymous functions),將line : line.split(",")結果進行回傳,將資料改成行並以逗號做分割,透過map產生一個新的RDD,叫做weatherParse,如下圖所示:
步驟十二、
透過filter抓取降雨量(PRCP)的資料,獲得weatherPrecp資料,接著將資料做個整理,一樣透過map加上Lambda將資料轉換產生新的RDD為weatherPrecpCountByKey如下圖所示:
步驟十三、
接著透過reduceByKey,將把個Key的相同元素做一個reduce的動作,譬如說我們RDD資料有(A,2)、(B,2)、(A,5),我們想把它做x + y的動作,那reduceByKey就會產生(A,7)、(B,2)的結果。同樣地用到這裡,是希望將前面整理好的氣象站降雨量資料,做某一些特定動作,在這裡我們一樣做加總,並產生新的RDD為weatherPrecpAddByKey如下圖所示:


步驟十三、
接著我們將資料做平均的計算,一樣透過map搭配計算公式的方式取得每個氣象站的平均值,產生名為weatherAverages的RDD。

步驟十四、
到這一步基本上都已取得各氣象站的平均降雨量,接著我們只要印出來即可。透過下面指令的動作,我們可以獲得前10筆的資料,結果如下圖所示:
for pair in weatherAverages.top(10):
    print "Station %s had average precipitations of %f" % (pair[0],pair[1])

步驟十五、
我們再透過下面指令做降冪排序的動作,由大排到小,結果如圖所示:
for pair in weatherAverages.map(lambda (x,y):  (y,x)).top(10):
    print "Station %s had average precipitations of %f" % (pair[1],pair[0])

完成了簡單的資料讀取與處理。


透過上面實作,我們完成了幾個部分,包含上傳資料到Object Storage、建立RDD資料、將資料做過濾、將資料進行平均的計算、以及印出結果與排序。綜合以上使用,筆者整理Bluemix Spark特色如下:
  1. 不到1分鐘就可以開始進行資料科學運算
  2. 只有1~2個按鈕就可以資源擴展 (可自選記憶體與主機數量)
  3. 便利的資料儲存與取用
  4. 不用顧慮基礎設施問題



沒有留言:

張貼留言