基于YARN集群構(gòu)建運(yùn)行PySpark Application

大數(shù)據(jù)

作者:Yanjun

Spark Application可以直接運(yùn)行在YARN集群上,這種運(yùn)行模式,會(huì)將資源的管理與協(xié)調(diào)統(tǒng)一交給YARN集群去處理,這樣能夠?qū)崿F(xiàn)構(gòu)建于YARN集群之上Application的多樣性,比如可以運(yùn)行MapReduc程序,可以運(yùn)行HBase集群,也可以運(yùn)行Storm集群,還可以運(yùn)行使用Python開(kāi)發(fā)機(jī)器學(xué)習(xí)應(yīng)用程序,等等。

我們知道,Spark on YARN又分為client模式和cluster模式:在client模式下,Spark Application運(yùn)行的Driver會(huì)在提交程序的節(jié)點(diǎn)上,而該節(jié)點(diǎn)可能是YARN集群內(nèi)部節(jié)點(diǎn),也可能不是,一般來(lái)說(shuō)提交Spark Application的客戶端節(jié)點(diǎn)不是YARN集群內(nèi)部的節(jié)點(diǎn),那么在客戶端節(jié)點(diǎn)上可以根據(jù)自己的需要安裝各種需要的軟件和環(huán)境,以支撐Spark Application正常運(yùn)行。在cluster模式下,Spark Application運(yùn)行時(shí)的所有進(jìn)程都在YARN集群的NodeManager節(jié)點(diǎn)上,而且具體在哪些NodeManager上運(yùn)行是由YARN的調(diào)度策略所決定的。

對(duì)比這兩種模式,最關(guān)鍵的是Spark Application運(yùn)行時(shí)Driver所在的節(jié)點(diǎn)不同,而且,如果想要對(duì)Driver所在節(jié)點(diǎn)的運(yùn)行環(huán)境進(jìn)行配置,區(qū)別很大,但這對(duì)于PySpark Application運(yùn)行來(lái)說(shuō)是非常關(guān)鍵的。

PySpark是Spark為使用Python程序編寫Spark Application而實(shí)現(xiàn)的客戶端庫(kù),通過(guò)PySpark也可以編寫Spark Application并在Spark集群上運(yùn)行。Python具有非常豐富的科學(xué)計(jì)算、機(jī)器學(xué)習(xí)處理庫(kù),如numpy、pandas、scipy等等。為了能夠充分利用這些高效的Python模塊,很多機(jī)器學(xué)習(xí)程序都會(huì)使用Python實(shí)現(xiàn),同時(shí)也希望能夠在Spark集群上運(yùn)行。

PySpark Application運(yùn)行原理

理解PySpark Application的運(yùn)行原理,有助于我們使用Python編寫Spark Application,并能夠?qū)ySpark Application進(jìn)行各種調(diào)優(yōu)。PySpark構(gòu)建于Spark的Java API之上,數(shù)據(jù)在Python腳本里面進(jìn)行處理,而在JVM中緩存和Shuffle數(shù)據(jù),數(shù)據(jù)處理流程如下圖所示(來(lái)自Apache Spark Wiki):

大數(shù)據(jù)

Spark Application會(huì)在Driver中創(chuàng)建pyspark.SparkContext對(duì)象,后續(xù)通過(guò)pyspark.SparkContext對(duì)象來(lái)構(gòu)建Job DAG并提交DAG運(yùn)行。使用Python編寫PySpark Application,在Python編寫的Driver中也有一個(gè)pyspark.SparkContext對(duì)象,該pyspark.SparkContext對(duì)象會(huì)通過(guò)Py4J模塊啟動(dòng)一個(gè)JVM實(shí)例,創(chuàng)建一個(gè)JavaSparkContext對(duì)象。PY4J只用在Driver上,后續(xù)在Python程序與JavaSparkContext對(duì)象之間的通信,都會(huì)通過(guò)PY4J模塊來(lái)實(shí)現(xiàn),而且都是本地通信。

PySpark Application中也有RDD,對(duì)Python RDD的Transformation操作,都會(huì)被映射到Java中的PythonRDD對(duì)象上。對(duì)于遠(yuǎn)程節(jié)點(diǎn)上的Python RDD操作,Java PythonRDD對(duì)象會(huì)創(chuàng)建一個(gè)Python子進(jìn)程,并基于Pipe的方式與該P(yáng)ython子進(jìn)程通信,將用戶編寫Python處理代碼和數(shù)據(jù)發(fā)送到Python子進(jìn)程中進(jìn)行處理。

下面,我們基于Spark on YARN模式,并根據(jù)當(dāng)前企業(yè)所具有的實(shí)際集群運(yùn)行環(huán)境情況,來(lái)說(shuō)明如何在Spark集群上運(yùn)行PySpark Application,大致分為如下3種情況:

YARN集群配置Python環(huán)境

這種情況,如果是初始安裝YARN、Spark集群,并考慮到了當(dāng)前應(yīng)用場(chǎng)景需要支持Python程序運(yùn)行在Spark集群之上,這時(shí)可以準(zhǔn)備好對(duì)應(yīng)Python軟件包、依賴模塊,在YARN集群中的每個(gè)節(jié)點(diǎn)上進(jìn)行安裝。這樣,YARN集群的每個(gè)NodeManager上都具有Python環(huán)境,可以編寫PySpark Application并在集群上運(yùn)行。目前比較流行的是直接安裝Python虛擬環(huán)境,使用Anaconda等軟件,可以極大地簡(jiǎn)化Python環(huán)境的管理工作。

這種方式的缺點(diǎn)是,如果后續(xù)使用Python編寫Spark Application,需要增加新的依賴模塊,那么就需要在YARN集群的每個(gè)節(jié)點(diǎn)上都進(jìn)行該新增模塊的安裝。而且,如果依賴Python的版本,可能還需要管理不同版本Python環(huán)境。因?yàn)樘峤籔ySpark Application運(yùn)行,具體在哪些NodeManager上運(yùn)行該Application,是由YARN的調(diào)度器決定的,必須保證每個(gè)NodeManager上都具有Python環(huán)境(基礎(chǔ)環(huán)境+依賴模塊)。

YARN集群不配置Python環(huán)境

這種情況,更適合企業(yè)已經(jīng)安裝了規(guī)模較大的YARN集群,并在開(kāi)始使用時(shí)并未考慮到后續(xù)會(huì)使用基于Python來(lái)編寫Spark Application,并且不想在YARN集群的NodeManager上安裝Python環(huán)境依賴依賴模塊。我們參考了Benjamin Zaitlen的博文(詳見(jiàn)后面參考鏈接),并基于Anaconda軟件環(huán)境進(jìn)行了實(shí)踐和驗(yàn)證,具體實(shí)現(xiàn)思路如下所示:

在任意一個(gè)LInux OS的節(jié)點(diǎn)上,安裝Anaconda軟件通過(guò)Anaconda創(chuàng)建虛擬Python環(huán)境在創(chuàng)建好的Python環(huán)境中下載安裝依賴的Python模塊將整個(gè)Python環(huán)境打成zip包提交PySpark Application時(shí),并通過(guò)–archives選項(xiàng)指定zip包路徑

下面進(jìn)行詳細(xì)說(shuō)明:

首先,我們?cè)贑entOS 7.2上,基于Python 2.7,下載了Anaconda2-5.0.0.1-Linux-x86_64.sh安裝軟件,并進(jìn)行了安裝。Anaconda的安裝路徑為/root/anaconda2。

然后,創(chuàng)建一個(gè)Python虛擬環(huán)境,執(zhí)行如下命令:

conda create -n mlpy_env --copy -y -q python=2 numpy pandas scipy

上述命令創(chuàng)建了一個(gè)名稱為mlpy_env的Python環(huán)境,–copy選項(xiàng)將對(duì)應(yīng)的軟件包都安裝到該環(huán)境中,包括一些C的動(dòng)態(tài)鏈接庫(kù)文件。同時(shí),下載numpy、pandas、scipy這三個(gè)依賴模塊到該環(huán)境中。

接著,將該P(yáng)ython環(huán)境打包,執(zhí)行如下命令:

cd?/root/anaconda2/envszip -r mlpy_env.zip mlpy_env

該zip文件大概有400MB左右,將該zip壓縮包拷貝到指定目錄中,方便后續(xù)提交PySpark Application:

最后,我們可以提交我們的PySpark Application,執(zhí)行如下命令:

1PYSPARK_PYTHON=./ANACONDA/mlpy_env/bin/python spark-submit \2?--conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./ANACONDA/mlpy_env/bin/python \3?--master yarn-cluster \4?--archives /tmp/mlpy_env.zip#ANACONDA \5?/var/lib/hadoop-hdfs/pyspark/test_pyspark_dependencies.py

上面的test_pyspark_dependencies.py文件中,使用了numpy、pandas、scipy這三個(gè)依賴包的函數(shù),通過(guò)上面提到的YARN集群的cluster模式可以運(yùn)行在Spark集群上。

可以看到,上面的依賴zip壓縮包將整個(gè)Python的運(yùn)行環(huán)境都包含在里面,在提交PySpark Application時(shí)會(huì)將該環(huán)境zip包上傳到運(yùn)行Application的所在的每個(gè)節(jié)點(diǎn)上,并解壓縮后為Python代碼提供運(yùn)行時(shí)環(huán)境。如果不想每次都從客戶端將該環(huán)境文件上傳到集群中運(yùn)行PySpark Application的節(jié)點(diǎn)上,也可以將zip包上傳到HDFS上,并修改–archives參數(shù)的值為hdfs:///tmp/mlpy_env.zip#ANACONDA,也是可以的。

另外,需要說(shuō)明的是,如果我們開(kāi)發(fā)的/var/lib/hadoop-hdfs/pyspark/test_pyspark_dependencies.py文件中,也依賴的一些我們自己實(shí)現(xiàn)的處理函數(shù),具有多個(gè)Python依賴的文件,想要通過(guò)上面的方式運(yùn)行,必須將這些依賴的Python文件拷貝到我們創(chuàng)建的環(huán)境中,對(duì)應(yīng)的目錄為mlpy_env/lib/python2.7/site-packages/下面。

基于混合編程語(yǔ)言環(huán)境

假如我們還是希望使用Spark on YARN模式來(lái)運(yùn)行PySpark Application,但并不將Python程序提交到Y(jié)ARN集群上運(yùn)行。這時(shí),我們可以考慮使用混合編程語(yǔ)言的方式,來(lái)處理數(shù)據(jù)任務(wù)。比如,機(jī)器學(xué)習(xí)Application具有迭代計(jì)算的特性,更適合在一個(gè)高配的節(jié)點(diǎn)上運(yùn)行;而普通的ETL數(shù)據(jù)處理具有多機(jī)并行處理的特點(diǎn),適合放到集群上進(jìn)行分布式處理。

一個(gè)完整的機(jī)器學(xué)習(xí)Application的設(shè)計(jì)與構(gòu)建,可以將算法部分和數(shù)據(jù)準(zhǔn)備部分分離出來(lái),使用Scala/Java進(jìn)行數(shù)據(jù)預(yù)處理,輸出一個(gè)機(jī)器學(xué)習(xí)算法所需要(更便于迭代、尋優(yōu)計(jì)算)的輸入數(shù)據(jù)格式,這會(huì)極大地壓縮算法輸入數(shù)據(jù)的規(guī)模,從而使算法迭代計(jì)算充分利用單機(jī)本地的資源(內(nèi)存、CPU、網(wǎng)絡(luò)),這可能會(huì)比直接放到集群中計(jì)算要快得多。

因此,我們?cè)趯?duì)機(jī)器學(xué)習(xí)Application準(zhǔn)備數(shù)據(jù)時(shí),使用原生的Scala編程語(yǔ)言實(shí)現(xiàn)Spark Application來(lái)處理數(shù)據(jù),包括轉(zhuǎn)換、統(tǒng)計(jì)、壓縮等等,將滿足算法輸入格式的數(shù)據(jù)輸出到HDFS指定目錄中。在性能方面,對(duì)數(shù)據(jù)規(guī)模較大的情況下,在Spark集群上處理數(shù)據(jù),Scala/Java實(shí)現(xiàn)的Spark Application運(yùn)行性能要好一些。然后,算法迭代部分,基于豐富、高性能的Python科學(xué)計(jì)算模塊,使用Python語(yǔ)言實(shí)現(xiàn),其實(shí)直接使用PySpark API實(shí)現(xiàn)一個(gè)機(jī)器學(xué)習(xí)PySpark Application,運(yùn)行模式為YARN client模式。這時(shí),就需要在算法運(yùn)行的節(jié)點(diǎn)上安裝好Python環(huán)境及其依賴模塊(而不需要在YARN集群的節(jié)點(diǎn)上安裝),Driver程序從HDFS中讀取輸入數(shù)據(jù)(緩存到本地),然后在本地進(jìn)行算法的迭代計(jì)算,最后輸出模型。

總結(jié)

對(duì)于重度使用PySpark的情況,比如偏向機(jī)器學(xué)習(xí),可以考慮在整個(gè)集群中都安裝好Python環(huán)境,并根據(jù)不同的需要進(jìn)行依賴模塊的統(tǒng)一管理,能夠=極大地方便PySpark Application的運(yùn)行。

不在YARN集群上安裝Python環(huán)境的方案,會(huì)使提交的Python環(huán)境zip包在YARN集群中傳輸帶來(lái)一定開(kāi)銷,而且每次提交一個(gè)PySpark Application都需要打包一個(gè)環(huán)境zip文件,如果有大量的Python實(shí)現(xiàn)的PySpark Application需要在Spark集群上運(yùn)行,開(kāi)銷會(huì)越來(lái)越大。另外,如果PySpark應(yīng)用程序修改,可能需要重新打包環(huán)境。但是這樣做確實(shí)不在需要考慮YARN集群集群節(jié)點(diǎn)上的Python環(huán)境了,任何版本Python編寫的PySpark Application都可以使用集群資源運(yùn)行。

關(guān)于該問(wèn)題,SPARK-13587(詳見(jiàn)下面參考鏈接)也在討論如果優(yōu)化該問(wèn)題,后續(xù)應(yīng)該會(huì)有一個(gè)比較合適的解決方案。

極客網(wǎng)企業(yè)會(huì)員

免責(zé)聲明:本網(wǎng)站內(nèi)容主要來(lái)自原創(chuàng)、合作伙伴供稿和第三方自媒體作者投稿,凡在本網(wǎng)站出現(xiàn)的信息,均僅供參考。本網(wǎng)站將盡力確保所提供信息的準(zhǔn)確性及可靠性,但不保證有關(guān)資料的準(zhǔn)確性及可靠性,讀者在使用前請(qǐng)進(jìn)一步核實(shí),并對(duì)任何自主決定的行為負(fù)責(zé)。本網(wǎng)站對(duì)有關(guān)資料所引致的錯(cuò)誤、不確或遺漏,概不負(fù)任何法律責(zé)任。任何單位或個(gè)人認(rèn)為本網(wǎng)站中的網(wǎng)頁(yè)或鏈接內(nèi)容可能涉嫌侵犯其知識(shí)產(chǎn)權(quán)或存在不實(shí)內(nèi)容時(shí),應(yīng)及時(shí)向本網(wǎng)站提出書面權(quán)利通知或不實(shí)情況說(shuō)明,并提供身份證明、權(quán)屬證明及詳細(xì)侵權(quán)或不實(shí)情況證明。本網(wǎng)站在收到上述法律文件后,將會(huì)依法盡快聯(lián)系相關(guān)文章源頭核實(shí),溝通刪除相關(guān)內(nèi)容或斷開(kāi)相關(guān)鏈接。

2017-10-18
基于YARN集群構(gòu)建運(yùn)行PySpark Application
作者:Yanjun Spark Application可以直接運(yùn)行在YARN集群上,這種運(yùn)行模式,會(huì)將資源的管理與協(xié)調(diào)統(tǒng)一交給YARN集群去處理,這樣能夠?qū)崿F(xiàn)

長(zhǎng)按掃碼 閱讀全文