個(gè)推消息推送基于 Apache Pulsar的優(yōu)先級(jí)隊(duì)列方案

作者:個(gè)推平臺(tái)研發(fā)工程師祥子

一.業(yè)務(wù)背景

在個(gè)推的消息推送場(chǎng)景中,消息隊(duì)列在整個(gè)系統(tǒng)中占有非常重要的位置?!?/p>

個(gè)推消息推送基于 Apache Pulsar的優(yōu)先級(jí)隊(duì)列方案

當(dāng) APP 有消息推送需求的時(shí)候, 會(huì)向個(gè)推發(fā)送一條消息推送的命令,接到推送需求后,我們會(huì)把APP要求推送消息的用戶放入下發(fā)隊(duì)列中,進(jìn)行消息下發(fā);當(dāng)同時(shí)有多個(gè)APP進(jìn)行消息下發(fā)時(shí),難免會(huì)出現(xiàn)資源競(jìng)爭(zhēng)的情況, 因此就產(chǎn)生了優(yōu)先級(jí)隊(duì)列的需求,在下發(fā)資源固定的情況下, 高優(yōu)先級(jí)的用戶需要有更多的下發(fā)資源。

二.基于 Kafka 的優(yōu)先級(jí)隊(duì)列方案

針對(duì)以上場(chǎng)景,個(gè)推基于 Kafka 設(shè)計(jì)了第一版的優(yōu)先級(jí)隊(duì)列方案。Kafka 是 LinkedIn 開(kāi)發(fā)的一個(gè)高性能、分布式消息系統(tǒng);Kafka 在個(gè)推有非常廣泛的應(yīng)用,如日志收集、在線和離線的消息推送分發(fā)等。

架構(gòu)  

個(gè)推消息推送基于 Apache Pulsar的優(yōu)先級(jí)隊(duì)列方案

在該方案中,個(gè)推將優(yōu)先級(jí)統(tǒng)一設(shè)定為高、中、低三個(gè)級(jí)別。具體操作方案如下:

1. 對(duì)某個(gè)優(yōu)先級(jí)根據(jù) task (單次推送任務(wù)) 維度,存入不同的 Topic,一個(gè) task 只寫(xiě)入一個(gè) Topic,一個(gè) Topic 可存多個(gè) task;

2. 消費(fèi)模塊根據(jù)優(yōu)先級(jí)配額(如 6:3:1),獲取不同優(yōu)先級(jí)的消息推送數(shù),同一優(yōu)先級(jí)輪詢獲取消息;這樣既保證了高優(yōu)先級(jí)用戶可以更快地發(fā)送消息,又避免了低優(yōu)先級(jí)用戶出現(xiàn)沒(méi)有下發(fā)的情況。

Kafka 方案遇到的問(wèn)題

隨著個(gè)推業(yè)務(wù)的不斷發(fā)展,接入的 APP 數(shù)量逐漸增多,消息推送的需求也越來(lái)越大,第一版的優(yōu)先級(jí)方案也逐漸暴露出一些問(wèn)題:

1. 當(dāng)相同優(yōu)先級(jí)的 APP 在同一時(shí)刻消息推送任務(wù)越來(lái)越多時(shí),后面進(jìn)入的 task 消息會(huì)因?yàn)榍懊?task 消息還存在隊(duì)列情況而出現(xiàn)延遲。如下圖所示, 當(dāng) task1 消息量過(guò)大時(shí),在 task1 消費(fèi)結(jié)束前,taskN 將一直處于等待狀態(tài)。  

個(gè)推消息推送基于 Apache Pulsar的優(yōu)先級(jí)隊(duì)列方案

2. Kafka 在 Topic 數(shù)量由 64 增長(zhǎng)到 256 時(shí),吞吐量下降嚴(yán)重,Kafka 的每個(gè) Topic、每個(gè)分區(qū)都會(huì)對(duì)應(yīng)一個(gè)物理文件。當(dāng) Topic 數(shù)量增加時(shí),消息分散的落盤(pán)策略會(huì)導(dǎo)致磁盤(pán) IO 競(jìng)爭(zhēng)激烈,因此我們不能僅通過(guò)增加 Topic 數(shù)量來(lái)緩解第一點(diǎn)中的問(wèn)題。

基于上述問(wèn)題,個(gè)推進(jìn)行了新一輪的技術(shù)選型, 我們需要可以創(chuàng)建大量的Topic, 同時(shí)吞吐性能不能比 Kafka 遜色。經(jīng)過(guò)一段時(shí)間的調(diào)研,Apache Pulsar 引起了我們的關(guān)注。

三.為什么是 Pulsar

Apache Pulsar 是一個(gè)企業(yè)級(jí)的分布式消息系統(tǒng),最初由 Yahoo 開(kāi)發(fā),在 2016 年開(kāi)源,并于2018年9月畢業(yè)成為 Apache 基金會(huì)的頂級(jí)項(xiàng)目。Pulsar 已經(jīng)在 Yahoo 的生產(chǎn)環(huán)境使用了三年多,主要服務(wù)于Mail、Finance、Sports、 Flickr、 the Gemini Ads platform、 Sherpa (Yahoo 的 KV 存儲(chǔ))。

架構(gòu)    

個(gè)推消息推送基于 Apache Pulsar的優(yōu)先級(jí)隊(duì)列方案  

Topic 數(shù)量

Pulsar 可以支持百萬(wàn)級(jí)別 Topic 數(shù)量的擴(kuò)展,同時(shí)還能一直保持良好的性能。

Topic 的伸縮性取決于它的內(nèi)部組織和存儲(chǔ)方式。Pulsar 的數(shù)據(jù)保存在 bookie(BookKeeper 服務(wù)器)上,處于寫(xiě)狀態(tài)的不同 Topic 的消息,在內(nèi)存中排序,最終聚合保存到大文件中,在 Bookie 中需要更少的文件句柄。另一方面 Bookie 的 IO 更少依賴于文件系統(tǒng)的 Pagecache,Pulsar 也因此能夠支持大量的主題。

消費(fèi)模型

Pulsar 支持三種消費(fèi)模型:Exclusive、Shared 和 Failover?! ?/p>

個(gè)推消息推送基于 Apache Pulsar的優(yōu)先級(jí)隊(duì)列方案  

* Exclusive (獨(dú)享)一個(gè) Topic 只能被一個(gè)消費(fèi)者消費(fèi)。Pulsar 默認(rèn)使用這種模式。

* Shared (共享)共享模式,多個(gè)消費(fèi)者可以連接到同一個(gè) Topic,消息依次分發(fā)給消費(fèi)者。當(dāng)一個(gè)消費(fèi)者宕機(jī)或者主動(dòng)斷開(kāi)連接時(shí),那么分發(fā)給這個(gè)消費(fèi)者的未確認(rèn)( ack)的消息會(huì)得到重新調(diào)度,分發(fā)給其他消費(fèi)者。

*Failover (災(zāi)備)一個(gè)訂閱同時(shí)只有一個(gè)消費(fèi)者,可以有多個(gè)備份消費(fèi)者。一旦主消費(fèi)者故障,則備份消費(fèi)者接管。不會(huì)出現(xiàn)同時(shí)有兩個(gè)活躍的消費(fèi)者。

Exclusive和Failover訂閱,僅允許一個(gè)消費(fèi)者來(lái)使用和消費(fèi)每個(gè)訂閱的Topic。這兩種模式都按 Topic 分區(qū)順序使用消息。它們最適用于需要嚴(yán)格消息順序的流(Stream)用例。

Shared 允許每個(gè)主題分區(qū)有多個(gè)消費(fèi)者。同一個(gè)訂閱中的每個(gè)消費(fèi)者僅接收Topic分區(qū)的一部分消息。Shared最適用于不需要保證消息推送的順序隊(duì)列(Queue)的使用模式,并且可以按照需要任意擴(kuò)展消費(fèi)者的數(shù)量。

存儲(chǔ)

Pulsar 引入了 Apache BookKeeper 作為存儲(chǔ)層,BookKeeper 是一個(gè)專門(mén)為實(shí)時(shí)系統(tǒng)優(yōu)化過(guò)的分布式存儲(chǔ)系統(tǒng),具有可擴(kuò)展、高可用、低延遲等特性。具體介紹,請(qǐng)參考[BookKeeper官網(wǎng)](https://github.com/apache/bookkeeper)。

Segment

BookKeeper 以 Segment(在 BookKeeper 內(nèi)部被稱作 ledger)作為存儲(chǔ)的基本單元。從 Segment 到消息粒度,都會(huì)均勻分散到 BookKeeper 的集群中。這種機(jī)制保證了數(shù)據(jù)和服務(wù)均勻分散在BookKeeper集群中。

Pulsar 和 Kafka 都是基于 partition 的邏輯概念來(lái)做 Topic存儲(chǔ)的。最根本的不同是,Kafka 的物理存儲(chǔ)是以 partition 為單位的,每個(gè) partition 必須作為一個(gè)整體(一個(gè)目錄)存儲(chǔ)在某個(gè) broker 上。 而 Pulsar 的 partition 是以 segment 作為物理存儲(chǔ)的單位,每個(gè) partition 會(huì)再被打散并均勻分散到多個(gè) bookie 節(jié)點(diǎn)中。

這樣的直接影響是,Kafka 的 partition 的大小,受制于單臺(tái) broker 的存儲(chǔ);而 Pulsar 的 partition 則可以利用整個(gè)集群的存儲(chǔ)容量?! ?/p>

個(gè)推消息推送基于 Apache Pulsar的優(yōu)先級(jí)隊(duì)列方案  

擴(kuò)容

當(dāng) partition 的容量達(dá)到上限后,需要擴(kuò)容的時(shí)候,如果現(xiàn)有的單臺(tái)機(jī)器不能滿足,Kafka 可能需要添加新的存儲(chǔ)節(jié)點(diǎn),并將 partition 的數(shù)據(jù)在節(jié)點(diǎn)之間搬移達(dá)到 rebalance 的狀態(tài)。

而 Pulsar 只需添加新的 Bookie 存儲(chǔ)節(jié)點(diǎn)即可。新加入的節(jié)點(diǎn)由于剩余空間大,會(huì)被優(yōu)先使用,接收更多的新數(shù)據(jù);整個(gè)擴(kuò)容過(guò)程不涉及任何已有數(shù)據(jù)的拷貝和搬移。

個(gè)推消息推送基于 Apache Pulsar的優(yōu)先級(jí)隊(duì)列方案

Broker 故障

Pulsar 在單個(gè)節(jié)點(diǎn)失敗時(shí)也會(huì)體現(xiàn)同樣的優(yōu)勢(shì)。如果 Pulsar 的某個(gè)服務(wù)節(jié)點(diǎn) broker 失效,由于 broker 是無(wú)狀態(tài)的,其他的 broker 可以很快接管 Topic,不會(huì)涉及 Topic 數(shù)據(jù)的拷貝;如果存儲(chǔ)節(jié)點(diǎn) Bookie 失效,在集群后臺(tái)中,其他的 Bookie 會(huì)從多個(gè) Bookie 節(jié)點(diǎn)中并發(fā)讀取數(shù)據(jù),并對(duì)失效節(jié)點(diǎn)的數(shù)據(jù)自動(dòng)進(jìn)行恢復(fù),對(duì)前端服務(wù)不會(huì)造成影響?! ?/p>

個(gè)推消息推送基于 Apache Pulsar的優(yōu)先級(jí)隊(duì)列方案

Bookie 故障

Apache BookKeeper 中的副本修復(fù)是 Segment(甚至是 Entry)級(jí)別的多對(duì)多快速修復(fù)。這種方式只會(huì)復(fù)制必須的數(shù)據(jù),這比重新復(fù)制整個(gè)主題分區(qū)要精細(xì)。如下圖所示,當(dāng)錯(cuò)誤發(fā)生時(shí), Apache BookKeeper 可以從 bookie 3 和 bookie 4 中讀取 Segment 4 中的消息,并在 bookie 1 處修復(fù) Segment 4。所有的副本修復(fù)都在后臺(tái)進(jìn)行,對(duì) Broker 和應(yīng)用透明。

當(dāng)某個(gè) Bookie 節(jié)點(diǎn)出錯(cuò)時(shí),BookKeeper 會(huì)自動(dòng)添加可用的新 Bookie 來(lái)替換失敗的 Bookie,出錯(cuò)的 Bookie 中的數(shù)據(jù)在后臺(tái)恢復(fù),所有 Broker 的寫(xiě)入不會(huì)被打斷,而且不會(huì)犧牲主題分區(qū)的可用性?! ?/p>

個(gè)推消息推送基于 Apache Pulsar的優(yōu)先級(jí)隊(duì)列方案

四.基于 Pulsar 的優(yōu)先級(jí)隊(duì)列方案  

在設(shè)計(jì)思路上,Pulsar 方案和 Kafka 方案并沒(méi)有多大區(qū)別。但在新方案中,個(gè)推技術(shù)團(tuán)隊(duì)借助 Pulsar 的特性,解決了 Kafka 方案中存在的問(wèn)題。

1. 根據(jù) task 動(dòng)態(tài)生成 Topic,保證了后進(jìn)入的 task 不會(huì)因?yàn)槠渌?task 消息推送的堆積而造成等待情況。

2. 中高優(yōu)先級(jí) task 都獨(dú)享一個(gè) Topic,低優(yōu)先級(jí) task 共享 n 個(gè) Topic。

3. 相同優(yōu)先級(jí)內(nèi),各個(gè) task 輪詢讀取消息,配額滿后流轉(zhuǎn)至下一個(gè)優(yōu)先級(jí)。

4. 相同優(yōu)先級(jí)內(nèi), 各個(gè) task 可動(dòng)態(tài)調(diào)整 quota,在相同機(jī)會(huì)內(nèi),可讀取更多消息推送的內(nèi)容。

5. 利用 Shared 模式, 可以動(dòng)態(tài)添加刪除 consumer,且不會(huì)觸發(fā) Rebalance 情況。

6. 利用 BookKeeper 特性,可以更靈活的添加存儲(chǔ)資源?! ?/p>

個(gè)推消息推送基于 Apache Pulsar的優(yōu)先級(jí)隊(duì)列方案

Pulsar 引入了 ApacheBookKeeper 作為存儲(chǔ)層,BookKeeper 是一個(gè)專門(mén)為實(shí)時(shí)系統(tǒng)優(yōu)化過(guò)的分布式存儲(chǔ)系統(tǒng),具有可擴(kuò)展、高可用、低延遲等特性。具體介紹,請(qǐng)參考[BookKeeper官網(wǎng)]

五.Pulsar 其他實(shí)踐

1. 不同 subscription 之間相對(duì)獨(dú)立,如果想要重復(fù)消費(fèi)某個(gè) Topic 的消息,需要使用不同的 subscriptionName 訂閱;但是一直增加新的 subscriptionName,backlog 會(huì)不斷累積。

2. 如果 Topic 無(wú)人訂閱,發(fā)給它的消息默認(rèn)會(huì)被刪除。因此如果 producer 先發(fā)送,consumer 后接收,一定要確保 producer 發(fā)送之前,Topic 有 subscription 存在(哪怕 subscribe 之后 close 掉),否則這段時(shí)間發(fā)送的消息會(huì)導(dǎo)致無(wú)人處理。

3. 如果既沒(méi)有人發(fā)送消息,又沒(méi)有人訂閱消息,一段時(shí)間后 Topic 會(huì)自動(dòng)刪除。

4. Pulsar 的 TTL 等設(shè)置,是針對(duì)整個(gè) namespace 起效的,無(wú)法針對(duì)單個(gè) Topic。

5. Pulsar 的鍵都建立在 zookeeper 的根目錄上,在初始化時(shí)建議增加總節(jié)點(diǎn)名。

6. 目前 Pulsar 的 java api 設(shè)計(jì),消息默認(rèn)需要顯式確認(rèn),這一點(diǎn)跟 Kafka 不一樣。

7. Pulsar dashboard 上的 storage size 和 prometheus 上的 storage size (包含副本大小) 概念不一樣。

8. 把 `dbStorage_rocksDB_blockCacheSize` 設(shè)置的足夠大;當(dāng)消息推送的體量大,出現(xiàn) backlog 大量堆積時(shí), 使用默認(rèn)大小(256M)會(huì)出現(xiàn)讀耗時(shí)過(guò)大情況,導(dǎo)致消費(fèi)變慢。

9. 使用多 partition,提高吞吐。

10. 在系統(tǒng)出現(xiàn)異常時(shí),主動(dòng)抓取 stats 和 stats-internal,里面有很多有用數(shù)據(jù)。

11. 如果業(yè)務(wù)中會(huì)出現(xiàn)單 Topic 體量過(guò)大的情況,建議把 `backlogQuotaDefaultLimitGB` 設(shè)置的足夠大(默認(rèn)10G), 避免因?yàn)槟J(rèn)使用 `producer_request_hold` 模式出現(xiàn) block producer 的情況;當(dāng)然可以根據(jù)實(shí)際業(yè)務(wù)選擇合適的 `backlogQuotaDefaultRetentionPolicy`。

12. 根據(jù)實(shí)際業(yè)務(wù)場(chǎng)景主動(dòng)選擇 backlog quota。

13. prometheus 內(nèi)如果發(fā)現(xiàn)讀耗時(shí)為空情況,可能是因?yàn)橹苯幼x取了緩存數(shù)據(jù);Pulsar 在讀取消息推送內(nèi)容時(shí)會(huì)先讀取 write cache, 然后讀取 read cache;如果都沒(méi)有命中, 則會(huì)在 RocksDB 中讀取條目位子后,再?gòu)娜罩疚募凶x取該條目。

14. 寫(xiě)入消息時(shí), Pulsar 會(huì)同步寫(xiě)入 journal 和 write cache;write cache 再異步寫(xiě)入日志文件和 RocksDB;所以有資源的話,建議 journal 盤(pán)使用SSD。

總結(jié)

現(xiàn)在, 個(gè)推針對(duì)優(yōu)先級(jí)中間件的改造方案已經(jīng)在部分現(xiàn)網(wǎng)業(yè)務(wù)中試運(yùn)行,希望通過(guò)這樣的模式可以解決消息推送下發(fā)中的一些問(wèn)題,而對(duì)于 Pulsar 的穩(wěn)定性,我們還在持續(xù)關(guān)注中。

作為一個(gè) 2016 年才開(kāi)源的項(xiàng)目, Pulsar 擁有非常多吸引人的特性,也彌補(bǔ)了其他競(jìng)品的短板,例如跨地域復(fù)制、多租戶、擴(kuò)展性、讀寫(xiě)隔離等。盡管在業(yè)內(nèi)使用尚不廣泛, 但從現(xiàn)有的特性來(lái)說(shuō), Pulsar 表現(xiàn)出了取代 Kafka 的趨勢(shì)。在使用 Pulsar 過(guò)程中,我們也遇到了一些問(wèn)題, 在此特別感謝翟佳和郭斯杰(兩位均為StreamNative的核心工程師、開(kāi)源項(xiàng)目Apache Pulsar的PMC成員)給我們提供的支持和幫助。

參考文獻(xiàn):

[1] [比拼 Kafka, 大數(shù)據(jù)分析新秀 Pulsar 到底好在哪]

[2] [開(kāi)源實(shí)時(shí)數(shù)據(jù)處理系統(tǒng)Pulsar:一套搞定Kafka+Flink+DB

極客網(wǎng)企業(yè)會(huì)員

免責(zé)聲明:本網(wǎng)站內(nèi)容主要來(lái)自原創(chuàng)、合作伙伴供稿和第三方自媒體作者投稿,凡在本網(wǎng)站出現(xiàn)的信息,均僅供參考。本網(wǎng)站將盡力確保所提供信息的準(zhǔn)確性及可靠性,但不保證有關(guān)資料的準(zhǔn)確性及可靠性,讀者在使用前請(qǐng)進(jìn)一步核實(shí),并對(duì)任何自主決定的行為負(fù)責(zé)。本網(wǎng)站對(duì)有關(guān)資料所引致的錯(cuò)誤、不確或遺漏,概不負(fù)任何法律責(zé)任。任何單位或個(gè)人認(rèn)為本網(wǎng)站中的網(wǎng)頁(yè)或鏈接內(nèi)容可能涉嫌侵犯其知識(shí)產(chǎn)權(quán)或存在不實(shí)內(nèi)容時(shí),應(yīng)及時(shí)向本網(wǎng)站提出書(shū)面權(quán)利通知或不實(shí)情況說(shuō)明,并提供身份證明、權(quán)屬證明及詳細(xì)侵權(quán)或不實(shí)情況證明。本網(wǎng)站在收到上述法律文件后,將會(huì)依法盡快聯(lián)系相關(guān)文章源頭核實(shí),溝通刪除相關(guān)內(nèi)容或斷開(kāi)相關(guān)鏈接。

2019-08-19
個(gè)推消息推送基于 Apache Pulsar的優(yōu)先級(jí)隊(duì)列方案
作者:個(gè)推平臺(tái)研發(fā)工程師祥子一.業(yè)務(wù)背景在個(gè)推的消息推送場(chǎng)景中,消息隊(duì)列在整個(gè)系統(tǒng)中占有非常重要的位置。

長(zhǎng)按掃碼 閱讀全文