基于Storm構(gòu)建分布式實(shí)時(shí)處理應(yīng)用初探

大數(shù)據(jù)

作者:唐潔

最近利用閑暇時(shí)間,又重新研讀了一下Storm。認(rèn)真對(duì)比了一下Hadoop,前者更擅長(zhǎng)的是,實(shí)時(shí)流式數(shù)據(jù)處理,后者更擅長(zhǎng)的是基于HDFS,通過(guò)MapReduce方式的離線數(shù)據(jù)分析計(jì)算。對(duì)于Hadoop,本身不擅長(zhǎng)實(shí)時(shí)的數(shù)據(jù)分析處理。兩者的共同點(diǎn)都是分布式架構(gòu),而且都類(lèi)似有主/從關(guān)系的概念。

本文我不會(huì)具體闡述Storm集群和Zookeeper集群如何部署的問(wèn)題,這里想通過(guò)一個(gè)實(shí)際的案例切入,分析一下如何利用Storm完成實(shí)時(shí)分析處理數(shù)據(jù)。

Storm本身是Apache托管的開(kāi)源的分布式實(shí)時(shí)計(jì)算系統(tǒng),它的前身是Twitter Storm。在Storm問(wèn)世以前,處理海量的實(shí)時(shí)數(shù)據(jù)信息,大部分是類(lèi)似于使用消息隊(duì)列,加上工作進(jìn)程/線程的方式。這使得構(gòu)建這類(lèi)的應(yīng)用程序,變得異常的復(fù)雜。很多的業(yè)務(wù)邏輯中,你不得不考慮消息的發(fā)送和接收,線程之間的并發(fā)控制等等問(wèn)題。而其中的業(yè)務(wù)邏輯可能只是占據(jù)整個(gè)應(yīng)用的一小部分,而且很難做到業(yè)務(wù)邏輯的解耦。但是Storm的出現(xiàn)改變了這種局面,它首先抽象出數(shù)據(jù)流Stream的抽象概念,一個(gè)Stream指的是tuples組成的無(wú)邊界的序列。后面又繼續(xù)提出Spouts、Bolts的概念。Spouts在Storm里面是數(shù)據(jù)源,專門(mén)負(fù)責(zé)生成流。而B(niǎo)olts則是以流作為輸入,并重新生成流作為輸出,并且Bolts還會(huì)繼續(xù)指定它輸入的流應(yīng)該如何劃分。最后Storm是通過(guò)拓?fù)洌═opology)這種抽象概念,組織起若干個(gè)Spouts、Bolts構(gòu)成的分布式數(shù)據(jù)處理網(wǎng)絡(luò)。Storm設(shè)計(jì)的時(shí)候,就有意的把Spouts、Bolts組成的拓?fù)洌═opology)網(wǎng)絡(luò)通過(guò)Thrift服務(wù)方式進(jìn)行封裝,這個(gè)做法,使得Storm的Spouts、Bolts組件可以通過(guò)目前主流的任意語(yǔ)言實(shí)現(xiàn),使得整個(gè)框架的兼容性和擴(kuò)展性更加優(yōu)秀。

在Storm里面拓?fù)洌═opology)的概念,非常類(lèi)似Hadoop里面MapReduce的Job的概念。不同的是Storm的拓?fù)洌═opology)只要你啟動(dòng)了,它就會(huì)一直運(yùn)行下去,除非你kill掉;而MapReduce的Job最終它是會(huì)結(jié)束的?;谶@樣的模式,使得Storm非常適合處理實(shí)時(shí)性的數(shù)據(jù)分析、持續(xù)計(jì)算、DRPC(分布式RPC)等。

下面就結(jié)合實(shí)際的案例,設(shè)計(jì)分析一下,如何利用Storm改善應(yīng)用的處理性能。

某通信公司的垃圾短信監(jiān)控平臺(tái),實(shí)時(shí)地上傳每個(gè)省的疑似垃圾短信用戶的垃圾短信內(nèi)容文件,每個(gè)省則根據(jù)文件中垃圾短信的內(nèi)容,解析過(guò)濾出,包含指定敏感關(guān)鍵字的垃圾短信進(jìn)行入庫(kù)。被入庫(kù)的垃圾短信用戶被列為敏感用戶,是重點(diǎn)監(jiān)控對(duì)象,畢竟亂發(fā)這些垃圾短信是非常不對(duì)的。垃圾短信監(jiān)控平臺(tái)生成的文件速度非常驚人,原來(lái)的傳統(tǒng)做法是,根據(jù)每個(gè)省的每一個(gè)地市,對(duì)應(yīng)一個(gè)獨(dú)立應(yīng)用,串行化地解析、過(guò)濾敏感關(guān)鍵字,來(lái)進(jìn)行入庫(kù)處理。但是,從現(xiàn)狀來(lái)看,程序處理的性能并不高效,常常造成文件積壓,沒(méi)有及時(shí)處理入庫(kù)。

現(xiàn)在,我們就通過(guò)Storm來(lái)重新梳理、組織一下上述的應(yīng)用場(chǎng)景。

首先,我先說(shuō)明一下,該案例中Storm集群和Zookeeper集群的部署情況,如下圖所示:

大數(shù)據(jù)

Nimbus對(duì)應(yīng)的主機(jī)是192.168.95.134是Storm主節(jié)點(diǎn),其余兩臺(tái)從節(jié)點(diǎn)Supervisor對(duì)應(yīng)的主機(jī)分別是192.168.95.135(主機(jī)名:slave1)、192.168.95.136(主機(jī)名:slave2)。同樣的,Zookeeper集群也是部署在上述節(jié)點(diǎn)上。

Storm集群和Zookeeper集群會(huì)互相通信,因?yàn)镾torm就是基于Zookeeper的。然后先啟動(dòng)每個(gè)節(jié)點(diǎn)的Zookeeper服務(wù),其次分別啟動(dòng)Storm的Nimbus、Supervisor服務(wù)。具體可以到Storm安裝的bin目錄下面啟動(dòng)服務(wù),啟動(dòng)命令分別為storm nimbus > /dev/null 2 > &1 &和storm supervisor > /dev/null 2 > &1 &。然后用jps觀察啟動(dòng)的效果。沒(méi)有問(wèn)題的話,在Nimbus服務(wù)對(duì)應(yīng)的主機(jī)上啟動(dòng)Storm UI監(jiān)控對(duì)應(yīng)的服務(wù),在Storm安裝目錄的bin目錄輸入命令:storm ui >/dev/null 2>&1 &。然后打開(kāi)瀏覽器輸入:http://{Nimbus服務(wù)對(duì)應(yīng)的主機(jī)ip}:8080,這里就是輸入:http://192.168.95.134:8080/。觀察Storm集群的部署情況,如下圖所示:

大數(shù)據(jù)

可以發(fā)現(xiàn),我們的Storm的版本是0.9.5,它的從節(jié)點(diǎn)(Supervisor)有2個(gè),分別是slave1、slave2。一共的woker的數(shù)量是8個(gè)(Total slots)。Storm集群我們已經(jīng)部署完畢,也啟動(dòng)成功了。現(xiàn)在就利用Storm的方式,重新改寫(xiě)一下這種敏感信息實(shí)時(shí)監(jiān)控過(guò)濾的應(yīng)用。先看下Storm方式的拓?fù)浣Y(jié)構(gòu)圖:

大數(shù)據(jù)

其中的SensitiveFileReader-591、SensitiveFileReader-592(用戶短信采集器,分地市)代表的是Storm中的Spouts組件,表示一個(gè)數(shù)據(jù)的源頭,這里是表示從服務(wù)器的指定目錄下,讀取疑似垃圾短信用戶的垃圾短信內(nèi)容文件。當(dāng)然Spouts的組件你可以根據(jù)實(shí)際的需求,擴(kuò)展出許多Spouts。

然后讀取出文件中每一行的內(nèi)容之后,就是分析文件的內(nèi)容組件了,這里是指:SensitiveFileAnalyzer(監(jiān)控短信內(nèi)容拆解分析),它負(fù)責(zé)分析出文件的格式內(nèi)容。

為了簡(jiǎn)單演示起見(jiàn),我這里定義文件的格式為如下內(nèi)容(隨便寫(xiě)一個(gè)例子):home_city=591&user_id=5911000&msisdn=10000&sms_content=abc-slave1。每個(gè)列之間用&進(jìn)行連接。其中home_city=591表示疑似垃圾短信的用戶歸屬地市編碼,591表示福州、592表示廈門(mén);user_id=5911000表示疑似垃圾短信的用戶標(biāo)識(shí);msisdn=10000表示疑似垃圾短信的用戶手機(jī)號(hào)碼;sms_content=abc-slave1代表的就是垃圾短信的內(nèi)容了。SensitiveFileAnalyzer代表的就是Storm中的Bolt組件,用來(lái)處理Spouts“流”出的數(shù)據(jù)。

最后,就是我們根據(jù)解析好的數(shù)據(jù),匹配業(yè)務(wù)規(guī)定的敏感關(guān)鍵字,進(jìn)行過(guò)濾入庫(kù)了。這里我們是把過(guò)濾好的數(shù)據(jù)存入MySQL數(shù)據(jù)庫(kù)中。負(fù)責(zé)這項(xiàng)任務(wù)的組件是:SensitiveBatchBolt(敏感信息采集處理),當(dāng)然它也是Storm中的Bolt組件。好了,以上就是完整的Storm拓?fù)洌═opology)結(jié)構(gòu)了。

現(xiàn)在,我們對(duì)于整個(gè)敏感信息采集過(guò)濾監(jiān)控的拓?fù)浣Y(jié)構(gòu),有了一個(gè)整體的了解之后,我們?cè)賮?lái)看下如何具體編碼實(shí)現(xiàn)!先來(lái)看下整個(gè)工程的代碼層次結(jié)構(gòu),它如下圖所示:

大數(shù)據(jù)

首先來(lái)看下,我們定義的敏感用戶的數(shù)據(jù)結(jié)構(gòu)RubbishUsers,假設(shè)我們要過(guò)濾的敏感用戶的短信內(nèi)容中,要包含“racketeer”、“Bad”等敏感關(guān)鍵字。具體代碼如下:

大數(shù)據(jù)

現(xiàn)在,我們看下敏感信息數(shù)據(jù)源組件SensitiveFileReader的具體實(shí)現(xiàn),它負(fù)責(zé)從服務(wù)器的指定目錄下面,讀取疑似垃圾短信用戶的垃圾短信內(nèi)容文件,然后把每一行的數(shù)據(jù),發(fā)送給下一個(gè)處理的Bolt(SensitiveFileAnalyzer),每個(gè)文件全部發(fā)送結(jié)束之后,在當(dāng)前目錄中,把原文件重命名成后綴bak的文件(當(dāng)然,你可以重新建立一個(gè)備份目錄,專門(mén)用來(lái)存儲(chǔ)這種處理結(jié)束的文件),SensitiveFileReader的具體實(shí)現(xiàn)如下:

大數(shù)據(jù)

監(jiān)控短信內(nèi)容拆解分析器SensitiveFileAnalyzer,這個(gè)Bolt組件,接收到數(shù)據(jù)源SensitiveFileReader的數(shù)據(jù)之后,就按照上面定義的格式,對(duì)文件中每一行的內(nèi)容進(jìn)行解析,然后把解析完畢的內(nèi)容,繼續(xù)發(fā)送給下一個(gè)Bolt組件:SensitiveBatchBolt(敏感信息采集處理)?,F(xiàn)在,我們來(lái)看下SensitiveFileAnalyzer這個(gè)Bolt組件的實(shí)現(xiàn):

大數(shù)據(jù)

最后一個(gè)Bolt組件SensitiveBatchBolt(敏感信息采集處理)根據(jù)上游Bolt組件SensitiveFileAnalyzer發(fā)送過(guò)來(lái)的數(shù)據(jù),然后跟業(yè)務(wù)規(guī)定的敏感關(guān)鍵字進(jìn)行匹配,如果匹配成功,說(shuō)明這個(gè)用戶,就是我們要重點(diǎn)監(jiān)控的用戶,我們把它通過(guò)hibernate采集到MySQL數(shù)據(jù)庫(kù),統(tǒng)一管理。最后要說(shuō)明的是,SensitiveBatchBolt組件還實(shí)現(xiàn)了一個(gè)監(jiān)控的功能,就是定期打印出,我們已經(jīng)采集到的敏感信息用戶數(shù)據(jù)?,F(xiàn)在給出SensitiveBatchBolt的實(shí)現(xiàn):

大數(shù)據(jù)

由于是通過(guò)hibernate入庫(kù)到MySQL,所以給出hibernate配置,首先是:hibernate.cfg.xml

大數(shù)據(jù)

對(duì)應(yīng)的ORM映射配置文件rubbish-users.hbm.xml內(nèi)容如下:

大數(shù)據(jù)

最后,還是通過(guò)Spring把hibernate集成起來(lái),數(shù)據(jù)庫(kù)連接池用的是:DBCP。對(duì)應(yīng)的Spring配置文件jdbc-hibernate-bean.xml的內(nèi)容如下:

大數(shù)據(jù)

到此為止,我們已經(jīng)完成了敏感信息實(shí)時(shí)監(jiān)控的所有的Storm組件的開(kāi)發(fā)?,F(xiàn)在,我們來(lái)完成Storm的拓?fù)洌═opology),由于拓?fù)洌═opology)又分為本地拓?fù)浜头植际酵負(fù)?,因此封裝了一個(gè)工具類(lèi)StormRunner(拓?fù)鋱?zhí)行器),對(duì)應(yīng)的代碼如下:

大數(shù)據(jù)

好了,現(xiàn)在我們把上面所有的Spouts/Bolts拼接成“拓?fù)洹保═opology)結(jié)構(gòu),我們這里用的是分布式拓?fù)?,?lái)進(jìn)行部署運(yùn)行。具體的SensitiveTopology(敏感用戶監(jiān)控Storm拓?fù)洌┐a如下:

大數(shù)據(jù)

到此為止,所有的Storm組件已經(jīng)開(kāi)發(fā)完畢!現(xiàn)在,我們把上述工程打成jar包,放到Storm集群中運(yùn)行,具體可以到Nimbus對(duì)應(yīng)的Storm安裝目錄下面的bin目錄,輸入:storm jar + {jar路徑}。

比如我這里是輸入:storm jar /home/tj/install/SensitiveTopology.jar newlandframework.storm.topology.SensitiveTopology,然后,把疑似垃圾短信用戶的垃圾短信內(nèi)容文件放到指定的服務(wù)器下面的目錄(/home/tj/data/591、/home/tj/data/592),最后打開(kāi)剛才的Storm UI,觀察任務(wù)的啟動(dòng)執(zhí)行情況,這里如下圖所示:

大數(shù)據(jù)

可以看到我們剛才提交的拓?fù)洌篠ensitiveTopology已經(jīng)成功提交到Storm集群里面了。這個(gè)時(shí)候,你可以鼠標(biāo)點(diǎn)擊SensitiveTopology,然后會(huì)打開(kāi)如下的一個(gè)Spouts/Bolts的監(jiān)控界面,如下圖所示:

大數(shù)據(jù)

我們可以很清楚地看到:Spouts組件(用戶短信采集器):SensitiveFileReader591、SensitiveFileReader592的線程數(shù)executors、任務(wù)提交emitted情況。以及Bolts組件:監(jiān)控短信內(nèi)容拆解分析器(SensitiveFileAnalyzer)、敏感信息采集處理(SensitiveBatchBolt)的運(yùn)行情況,這樣監(jiān)控起來(lái)就非常方便。

此外,我們還可以到對(duì)應(yīng)的Supervisor服務(wù)器對(duì)應(yīng)的Storm安裝目錄下面的logs目錄,查看一下worker的工作日志,我們來(lái)看下敏感信息監(jiān)控過(guò)濾的處理情況,截圖如下:

大數(shù)據(jù)

通過(guò)SensitiveBatchBolt模塊的監(jiān)控線程,可以看到,我們目前已經(jīng)采集到了9個(gè)敏感信息用戶了,再來(lái)看下,這些包含敏感關(guān)鍵字的用戶有沒(méi)有入庫(kù)MySQL成功?

大數(shù)據(jù)

發(fā)現(xiàn)入庫(kù)的結(jié)果也是9個(gè),和日志打印的數(shù)量上是一致的。而且垃圾短信內(nèi)容sms_content果然都包含了“racketeer”、“Bad”這些敏感關(guān)鍵字!完全符合我們的預(yù)期。而且,以后文件處理量上來(lái)了,我們可以通過(guò)調(diào)整設(shè)置Spouts/Bolts的并行度,和Worker的數(shù)量進(jìn)行化解。當(dāng)然,你還可以通過(guò)水平擴(kuò)展集群的數(shù)量來(lái)解決這個(gè)問(wèn)題。

Storm在Apache開(kāi)源項(xiàng)目的網(wǎng)址是:http://storm.apache.org/,有興趣的朋友可以經(jīng)常關(guān)注一下。官網(wǎng)上面有很權(quán)威的技術(shù)規(guī)范說(shuō)明,以及如何把Storm和消息隊(duì)列、HDFS、HBase有效的集成起來(lái)。目前在國(guó)內(nèi),就我個(gè)人看法,對(duì)Storm分析應(yīng)用,做得最好的應(yīng)該算是阿里巴巴,它在原來(lái)Storm的基礎(chǔ)上加以改良,開(kāi)源出JStorm,有興趣的朋友,可以多關(guān)注一下。

借助Storm,我們可以很輕松地開(kāi)發(fā)分布式實(shí)時(shí)處理應(yīng)用,而上述場(chǎng)景的設(shè)計(jì),只是Storm應(yīng)用的一個(gè)案例。相比傳統(tǒng)的單機(jī)服務(wù)器應(yīng)用而言,集群化地并行協(xié)同計(jì)算處理,是云計(jì)算、大數(shù)據(jù)時(shí)代的一個(gè)趨勢(shì),也是我今后努力學(xué)習(xí)的方向。故在此寫(xiě)下自己的學(xué)習(xí)經(jīng)驗(yàn)體會(huì),有不對(duì)的地方,還請(qǐng)各位群友批評(píng)指正。

極客網(wǎng)企業(yè)會(huì)員

免責(zé)聲明:本網(wǎng)站內(nèi)容主要來(lái)自原創(chuàng)、合作伙伴供稿和第三方自媒體作者投稿,凡在本網(wǎng)站出現(xiàn)的信息,均僅供參考。本網(wǎng)站將盡力確保所提供信息的準(zhǔn)確性及可靠性,但不保證有關(guān)資料的準(zhǔn)確性及可靠性,讀者在使用前請(qǐng)進(jìn)一步核實(shí),并對(duì)任何自主決定的行為負(fù)責(zé)。本網(wǎng)站對(duì)有關(guān)資料所引致的錯(cuò)誤、不確或遺漏,概不負(fù)任何法律責(zé)任。任何單位或個(gè)人認(rèn)為本網(wǎng)站中的網(wǎng)頁(yè)或鏈接內(nèi)容可能涉嫌侵犯其知識(shí)產(chǎn)權(quán)或存在不實(shí)內(nèi)容時(shí),應(yīng)及時(shí)向本網(wǎng)站提出書(shū)面權(quán)利通知或不實(shí)情況說(shuō)明,并提供身份證明、權(quán)屬證明及詳細(xì)侵權(quán)或不實(shí)情況證明。本網(wǎng)站在收到上述法律文件后,將會(huì)依法盡快聯(lián)系相關(guān)文章源頭核實(shí),溝通刪除相關(guān)內(nèi)容或斷開(kāi)相關(guān)鏈接。

2017-11-02
基于Storm構(gòu)建分布式實(shí)時(shí)處理應(yīng)用初探
作者:唐潔 最近利用閑暇時(shí)間,又重新研讀了一下Storm。認(rèn)真對(duì)比了一下Hadoop,前者更擅長(zhǎng)的是,實(shí)時(shí)流式數(shù)據(jù)處理,后者更擅長(zhǎng)的是基于HDFS,通過(guò)Map

長(zhǎng)按掃碼 閱讀全文