餓了麼:分散式時序資料庫 - LinDB
餓了麼對時序資料庫的需求主要來自各監控系統,主要用於儲存監控指標。原來使用graphite,後來慢慢有對指標有多維的需求(主要體現在對一個指標加多個Tag, 來組成Series,然後對Tag進行Filter和Group進行計算),這時graphite基本很難滿足需求。 業界現在用的比較多的主要有如下幾類TSDB: 免費獲取學習Java高架構、分散式架構、高可擴充套件、高效能、高併發、效能優化、Spring boot、Redis、ActiveMQ、Nginx、Mycat、Netty、Jvm大型分散式專案實戰學習架構師視訊免費獲取 架構群:863621962 InfluxDB:很多公司都在用,包括餓了麼有部分監控系統也是用InfluxDB。優點,支援多維和多欄位,儲存也根據TSDB的特點做了優化。但開源的部分不支援,很多公司自己做叢集化, 但大多基於指標名來,這樣會有單指的熱點問題。現在餓了麼也是類似的做法,但熱點問題很嚴重,大的指標已經用了最好的伺服器,但是查詢效能還是不夠理想, 如果做成按Series Sharding那成本還是有一點高; Graphite:根據指標寫入及查詢,計算函式很多,但很難支援多維,包括機房或多叢集的查詢,原來餓了麼把業務層的監控指標儲存在Graphite中,並工作的很好, 但是多活之後基本已經很難滿足一些需求了,由於其儲存結構的特點,很佔IO,根據目前線上的資料寫放大差不多幾十倍以上; OpenTSDB: 基於HBase,優點儲存層不用自己考慮,做好查詢聚合就可以,也會存在HBase的熱點問題等,在以前公司也弄基於HBase實現的TSDB,來解決OpenTSDB的一些問題, 如熱點,部分查詢聚合下放到HBase等,目的是優化其查詢效能,但依賴HBase/HDFS還是很點重; HiTSDB: 阿里提供的TSDB,儲存也是用HBase,在資料結構及Index上面做了很多優化,具體沒有研究,有興趣的同學可以在阿里雲上試一下; Druid: Druid其實是一個OLAP系統,但也可以用來儲存時間序列資料,但看到它的架構圖時已經放棄了; ES: 也有公司直接用ES來儲存,沒有實際測試,但總覺得ES不是一個真正的TSDB; atlas: Netflix出品,全記憶體TSDB,最近幾小時資料全在記憶體中,歷史資料需要外部儲存,具體沒有詳細研究; beringei:facebook出品,全記憶體TSDB,跟atlas一樣最近的資料在記憶體,目前應該還在孵化期; 3. 最終我們還是決定自己實現一套分散式時序資料庫,具體需要解決如下問題: 輕量,目前只依賴於Zookeeper; 基於Series進行Sharding,解決熱點,可以真正水平擴充套件; 實時寫入,實時查詢,由於大多用於監控系統,所以查詢效能要好; 由於餓了麼目前是多活,監控系統也是多活,所以要支援單機房寫入,多機房聚合查詢等; 自動的Rollup功能,如使用者可以寫10s的精度,系統自動Rollup到分鐘,小時,天級別,以支援大時間範圍的查詢,如報表等; 支援類SQL的查詢方式; 支援多副本,以提高整個系統的可靠性,正常只要還有一個副本存活就可以正常提供服務,副本數指定; 整體設計 採用計算和儲存分離的架構,分為計算層LinProxy和儲存層LinStorage。 說明: LinProxy主要做一些SQL的解析,及一些中間結合的再聚合計算,如果不是跨叢集,LinProxy可以不需要,對於單叢集的每個節點都內嵌了一個LinProxy來提供查詢服務; LinDB Client主要用於資料的寫入,也有一些查詢的API; LinStorage的每個節點組成一個叢集,節點之進行復制,並有副本的Leader節點提供讀寫服務,這點設計主要是參考Kafka的設計,可以把LinDB理解成類Kafka的資料寫入複製+底層時間序列的儲存層; LinMaster主要負責database、shard、replica的分配,所以LinStorage儲存的排程,及MetaData(目前儲存Zookeeper中)的管理; 由於LinStorage Node都是對等的,所以我們基於Zookeeper在叢集的節點的選一個節點成為Master,每個Node把自身的狀態以心跳的方式上報到Master上,Master根據這些狀態進行排程, 如果Master掛了,自動再選一個Master出來,這個過程基本對整個服務是無損的,所以使用者基本無感知。 寫入 整個寫過程分為如下2部分組成: WAL複製,這部分設計上參考了Kafka,使用者的寫入只要寫入WAL成功,就認為成功(由於主要用於監控系統,所以對資料的一致性沒有做太多的保證),這樣就可以提供系統的寫入吞吐; 本地寫入,這個過程是把WAL的資料解析寫入到自己的儲存結構中,只有寫入本地儲存的資料才可以查到; 整個過程不像一些系統在每次寫的過程中完成,我們是把這個過程分2步,並非同步化了; WAL複製 目前LinDB的replica複製協議採用多通道複製協議,主要基於WAL在多節點之間的複製,WAL在每個節點上的寫入,有獨立的寫操作完成, 所以對於Client寫入對應Leader的WAL成功就認為本次寫操作是成功的,Leader所在的節點負責把相應的WAL複製到對應的follower, 同理寫WAL成功認為複製成功,如下所示: 多通道複製協議 寫入Leader副本成功就算成功以提高了寫入速率,也帶來了以下問題: 資料一致性的問題 資料的丟失問題 以上圖Server1為Leader,3個Replication來複制1-WAL為舉例來說: 當前Server1是該shard的Leader接受Client的寫入,Server2和Server3都是Follower接受Server1的複製請求,此時1-wal通道作為當前的資料寫入通道, Server2和Server3此時可能落後於Server1。 說明: 整個過程需要注意以下幾個Index; Client寫入時的Append Index,表示當前Client寫入到哪裡; 對應每個Follower都會有一個Replica Index,表示對應Follower消費Leader上面同步到哪裡; Follower的Ack Index,表示Follower已經成功複製到本地的WAL; 對於Follower的複製請求,其實相當於一個特殊Client的寫入,所以也有一個對應的Append Index; 只有被Ack過的Index,才標示為已經處理完成,對於Leader來說,小於最小的Ack Index的WAL資料是可以被刪除; 在這個過程中,如果Server2或者Server3中有一臺出問題,這時對應的Consume Index不會移動,只有等到相應服務恢復之後,繼續處理; 在整個過程中可能出現如下情況的可能; Leader Replica Index > Follower Append Index,這時需要根據Follower Append Index重置Leader Replica Index,可能存在2種情況,具體情況在複製順序性中描述; Leader Replica Index < Follower Append Index,也同樣存在2種情況,具體情況在複製順序性中描述; 假如此時Server1掛了,從Server2和Server3中選出新的Leader,如此時選為Server2為Leader。 Server2就會開啟2-wal複製通道,向server1和server3複製,由於當前server1掛了,所以暫時只往Server3複製,此時資料的寫入通道為2-wal。 Server1啟動恢復後,Server2會開啟向Server1的2-wal複製通道,同時server1會將1-wal中剩餘的還未向Server2和Server3複製的資料複製給他們。 對於異常情況,WAL中的資料不能正常由於ACK之後刪除,導致WAL佔用過多磁碟,所以對WAL需要有一個SIZE和TTL的清理過程,一旦因為WAL因為SIZE和TTL清理之後,會導致幾個Index錯亂,具體錯亂情況如上所述。 多通道複製協議帶來的問題: 每個通道都有對應的index序列,儲存每個通道的last index。而單通道複製只需要儲存1個last index即可。這個代價其實還好。 本地寫入 背景 做到Shard級別的寫入隔離,即每個Shard都會有獨立的執行緒來負責寫入,不會因為某個資料庫或者某個Shard寫入量具增而導致別的資料庫的寫入, 但可能會因為單機承載的Shard數過多,導致執行緒數過多,如果遇到這種情況,應該通過擴機器來解決,或者在新建資料庫的時候,合理分配Shard數。 由於是單執行緒的寫操作,所以在很多情況下,不需要考慮多執行緒寫帶來的鎖競爭問題。 資料儲存結構 說明,以單個數據庫在單節點上的資料結構如例: 一個數據庫在單節點上會存在多個Shard,所有Shard共享一個索引資料; 所有的資料根據資料庫的Interval來計算按時間片來儲存具體的資料包括資料檔案和索引檔案。 這樣的設計主要為了方便處理TTL,資料如果過期,直接刪除相應的目錄就可以; 每個shard下面會存在segment,segment根據interval來儲存相應時間片的資料; 為什麼每個segment下面又按interval儲存很多個data family?這個主要由於LinDB主要解決的問題是儲存海量的監控資料,一般的監控資料基本是最新時間寫入, 基本不會寫歷史資料,而整個LinDB的資料儲存類似LSM方式,所以為了減少資料檔案之間的合併操作,導致寫放大,所以最終衡量下來,再對segment時間片進行分片。 下面以interval為10s為例說明: segment按天來儲存; 每個segment按小時來分data family,每個小時一個family,每個family中的檔案再按列儲存具體的資料。 寫入流程 說明: 系統會為每一個Shard啟一個寫執行緒,該執行緒負責這個Shard的所有寫操作。 首先把measurement, tags, fields對應的資料寫入資料庫的索引檔案,並生成相應的measurement id, time series id及field id,主要完成string->int的轉換。 這樣的好處是所有的資料儲存都以資料型別來儲存,從而可以減少整個儲存大小,因為對於每個資料點,measurement/tags/field這樣元資料佔用,如cpu{host=1.1.1.1} load=1 1514214168614, 其實轉換成id之後,cpu => 1(measurement id), host=1.1.1.1 => 1(time series id), load => 1(field id),所以最終的資料儲存為1 1 1514214168614=>1,這個考慮OpenTSDB的設計。 如果寫索引失敗,認為本次寫入失敗,失敗分為2種,一種是資料寫入格式有問題,這類失敗直接標示失敗,另外一種由於內部問題,這時寫入失敗需要重試。 使用根據索引得到的ID,再結合寫入時間和資料庫Interval計算得到需要寫入到哪個segment下的哪個family,寫family的過程,直接寫記憶體以達到高吞吐量的要求, 記憶體資料到達記憶體限制之後,會觸發Flush操作。 整個寫過程先寫記憶體,再由Flusher執行緒把記憶體中的資料dump到相應的檔案中,這樣就做到了對一批資料順序寫入,同時對於最近的資料根據Field Type進行Rollup操作,從而進一步減少磁碟IO操作。 查詢引擎 LinDB查詢需要解決如下問題: 解決多個機房之間的查詢; 高效的流式查詢計算; 說明: 由於需要支援多機房或者多叢集的查詢,所以引入了LinProxy,LinProxy主要負責面向使用者的查詢請求; SQL Plan負責具體SQL的解析,生成最終的執行計劃及需要計算的中間結果的函式; 通過Zookeeper中的Metadata,把請求路由給具體的LinDB叢集中對應的服務; 每個LinConnect負責與一個LinDB叢集之間的通訊,每個LinConnect內部儲存了一份對應叢集的Metadata,該Metadata資訊在每個Metadata變更的時候有Server端推送給LinConnect, 這樣LinConnect基本做到近實時的更新Metadata; Aggregator Stream主要負責把各個LinConnect的中間結果進行最終的合併計算操作; 整個LinProxy處理過程都是非同步化,這樣可以利用執行緒在IO等待的時候可以做計算; 每個Node接收LinConnect過來的請求,在內部查詢計算成中間結果返回給LinConnect,詳細的過程後面要介紹; Node查詢 說明: 如果所示,Client過來的一個查詢請求,會產生很多小的查詢任務,每個任務所承擔的職責很單一,只做它所自己的任務,然後把結果給下一個任務, 所以需要所有的查詢計算任務都是異常無阻塞處理,IO/CPU任務分離; 整個服務端查詢使用Actor模式來簡化整個Pipeline的處理; 任何一個任務執行完成,如果沒有結果產生,則不會生產下游的任務,所有下游的任務都是根據上游任務是否有結果來決定; 最終把底層結果,通過Reduce Aggregate聚合成最終的結果; 儲存結構 倒排索引 倒排索引,分兩部分,目前索引相關的資料還是儲存在RocksDB中。 根據Time Series的Measurement+Tags生成對應的唯一ID(類似luence裡面的doc id)。 根據Tags倒排索引,指向一個ID列表。TSID列表以BitMap的方式儲存,以方便查詢的時候通過BitMap操作來過濾出想要的資料。BitMap使用RoaringBitMap。 每一類資料都儲存在獨立的RocksDB Family中。 記憶體結構 為了提高寫入效能,把當前一段時間的資料寫入到記憶體中,記憶體到達一定限制或者時間後把記憶體中的資料Dump到檔案中。 記憶體儲存分為當前可寫和不可寫,當前可寫用於接入正常的資料寫入,不可寫用入Dump到檔案中,如果Dump成功,則清空不可寫部分。 如果可寫部分也到在寫入限制,但不可寫部分還沒有完成Dump,這時寫入會被Block住,直到有可用的記憶體供資料寫入,目的是為了不會因為佔用過多記憶體而導致OOM。 MemoryTable內部通過一個Map來儲存Measurement ID->Measurement Store關係,即每個Measurement都儲存在一個獨立的Store中。 在Measurement Store記憶體儲對應Measurement下面每個TSID的資料,每個TSID對應的資料用一個Memory Block來儲存,每個Memory Block按TSID的順序儲存在Array List中,把TSID儲存在一個BitMap中,通過TSID在Bitmap中位置來定位Memory Block在Array List中的具體位置,這裡說明一下為什麼不直接使用Map來儲存,因為整個系統是用Java實現的,Java中的Map結構,不適合儲存小物件的資料,存在記憶體放多倍的儲存。 由於每個TSID都會對應一個時間線,每個時間線可能會存在多個數據點的情況,如count時只有一個count值,timer時會有count/sum/min/max等多個值。每個資料型別以Chunk的方式儲存。Chunk內部又以堆內和堆外2部分記憶體來儲存,最近一段時間的資料放在堆內,歷史資料壓縮之後放在堆外,在記憶體中儘量多放一些最近的資料,因為LinDB的目的主要是儲存一些監控類的資料,而監控類的資料主要關心最近一段時間的資料。 檔案儲存結構 檔案儲存跟記憶體儲存類似,同一個Measurement的資料以Block的方式儲存在一起,查詢時通過Measurement ID定位到該Measurement的資料儲存在哪個Block中。 Measurement Block後儲存一個Offset Block,即儲存每個Measurement Block所在的Offset,每個Offset以4 bytes儲存。 Offset Block儲存一個Measurement Index Block,按順序儲存每個Measurement ID,以Bitmap的方式儲存。 檔案的尾儲存一個Footer Block,主要儲存Version(2 bytes) + Measurement Index Offset(4 bytes) + Measurement Index Length(4 bytes)。 Data資料塊都是數值,所以使用xor壓縮,參考facebook的gorilla論文; Measurement Block: 每個Measurement Block類似Measurement的方式儲存,只是把Measurement ID換成Measurement內的TSID。 TS Entry儲存該TSID對應每一列的資料,一列資料對應儲存一段時間的資料點。 查詢邏輯: DataFile在第一次載入的時候會把Measurement Index放在記憶體中,查詢輸入Measurement ID通過Measurement Index中的第幾個位置,然後通過這個位置N,在Offset Block查詢具體的Measurement Block的Offset,由於每個Offset都是4 bytes,所以offset position = (N-1) * 4,再讀取4 bytes得到真正的Offset。 同樣的道理可以通過TSID,找到具體的TS Entry,再根據條件過濾具體的列資料,最終得到需要讀取的資料。 總結 LinDB從2年前正式慢慢服務於公司的監控系統,從1.0到2.0,已經穩定執行2年多,除了一次rocksdb的問題,幾乎沒出過什麼問題,到現在的3.0效能的大幅提升,我們基本都是站在業界一些成熟方案的基礎上,慢慢演進而來。 也有人問,LinDB為什麼這麼快,其實我們是參考了很多TSDB的作法,然後取其好的設計,再結果時序的特徵再做一些優化。 時序一般都是最新寫入,但也是一種隨機寫,我們會先成記憶體中把隨寫變成循序寫,最終到寫檔案都是順序寫,所有資料都是有序,這樣查詢的時候也是順序讀,這一點很關鍵; 把寫入的measurement/tags/fields都轉化成Int,再生成倒排索引,最終生成一個TSID(類似Luence的doc id),這樣就大大減少了最終的資料量,畢竟指標這樣字串是佔絕對的大頭, 這點很想OpenTSDB,雖然InfluxDB已經把一段時間的按Block來儲存,但還是在block的頭放這些資料,這些都是成本,特別是在compact的時候; 不像別的TSDB會把timestamp直接存下來,一般timestamp到毫秒基別佔8個節點,雖然根據時間有序的優勢再用delta-encoded,壓縮也是很好,但我們想做到極致,我們是用一個bit來表示時間, 具體的做法就是根據上面的描述,把時間的高位和儲存Interval,把高位的時間放在目錄上,再結合高位算一個delta,把delta以1bit的格式儲存,來表示有沒有資料,因為監控資料絕大部分都是連續的資料, 所以這樣做也是合理的,因此在時間這個資料上的儲存也大大減少空間; 我們發現對一個指標的多個Field的資料,每個Field的資料相鄰的一些點基本是很相近的,LinDB 2.0儲存直接是用RocksDB,多個Field放在一起儲存,再把相鄰的點進行壓縮,這樣其實壓縮率不會很高, 而且每取查詢取Field的時候都要把所有的資料都讀出來,這也是LinDB 3.0我們考慮自己實現列式儲存,相同列存在一塊,以提高壓縮率,查詢的時候只讀需要的資料。 整個壓縮我們也沒有用gzip/snappy/zlib,因為這些不大適合用於數值型別,我們是直接參考了facebook的gorilla論文的xor的方式來的,這個現在已經被很多TSDB採用; 基於上面這些基本的順序讀已經不成問題,基於TSID查詢的更不是問題,因為整個設計都是基於TSID->data來設計的,所以還要解決一個根據倒排查出一組TSID對資料的隨機讀,如上我們是把TSID放在Bitmap, 然後通過Bitmap計算出Offset,直接找到資料,通過儲存時的優化,做到TSID查詢精準查詢,然不是通過二分查詢; 還有一點就是LinDB在新建資料庫時指定完Interval之後,系統會自己Rollup,不像InfluxDB要寫很多Continue Query,LinDB所有的這一切都是自動化的; 查詢計算並行流式處理; 所以用一句話來總結的話就是一個高效的索引外加一堆數值,然後怎麼玩好這堆數值。 自身監控 LinDB也自帶了自身的一些監控功能 Overview Dashboard 未來的展望 豐富查詢函式; 優化記憶體使用率; 自身監控的提升; 如果有可能,計劃開源; 對比測試 下面是與InfluxDB和LinDB2.0的一些查詢效能對比。 由於InfluxDB叢集化要商業版,所以都是單機預設配置下,無Cache的測試。 伺服器配置阿里雲機器:8 Core 16G Memory 大維度 Tags: host(40000),disk(4),partition(20),模擬伺服器磁碟的監控,總的Series數為320W,每個Series寫一個數據點 小維度的1天內的聚合測試 Tags: host(400),disk(2),partition(10),模擬伺服器磁碟的監控,總的Series數為8K,每個Series寫一天的資料 每個維度每2s寫入1個點,每個維度一天內總共43200個點,所有維度總共43200 * 8000個點,共3 4560 0000即3億多資料 小維度的7天內的聚合測試 Tags: host(400),disk(2),partition(10),模擬伺服器磁碟的監控,總的Series數為8K,每個Series寫7天的資料 每個維度每5s寫入1個點,每個維度一天內總共17280個點,所有天數所有維度總共17280 8000 7 個點,即9 6768 0000,9億多個點 這個測試要說明一下,得利於LinDB自動的Rollup,如果InfluxDB開Continue Query的話相信應該也還好。