阿里實時計算Blink核心技術
實時計算in阿里巴巴
實時計算在阿里巴巴內部應用廣泛。隨著新經濟體的出現與發展,技術的革新和使用者需求的提升,人們越來越需要實時計算的能力,它的最大好處就是能夠基於實時變化資料更新大資料處理的狀態和結果。接下來,舉兩個例子來闡釋實時計算在阿里內部應用的場景:
雙11大屏
每年雙11阿里都會聚合有價值的資料展現給媒體,GMV大屏是其中之一。整個GMV大屏是非常典型的實時計算,每條交易資料經過聚合展現在大屏之上。從DataBase寫入一條資料開始,到資料實時處理寫入HBase,最後展現在大屏之上,整個過程的鏈路十分長。整個應用存在著許多挑戰:
- 大屏展現需要秒級延遲,這需要實時計算延遲在亞秒級別
- 雙11大量資料需要在一個Job中聚合完成
- Exactly-Once 保持資料計算的精確性
- 系統高可用,不存在卡頓和不可用的情況
這個應用場景的SLA非常高,要求秒級延遲和資料的精確性,但它的計算並不複雜,接下來介紹更為複雜的應用。
實時機器學習
機器學習一般有兩個重要的元件:Feature 和Model。傳統的機器學習使用批計算對Feature的採集和Model的訓練,這樣更新頻率太低,無法適應資料在不斷變化的應用的需求。例如在雙11時,商品的價格、活動的規則與平時完全不同,依據之前的資料進行訓練得不到最優的效果。因此,只有實時收集Feature並訓練Model,才能擬合出較為滿意的結果。為此,我們開發了實時機器學習平臺。
此實時機器學習平臺主要包括兩個部分:實時Feature計算和實時Model計算。這套系統同樣擁有很多挑戰,具體如下:
- 機器學習需要採集各種各樣Metrics,存在許多DataSource
- 維度多,如使用者維度、商品維度。維度的疊加甚至是笛卡兒積導致最後的Metrics是海量的,State非常巨大
- 機器學習計算複雜,耗用大量CPU
- 某些資料不能存在State中,需要外部儲存,存在大量外部IO
實時A/B Testing
演算法工程師在調優Model時會涉及多種Model,不同的Model有不同的計算模式和方法,產生不同的計算結果。因此,往往會有不同的Query訂閱實時資料,產生結果後根據使用者回饋迭代Model,最終得到最優模型。A/B Tesing的挑戰在於演算法工程師往往計算很多Metrics,所有的Metrics都通過實時計算進行統計會浪費大量資源。
針對這個挑戰,我們設計了A/B Testing的框架開發平臺。它用來同步演算法工程師感興趣的Metrics進行聚合,收集起來併發送到Druid引擎。這樣,演算法工程師根據不同Job的要求清洗資料到Druid,最後在Druid之上對不同的Metrics進行統計分析,從而找到最優的演算法Model。
總結
綜上,實時計算在阿里巴巴內部存在如下挑戰:
- 業務龐大,場景多,大量的機器學習需求,這些因素一起導致了計算邏輯十分複雜
- 資料量大,作業多,因此整個實時計算的機器規模十分巨大
- 要保障低延遲和資料精確性,同時要滿足高吞吐量的需求
流計算的介紹
顯然批量計算模型是無法滿足當前大資料實時計算需求的,只有流式計算模型才是實時計算的天然計算模型,因此我先介紹下流式計算的基本思想,尤其是區別於傳統批量計算的一些概念。批量計算是對於有限固定的資料集合進行處理,流式計算是對無限資料流的處理,即計算無法確定資料何時會結束。從另一個角度看,批量計算其實也可以認為是流式計算的一種特例,因此批量計算可以看做是一個數據流中的片段,即有明確開始和結束標記的資料流,如下圖所示:
完善的流式計算不僅應該提供實時計算能力,還應該支援計算過程中的狀態管理,狀態主要是指計算過程中需要的資料或者變數,例如:統計計算中的aggregation(sum/min/max…),機器學習中的feature和model,狀態管理包括這些資料的儲存、備份、恢復,版本管理,提供讀寫訪問API,並保證一致性,如下圖所示:
此外,完善的流計算還需要考慮資料的時序問題,因為現實場景中,資料的產生順序和接收順序未必一致,因此需要給資料附帶時間戳屬性,即:event time,計算邏輯可以按照資料的event time來處理,這樣可以解決資料的亂序問題,配合watermark機制,可以較好的解決time window計算,如下圖所示:
流和批統一的計算引擎
完整的生態系統
狀態管理和一致性
Chandy-Lamport演算法是Flink支援狀態管理和強一致性的核心理論基礎,演算法基礎思想如下圖所示:
Chandy-Lamport演算法的核心思想就是定期在流式計算任務中插入Barrier,然後觸發整個流做一次Checkpoint,即將任務的State做一次Snapshot持久化儲存。在下次任務重啟的時候,可以基於上次成功的Checkpoint進行恢復,過程如下圖所示:
Flink的選定及優化
為了應對上述挑戰,我們調研了許多計算框架,最終選定Flink,原因如下:
- Flink很好地引入和設計了State,基於State複雜的邏輯計算如join能得到很好的描述
- Flink引入了Chandy-Lamport 演算法,在此演算法的支撐下可以完美實現Exactly-Once,並能在低延遲下實現高吞吐量。
然而,Flink在State、Chandy-Lamport 演算法等方面還有很多缺陷,為此阿里開闢了名為Blink的專案。
Blink是開源Flink與阿里巴巴Improvement的結合,主要分兩大塊:
BlinkRuntime
包括儲存、排程和計算,不同公司在使用Flink時,儲存、排程以及底層優化等方面會有諸多不同,阿里巴巴的blink內部也對Runtime做了諸多個性化的優化,這一層不好與Apache Flink社群統一,我們稱之為Blink Runtime。
Flink SQL
原生的Flink只有比較底層的DataStream API,使用者在使用時需要設計實現大量的程式碼,此外DataStream本身也有設計上的缺陷。為了方便使用者使用,阿里巴巴團隊設計了流計算的Flink SQL並推回了社群。取名Flink SQL而不是Blink SQL,主要原因Blink和Flink在SQL這個使用者API上面是完全和社群統一的,另外Apache Flink的大部分功能都是阿里巴巴貢獻的,所以說Flink SQL就是Blink SQL,沒有特別大的區別。
Blink介紹
在2015年,當時我們還是阿里巴巴搜尋事業部的資料技術團隊,負責阿里巴巴所有商品搜尋後臺的資料處理,包括淘寶,天貓,B2B等全球商品,面對海量商品的資料處理,我們需要在維護兩套資料處理流程,一套是每天晚上的全量流程,同時還要一套白天的實時增量流程,為了降低開發和維護成本,我們開始探索一套流和批統一的計算引擎。
當時我們重點分析對比了Spark和Flink兩套技術,最後雖然覺得Spark相對成熟穩定,但Spark是從Batch出發,模擬Streaming,而Flink正好相反是從Streaming出發,認為Batch是Streaming的Special Case,因此我們感覺Flink的設計思想更先進,更適合未來的計算髮展方向,更適合我們的需求,因此我們決定選擇Flink技術方向。
雖然Flink具備流計算的各種優勢,但Flink在成熟度和活躍度上的不足,使得我們無法在阿里巴巴業務場景中直接使用,因此我們啟動了Blink專案,目標就是擴充套件、優化、完善Flink,使其能夠應用在阿里巴巴大規模實時計算場景,並將我們在阿里巴巴對Flink的改進都回饋給開源社群。
最近一年中Blink已經將多項架構、功能和效能改進貢獻給Flink社群,例如:
- Flink架構升級,外掛化原生支援不同調度系統,並實現了原生執行在Hadoop YARN上
- Failover穩定性改進,優化了Task/TaskManager以及JobManager各種元件Fail的場景處理
- 提出並實現了增量式Checkpoint的架構,使得Flink的Checkpoint/Recovery速度大幅提升,成本明顯下降
- 提出並實現了Async Operator,通過非同步方式,讓I/O密集型計算節點的效能大幅提升
- 提出了大量Table API的全新設計,以及流和批在SQL層面的統一概念和方案
BlinkRuntime核心優化解密
部署和模型的優化
優化包含以下幾點:
- 解決大規模部署問題。Flink中一個Cluster只有一個JobMaster來管理所有的Job。隨著Job的不斷增加,單一的Master無法承接更多的Job,產生了瓶頸。因此,我們重構了架構,使每一個Job擁有自己的Master。
- 早期的Flink中TaskManager管理很多Task,某一個Task的問題會導致TaskManager崩潰,進而影響其他Job。我們使每一個Job擁有自己的TaskManager,增強了Job的隔離。
- 引入ResourceManager。ResourceManager可以和JobMaster通訊,實時動態地調整資源,達到最優的叢集部署。
- 我們不僅將這些優化應用在YarnCluster上,還應用到Mesos和Standalone的部署上。
有了這些工作,Flink就可以應用到大規模的叢集部署。
Incremental Checkpoint
實時計算需要不停的在checkpoint的時候來保留計算狀態。早期的Flink的checkpoint的設計存在缺陷,在每個checkpoint發生的時候,它會讀取所有舊的狀態資料,和新的資料合併後按照全量的方式寫入磁碟。隨著State的不斷增大,在每次做checkpoint的時候所需要的資料讀取和寫入的量級是十分巨大。 這就導致Job的checkpoint的間隔需要設定的很大,不能小於1分鐘。越大的checkpoint的間隔, failover的時候回退的計算就越大,造成的資料延遲也就越嚴重。
為了減少checkpoint間隔,我們提出了Incremental Checkpoint的設計。概括的說就是在checkpoint的時候只儲存增量的state變化的資料。由於歷史上每個checkpoint的資料都已經儲存,後面的checkpoint只需要將不同的資料放入儲存,這樣每次做checkpoint需要更新的資料量就非常小,使得checkpoint可以在若干秒級內完成,這就大大減小了failover時可能引起的延遲。
非同步IO
很多時候我們不得不將資料放在外部儲存中,這樣在計算過程中就需要通過網路IO讀取資料。傳統的方式使用 Sync-IO的讀取方式,在發出資料請求之後,只有等待到結果返回之後才能開始下一個資料請求,這種做法造成了CPU資源的浪費,因為CPU在大多數情況下都在等待網路IO的請求返回。Sync-IO使得CPU的資源利用率無法提高到極致,也就大大影響了單位CPU下的計算吞吐。為此提升計算吞吐,我們設計了Async-IO的資料讀取框架,它允許非同步地多執行緒地讀取資料。
每次資料請求發出後不需要等待資料返回就繼續傳送下一個資料請求。當資料請求從外部儲存返回後,計算系統會呼叫callback方法處理資料。如果資料計算不需要保序,資料返回之後就會快速經過計算髮出。如果使用者需要資料的計算保序時,我們使用buffer暫時儲存先到的資料,等前部資料全部到達後再批量地傳送。在使用了Async-IO之後,根據設定的buffer大小不同計算吞吐可以提升幾十倍甚至幾百倍,這就極大地提升了單位CPU利用率和整體的計算效能。
值得一提的是,以上所述的所有Blink Runtime優化已經全部貢獻給了Apache Flink社群。
Flink SQL核心功能解密
阿里完成Apache Flink SQL 80%研發工作
目前,Apache Flink SQL 80%的功能是阿里巴巴實時計算團隊貢獻的,包括兩百個提交和近十萬行程式碼。使用Flink SQL的原因是因為我們發現了底層API給使用者的遷移、上線帶來的極大不便。那麼,我們又為什麼選擇SQL?主要原因如下:
- SQL是十分通用的描述性語言,SQL適合用來讓使用者十分方便的描述Job的需求。
- SQL擁有比較好的優化框架,使得使用者只需要專注於業務邏輯得設計而不用關心狀態管理,效能優化等等複雜得設計,這樣就大大降低了使用門檻。
- SQL易懂,適合不同領域的人使用。使用SQL的使用者往往都不需要特別多的計算機程式設計基礎,從產品設計到產品開發各種人員都可以快速掌握SQL的使用方法。
- SQL的API十分穩定,在做機構升級,甚至更換計算引擎時都不用修改使用者的Job而繼續使用。
- 有些應用場景需要流式更新,批式驗證。使用SQL可以統一批計算和流計算的查詢query。真正實現一個Query,同樣的結果。
流處理 VS 批處理
要想設計和批處理統一的流計算SQL,就要了解流處理和批處理的區別。兩者的核心區別在於流處理的資料是無窮的而批處理的資料是有限的。這個本質區別又引入另外三個更具體的區別:
- 流處理會不斷產生結果而不會結束,批處理往往只返回一個最終結果並且結束。比方說,如果要統計雙11的交易金額,使用批處理計算就要在雙11當天的所有交易結束後,再開始計算所有買家花費的總金額並得到一個最終數值。而流處理需要追蹤實時的交易金額,實時的計算並更新結果。
- 流計算需要做checkpoint並保留狀態,這樣在failover的時候能夠快速續跑。而批計算由於它的輸入資料往往是被持久化儲存過的,因此往往不需要保留狀態。
- 流資料會不斷更新,例如某一買家的花費總金額在不斷變化,而批處理的資料是一天花費的總金額,是固定的,不會變化的。流資料處理是對最終結果的一個提前觀測,往往需要把提前計算的結果撤回(Retraction)做更改而批計算則不會。
Query Configuration
上面提到的這些區別都不涉及使用者的業務邏輯,也就是說這些區別不會反應在SQL的不同。我們認為這些區別只是一個job的屬性不同。為了描述流計算所特有的一些屬性,例如什麼時候產生流計算結果和怎麼保留狀態,我們設計容許使用者配置的Query Configuration,它主要包括兩個部分:
- Latency SLA。定義了從資料產生到展現的延遲,如雙11大屏是秒級別。使用者根據自己的需要配置不同SLA,我們的SQL系統會根據SLA的要求做最好的優化,使得在滿足使用者需求的同時達到系統性能的最優。
- State Retention/TTL。流計算是永不停止的,但是流資料中的State往往不需要保留很久,保留過久勢必對儲存是個浪費,也極大的影響了效能。所以我們容許使用者設定合理的TTL(過期時間)來獲得更好的計算效能。
我們通過Query Configuration描述了流和批所不同的一些屬性。接下來我們需要繼續考慮如何設計流式的SQL?
動態表(Dynamic-Table)
問題關鍵在於SQL在批處理中對錶操作而流資料中並沒有表。因此,我們建立了資料會隨著時間變化的動態表。動態表是流的另一種表現形式,它們之間具有對偶性,即它們可以互相轉換而不破壞資料的一致性。以下是一個例子:
如圖,左邊是輸入流,我們為每一條資料產生Dynamic-Table,再將Table的變化用Changelog傳送出去。這樣兩次變化後,輸入流和輸出流中的資料始終保持一致,這就證明了引入Dynamic-Table並沒有丟失語義和資料。
有了動態表的概念,我們就可以應用傳統SQL作用於流上。值得一提的是,Dynamic-Table是虛擬的存在著,它並不需要實際的儲存來落地。我們再來看一個例子:
如圖,當有輸入流的時候我們進行連續查詢。我們將Stream理解為一個Dynamic-Table,動態查詢是基於Dynamic-Table產生一個新的Dynamic-Table,如果需要新產生的Dynamic-Table還可以繼續產生流。這裡,因為加入了連續查詢的聚合計算,左右兩邊的流已經發生了變換。總之動態表的引入提供了我們在流上做連續SQL查詢的能力。
Stream SQL是沒有必要存在的
通過上面的討論,我們發現有了Dynamic-Table之後我們不需要再創造任何新的流式SQL的語義。因此我們得出這樣的結論:流式SQL是沒必要存在的。ANSI SQL完全可以描述Stream SQL的語義,保持ANSI SQL的標準語義是我們構建Flink SQL的一個基本原則。
ANSI SQL功能實現
基於上面的理論基礎,我們繼而實現了流計算所需要的若干ANSI SQL功能,包括:DML、DDL、UDF/UDTF/UDAF、連線Join、撤回(Retraction)、Window聚合等等, 除了這些功能之外,我們還做了大量的查詢優化,從而保障了Flink SQL即能滿足使用者的各種查詢的需求,同時兼具優異的查詢效能。接下來,簡要介紹其中幾項:
1)JOIN
流和動態表具有對偶性,一條SQL看似是Table的join,事實上是流的join。
例如Inner Join的實現原理如下:資料會從輸入的兩邊任意一條流而來,一邊資料先來會被存在State中並按照Joining key查詢另外一邊的State,如果存在就會輸出結果,不存在則不輸出,直到對面資料來了之後才產生結果。
總之,兩個流具有兩個state,一邊的資料到達後存下來等待另外一邊資料,全部到達後inner join產生結果。 除了兩條流的join之外,我們還引入了流和外部表的join。我們的機器學習平臺會把大量的資料儲存在HBase中,查詢HBase中的資料的操作實際上是在連線一個外部表。連線外部表往往存在兩個模式:
- Look up方式。流資料到達時即時地查詢外部表,從而得到結果。
- Snapshot方式。流資料到達時即時地傳送snapshot的版本資訊給外部儲存service從而查詢資料,外部表儲存根據版本資訊返回結果。
值得一提的是,我們設計的這個流和外部表關聯的這個功能沒有引入任何新的語法,是完全按照SQL-2011的標準實現的。同樣的查詢在批計算上也適用。
2)Retraction
撤回是流計算的重要概念,舉一個例子作解釋:計算詞頻
詞頻的計算是指對所有英文單詞統計頻率,並最終按照頻率統計不同頻率下不同單詞的個數。例如,如果一個統計的初始狀態只有Hello World Bark三個單詞,且每個單詞只出現一次,那麼詞頻的最終結果就是出現頻率為1的單詞有3個(出現頻率為其他次數的完全沒有),因此結果表只有一行“1——3”。當單詞不斷更新,再增加一個Hello時,因為Hello的出現頻率變為2次,我們在詞頻的結果表中插入“2——1”這麼一行新的資料。
顯然,出現兩次的單詞是一個,那麼“2——1”這個結果是對的,但是出現頻率為1次的單詞數已經錯了,應該是2個,而不是3個。出現這種問題的本質原因是因為流計算輸出的結果是對計算的一個提前觀測,隨著資料的不斷更新,計算結果必然會發生改變,這就要求我們對之前發生的結果做撤回(retraction)再把更新的結果發出去,不然資料結果就不錯誤。對於上面的例子,當Hello的頻率從1變到2的時候,我們不僅需要在結果表中插入“2——1”這麼一行,還需要對“1——3”這一行做撤回更新操作。
值得一提的是什麼時候需要撤回,什麼時候不需要,完全由SQL的Query Optimizer來判斷,這個使用者是完全不需要感知的,使用者只需要通過SQL描述他的業務計算邏輯就好了。如圖所示,第一個場景不需要撤回而第二個需要,這完全是由優化框架決定而非使用者 。這一點,大大體現了使用SQL,並利用SQL中所擁有的天然優化框架的好處。
3)Window聚合
Window聚合是Flink SQL的一個重要能力。圖中的這個例子我們對每一個小時的資料做聚合統計。除了這種Tumble window我們還支援了Sliding Window和Session Window。將來還會支援使用者自定義的window。
4)查詢優化Query Optimization
除了新增新的功能,我們還做了大量的查詢優化。例如micro-batching。如果沒有micro-batching,處理每一條資料就會伴隨著幾次IO讀寫。有了micro-batching之後我們可以用幾次IO處理來處理上千條資料。除此之外,我們還做了大量的的filter/join/aggregate pushdown以及TopN的優化,下面再舉例解釋TopN的優化:
如上圖,我們想取銷售量前三的city,對使用者的Query有兩種底層的實現:
- 一種方式是當沒一條資料來的時候,對儲存的所有city進行排序,再擷取前三個city。這種設計每條資料跟新都會重新排列所有city,勢必會造成大量計算資源浪費。
- 我們的Query Optimizer會自動識別到查詢語句,對這種計算做優化,真正執行過程中只需要不停的更新排前三的city就可以了,這樣大大優化了計算的複雜度,提升了效能。
阿里巴巴實時計算應用
阿里雲流計算開發平臺
一個是阿里雲流計算平臺(streamCompute),該平臺允許使用者編寫SQL,並在平臺內部除錯debug。除錯正確後,使用者可以通過這個平臺直接將作業釋出在阿里雲集群上部署,部署完成後後檢測運維上線的。因此這個平臺整合了所有實時計算的需求,集開發、Debug、上線部署、運維於一體,大大加速了使用者開發和上線的效率。值得一提的是,2017年雙11期間阿里集團絕大多數的實時計算Job均通過這個平臺釋出。我們今年9月開始,通過阿里雲,包括公共雲、專有云也將這個平臺開放給外部企業,讓他們能夠使用到阿里巴巴實時計算的能力。
阿里實時機器學習平臺Porsche
線上機器學習平臺利用了Blink強大的實時計算能力,能夠實時的對海量使用者和商品行為資料進行流式特徵提取以及訓練學習,並將實時更新的特徵和模型實時同步給線上的搜尋和推薦引擎,實現個性化搜尋和推薦,資料流程如下圖所示:
為了方便演算法同學開發機器學習任務,我們基於Flink SQL以及Hbase,設計實現了一個面向演算法人員、支援視覺化自助開發運維的線上機器學習平臺——Porsche。如上圖所示,使用者在Porsche平臺的IDE,通過視覺化的方式將元件拖入畫布中,配置好元件屬性,定義好完整的計算DAG。這個DAG會被翻譯成SQL,最終提交給Blink執行。另外,值得一提的是,Porsche平臺還支援Tensorflow,今年雙11也是大放異彩,本平臺免去了演算法同學學習使用SQL的成本,暫時只對內開放。
商品數索引構建流程
淘寶的搜尋引擎是使用者在淘寶購物的最主要入口,淘寶的商品資料處理和索引構建流程大致如下圖所示,淘寶的商品庫都儲存的阿里巴巴的MySQL叢集中,搜尋的資料處理流程需要從MySQL將資料同步到搜尋引擎後臺的HBase儲存中(類似:Google都將網頁抓取到BigTable中),然後進行各種業務邏輯處理和索引構建,最終將索引推送到線上搜尋引擎中提供搜尋服務。
由於淘寶的商品搜尋引擎需要在每天白天不斷進行實時商品更新,同時晚上還需要一套額外的全量商品更新流程,因此基於Blink的統一計算模型,流式計算和批量計算可以使用一套使用者邏輯程式碼完成。
Blink的未來
目前Blink已經在阿里巴巴內部達成共識,成為阿里巴巴統一的實時計算引擎,接下來我們將繼續加大在Blink技術發展上的投入,並與開源社群更加密切的合作,突進流式計算的發展。應用場景上,一方面會繼續擴大計算規模,並提推出內部統一實時計算服務,統一支援阿里內部的所有實時計算業務;另一方面也將會通過阿里雲的公有云和專有云渠道向外界輸出我們的實時計算能力,讓更多行業和使用者也都能享受到阿里巴巴實時計算的技術成果。
總之,Blink的實時計算之路剛剛開啟,未來必將有更大的挑戰和機遇,也非常歡迎各位對實時計算有興趣的技術愛好者以及高校學子們投身到這件開創新一代計算平臺的事情上來。
參考連結: