ASP.Net Core 中使用Zookeeper搭建分散式環境中的配置中心繫列一:使用Zookeeper.Net元件演示基本...
前言:馬上要過年了,祝大家新年快樂!在過年回家前分享一篇關於Zookeeper的文章,我們都知道現在微服務盛行,大資料、分散式系統中經常會使用到Zookeeper,它是微服務、分散式系統中必不可少的分散式協調框架。它的作用體現在分散式系統中解決了配置中心的問題,以及解決了在分散式環境中不同程序之間爭奪資源的問題,也就是分散式鎖的功能以及分散式訊息佇列功能等等。所以在微服務的環境中Zookeeper是現在很多公司首選的分散式協調框架,包括我之前的公司也在使用Zookeeper。說了這麼多,沒別的就是想說一下Zookeeper的重要性,廢話不多說,進入正題。本篇部落格只是演示在.Net Core 環境中如何使用Zookeeper元件進行基本的增刪改查和一些注意的要點,如果對Zookeeper還不是太瞭解的話,建議認認真真、仔仔細細地閱讀該文章:http://www.cnblogs.com/sunddenly/p/4033574.html 否則可能下面演示的你會看不懂。
一、Zookeeper基本概念快速介紹
概念:
Zookeeper是一個開源的分散式協調框架,它具有高效能 、高可用的特點,同時具有嚴格的順序訪問控制能力(主要是寫操作的嚴格順序性),基於對ZAB(Zookeeper原子訊息廣播協議)的實現,它能夠很好的保證分散式環境下的資料一致性。也正是基於這樣的特徵,使得Zookeeper稱為解決分散式資料一致性問題的利器, Zookeeper由兩部分組成:Zookeeper服務端和客戶端。
特點:
- 全域性一致性:每個server儲存一份相同的資料副本,client無論連結哪個server,展示的資料都是一致的,這是最重要的特徵。
- 可靠性:如果訊息其中一臺伺服器接受,那麼將被所有的伺服器接受。
- 順序性:包括全域性有序性和偏序兩種:全域性有序是指如果在一臺伺服器上訊息a在訊息b前釋出,則在所有server上訊息a都將在訊息b前被髮布;偏序是指如果一個訊息b在訊息a後被同一個傳送者釋出,a必將排在b前面。
- 資料更新原子性:一次資料更新要麼成功,要麼失敗,不存在中間狀態。
- 實時性:Zookeeper保證客戶端將在一個時間間隔範圍內獲得伺服器的更新資訊,或者伺服器失敗的資訊。
資料結構:
圖片來源:(https://www.cnblogs.com/xums/p/7074008.html)
- Zookeeper的資料結構模型採用類似於檔案系統的樹結構。樹上的每個節點稱為ZNode,而每個節點都可能有一個或者多個子節點。ZNode的節點路徑標識方式是由一系列斜槓"/"進行分割的路徑表示,必須是絕對路徑。既可以向ZNode節點寫入、修改和讀取資料,也可以建立、刪除ZNode節點或ZNode節點下的子節點。
- 值的注意的是,Zookeeper的設計目標不是傳統的資料庫儲存或大資料物件儲存,而是協同資料的儲存,因此在實現的時候,ZNode儲存的資料大小不應該超過1MB。另外,每一個節點都有一個ACL(訪問控制列表),據此控制該節點的訪問許可權。
- ZNode資料節點是有生命週期的,其生命週期的長短取決於資料節點的節點型別。節點型別共有四種:持久節點、持久順序節點、臨時節點、臨時順序節點
好了,基本的概念就聊到這裡,先有一個印象,如果需要詳細的學習,建議認認真真閱讀這篇部落格:http://www.cnblogs.com/sunddenly/p/4033574.html,下面就開始演示基本的api操作。
二、ASP.Net Core 中使用ZooKeeper
首先,新增下面的依賴包:
新建一個.Net Core的控制檯應用:
Zookeeper的服務端使用的是張輝清老師新書《中小研發團隊架構實踐》裡面的服務,我這裡不再安裝Zookeeper服務端,只是介紹一下Zookeeper的目錄結構
- Zookeeper目錄介紹
(1)bin:主要的一些執行命令
(2)conf:存放配置檔案,其中我們需要修改zk.cfg
(3)contrib:附加的一些功能
(4)dist-maven:mvn編譯後的目錄
(5)docs:文件
(6)lib:需要依賴的jar包
配置檔案zk.cfg檔案內容介紹(單機版)
(1)trickTime:用於計算的時間單元,比如session超時:N*trickTime
(2)initLimit:用於叢集,允許從節點連結並同步到master節點的初始化連結時間,以trickTime的倍數來表示
(3)syncLimit:用於叢集,master主節點與從節點之間傳送訊息,請求和應答時間長度(心跳機制)
(4)dataDir:必須配置
(5)dataLogDir:日誌目錄,如果不配置會和dataDir公用
(6)clientPort:連結伺服器的埠,預設是2181
好了就介紹到這裡,下面讓我會演示關於Zookeeper API的各種操作。
- 如何連線Zookeeper的服務端
(1)程式碼如下:
using org.apache.zookeeper; using org.apache.zookeeper.data; using System; using System.Collections.Generic; using System.Text; using System.Threading; using System.Threading.Tasks; using static org.apache.zookeeper.Watcher.Event; namespace ZookeeperNetCore { public class ZookeeperClient { public ZooKeeper ZK { get; set; } // 配置項 public string QueryPath { get; set; }= "/Configuration"; //節點狀態資訊 public Stat Stat { get; set; } // 配置資料 public byte[] ConfigData { get; set; } = null; public ZookeeperClient(string serviceAddress, int timeout) { ZK = new ZooKeeper(serviceAddress, timeout, new ConfigServiceWatcher(this)); Console.WriteLine("客戶端開始連線zookeeper伺服器..."); Console.WriteLine($"連線狀態:{ZK.getState()}"); Thread.Sleep(1000);//注意:為什麼要加上這行程式碼,如果不加會出現什麼問題 Console.WriteLine($"連線狀態:{ZK.getState()}"); } // 讀取節點的配置資料 public async Task<string> ReadConfigDataAsync() { if (this.ZK == null) { return string.Empty; } var stat = await ZK.existsAsync(QueryPath, true); if (stat == null) { return string.Empty; } this.Stat = stat; var dataResult = await ZK.getDataAsync(QueryPath, true); return Encoding.UTF8.GetString(dataResult.Data); } public class ConfigServiceWatcher : Watcher { private ZookeeperClient _cs = null; public ConfigServiceWatcher(ZookeeperClient cs) { _cs = cs; } public overrideasync Task process(WatchedEvent @event) { Console.WriteLine($"Zookeeper連結成功:{@event.getState() == KeeperState.SyncConnected}"); if (@event.get_Type() == EventType.NodeDataChanged) { var data = await _cs.ReadConfigDataAsync(); Console.WriteLine("{0}收到修改此節點【{1}】值的通知,其值已被改為【{2}】。", Environment.NewLine, _cs.QueryPath, data); } } } } }
解釋:
首先,我們來看看建立Zookeeper物件時,應該注意的問題:
Zookeeper的建構函式引數解釋如下:
客戶端和zk服務端連結是一個非同步的過程,當連線成功後後,客戶端會收的一個watch通知,就是呼叫回撥函式:ConfigServiceWatcher.process(WatchedEvent @event)注意這個類ConfigServiceWatcher必須要繼承Watcher,重寫 process(WatchedEvent @event),所以就打印出了 。關於Zookeeper的watcher後面會詳細介紹,不明白的不要緊,後面會通過程式碼給大家演示。
(1)connectString:連線伺服器的ip字串,比如: "192.168.1.1:2181,192.168.1.2:2181,192.168.1.3:2181"可以是一個ip,也可以是多個ip,一個ip代表單機,多個ip代表叢集,也可以在ip後加路徑。
(2)sessionTimeout:超時時間,心跳收不到了,那就超時
(3)watcher:通知事件,如果有對應的事件觸發,則會收到一個通知;如果不需要,那就設定為null,在上面的演示中,我們設定了一個watcher。
(4)canBeReadOnly:可讀,當這個物理機節點斷開後,還是可以讀到資料的,只是不能寫,此時資料被讀取到的可能是舊資料,此處建議設定為false,不推薦使用。
(5)sessionId:會話的id
(6)sessionPasswd:會話密碼 當會話丟失後,可以依據 sessionId 和 sessionPasswd 重新獲取會話。
好了,基本的引數已經介紹完畢,那麼,來解釋一下為什麼在建立Zookeeper物件時新增下面這句程式碼:
其實上面我已經解釋了,由於客戶端和zk服務端連結是一個非同步的過程,需要一定的時間間隔,所以,如果不新增效果這樣:
(2)zookeeper 恢復之前的會話連線演示
using org.apache.zookeeper; using org.apache.zookeeper.data; using System; using System.Collections.Generic; using System.Text; using System.Threading; using System.Threading.Tasks; using static org.apache.zookeeper.Watcher.Event; namespace ZookeeperNetCore { public class ZookeeperClient { public ZooKeeper ZK { get; set; } // 配置項 public string QueryPath { get; set; }= "/Configuration"; //節點狀態資訊 public Stat Stat { get; set; } // 配置資料 public byte[] ConfigData { get; set; } = null; public ZookeeperClient(string serviceAddress, int timeout) { ZK = new ZooKeeper(serviceAddress, timeout, new ConfigServiceWatcher(this)); } public ZookeeperClient(string serviceAddress, int timeout, long sessionId, byte[] sessionPasswd) { ZK = new ZooKeeper(serviceAddress, timeout, new ConfigServiceWatcher2(this), sessionId, sessionPasswd); } // 讀取節點的配置資料 public async Task<string> ReadConfigDataAsync() { if (this.ZK == null) { return string.Empty; } var stat = await ZK.existsAsync(QueryPath, true); if (stat == null) { return string.Empty; } this.Stat = stat; var dataResult = await ZK.getDataAsync(QueryPath, true); return Encoding.UTF8.GetString(dataResult.Data); } public class ConfigServiceWatcher : Watcher { private ZookeeperClient _cs = null; public ConfigServiceWatcher(ZookeeperClient cs) { _cs = cs; } public overrideasync Task process(WatchedEvent @event) { Console.WriteLine($"Zookeeper連結成功:{@event.getState() == KeeperState.SyncConnected}"); if (@event.get_Type() == EventType.NodeDataChanged) { var data = await _cs.ReadConfigDataAsync(); Console.WriteLine("{0}收到修改此節點【{1}】值的通知,其值已被改為【{2}】。", Environment.NewLine, _cs.QueryPath, data); } } } public class ConfigServiceWatcher2 : Watcher { private ZookeeperClient _cs = null; public ConfigServiceWatcher2(ZookeeperClient cs) { _cs = cs; } public override async Task process(WatchedEvent @event) { Console.WriteLine($"Zookeeper連結成功:{@event.getState() == KeeperState.SyncConnected}"); if (@event.get_Type() == EventType.NodeDataChanged) { var data = await _cs.ReadConfigDataAsync(); Console.WriteLine("{0}收到修改此節點【{1}】值的通知,其值已被改為【{2}】。", Environment.NewLine, _cs.QueryPath, data); } } } } }
- ZNode建立刪除修改查詢
程式碼:
using org.apache.zookeeper; using org.apache.zookeeper.data; using System; using System.Collections.Generic; using System.Text; using System.Threading; using System.Threading.Tasks; using static org.apache.zookeeper.Watcher.Event; using static org.apache.zookeeper.ZooDefs; namespace ZookeeperNetCore { public class ZookeeperClient { public ZooKeeper ZK { get; set; } // 配置項 public string QueryPath { get; set; }= "/Configuration"; //節點狀態資訊 public Stat Stat { get; set; } // 配置資料 public byte[] ConfigData { get; set; } = null; public ZookeeperClient(string serviceAddress, int timeout) { ZK = new ZooKeeper(serviceAddress, timeout, new ConfigServiceWatcher(this)); } public ZookeeperClient(string serviceAddress, int timeout, long sessionId, byte[] sessionPasswd) { ZK = new ZooKeeper(serviceAddress, timeout, new ConfigServiceWatcher2(this), sessionId, sessionPasswd); } // 讀取節點的配置資料 public async Task<string> ReadConfigDataAsync() { if (this.ZK == null) { return string.Empty; } var stat = await ZK.existsAsync(QueryPath, true); if (stat == null) { return string.Empty; } this.Stat = stat; var dataResult = await ZK.getDataAsync(QueryPath, true); return Encoding.UTF8.GetString(dataResult.Data); } public class ConfigServiceWatcher : Watcher { private ZookeeperClient _cs = null; public ConfigServiceWatcher(ZookeeperClient cs) { _cs = cs; } public overrideasync Task process(WatchedEvent @event) { Console.WriteLine($"Zookeeper連結成功:{@event.getState() == KeeperState.SyncConnected}"); if (@event.get_Type() == EventType.NodeDataChanged) { var data = await _cs.ReadConfigDataAsync(); Console.WriteLine("{0}收到修改此節點【{1}】值的通知,其值已被改為【{2}】。", Environment.NewLine, _cs.QueryPath, data); } } } public class ConfigServiceWatcher2 : Watcher { private ZookeeperClient _cs = null; public ConfigServiceWatcher2(ZookeeperClient cs) { _cs = cs; } public override async Task process(WatchedEvent @event) { Console.WriteLine($"Zookeeper連結成功:{@event.getState() == KeeperState.SyncConnected}"); if (@event.get_Type() == EventType.NodeDataChanged) { var data = await _cs.ReadConfigDataAsync(); Console.WriteLine("{0}收到修改此節點【{1}】值的通知,其值已被改為【{2}】。", Environment.NewLine, _cs.QueryPath, data); } } } // 關閉ZooKeeper連線 // 釋放資源 public async Task Close() { if (this.ZK != null) { await ZK.closeAsync(); } this.ZK = null; } } }
using org.apache.zookeeper; using System; using System.Text; using System.Threading; using System.Threading.Tasks; using static org.apache.zookeeper.ZooDefs; namespace ZookeeperNetCore { class Program { public const int timeout = 5000; static async Task Main(string[] args) { var conf = new ZookeeperClient("", timeout); try { conf.QueryPath = "/UserName"; Console.WriteLine("客戶端開始連線zookeeper伺服器..."); Console.WriteLine($"連線狀態:{conf.ZK.getState()}"); Thread.Sleep(1000);//注意:為什麼要加上這行程式碼,如果不加會出現什麼問題 Console.WriteLine($"連線狀態:{conf.ZK.getState()}"); if (await conf.ZK.existsAsync(conf.QueryPath, false) == null) { conf.ConfigData = Encoding.Default.GetBytes("guozheng"); await conf.ZK.createAsync(conf.QueryPath, conf.ConfigData, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } string configData = await conf.ReadConfigDataAsync(); Console.WriteLine("節點【{0}】目前的值為【{1}】。", conf.QueryPath, configData); Console.ReadLine(); Random random = new Random((int)DateTime.Now.Ticks & 0x0000FFFF); conf.ConfigData = Encoding.UTF8.GetBytes(string.Format("Mike_{0}", random.Next(100))); await conf.ZK.setDataAsync(conf.QueryPath, conf.ConfigData, -1); Console.WriteLine("節點【{0}】的值已被修改為【{1}】。", conf.QueryPath, Encoding.UTF8.GetString(conf.ConfigData)); Console.ReadLine(); if (await conf.ZK.existsAsync(conf.QueryPath, false) != null) { await conf.ZK.deleteAsync(conf.QueryPath, -1); Console.WriteLine("已刪除此【{0}】節點。{1}", conf.QueryPath, Environment.NewLine); } } catch (Exception ex) { if (conf.ZK == null) { Console.WriteLine("已關閉ZooKeeper的連線。"); Console.ReadLine(); return; } Console.WriteLine("丟擲異常:{0}【{1}】。", Environment.NewLine, ex.ToString()); } finally { await conf.Close(); Console.WriteLine("已關閉ZooKeeper的連線。"); Console.ReadLine(); } ////開始會話重連 //Console.WriteLine("開始會話重連..."); //var conf2 = new ZookeeperClient("", timeout, sessionId, sessionPassword); //Console.WriteLine(conf2.ZK.getSessionId()); //Console.WriteLine( Encoding.UTF8.GetString(conf2.ZK.getSessionPasswd())); //Console.WriteLine($"重新連線狀態zkSession:{conf2.ZK.getState()}"); //Thread.Sleep(1000);//注意:為什麼要加上這行程式碼,如果不加會出現什麼問題 //Console.WriteLine($"重新連線狀態zkSession:{conf2.ZK.getState()}"); Console.ReadKey(); } } }
解釋:
關於非同步建立節點的方法,是不支援子節點的遞迴建立,引數介紹:
(1)path:建立的路徑
(2)data:儲存的資料的byte[]
(3)acl:控制權限策略 Ids.OPEN_ACL_UNSAFE --> world:anyone:cdrwa CREATOR_ALL_ACL --> auth:user:password:cdrwa
(4)createMode: 節點型別, 是一個列舉 PERSISTENT:持久節點 PERSISTENT_SEQUENTIAL:持久順序節點 EPHEMERAL:臨時節點 EPHEMERAL_SEQUENTIAL:臨時順序節點
關於上面引數引出來的知識點,需要幾章來講解,本篇文章先不介紹,後面會介紹。好了,關於.Net Core中使用Zookeeper的介紹就到這裡,關於上面演示的結果,我先丟擲一個問題,大家可以思考一下:為什麼“Zookeeper連結成功:True”會輸出多次?也就是我們下節要討論的Zookeeper的watcher機制。時間到了,收拾行李,準備一下回家啦,先寫到這裡,祝大家新年快樂!希望對你有幫助,過完年來見!
三、總結
可能有些地方解釋的不是太清楚,大家多多見諒,有些的不對的地方,希望能指正出來。
說明:演示程式碼裡面使用的Zookeeper服務過一段時間能用,不能用的話,在評論區留言,後面用阿里雲自己搭建一個。
程式碼地址:
https://github.com/guozheng007/ZookeeperNetCoreDemo
參考資料:
(1)張輝清:《中小研發團隊架構實踐》
(2) 風間影月:《ZooKeeper分散式專題與Dubbo微服務入門》
(3)sunddenly:http://www.cnblogs.com/sunddenly/p/4033574.html
作者:郭崢
出處:http://www.cnblogs.com/runningsmallguo/
本文版權歸作者和部落格園共有,歡迎轉載,但未經作者同意必須保留此段宣告,且在文章頁面明顯位置給出原文連結。