druid 的基礎架構與應用
快,關注這個公眾號,一起漲知識~
本文介紹了druid的基礎架構以及工作過程,通過一個應用案例加深瞭解。
上篇文章回顧:玩轉 Linux cron
durid簡介
druid是一種高效能、列式儲存、分散式資料儲存的時序資料分析引擎。能支援“PB”級資料的秒級查詢。類似的產品有kylin/clickhouse。druid典型的應用就是OLAP場景下的cube組合查詢分析。如資料鑽取(Drill-down)、上卷(Roll-up)、切片(Slice)、切塊(Dice)以及旋轉(Pivot)。後面的應用示例章節再詳細闡述。
durid基礎架構
先來了解一下durid主要節點:
1、broker node(代理節點)
Broker節點扮演著歷史節點和實時節點的查詢路由的角色。主要負責接收外部查詢,轉發查詢至各個segment資料所在的節點,並聚合結果返回。
2、historical node(歷史節點)
historical主要負責歷史資料儲存和查詢,接收協調節點資料載入與刪除指令,從deepstoage中下載segment,完成資料載入或者刪除後在zk中進行通告。歷史節點遵循shared-nothing的架構,因此節點間沒有單點問題。節點間是相互獨立的並且提供的服務也是簡單的,它們只需要知道如何載入、刪除和處理不可變的segment。historical節點也可以進行分組,組合成不同的historical tier。這會在叢集規模較大的時候體現出優勢。如做資料的冷熱分離,按不同業務的資料分離(一定程度的資源隔離)。當然,historical 節點是整個叢集查詢效能的核心所在,因為historical會承擔絕大部分的segment查詢。
3、coordinator node(協調節點)
主要負責資料的管理和在歷史節點上的分佈。協調節點告訴歷史節點載入新資料、解除安裝過期資料、複製資料、和為了負載均衡移動資料。可以配置load資料及drop資料規則。
4、overlord node(index service 可以理解為任務管理節點)
功能描述:負責接收任務,管理任務。接收外部http請求(新建任務、查詢任務狀態、kill任務等),分配管理任務(當有新的任務請求,overload node會將任務分配給middleManager node去執行)。
5、middleManager node(可以理解為overlord節點的工作節點)
功能描述:可以啟動n(可配置)個peon,接收overlord分配的task,再交給自己peon去執行。
查詢過程
見上圖藍色箭頭,Broker節點接收到查詢(Q1),再將查詢傳送給歷史節點與實時節點(Q2,Q3),在上圖的模式中,實時節點是MM節點上啟動的task。該task會負責資料的攝入以及提供實時資料的查詢。
資料攝入過程
見上圖紅色箭頭,D1是client生產資料最終寫入kafka(這個過程可能在client與kafka的中間,還包含了多個環節,如資料傳輸與資料清洗),D2和D3過程是部署tranquility-kafka服務,消費kafka資料寫入對應的task,tranquility-kakfa啟動的時候會跟overlord節點通訊,由overlord節點分配任務給middleManager執行。D4是task 負責的segment段正常結束,然後將segment資料寫入deepstorage過程。(實時task執行時間是segmentGranularity+windowPeriod+intermediatePersistPeriod)。D5則是historical節點從deepstorage下載segment並在zk中宣告負責該segment段查詢的過程。
目前druid資料攝入過程還有一種更推薦的方式就是kafka index service(簡稱kis),有興趣的同學可以參考官方文件,kis對kafka的版本有強要求。
druid整體架構雖然略為複雜,但是整體穩定性非常不錯,幾乎很少出現叢集故障。拋開叢集硬體故障和資料本身問題,SLA基本能到4個9。coordinator,overlord兩個節點是主從模式,保證每個角色起兩個例項即可。broker節點無狀態,可以起多個例項,前面掛個域名即可(為了保證快取命中,最好配置ip hash)。historical節點無狀態,有一定冗餘即可。middleManager用作資料攝入節點,若task沒有配置副本,則節點宕機會引發丟資料的風險。當然,kis可以避免該問題。
durid資料聚合、儲存核心思想
druid 資料儲存分為三部分timestamp、dimensions、metrics。其中,timestamp、metrics部分是採用lz4直接壓縮。
但是dimensions部分需要支援過濾查詢以及分組查詢。所以dimensions部分的每個維度都採用了以下三種資料結構做轉碼、儲存:
-
A dictionary that maps values (which are always treated as strings) to integer IDs,
-
For each distinct value in the column,a bitmap that indicates which rows contain that value,and
-
A list of the column’s values,encoded using the dictionary in 1
舉個例子,源資料如下:
name列來說
1. Dictionary that encodes column values
字典表的key都是唯一的,所以Map的key是unique的column value,Map的value從0開始不斷增加。 示例資料的name列只有兩個不同的值。所以張三編號為0,李四編號為1:
{ "張三": 0 "李四": 1 }
2. Column data
要儲存的是每一行中這一列的值,值是ID而不是原始的值。因為有了上面的Map字典,所以有下面的對應關係:
[0,
1,
1,
0]
3. Bitmaps - one for each unique value of the column
BitMap的key是第一步Map的key(原始值), value則是真假的一個標識(是|否?等於|不等於?),取值只有0、1,如下:
value="張三": [1,0,0,1]
value=“李四": [0,1,1,0]
所以由上可知最壞的情況可能是隨著資料量的增加,bitmap的個數也成線性增長,為資料量大小*列的個數。那麼在什麼情況下會導致這種線性增長?這裡我們引入了一個基數(cardinality)的概念。基數=unique(dim1,dim2.....),如若dim取值均為各種爆炸性id或者隨機數,則druid的預聚合將完全失去意義。所以在druid的應用場景中,基數約小,聚合效率越高。
講了dimensions怎麼儲存,那麼metrics又是怎麼聚合(roll-up)呢?這就要引入druid資料schema定義了。下一章結合應用一塊看一個示例。
應用示例與實踐經驗
假設有這樣一份資料,典型的商品銷售資料。
我們構造成druid中的資料schema如下:
{ "dataSources" : [ { "spec" : { "dataSchema" : { "dataSource" : "test_datasource", "granularitySpec" : { "segmentGranularity" : "hour", "queryGranularity" : "minute", "type" : "uniform" }, "parser" : { "type" : "string", "parseSpec" : { "format" : "json", "timestampSpec" : { "column" : "time", "format" : "auto" }, "dimensionsSpec" : { "dimensions" : [ "productName", "city", "channel", “action"] } } }, "metricsSpec" : [ { "name" : "count", "type" : "count" },{ "type" : "doubleSum", "fieldName" : "price", "name" : “sale" } ] }, "tuningConfig" : { "type" : "realtime", "windowPeriod" : "PT10M", "intermediatePersistPeriod" : "PT10M", "maxRowsInMemory" : "100000" } }, "properties" : { "topicPattern" : "test_datasource", "task.partitions" : "2", "task.replicants" : "1" } } ], "properties" : { ... } }
上述schema定義的詳細解釋這裡就不多介紹了,以上的例子僅僅是針對實時資料,當然還有離線方式hadoop index,有興趣的同學請直接參考官網:http://druid.io。
前面重點說了dimensions,我們再來看下metrics。在上面的例子中我們只定義count和針對price的doubleSum,那麼這些指標就已經固定了後期的分析需求。我們看到上面table中的一二行標紅部分,所有dim取值完全相同,queryGranularity為一分鐘。那麼在這2018-06-11 12:23:00這個點,這兩行資料就被聚合成一行,count=2,sale=0。以此類推。
然後我們再來看看具體的分析需求,一個鑽取的例子。我們首先檢視商品A昨天的點選量,select sum(count) from table where productName=‘A’ and action=‘click',再想看看地區=北京,渠道=web呢?是不是再加幾個where就搞定了?select sum(count) from table where productName=‘A’ and city=‘北京’ and channel=‘web' and action=‘click’; 然後就是切片和切塊,也很簡單,就是幾個group by。這些在druid中都能非常輕鬆的支援。
具體使用上的經驗總結:
1. reindex思想。一般我們實時資料查詢粒度配置的會比較小,秒級或者分鐘級。那麼對於一天前,三天前,一個月前的資料呢?這時候一般關注的粒度將不再那麼細,所以我們一般會採取redinx的策略進行再聚合
2. 針對歷史資料,可能對於某些維度將不在關心,這時候我們也可以在reindex時,將無用的維度剔除掉,可能大大減少整體資料的基數。
3. 一般資料壓縮比例。這裡提供一個大概的參考值。資料總基數在10W以下,每天資料量約百億左右,druid中聚合後的索引資料與原始資料大小之比可以到1:100,甚至1:1000。
4. druid適用於常規的olap場景,能非常輕鬆的支撐每天百億甚至千億級別的資料寫入。
5. 爆炸性維度資料,以及頻繁update資料的需求,不適用於druid的場景。
總結
本文主要對druid做了入門級的基礎介紹,可以給大家做olap引擎技術選型時做一個參考。以及對druid的初學者做一個大致介紹。druid是一款非常優秀的olap引擎,從效能、穩定性上來說,都是非常不錯的。