高可用分散式儲存 etcd 的實現原理
原文連結: etcd-introduction" rel="nofollow,noindex" target="_blank">https://draveness.me/etcd-introduction
在上一篇文章《 詳解分散式協調服務 ZooKeeper 》中,我們介紹過分散式協調服務 ZooKeeper 的實現原理以及應用,今天想要介紹的 etcd 其實也是在生產環境中經常被使用的協調服務,它與 ZooKeeper 一樣,也能夠為整個叢集提供服務發現、配置以及分散式協調的功能。
這篇文章將會介紹 etcd 的實現原理,其中包括 Raft 協議、儲存兩大模組,在最後我們也會簡單介紹 etcd 一些具體應用場景。
簡介
etcd 的官方將它定位成一個可信賴的分散式鍵值儲存服務,它能夠為整個分散式叢集儲存一些關鍵資料,協助分散式叢集的正常運轉。
我們可以簡單看一下 etcd 和 ZooKeeper 在定義上有什麼不同:
- etcd is a distributed reliable key-value store for the most critical data of a distributed system…
- ZooKeeper is a centralized service for maintaining configuration information, naming, providing distributed synchronization, and providing group services.
其中前者是一個用於儲存關鍵資料的鍵值儲存,後者是一個用於管理配置等資訊的中心化服務。
etcd 的使用其實非常簡單,它對外提供了 RPC/">gRPC 介面,我們可以通過 Protobuf 和 gRPC 直接對 etcd 中儲存的資料進行管理,也可以使用官方提供的 etcdctl 操作儲存的資料。
service KV { rpc Range(RangeRequest) returns (RangeResponse) { option (google.api.http) = { post: "/v3beta/kv/range" body: "*" }; } rpc Put(PutRequest) returns (PutResponse) { option (google.api.http) = { post: "/v3beta/kv/put" body: "*" }; } }
文章並不會展開介紹 etcd 的使用方法,這一小節將逐步介紹幾大核心模組的實現原理,包括 etcd 使用 Raft 協議同步各個節點資料的過程以及 etcd 底層儲存資料使用的結構。
Raft
在每一個分散式系統中,etcd 往往都扮演了非常重要的地位,由於很多服務配置發現以及配置的資訊都儲存在 etcd 中,所以整個叢集可用性的上限往往就是 etcd 的可用性,而使用 3 ~ 5 個 etcd 節點構成高可用的叢集往往都是常規操作。
正是因為 etcd 在使用的過程中會啟動多個節點,如何處理幾個節點之間的分散式一致性就是一個比較有挑戰的問題了。
解決多個節點資料一致性的方案其實就是共識演算法,在之前的文章中我們簡單介紹過 ZooKeeper 使用的 Zab 協議 以及常見的 共識演算法 Paxos 和 Raft,etcd 使用的共識演算法就是 Raft,這一節我們將詳細介紹 Raft 以及 etcd 中 Raft 的一些實現細節。
介紹
Raft 從一開始就被設計成一個易於理解和實現的共識演算法,它在容錯和效能上與 Paxos 協議比較類似,區別在於它將分散式一致性的問題分解成了幾個子問題,然後一一進行解決。
每一個 Raft 叢集中都包含多個伺服器,在任意時刻,每一臺伺服器只可能處於 Leader、Follower 以及 Candidate 三種狀態;在處於正常的狀態時,叢集中只會存在一個 Leader,其餘的伺服器都是 Follower。
*上述圖片修改自 In Search of an Understandable Consensus Algorithm 一文 5.1 小結中圖四。
所有的 Follower 節點都是被動的,它們不會主動發出任何的請求,只會響應 Leader 和 Candidate 發出的請求,對於每一個使用者的可變操作,都會被路由給 Leader 節點進行處理,除了 Leader 和 Follower 節點之外,Candidate 節點其實只是叢集執行過程中的一個臨時狀態。
Raft 叢集中的時間也被切分成了不同的幾個任期(Term),每一個任期都會由 Leader 的選舉開始,選舉結束後就會進入正常操作的階段,直到 Leader 節點出現問題才會開始新一輪的選擇。
每一個伺服器都會儲存當前叢集的最新任期,它就像是一個單調遞增的邏輯時鐘,能夠同步各個節點之間的狀態,當前節點持有的任期會隨著每一個請求被傳遞到其他的節點上。
Raft 協議在每一個任期的開始時都會從一個叢集中選出一個節點作為叢集的 Leader 節點,這個節點會負責叢集中的日誌的複製以及管理工作。
我們將 Raft 協議分成三個子問題:節點選舉、日誌複製以及安全性,文章會以 etcd 為例介紹 Raft 協議是如何解決這三個子問題的。
節點選舉
使用 Raft 協議的 etcd 叢集在啟動節點時,會遵循 Raft 協議的規則,所有節點一開始都被初始化為 Follower 狀態,新加入的節點會在 NewNode 中做一些配置的初始化,包括用於接收各種資訊的 Channel:
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/raft/node.go#L190-225 func StartNode(c *Config, peers []Peer) Node { r := newRaft(c) r.becomeFollower(1, None) r.raftLog.committed = r.raftLog.lastIndex() for _, peer := range peers { r.addNode(peer.ID) } n := newNode() go n.run(r) return &n }
在做完這些初始化的節點和 Raft 配置的事情之後,就會進入一個由 for 和 select 組成的超大型迴圈,這個迴圈會從 Channel 中獲取待處理的事件:
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/raft/node.go#L291-423 func (n *node) run(r *raft) { lead := None for { if lead != r.lead { lead = r.lead } select { case m := <-n.recvc: r.Step(m) case <-n.tickc: r.tick() case <-n.stop: close(n.done) return } } }
作者對整個迴圈內的程式碼進行了簡化,因為當前只需要關心三個 Channel 中的訊息,也就是用於接受其他節點訊息的 recvc、用於觸發定時任務的 tickc 以及用於暫停當前節點的 stop。
除了 stop Channel 中介紹到的訊息之外,recvc 和 tickc 兩個 Channel 中介紹到事件時都會交給當前節點持有 Raft 結構體處理。
定時器與心跳
當節點從任意狀態(包括啟動)呼叫 becomeFollower 時,都會將節點的定時器設定為 tickElection:
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/raft/raft.go#L636-643 func (r *raft) tickElection() { r.electionElapsed++ if r.promotable() && r.pastElectionTimeout() { r.electionElapsed = 0 r.Step(pb.Message{From: r.id, Type: pb.MsgHup}) } }
如果當前節點可以成為 Leader 並且上一次收到 Leader 節點的訊息或者心跳已經超過了等待的時間,當前節點就會發送 MsgHup 訊息嘗試開始新的選舉。
但是如果 Leader 節點正常執行,就能夠同樣通過它的定時器 tickHeartbeat 向所有的 Follower 節點廣播心跳請求,也就是 MsgBeat 型別的 RPC 訊息:
func (r *raft) tickHeartbeat() { r.heartbeatElapsed++ r.electionElapsed++ if r.heartbeatElapsed >= r.heartbeatTimeout { r.heartbeatElapsed = 0 r.Step(pb.Message{From: r.id, Type: pb.MsgBeat}) } }
上述程式碼段 Leader 節點中呼叫的 Step 函式,最終會呼叫 stepLeader 方法,該方法會根據訊息的型別進行不同的處理:
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/raft/raft.go#L931-1142 func stepLeader(r *raft, m pb.Message) error { switch m.Type { case pb.MsgBeat: r.bcastHeartbeat() return nil // ... } //... }
bcastHeartbeat 方法最終會向所有的 Follower 節點發送 MsgHeartbeat 型別的訊息,通知它們目前 Leader 的存活狀態,重置所有 Follower 持有的超時計時器。
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/raft/raft.go#L518-534 func (r *raft) sendHeartbeat(to uint64, ctx []byte) { commit := min(r.getProgress(to).Match, r.raftLog.committed) m := pb.Message{ To:to, Type:pb.MsgHeartbeat, Commit:commit, Context: ctx, } r.send(m) }
作為叢集中的 Follower,它們會在 stepFollower 方法中處理接收到的全部訊息,包括 Leader 節點發送的心跳 RPC 訊息:
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/raft/raft.go#L1191-1247 func stepFollower(r *raft, m pb.Message) error { switch m.Type { case pb.MsgHeartbeat: r.electionElapsed = 0 r.lead = m.From r.handleHeartbeat(m) // ... } return nil }
當 Follower 接受到了來自 Leader 的 RPC 訊息 MsgHeartbeat 時,會將當前節點的選舉超時時間重置並通過 handleHeartbeat 向 Leader 節點發出響應 —— 通知 Leader 當前節點能夠正常執行。
而 Candidate 節點對於 MsgHeartBeat 訊息的處理會稍有不同,它會先執行 becomeFollower 設定當前節點和 Raft 協議的配置:
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/raft/raft.go#L1146-1189 func stepCandidate(r *raft, m pb.Message) error { // ... switch m.Type { case pb.MsgHeartbeat: r.becomeFollower(m.Term, m.From) // always m.Term == r.Term r.handleHeartbeat(m) } // ... return nil }
Follower 與 Candidate 會根據節點型別的不同做出不同的響應,兩者收到心跳請求時都會重置節點的選舉超時時間,不過後者會將節點的狀態直接轉變成 Follower:
當 Leader 節點收到心跳的響應時就會將對應節點的狀態設定為 Active,如果 Follower 節點在一段時間內沒有收到來自 Leader 節點的訊息就會嘗試發起競選。
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/raft/raft.go#L636-643 func (r *raft) tickElection() { r.electionElapsed++ if r.promotable() && r.pastElectionTimeout() { r.electionElapsed = 0 r.Step(pb.Message{From: r.id, Type: pb.MsgHup}) } }
到了這裡,心跳機制就起到了作用開始傳送 MsgHup 嘗試重置整個叢集中的 Leader 節點,接下來我們就會開始分析 Raft 協議中的競選流程了。
競選流程
如果叢集中的某一個 Follower 節點長時間內沒有收到來自 Leader 的心跳請求,當前節點就會通過 MsgHup 訊息進入預選舉或者選舉的流程。
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/raft/raft.go#L785-927 func (r *raft) Step(m pb.Message) error { // ... switch m.Type { case pb.MsgHup: if r.state != StateLeader { if r.preVote { r.campaign(campaignPreElection) } else { r.campaign(campaignElection) } } else { r.logger.Debugf("%x ignoring MsgHup because already leader", r.id) } } // ... return nil }
如果收到 MsgHup 訊息的節點不是 Leader 狀態,就會根據當前叢集的配置選擇進入 PreElection 或者 Election 階段,PreElection 階段並不會真正增加當前節點的 Term,它的主要作用是得到當前叢集能否成功選舉出一個 Leader 的答案,如果當前叢集中只有兩個節點而且沒有預選舉階段,那麼這兩個節點的 Term 會無休止的增加,預選舉階段就是為了解決這一問題而出現的。
在這裡不會討論預選舉的過程,而是將目光主要放在選舉階段,具體瞭解一下使用 Raft 協議的 etcd 叢集是如何從眾多節點中選出 Leader 節點的。
我們可以繼續來分析 campaign 方法的具體實現,下面就是刪去預選舉相關邏輯後的程式碼:
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/raft/raft.go#L730-766 func (r *raft) campaign(t CampaignType) { r.becomeCandidate() if r.quorum() == r.poll(r.id, voteRespMsgType(voteMsg), true) { r.becomeLeader() return } for id := range r.prs { if id == r.id { continue } r.send(pb.Message{Term: r.Term, To: id, Type: pb.MsgVote, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm(), Context: ctx}) } }
當前節點會立刻呼叫 becomeCandidate 將當前節點的 Raft 狀態變成候選人;在這之後,它會將票投給自己,如果當前叢集只有一個節點,該節點就會直接成為叢集中的 Leader 節點。
如果叢集中存在了多個節點,就會向叢集中的其他節點發出 MsgVote 訊息,請求其他節點投票,在 Step 函式中包含不同狀態的節點接收到訊息時的響應:
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/raft/raft.go#L785-927 func (r *raft) Step(m pb.Message) error { // ... switch m.Type { case pb.MsgVote, pb.MsgPreVote: canVote := r.Vote == m.From || (r.Vote == None && r.lead == None) if canVote && r.raftLog.isUpToDate(m.Index, m.LogTerm) { r.send(pb.Message{To: m.From, Term: m.Term, Type: pb.MsgVoteResp}) r.electionElapsed = 0 r.Vote = m.From } else { r.send(pb.Message{To: m.From, Term: r.Term, Type: pb.MsgVoteResp, Reject: true}) } } // ... return nil }
如果當前節點投的票就是訊息的來源或者當前節點沒有投票也沒有 Leader,那麼就會向來源的節點投票,否則就會通知該節點當前節點拒絕投票。
在 stepCandidate 方法中,候選人節點會處理來自其他節點的投票響應訊息,也就是 MsgVoteResp:
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/raft/raft.go#L1146-1189 func stepCandidate(r *raft, m pb.Message) error { switch m.Type { // ... case pb.MsgVoteResp: gr := r.poll(m.From, m.Type, !m.Reject) switch r.quorum() { case gr: r.becomeLeader() r.bcastAppend() // ... } } return nil }
每當收到一個 MsgVoteResp 型別的訊息時,就會設定當前節點持有的 votes 陣列,更新其中儲存的節點投票狀態並返回投『同意』票的人數,如果獲得的票數大於法定人數 quorum,當前節點就會成為叢集的 Leader 並向其他的節點發送當前節點當選的訊息,通知其餘節點更新 Raft 結構體中的 Term 等資訊。
節點狀態
對於每一個節點來說,它們根據不同的節點狀態會對網路層發來的訊息做出不同的響應,我們會分別介紹下面的四種狀態在 Raft 中對於配置和訊息究竟是如何處理的。
對於每一個 Raft 的節點狀態來說,它們分別有三個比較重要的區別,其中一個是在改變狀態時呼叫 becomeLeader、becomeCandidate、becomeFollower 和 becomePreCandidate 方法改變 Raft 狀態有比較大的不同,第二是處理訊息時呼叫 stepLeader、stepCandidate 和 stepFollower 時有比較大的不同,最後是幾種不同狀態的節點具有功能不同的定時任務。
對於方法的詳細處理,我們在這一節中不詳細介紹和分析,如果一個節點的狀態是 Follower,那麼當前節點切換到 Follower 一定會通過 becomeFollower 函式,在這個函式中會重置節點持有任期,並且設定處理訊息的函式為 stepFollower:
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/raft/raft.go#L671-678 func (r *raft) becomeFollower(term uint64, lead uint64) { r.step = stepFollower r.reset(term) r.tick = r.tickElection r.lead = lead r.state = StateFollower } // https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/raft/raft.go#L636-643 func (r *raft) tickElection() { r.electionElapsed++ if r.promotable() && r.pastElectionTimeout() { r.electionElapsed = 0 r.Step(pb.Message{From: r.id, Type: pb.MsgHup}) } }
除此之外,它還會設定一個用於在 Leader 節點宕機時觸發選舉的定時器 tickElection。
Candidate 狀態的節點與 Follower 的配置差不了太多,只是在訊息處理函式 step、任期以及狀態上的設定有一些比較小的區別:
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/raft/raft.go#L680-691 func (r *raft) becomeCandidate() { r.step = stepCandidate r.reset(r.Term + 1) r.tick = r.tickElection r.Vote = r.id r.state = StateCandidate }
最後的 Leader 就與這兩者有其他的區別了,它不僅設定了處理訊息的函式 step 而且設定了與其他狀態完全不同的 tick 函式:
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/raft/raft.go#L708-728 func (r *raft) becomeLeader() { r.step = stepLeader r.reset(r.Term) r.tick = r.tickHeartbeat r.lead = r.id r.state = StateLeader r.pendingConfIndex = r.raftLog.lastIndex() r.appendEntry(pb.Entry{Data: nil}) }
這裡的 tick 函式 tickHeartbeat 每隔一段時間會通過 Step 方法向叢集中的其他節點發送 MsgBeat 訊息:
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/raft/raft.go#L646-669 func (r *raft) tickHeartbeat() { r.heartbeatElapsed++ r.electionElapsed++ if r.electionElapsed >= r.electionTimeout { r.electionElapsed = 0 if r.checkQuorum { r.Step(pb.Message{From: r.id, Type: pb.MsgCheckQuorum}) } } if r.heartbeatElapsed >= r.heartbeatTimeout { r.heartbeatElapsed = 0 r.Step(pb.Message{From: r.id, Type: pb.MsgBeat}) } }
上述程式碼中的 MsgBeat 訊息會在 Step 中被轉換成 MsgHeartbeat 最終傳送給其他的節點,Leader 節點超時之後的選舉流程我們在前兩節中也已經介紹過了,在這裡就不再重複了。
儲存
etcd 目前支援 V2 和 V3 兩個大版本,這兩個版本在實現上有比較大的不同,一方面是對外提供介面的方式,另一方面就是底層的儲存引擎,V2 版本的例項是一個純記憶體的實現,所有的資料都沒有儲存在磁碟上,而 V3 版本的例項就支援了資料的持久化。
在這一節中,我們會介紹 V3 版本的 etcd 究竟是通過什麼樣的方式儲存使用者資料的。
後端
在 V3 版本的設計中,etcd 通過 backend 後端這一設計,很好地封裝了儲存引擎的實現細節,為上層提供一個更一致的介面,對於 etcd 的其他模組來說,它們可以將更多注意力放在介面中的約定上,不過在這裡,我們更關注的是 etcd 對 Backend 介面的實現。
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/backend/backend.go#L51-69 type Backend interface { ReadTx() ReadTx BatchTx() BatchTx Snapshot() Snapshot Hash(ignores map[IgnoreKey]struct{}) (uint32, error) Size() int64 SizeInUse() int64 Defrag() error ForceCommit() Close() error }
etcd 底層預設使用的是開源的嵌入式鍵值儲存資料庫 bolt,但是這個專案目前的狀態已經是歸檔不再維護了,如果想要使用這個專案可以使用 CoreOS 的 bbolt 版本。
這一小節中,我們會簡單介紹 etcd 是如何使用 BoltDB 作為底層儲存的,首先可以先來看一下 pacakge 內部的 backend 結構體,這是一個實現了 Backend 介面的結構:
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/backend/backend.go#L80-104 type backend struct { size int64 sizeInUse int64 commits int64 mu sync.RWMutex db *bolt.DB batchInterval time.Duration batchLimitint batchTx*batchTxBuffered readTx *readTx stopc chan struct{} donec chan struct{} lg *zap.Logger }
從結構體的成員 db 我們就可以看出,它使用了 BoltDB 作為底層儲存,另外的兩個 readTx 和 batchTx 分別實現了 ReadTx 和 BatchTx 介面:
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/backend/read_tx.go#L30-36 type ReadTx interface { Lock() Unlock() UnsafeRange(bucketName []byte, key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte) UnsafeForEach(bucketName []byte, visitor func(k, v []byte) error) error } // https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/backend/batch_tx.go#L28-38 type BatchTx interface { ReadTx UnsafeCreateBucket(name []byte) UnsafePut(bucketName []byte, key []byte, value []byte) UnsafeSeqPut(bucketName []byte, key []byte, value []byte) UnsafeDelete(bucketName []byte, key []byte) Commit() CommitAndStop() }
從這兩個介面的定義,我們不難發現它們能夠對外提供資料庫的讀寫操作,而 Backend 就能對這兩者提供的方法進行封裝,為上層遮蔽儲存的具體實現:
每當我們使用 newBackend 建立一個新的 backend 結構時,都會建立一個 readTx 和 batchTx 結構體,這兩者一個負責處理只讀請求,一個負責處理讀寫請求:
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/backend/backend.go#L137-176 func newBackend(bcfg BackendConfig) *backend { bopts := &bolt.Options{} bopts.InitialMmapSize = bcfg.mmapSize() db, _ := bolt.Open(bcfg.Path, 0600, bopts) b := &backend{ db: db, batchInterval: bcfg.BatchInterval, batchLimit:bcfg.BatchLimit, readTx: &readTx{ buf: txReadBuffer{ txBuffer: txBuffer{make(map[string]*bucketBuffer)}, }, buckets: make(map[string]*bolt.Bucket), }, stopc: make(chan struct{}), donec: make(chan struct{}), } b.batchTx = newBatchTxBuffered(b) go b.run() return b }
當我們在 newBackend 中進行了初始化 BoltDB、事務等工作後,就會開一個 goroutine 非同步的對所有批量讀寫事務進行定時提交:
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/backend/backend.go#L289-305 func (b *backend) run() { defer close(b.donec) t := time.NewTimer(b.batchInterval) defer t.Stop() for { select { case <-t.C: case <-b.stopc: b.batchTx.CommitAndStop() return } if b.batchTx.safePending() != 0 { b.batchTx.Commit() } t.Reset(b.batchInterval) } }
對於上層來說,backend 其實只是對底層儲存的一個抽象,很多時候並不會直接跟它打交道,往往都是使用它持有的 ReadTx 和 BatchTx 與資料庫進行互動。
只讀事務
目前大多數的資料庫對於只讀型別的事務並沒有那麼多的限制,尤其是在使用了 MVCC 之後,所有的只讀請求幾乎不會被寫請求鎖住,這大大提升了讀的效率,由於在 BoltDB 的同一個 goroutine 中開啟兩個相互依賴的只讀事務和讀寫事務會發生死鎖,為了避免這種情況我們還是引入了 sync.RWLock 保證死鎖不會出現:
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/backend/read_tx.go#L38-47 type readTx struct { musync.RWMutex buf txReadBuffer txmusync.RWMutex tx*bolt.Tx buckets map[string]*bolt.Bucket }
你可以看到在整個結構體中,除了用於保護 tx 的 txmu 讀寫鎖之外,還存在另外一個 mu 讀寫鎖,它的作用是保證 buf 中的資料不會出現問題,buf 和結構體中的 buckets 都是用於加速讀效率的快取。
對於一個只讀事務來說,它對上層提供了兩個獲取儲存引擎中資料的介面,分別是 UnsafeRange 和 UnsafeForEach,在這裡會重點介紹前面方法的實現細節:
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/backend/read_tx.go#L52-90 func (rt *readTx) UnsafeRange(bucketName, key, endKey []byte, limit int64) ([][]byte, [][]byte) { if endKey == nil { limit = 1 } keys, vals := rt.buf.Range(bucketName, key, endKey, limit) if int64(len(keys)) == limit { return keys, vals } bn := string(bucketName) bucket, ok := rt.buckets[bn] if !ok { bucket = rt.tx.Bucket(bucketName) rt.buckets[bn] = bucket } if bucket == nil { return keys, vals } c := bucket.Cursor() k2, v2 := unsafeRange(c, key, endKey, limit-int64(len(keys))) return append(k2, keys...), append(v2, vals...) }
上述程式碼中省略了加鎖保護讀快取以及 Bucket 中儲存資料的合法性,也省去了一些引數的檢查,不過方法的整體介面還是沒有太多變化,UnsafeRange 會先從自己持有的快取 txReadBuffer 中讀取資料,如果資料不能夠滿足呼叫者的需求,就會從 buckets 快取中查詢對應的 BoltDB bucket 並從 BoltDB 資料庫中讀取。
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/backend/batch_tx.go#L121-141 func unsafeRange(c *bolt.Cursor, key, endKey []byte, limit int64) (keys [][]byte, vs [][]byte) { var isMatch func(b []byte) bool if len(endKey) > 0 { isMatch = func(b []byte) bool { return bytes.Compare(b, endKey) < 0 } } else { isMatch = func(b []byte) bool { return bytes.Equal(b, key) } limit = 1 } for ck, cv := c.Seek(key); ck != nil && isMatch(ck); ck, cv = c.Next() { vs = append(vs, cv) keys = append(keys, ck) if limit == int64(len(keys)) { break } } return keys, vs }
這個包內部的函式 unsafeRange 實際上通過 BoltDB 中的遊標來遍歷滿足查詢條件的鍵值對。
到這裡為止,整個只讀事務提供的介面就基本介紹完了,在 etcd 中無論我們想要後去單個 Key 還是一個範圍內的 Key 最終都是通過 Range 來實現的,這其實也是隻讀事務的最主要功能。
讀寫事務
只讀事務只提供了讀資料的能力,包括 UnsafeRange 和 UnsafeForeach,而讀寫事務 BatchTx 提供的就是讀和寫資料的能力了:
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/backend/batch_tx.go#L40-46 type batchTx struct { sync.Mutex tx*bolt.Tx backend *backend pending int }
讀寫事務同時提供了不帶快取的 batchTx 實現以及帶快取的 batchTxBuffered 實現,後者其實『繼承了』前者的結構體,並額外加入了快取 txWriteBuffer 加速讀請求:
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/backend/batch_tx.go#L243-246 type batchTxBuffered struct { batchTx buf txWriteBuffer }
後者在實現介面規定的方法時,會直接呼叫 batchTx 的同名方法,並將操作造成的副作用的寫入的快取中,在這裡我們並不會展開介紹這一版本的實現,還是以分析 batchTx 的方法為主。
當我們向 etcd 中寫入資料時,最終都會呼叫 batchTx 的 unsafePut 方法將資料寫入到 BoltDB 中:
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/backend/batch_tx.go#L65-67 func (t *batchTx) UnsafePut(bucketName []byte, key []byte, value []byte) { t.unsafePut(bucketName, key, value, false) } // https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/backend/batch_tx.go#L74-103 func (t *batchTx) unsafePut(bucketName []byte, key []byte, value []byte, seq bool) { bucket := t.tx.Bucket(bucketName) if err := bucket.Put(key, value); err != nil { plog.Fatalf("cannot put key into bucket (%v)", err) } t.pending++ }
這兩個方法的實現非常清晰,作者覺得他們都並不值得展開詳細介紹,只是呼叫了 BoltDB 提供的 API 操作一下 bucket 中的資料,而另一個刪除方法的實現與這個也差不多:
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/backend/batch_tx.go#L144-169 func (t *batchTx) UnsafeDelete(bucketName []byte, key []byte) { bucket := t.tx.Bucket(bucketName) err := bucket.Delete(key) if err != nil { plog.Fatalf("cannot delete key from bucket (%v)", err) } t.pending++ }
它們都是通過 Bolt.Tx 找到對應的 Bucket,然後做出相應的增刪操作,但是這寫請求在這兩個方法執行後其實並沒有提交,我們還需要手動或者等待 etcd 自動將請求提交:
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/backend/batch_tx.go#L184-188 func (t *batchTx) Commit() { t.Lock() t.commit(false) t.Unlock() } // https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/backend/batch_tx.go#L210-241 func (t *batchTx) commit(stop bool) { if t.tx != nil { if t.pending == 0 && !stop { return } start := time.Now() err := t.tx.Commit() rebalanceSec.Observe(t.tx.Stats().RebalanceTime.Seconds()) spillSec.Observe(t.tx.Stats().SpillTime.Seconds()) writeSec.Observe(t.tx.Stats().WriteTime.Seconds()) commitSec.Observe(time.Since(start).Seconds()) atomic.AddInt64(&t.backend.commits, 1) t.pending = 0 } if !stop { t.tx = t.backend.begin(true) } }
在每次呼叫 Commit 對讀寫事務進行提交時,都會先檢查是否有等待中的事務,然後會將資料上報至 Prometheus 中,其他的服務就可以將 Prometheus 作為資料來源對 etcd 的執行狀況進行監控了。
索引
經常使用 etcd 的開發者可能會了解到,它本身對於每一個鍵值對都有一個 revision 的概念,鍵值對的每一次變化都會被 BoltDB 單獨記錄下來,所以想要在儲存引擎中獲取某一個 Key 對應的值,要先獲取 revision,再通過它才能找到對應的值,在裡我們想要介紹的其實是 etcd 如何管理和儲存一個 Key 的多個 revision 記錄。
在 etcd 服務中有一個用於儲存所有的鍵值對 revision 資訊的 btree,我們可以通過 index 的 Get 介面獲取一個 Key 對應 Revision 的值:
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/index.go#L68-76 func (ti *treeIndex) Get(key []byte, atRev int64) (modified, created revision, ver int64, err error) { keyi := &keyIndex{key: key} if keyi = ti.keyIndex(keyi); keyi == nil { return revision{}, revision{}, 0, ErrRevisionNotFound } return keyi.get(ti.lg, atRev) }
上述方法通過 keyIndex 方法查詢 Key 對應的 keyIndex 結構體,這裡使用的記憶體結構體 btree 是 Google 實現的一個版本:
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/index.go#L84-89 func (ti *treeIndex) keyIndex(keyi *keyIndex) *keyIndex { if item := ti.tree.Get(keyi); item != nil { return item.(*keyIndex) } return nil }
可以看到這裡的實現非常簡單,只是從 treeIndex 持有的成員 btree 中查詢 keyIndex,將結果強制轉換成 keyIndex 型別後返回;獲取 Key 對應 revision 的方式也非常簡單:
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/key_index.go#L149-171 func (ki *keyIndex) get(lg *zap.Logger, atRev int64) (modified, created revision, ver int64, err error) { g := ki.findGeneration(atRev) if g.isEmpty() { return revision{}, revision{}, 0, ErrRevisionNotFound } n := g.walk(func(rev revision) bool { return rev.main > atRev }) if n != -1 { return g.revs[n], g.created, g.ver - int64(len(g.revs)-n-1), nil } return revision{}, revision{}, 0, ErrRevisionNotFound }
KeyIndex
在我們具體介紹方法實現的細節之前,首先我們需要理解 keyIndex 包含的欄位以及管理同一個 Key 不同版本的方式:
每一個 keyIndex 結構體中都包含當前鍵的值以及最後一次修改對應的 revision 資訊,其中還儲存了一個 Key 的多個 generation,每一個 generation 都會記錄當前 Key『從生到死』的全部過程,每當一個 Key 被刪除時都會呼叫 timestone 方法向當前的 generation 中追加一個新的墓碑版本:
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/key_index.go#L127-145 func (ki *keyIndex) tombstone(lg *zap.Logger, main int64, sub int64) error { if ki.generations[len(ki.generations)-1].isEmpty() { return ErrRevisionNotFound } ki.put(lg, main, sub) ki.generations = append(ki.generations, generation{}) return nil }
這個 tombstone 版本標識這當前的 Key 已經被刪除了,但是在每次刪除一個 Key 之後,就會在當前的 keyIndex 中建立一個新的 generation 結構用於儲存新的版本資訊,其中 ver 記錄當前 generation 包含的修改次數,created 記錄建立 generation 時的 revision 版本,最後的 revs 用於儲存所有的版本資訊。
讀操作
etcd 中所有的查詢請求,無論是查詢一個還是多個、是數量還是鍵值對,最終都會呼叫 rangeKeys 方法:
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/kvstore_txn.go#L112-165 func (tr *storeTxnRead) rangeKeys(key, end []byte, curRev int64, ro RangeOptions) (*RangeResult, error) { rev := ro.Rev revpairs := tr.s.kvindex.Revisions(key, end, rev) if len(revpairs) == 0 { return &RangeResult{KVs: nil, Count: 0, Rev: curRev}, nil } kvs := make([]mvccpb.KeyValue, int(ro.Limit)) revBytes := newRevBytes() for i, revpair := range revpairs[:len(kvs)] { revToBytes(revpair, revBytes) _, vs := tr.tx.UnsafeRange(keyBucketName, revBytes, nil, 0) kvs[i].Unmarshal(vs[0]) } return &RangeResult{KVs: kvs, Count: len(revpairs), Rev: curRev}, nil }
為了獲取一個範圍內的所有鍵值對,我們首先需要通過 Revisions 函式從 btree 中獲取範圍內所有的 keyIndex:
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/index.go#L106-120 func (ti *treeIndex) Revisions(key, end []byte, atRev int64) (revs []revision) { if end == nil { rev, _, _, err := ti.Get(key, atRev) if err != nil { return nil } return []revision{rev} } ti.visit(key, end, func(ki *keyIndex) { if rev, _, _, err := ki.get(ti.lg, atRev); err == nil { revs = append(revs, rev) } }) return revs }
如果只需要獲取一個 Key 對應的版本,就是直接使用 treeIndex 的方法,但是當上述方法會從 btree 索引中獲取一個連續多個 revision 值時,就會呼叫 keyIndex.get 來遍歷整顆樹並選取合適的版本:
func (ki *keyIndex) get(lg *zap.Logger, atRev int64) (modified, created revision, ver int64, err error) { g := ki.findGeneration(atRev) if g.isEmpty() { return revision{}, revision{}, 0, ErrRevisionNotFound } n := g.walk(func(rev revision) bool { return rev.main > atRev }) if n != -1 { return g.revs[n], g.created, g.ver - int64(len(g.revs)-n-1), nil } return revision{}, revision{}, 0, ErrRevisionNotFound }
因為每一個 Key 的 keyIndex 中其實都儲存著多個 generation,我們需要根據傳入的引數返回合適的 generation 並從其中返回主版本大於 atRev 的 revision 結構。
對於上層的鍵值儲存來說,它會利用這裡返回的 revision 從真正儲存資料的 BoltDB 中查詢當前 Key 對應 revision 的結果。
寫操作
當我們向 etcd 中插入資料時,會使用傳入的 key 構建一個 keyIndex 結構體並從樹中獲取相關版本等資訊:
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/index.go#L53-66 func (ti *treeIndex) Put(key []byte, rev revision) { keyi := &keyIndex{key: key} item := ti.tree.Get(keyi) if item == nil { keyi.put(ti.lg, rev.main, rev.sub) ti.tree.ReplaceOrInsert(keyi) return } okeyi := item.(*keyIndex) okeyi.put(ti.lg, rev.main, rev.sub) }
treeIndex.Put 在獲取記憶體中的 keyIndex 結構之後會通過 keyIndex.put 其中加入新的 revision:
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/key_index.go#L77-104 func (ki *keyIndex) put(lg *zap.Logger, main int64, sub int64) { rev := revision{main: main, sub: sub} if len(ki.generations) == 0 { ki.generations = append(ki.generations, generation{}) } g := &ki.generations[len(ki.generations)-1] if len(g.revs) == 0 { g.created = rev } g.revs = append(g.revs, rev) g.ver++ ki.modified = rev }
每一個新 revision 結構體寫入 keyIndex 時,都會改變當前 generation 的 created 和 ver 等引數,從這個方法中我們就可以瞭解到 generation 中的各個成員都是如何被寫入的。
寫入的操作除了增加之外,刪除某一個 Key 的函式也會經常被呼叫:
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/kvstore_txn.go#L252-309 func (tw *storeTxnWrite) delete(key []byte) { ibytes := newRevBytes() idxRev := revision{main: tw.beginRev + 1, sub: int64(len(tw.changes))} revToBytes(idxRev, ibytes) ibytes = appendMarkTombstone(tw.storeTxnRead.s.lg, ibytes) kv := mvccpb.KeyValue{Key: key} d, _ := kv.Marshal() tw.tx.UnsafeSeqPut(keyBucketName, ibytes, d) tw.s.kvindex.Tombstone(key, idxRev) tw.changes = append(tw.changes, kv) }
正如我們在文章前面所介紹的,刪除操作會向結構體中的 generation 追加一個新的 tombstone 標記,用於標識當前的 Key 已經被刪除;除此之外,上述方法還會將每一個更新操作的 revision 存到單獨的 keyBucketName 中。
索引的恢復
因為在 etcd 中,所有的 keyIndex 都是在記憶體的 btree 中儲存的,所以在啟動服務時需要從 BoltDB 中將所有的資料都載入到記憶體中,在這裡就會初始化一個新的 btree 索引,然後呼叫 restore 方法開始恢復索引:
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/kvstore.go#L321-433 func (s *store) restore() error { min, max := newRevBytes(), newRevBytes() revToBytes(revision{main: 1}, min) revToBytes(revision{main: math.MaxInt64, sub: math.MaxInt64}, max) tx := s.b.BatchTx() rkvc, revc := restoreIntoIndex(s.lg, s.kvindex) for { keys, vals := tx.UnsafeRange(keyBucketName, min, max, int64(restoreChunkKeys)) if len(keys) == 0 { break } restoreChunk(s.lg, rkvc, keys, vals, keyToLease) newMin := bytesToRev(keys[len(keys)-1][:revBytesLen]) newMin.sub++ revToBytes(newMin, min) } close(rkvc) s.currentRev = <-revc return nil }
在恢復索引的過程中,有一個用於遍歷不同鍵值的『生產者』迴圈,其中由 UnsafeRange 和 restoreChunk 兩個方法構成,這兩個方法會從 BoltDB 中遍歷資料,然後將鍵值對傳到 rkvc 中,交給 restoreIntoIndex 方法中建立的 goroutine 處理:
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/kvstore.go#L486-506 func restoreChunk(lg *zap.Logger, kvc chan<- revKeyValue, keys, vals [][]byte, keyToLease map[string]lease.LeaseID) { for i, key := range keys { rkv := r evKeyValue{key: key} _ := rkv.kv.Unmarshal(vals[i]) rkv.kstr = string(rkv.kv.Key) if isTombstone(key) { delete(keyToLease, rkv.kstr) } else if lid := lease.LeaseID(rkv.kv.Lease); lid != lease.NoLease { keyToLease[rkv.kstr] = lid } else { delete(keyToLease, rkv.kstr) } kvc <- rkv } }
先被呼叫的 restoreIntoIndex 方法會建立一個用於接受鍵值對的 Channel,在這之後會在一個 goroutine 中處理從 Channel 接收到的資料,並將這些資料恢復到記憶體裡的 btree 中:
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/kvstore.go#L441-484 func restoreIntoIndex(lg *zap.Logger, idx index) (chan<- revKeyValue, <-chan int64) { rkvc, revc := make(chan revKeyValue, restoreChunkKeys), make(chan int64, 1) go func() { currentRev := int64(1) defer func() { revc <- currentRev }() for rkv := range rkvc { ki = &keyIndex{key: rkv.kv.Key} ki := idx.KeyIndex(ki) rev := bytesToRev(rkv.key) currentRev = rev.main if ok { if isTombstone(rkv.key) { ki.tombstone(lg, rev.main, rev.sub) continue } ki.put(lg, rev.main, rev.sub) } else if !isTombstone(rkv.key) { ki.restore(lg, revision{rkv.kv.CreateRevision, 0}, rev, rkv.kv.Version) idx.Insert(ki) } } }() return rkvc, revc }
恢復記憶體索引的相關程式碼在實現上非常值得學習,兩個不同的函式通過 Channel 進行通訊並使用 goroutine 處理任務,能夠很好地將訊息的『生產者』和『消費者』進行分離。
Channel 作為整個恢復索引邏輯的一個訊息中心,它將遍歷 BoltDB 中的資料和恢復索引兩部分程式碼進行了分離。
儲存
etcd 的 mvcc 模組對外直接提供了兩種不同的訪問方式,一種是鍵值儲存 kvstore,另一種是 watchableStore 它們都實現了包內公開的 KV 介面:
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/kv.go#L100-125 type KV interface { ReadView WriteView Read() TxnRead Write() TxnWrite Hash() (hash uint32, revision int64, err error) HashByRev(rev int64) (hash uint32, revision int64, compactRev int64, err error) Compact(rev int64) (<-chan struct{}, error) Commit() Restore(b backend.Backend) error Close() error }
kvstore
對於 kvstore 來說,其實沒有太多值得展開介紹的地方,它利用底層的 BoltDB 等基礎設施為上層提供最常見的增傷改查,它組合了下層的 readTx、batchTx 等結構體,將一些執行緒不安全的操作變成執行緒安全的。
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/kvstore_txn.go#L32-40 func (s *store) Read() TxnRead { s.mu.RLock() tx := s.b.ReadTx() s.revMu.RLock() tx.Lock() firstRev, rev := s.compactMainRev, s.currentRev s.revMu.RUnlock() return newMetricsTxnRead(&storeTxnRead{s, tx, firstRev, rev}) }
它也負責對記憶體中 btree 索引的維護以及壓縮一些無用或者不常用的資料,幾個對外的介面 Read、Write 就是對 readTx、batchTx 等結構體的組合並將它們的介面暴露給其他的模組。
watchableStore
另外一個比較有意思的儲存就是 watchableStore 了,它是 mvcc 模組為外界提供 Watch 功能的介面,它負責了註冊、管理以及觸發 Watcher 的功能,我們先來看一下這個結構體的各個欄位:
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/watchable_store.go#L45-65 type watchableStore struct { *store mu sync.RWMutex unsynced watcherGroup synced watcherGroup stopc chan struct{} wgsync.WaitGroup }
每一個 watchableStore 其實都組合了來自 store 結構體的欄位和方法,除此之外,還有兩個 watcherGroup 型別的欄位,其中 unsynced 用於儲存未同步完成的例項,synced 用於儲存已經同步完成的例項。
在初始化一個新的 watchableStore 時,我們會建立一個用於同步watcherGroup 的 Goroutine,在 syncWatchersLoop 這個迴圈中會每隔 100ms 呼叫一次 syncWatchers 方法,將所有未通知的事件通知給所有的監聽者,這可以說是整個模組的核心:
func (s *watchableStore) syncWatchers() int { curRev := s.store.currentRev compactionRev := s.store.compactMainRev wg, minRev := s.unsynced.choose(maxWatchersPerSync, curRev, compactionRev) minBytes, maxBytes := newRevBytes(), newRevBytes() revToBytes(revision{main: minRev}, minBytes) revToBytes(revision{main: curRev + 1}, maxBytes) tx := s.store.b.ReadTx() revs, vs := tx.UnsafeRange(keyBucketName, minBytes, maxBytes, 0) evs := kvsToEvents(nil, wg, revs, vs) wb := newWatcherBatch(wg, evs) for w := range wg.watchers { w.minRev = curRev + 1 eb, ok := wb[w] if !ok { s.synced.add(w) s.unsynced.delete(w) continue } w.send(WatchResponse{WatchID: w.id, Events: eb.evs, Revision: curRev}) s.synced.add(w) s.unsynced.delete(w) } return s.unsynced.size() }
簡化後的 syncWatchers 方法中總共做了三件事情,首先是根據當前的版本從未同步的 watcherGroup 中選出一些待處理的任務,然後從 BoltDB 中後去當前版本範圍內的資料變更並將它們轉換成事件,事件和 watcherGroup 在打包之後會通過 send 方法傳送到每一個 watcher 對應的 Channel 中。
上述圖片中展示了 mvcc 模組對於向外界提供的監聽某個 Key 和範圍的介面,外部的其他模組會通過 watchStream.watch 函式與模組內部進行互動,每一次呼叫 watch 方法最終都會向 watchableStore 持有的 watcherGroup 中新增新的 watcher 結構。
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/watcher.go#L108-135 func (ws *watchStream) Watch(id WatchID, key, end []byte, startRev int64, fcs ...FilterFunc) (WatchID, error) { if id == AutoWatchID { for ws.watchers[ws.nextID] != nil { ws.nextID++ } id = ws.nextID ws.nextID++ } w, c := ws.watchable.watch(key, end, startRev, id, ws.ch, fcs...) ws.cancels[id] = c ws.watchers[id] = w return id, nil } // https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/watchable_store.go#L111-142 func (s *watchableStore) watch(key, end []byte, startRev int64, id WatchID, ch chan<- WatchResponse, fcs ...FilterFunc) (*watcher, cancelFunc) { wa := &watcher{ key:key, end:end, minRev: startRev, id:id, ch:ch, fcs:fcs, } synced := startRev > s.store.currentRev || startRev == 0 if synced { s.synced.add(wa) } else { s.unsynced.add(wa) } return wa, func() { s.cancelWatcher(wa) } }
當 etcd 服務啟動時,會在服務端執行一個用於處理監聽事件的 watchServer gRPC 服務,客戶端的 Watch 請求最終都會被轉發到這個服務的 Watch 函式中:
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/etcdserver/api/v3rpc/watch.go#L136-206 func (ws *watchServer) Watch(stream pb.Watch_WatchServer) (err error) { sws := serverWatchStream{ // ... gRPCStream:stream, watchStream: ws.watchable.NewWatchStream(), ctrlStream: make(chan *pb.WatchResponse, ctrlStreamBufLen), } sws.wg.Add(1) go func() { sws.sendLoop() sws.wg.Done() }() go func() { sws.recvLoop() }() sws.wg.Wait() return err }
當客戶端想要通過 Watch 結果監聽某一個 Key 或者一個範圍的變動,在每一次客戶端呼叫服務端上述方式都會建立兩個 Goroutine,這兩個協程一個會負責向監聽者傳送資料變動的事件,另一個協程會負責處理客戶端發來的事件。
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/etcdserver/api/v3rpc/watch.go#L220-334 func (sws *serverWatchStream) recvLoop() error { for { req, err := sws.gRPCStream.Recv() if err == io.EOF { return nil } if err != nil { return err } switch uv := req.RequestUnion.(type) { case *pb.WatchRequest_CreateRequest: creq := uv.CreateRequest filters := FiltersFromRequest(creq) wsrev := sws.watchStream.Rev() rev := creq.StartRevision id, _ := sws.watchStream.Watch(mvcc.WatchID(creq.WatchId), creq.Key, creq.RangeEnd, rev, filters...) wr := &pb.WatchResponse{ Header:sws.newResponseHeader(wsrev), WatchId:int64(id), Created:true, Canceled: err != nil, } select { case sws.ctrlStream <- wr: case <-sws.closec: return nil } case *pb.WatchRequest_CancelRequest: // ... case *pb.WatchRequest_ProgressRequest: // ... default: continue } } }
在用於處理客戶端的 recvLoop 方法中呼叫了 mvcc 模組暴露出的 watchStream.Watch 方法,該方法會返回一個可以用於取消監聽事件的 watchID;當 gRPC 流已經結束後者出現錯誤時,當前的迴圈就會返回,兩個 Goroutine 也都會結束。
如果出現了更新或者刪除事件,就會被髮送到 watchStream 持有的 Channel 中,而 sendLoop 會通過 select 來監聽多個 Channel 中的資料並將接收到的資料封裝成 pb.WatchResponse 結構並通過 gRPC 流傳送給客戶端:
func (sws *serverWatchStream) sendLoop() { for { select { case wresp, ok := <-sws.watchStream.Chan(): evs := wresp.Events events := make([]*mvccpb.Event, len(evs)) for i := range evs { events[i] = &evs[i]} canceled := wresp.CompactRevision != 0 wr := &pb.WatchResponse{ Header:sws.newResponseHeader(wresp.Revision), WatchId:int64(wresp.WatchID), Events:events, CompactRevision: wresp.CompactRevision, Canceled:canceled, } sws.gRPCStream.Send(wr) case c, ok := <-sws.ctrlStream: // ... case <-progressTicker.C: // ... case <-sws.closec: return } } }
對於每一個 Watch 請求來說,watchServer 會根據請求建立兩個用於處理當前請求的 Goroutine,這兩個協程會與更底層的 mvcc 模組協作提供監聽和回撥功能:
到這裡,我們對於 Watch 功能的介紹就差不多結束了,從對外提供的介面到底層的使用的資料結構以及具體實現,其他與 Watch 功能相關的話題可以直接閱讀 etcd 的原始碼瞭解更加細節的實現。
應用
在上面已經介紹了核心的 Raft 共識演算法以及使用的底層儲存之後,這一節更想談一談 etcd 的一些應用場景,與之前談到的 分散式協調服務 ZooKeeper 一樣,etcd 在大多數的叢集中還是處於比較關鍵的位置,工程師往往都會使用 etcd 儲存叢集中的重要資料和元資料,多個節點之間的強一致性以及叢集部署的方式賦予了 etcd 叢集高可用性。
我們依然可以使用 etcd 實現微服務架構中的服務發現、釋出訂閱、分散式鎖以及分散式協調等功能,因為雖然它被定義成了一個可靠的分散式鍵值儲存,但是它起到的依然是一個分散式協調服務的作用,這也使我們在需要不同的協調服務中進行權衡和選擇。
為什麼要在分散式協調服務中選擇 etcd 其實是一個比較關鍵的問題,很多工程師選擇 etcd 主要是因為它使用 Go 語言開發、部署簡單、社群也比較活躍,但是缺點就在於它相比 ZooKeeper 還是一個比較年輕的專案,需要一些時間來成長和穩定。
總結
etcd 的實現原理非常有趣,我們能夠在它的原始碼中學習很多 Go 程式設計的最佳實踐和設計,這也值得我們去研究它的原始碼。
目前很多專案和公司都在生產環境中大規模使用 etcd,這對於社群來說是意見非常有利的事情,如果微服務的大部分技術棧是 Go,作者也更加推薦各位讀者在選擇分散式協調服務時選擇 etcd 作為系統的基礎設施。