利用Zookeeper實現資料釋出訂閱
所謂的資料釋出/訂閱,意思是釋出者將資料釋出到Zookeeper上的一個或一系列節點上,通過watcher機制,客戶端可以監聽(訂閱)這些資料節點,當這些節點發生變化時,Zookeeper及時地通知客戶端,從而達到動態獲取資料的目的。
一種常見的場景就是配置中心。隨著應用越來越多,功能越來越複雜,機器也越來越多,對於一些公共的程式配置,譬如各種功能的開關、資料庫的配置、伺服器的地址等,如果每個應用每個機器仍然單獨維護,當要修改配置時就得一個一個地修改,這樣顯然非常不方便。
這些公共的配置資訊通常具備以下3個特性:
- 資料量通常比較小
- 資料內容在執行時發生動態變化
- 叢集中各機器共享、配置一致
可以將這些配置抽取出來,交給配置中心統一管理起來。配置中心的架構一般是這樣:
開源配置中心
開源的配置中心有很多,各有特點,這裡只列出幾個進行簡單地介紹。
Ctrip Apollo
github地址: https://github.com/ctripcorp/apollo
介紹:Apollo(阿波羅)是攜程框架部門研發的分散式配置中心,能夠集中化管理應用不同環境、不同叢集的配置,配置修改後能夠實時推送到應用端,並且具備規範的許可權、流程治理等特性,適用於微服務配置管理場景。
Disconf
github地址: https://github.com/knightliao/disconf
介紹:專注於各種「分散式系統配置管理」的「通用元件」和「通用平臺」, 提供統一的「配置管理服務」。主要目標是部署極其簡單、部署動態化、統一管理、一個jar包,到處執行。
Spring Cloud Config
github地址: https://github.com/spring-cloud/spring-cloud-config
介紹:Spring Cloud Config是一個基於http協議的遠端配置實現方式,通過統一的配置管理伺服器進行配置管理,客戶端通過https協議主動的拉取服務的的配置資訊,完成配置獲取。
Nacos
github地址: https://github.com/alibaba/nacos
介紹:Nacos是阿里最近才開源的一個更易於構建雲原生應用的動態服務發現、配置管理和服務管理平臺。Nacos 致力於幫助您發現、配置和管理微服務。Nacos提供了一組簡單易用的特性集,幫助您快速實現動態服務發現、服務配置、服務元資料及流量管理。
利用Zookeeper實現一個配置中心
開源的配置中心當然都很優秀,但是現在我們還是先利用Zookeeper來實現一個屬於自己的配置中心。
我們的配置中心儲存的配置資訊十分簡單,就是JDBC連線MySQL需要用的連線資訊。這些連線資訊將轉化為JSON字串,儲存在Zookeeper上的一個節點中;應用程式(通過執行緒模擬的)從Zookeeper中讀取這些配置資訊,然後查詢資料庫;當修改資料庫連線資訊時( 切換資料庫 ),應用程式能及時的拉取新的連線資訊,使用新的連線查詢資料庫。
定義一個 MysqlConfig 類,方便使用 FastJSON 將配置資訊在JSON字串與物件之間做轉換。
@AllArgsConstructor @Data public class MysqlConfig { private String url; private String driver; private String username; private String password; }
最開始,將Zookeeper上節點的配置資訊初始化為 test 資料庫的連線資訊,然後啟動 N 個執行緒(模擬應用程式),讀取連線資訊並查詢資料,同時設定監聽節點;等待 10 秒鐘之後,將配置切換為 test2 資料庫的連線資訊,這時應用程式將受到配置變更的通知,然後獲取資訊連線資訊,重新查詢資料庫。
// 工具類 public class ZKUtils { private static final String zkServerIps = "master:2181,hadoop2:2181"; public static synchronized CuratorFramework getClient() { CuratorFramework client = CuratorFrameworkFactory.builder().connectString(zkServerIps) .sessionTimeoutMs(6000).connectionTimeoutMs(3000) //.namespace("LeaderLatchTest") .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build(); return client; } } // 配置中心示例,模擬資料庫切換 public class ConfigCenterTest { // test 資料庫的 test1 表 private static final MysqlConfig mysqlConfig_1 = new MysqlConfig("jdbc:mysql://master:3306/test?useUnicode=true&characterEncoding=utf-8&useSSL=false", "com.mysql.jdbc.Driver", "root", "123456"); // test2 資料庫的 test1 表 private static final MysqlConfig mysqlConfig_2 = new MysqlConfig("jdbc:mysql://master:3306/test2?useUnicode=true&characterEncoding=utf-8&useSSL=false", "com.mysql.jdbc.Driver", "root", "123456"); // 儲存MySQL配置資訊的節點路徑 private static final String configPath = "/testZK/jdbc/mysql"; private static final Integer clientNums = 3; private static CountDownLatch countDownLatch = new CountDownLatch(clientNums); public static void main(String[] args) throws Exception { // 最開始時設定MySQL配置資訊為 mysqlConfig_1 setMysqlConfig(mysqlConfig_1); // 啟動 clientNums 個執行緒,模擬分散式系統中的節點, // 從Zookeeper中獲取MySQL的配置資訊,查詢資料 for (int i = 0; i < clientNums; i++) { String clientName = "client#" + i; new Thread(new Runnable() { @Override public void run() { CuratorFramework client = ZKUtils.getClient(); client.start(); try { Stat stat = new Stat(); // 如果要監聽多個子節點則應該使用 PathChildrenCache final NodeCache cacheNode = new NodeCache(client, configPath, false); cacheNode.start(true);// true 表示啟動時立即從Zookeeper上獲取節點 byte[] nodeData = cacheNode.getCurrentData().getData(); MysqlConfig mysqlConfig = JSON.parseObject(new String(nodeData), MysqlConfig.class); queryMysql(clientName, mysqlConfig);// 查詢資料 cacheNode.getListenable().addListener(new NodeCacheListener() { @Override public void nodeChanged() throws Exception { byte[] newData = cacheNode.getCurrentData().getData(); MysqlConfig newMysqlConfig = JSON.parseObject(new String(newData), MysqlConfig.class); queryMysql(clientName, newMysqlConfig);// 查詢資料 } }); Thread.sleep(20 * 1000); } catch (Exception e) { e.printStackTrace(); } finally { client.close(); countDownLatch.countDown(); } } }).start(); } Thread.sleep(10 * 1000); System.out.println("\n---------10秒鐘後將MySQL配置資訊修改為 mysqlConfig_2---------\n"); setMysqlConfig(mysqlConfig_2); countDownLatch.await(); } /** * 初始化,最開始的時候的MySQL配置為 mysqlConfig_1 */ public static void setMysqlConfig(MysqlConfig config) throws Exception { CuratorFramework client = ZKUtils.getClient(); client.start(); String mysqlConfigStr = JSON.toJSONString(config); Stat s = client.checkExists().forPath(configPath); if (s != null) { Stat resultStat = client.setData().forPath(configPath, mysqlConfigStr.getBytes()); System.out.println(String.format("節點 %s 已存在,更新資料為:%s", configPath, mysqlConfigStr)); } else { client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(configPath, mysqlConfigStr.getBytes()); System.out.println(String.format("建立節點:%s,初始化資料為:%s", configPath, mysqlConfigStr)); } System.out.println(); client.close(); } /** * 通過配置資訊,查詢MySQL資料庫 */ public static synchronized void queryMysql(String clientName, MysqlConfig mysqlConfig) throws ClassNotFoundException, SQLException { System.out.println(clientName + " 查詢MySQL資料,使用的MySQL配置資訊:" + mysqlConfig); Class.forName(mysqlConfig.getDriver()); Connection connection = DriverManager.getConnection(mysqlConfig.getUrl(), mysqlConfig.getUsername(), mysqlConfig.getPassword()); Statement statement = connection.createStatement(); ResultSet resultSet = statement.executeQuery("select * from test1"); while (resultSet.next()) { System.out.println(String.format("id=%s, name=%s, age=%s", resultSet.getString(1), resultSet.getString(2), resultSet.getString(3))); } System.out.println(); resultSet.close(); statement.close(); connection.close(); } }
控制檯列印日誌
client#2 查詢MySQL資料,使用的MySQL配置資訊:MysqlConfig(url=jdbc:mysql://master:3306/test?useUnicode=true&characterEncoding=utf-8&useSSL=false, driver=com.mysql.jdbc.Driver, username=root, password=123456) id=2, name=賴鍵鋒, age=25 id=3, name=小旋鋒, age=22000 id=4, name=test, age=100 client#1 查詢MySQL資料,使用的MySQL配置資訊:MysqlConfig(url=jdbc:mysql://master:3306/test?useUnicode=true&characterEncoding=utf-8&useSSL=false, driver=com.mysql.jdbc.Driver, username=root, password=123456) id=2, name=賴鍵鋒, age=25 id=3, name=小旋鋒, age=22000 id=4, name=test, age=100 client#0 查詢MySQL資料,使用的MySQL配置資訊:MysqlConfig(url=jdbc:mysql://master:3306/test?useUnicode=true&characterEncoding=utf-8&useSSL=false, driver=com.mysql.jdbc.Driver, username=root, password=123456) id=2, name=賴鍵鋒, age=25 id=3, name=小旋鋒, age=22000 id=4, name=test, age=100 ---------10秒鐘後將MySQL配置資訊修改為 mysqlConfig_2--------- 節點 /testZK/jdbc/mysql 已存在,更新資料為:{"driver":"com.mysql.jdbc.Driver","password":"123456","url":"jdbc:mysql://master:3306/test2?useUnicode=true&characterEncoding=utf-8&useSSL=false","username":"root"} client#1 查詢MySQL資料,使用的MySQL配置資訊:MysqlConfig(url=jdbc:mysql://master:3306/test2?useUnicode=true&characterEncoding=utf-8&useSSL=false, driver=com.mysql.jdbc.Driver, username=root, password=123456) id=2, name=賴鍵鋒, age=23 id=3, name=小旋鋒, age=22 id=4, name=whirly, age=24 client#2 查詢MySQL資料,使用的MySQL配置資訊:MysqlConfig(url=jdbc:mysql://master:3306/test2?useUnicode=true&characterEncoding=utf-8&useSSL=false, driver=com.mysql.jdbc.Driver, username=root, password=123456) id=2, name=賴鍵鋒, age=23 id=3, name=小旋鋒, age=22 id=4, name=whirly, age=24 client#0 查詢MySQL資料,使用的MySQL配置資訊:MysqlConfig(url=jdbc:mysql://master:3306/test2?useUnicode=true&characterEncoding=utf-8&useSSL=false, driver=com.mysql.jdbc.Driver, username=root, password=123456) id=2, name=賴鍵鋒, age=23 id=3, name=小旋鋒, age=22 id=4, name=whirly, age=24
上面採用的示例是通過 NodeCache 來監聽單個節點,如果要監聽多個子節點則須使用 PathChildrenCache,使用示例可以參考《 Zookeeper 分散式協調服務介紹 》
相關文章
後序
程式碼下載: http://t.cn/E5ncvDR
我的部落格:laijianfeng.org
參考:
《從Paxos到Zookeeper分散式一致性原理與實踐》
轉載請註明來源,歡迎對文章中的引用來源進行考證,歡迎指出任何有錯誤或不夠清晰的表達。可以在下面評論區評論,也可以郵件至 [email protected]