2016年5月22日 星期日

一天上手SparkR

Spark是近幾年在數據分析領域非常火紅的技術,繼上一次「透過Python與Spark做氣象大數據分析」的文章,筆者這次希望分享的是原生Spark 在 R上面的改變,從Spark 1.4版開始對R有較完整的支持,而今年推出的Spark 1.6版上面,可以看到Spark 已經特別為R做了一個專屬的核心,這個核心也只有R才有,其目標就是希望把過去R累積的運算套件無痛移植到Spark上。

圖1:SparkR架構(來源)

從「SparkR: Scaling R Programs with Spark」這篇期刊文章上可以觀察到,圖1中的R的Spark Context透過R-JVM跟Java Spark Context做溝通,將工作分散到每個有R的Worker上面,透過Spark Executor進行運算,其實有Hadoop MR的味道。

但重點是效率變得更好,程式碼變得更好寫,從效率來看,圖2中SparkR的DataFrame效率跟Scala、Python其實也差不多了,而且做Cache會更快。

圖2:DataFrame的效率比較(來源)

另外一個重點是寫R的程式碼邏輯不需變換,因此寫R的資料科學家可以更輕鬆的轉移過去的R程式碼,另外2015年從SparkR 1.4開始已提供66種函式,到Spark1.5提供197個函式,到今年的SparkR 1.6.x版本已提供225種函式,這種增長速度是非常驚人的,因為SparkR 1.4 是去年2015年6月發表,距離筆者寫這篇文章的時間才剛剛準備要滿一年,因此SparkR的發展非常的快速。

接下來我們就來用Windows平台先快快的體驗一下SparkR的威力吧,首先到Spark官網上去下載Spark,透過下圖的點選,下載spark-1.6.1-bin-hadoop2.6.tgz版本。


下載完成後直接解壓縮到您想要放的位置,像筆者是放在C槽區,並把名字變更為Spark。接下來打開RStudioRMicrosoft R Open都可以,這篇筆者以RStudio做示範。

首先我們進行環境的設定

Sys.setenv(SPARK_HOME = "C:\\spark") #其中C:\\Spark是您的Spark路徑與目錄名稱 library(SparkR, lib.loc = c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib"))) library(SparkR) 
接下來我們就可以開始使用SparkR,我們先使用下面兩行指令做起手式。

sc = sparkR.init();sqlContext = sparkRSQL.init(sc)
其中sc的第一行可以設定Spark的初始狀態,如:要給Spark的記憶體,詳細可參考SparkR的SparkR.init文件。第二行就是前面筆者提到的SparkContext,主要是從JavaSparkContext做初始化的動作。

接下來我們來玩玩經典的iris例子,首先先把iris的欄位名稱做修改,因為Spark裡面很多「.」的函式,避免出錯,我們先把欄位名稱以「_」取代。

newiris = iris;names(newiris) = c("Sepal_Length","Sepal_Width","Petal_Length","Petal_Width","Species")
接下來我們產生Spark超強的DataFrame,這對R使用者不陌生,只是這次是使用Spark去產生DataFrame。

df = createDataFrame(sqlContext, newiris)
接下來我們可以使用head或showDF函式觀察資料。

head(df, 5) showDF(df, 5)
接下來我們可以透過printSchema去觀察一下資料的架構。

printSchema(df)
接著我們可以透過head與select去挑選某個欄位的資料,如挑選Sepal_Length。

head(select(df, df$Sepal_Length)) head(select(df, df$Sepal_Length),10) #挑選10筆
假設筆者想挑選Sepal_Length大於5.4的資料,可以使用head與filter這兩個函式,這兩個函式用起來有沒有覺得很像R的dplyr呢? 相信R使用者很快就能夠上手SparkR。

head(filter(df, df$Sepal_Length > 5.4))
如果dplyr您夠了解,那接下來如果我們想計算Sepal_Length的做簡單的Aggregation,就可以使用下面的指令。

sepal_counts = summarize(groupBy(df, df$Sepal_Length), count = n(df$Sepal_Length))
當然如果要做廣義線性模型(GLM),SparkR的做法跟原生的R做法是完全一模一樣。

model = glm(Sepal_Length ~., data = df, family = "gaussian");summary(model)
如果還想知道其他的功能,請直接拜訪SparkR的文件

從本篇文章中我們可以快速掌握SparkR的使用,同時也了解到目前SparkR不僅支援資料選擇(Selection)、資料過濾(Filtering)、資料聚合(Aggregation),更已經開始支援Spark MLlib可以進行分散式的機器學習。因此我們可以期待未來R的資料科學家在分散式運算上可以透過SparkR更暢通無阻。

當然實務上真的全部資料都要使用SparkR嗎? 不一定,R在記憶體運算(In-memory computing)的部分也有很強的處理能力,因此有時候單機的記憶體夠多,像是MRO + Math Kernel Library,或是寫法稍微改一下(如矩陣寫法),基本上也無須要動用SparkR,雖然如此,在分散式的環境下SparkR還是有它能發展的舞台,所以讓我們繼續期待下去。

沒有留言:

張貼留言