說說 Egg.js 中的多程序增強模型(一)
Egg.js 是阿里推出的面向Node的企業級服務框架,這裡只是講一講egg進階中的外掛開發會遇到多程序增強模型 .
背景
Egg.js原理簡介
稍微熟悉Egg.js原理的應該都知道master / agent / worker
這三個程序的職責以及agent.js / app.js
這兩個js檔案,agent程序對應於agent.js,worker程序對應的是app.js,而worker程序是有多個的以叢集方式進行工作的,並且最終部署的應用也是叢集的方式部署在不同的機器上的,因此實際的worker是一個n
x
m
的數量。
服務長鏈
服務端應用最典型的就是資料庫連線(如: MySQL),尤其是微服務化後出現了各種各樣的中介軟體(如:Eureka/Zookeeper/Disconf), 這樣每一個應用都需要維護各種各樣的長連結。
Egg的支援
對於長連結的建立方式,Egg提供了兩種支援分別是:app.addSingleton(name, creator)
和多程序增強模型
。兩種方式分別在什麼時候使用?addSingleton
的方式可以直接參考Egg提供的例子MySQL
,它可以保證一個application
的物件(一個worker)只會有一個mysql
例項,但是多個worker還是會有多個,對於MySQL這種在server端有連結池的是沒有問題的,而且這樣實現也簡單易用,但是如果沒有連結池的中介軟體來講這樣是一種極大的資源浪費 (n
x
m
),因此就會用到了多程序增強模型
,下面具體說說。
多程序增強模型
Egg中的多程序增強模型實際上完全使用的就是Cluser-Client 庫(也是阿里開源),在GitHub上面有它工作原理和使用方式的介紹,只是不知道大家會不會和我一樣看了一遍之後依然不知所云和無從下手的感覺,因此才寫了這篇部落格將原始碼閱讀的理解記錄下來。
Egg文件引用
首先建立RegistryClient
程式碼如下:
const Base = require('sdk-base'); class RegistryClient extends Base { ... }
然後建立一個APIClient
類繼承框架提供的快捷類APIClientBase
, 程式碼如下:
const APIClientBase = require('cluster-client').APIClientBase; const RegistryClient = require('./registry_client'); class APIClient extends APIClientBase { // 返回原始的客戶端類 get DataClient() { return RegistryClient; } subscribe(reg, listener) { this._client.subscribe(reg, listener); } publish(reg) { this._client.publish(reg); } }
這裡需要注意的是:
-
DataClient
方法需要返回前面定義好的RegistryClient
類。 -
_client
屬性是繼承自父類, 直接就可以使用。
在Egg中嵌入上面的程式碼:
// app.js || agent.js const APIClient = require('./APIClient'); // 上面那個模組 module.exports = app => { const config = app.config.apiClient; app.apiClient = new APIClient(Object.assign({}, config, { cluster: app.cluster }); app.beforeStart(async () => { await app.apiClient.ready(); }); };
根據上面的程式碼進行下面的梳理:
-
每一個
agent / application
都會有一個APIClient
例項。 -
所有的
APIClient
例項都會知曉RegistryClient
類名。 -
APIClient
裡面的方法會實際的呼叫一個_client
屬性。
理解:
這就是一個靜態代理模式
,所有想要對RegistryClient
類的呼叫都要經過APIClient
進行一次代理,所以只要保證RegistryClient
的例項只有一個,其它所有的APIClient
都可以通過某種方式將操作(請求)傳達給RegistryClient
就可以實現多程序單例項模式了。
注:上面使用方式是cluster-client的最佳實踐,雖然拋開APIClient這個類也可以,這裡直接跳過了是因為這樣拆解更靈活並易於擴充套件,實際這裡是需要進行兩層的代理
, RegistryClient會代理真正的業務client的呼叫(可以動態代理實現)並維護業務client的連結和事件接收,APIClient是用來mock所有業務client的api,讓業務的使用更貼近真正業務client的呼叫。如(示意):APIClient.getData() --> RegistryClient.<DynamicDispatcher> --> zkClient.getData()
原始碼分析
有了上面的例子和思路,帶著下面兩個問題進行原始碼的分析:
RegistryClient APIClient
主從模式(Leader / Follower)
將多程序分為主(Leader)程序和從(Follower)程序,Leader只有一個並負責維護實際的第三方應用的連結及事件處理,Follower用於訂閱Leader的一些事件及主動推送資料給Leader,也可以主動呼叫Leader執行一些操作,它們之間可以通過程序間通訊的方式進行資訊交換。在Egg中規定了agent程序是Leader,而其他worker程序作為Follower,程式碼如下isLeader: this.type === 'agent'
:
// node_modules/egg/egg.js class EggApplication extends EggCore { constructor(options) { ... ... /** * Wrap the Client with Leader/Follower Pattern * * @description almost the same as Agent.cluster API, the only different is that this method create Follower. * * @see https://github.com/node-modules/cluster-client * @param {Function} clientClass - client class function * @param {Object} [options] *- {Boolean} [autoGenerate] - whether generate delegate rule automatically, default is true *- {Function} [formatKey] - a method to tranform the subscription info into a string,default is JSON.stringify *- {Object} [transcode|JSON.stringify/parse] *- {Function} encode - custom serialize method *- {Function} decode - custom deserialize method *- {Boolean} [isBroadcast] - whether broadcast subscrption result to all followers or just one, default is true *- {Number} [responseTimeout] - response timeout, default is 3 seconds *- {Number} [maxWaitTime|30000] - leader startup max time, default is 30 seconds * @return {ClientWrapper} wrapper */ this.cluster = (clientClass, options) => { options = Object.assign({}, this.config.clusterClient, options, { // cluster need a port that can't conflict on the environment port: this.options.clusterPort, // agent worker is leader, app workers are follower isLeader: this.type === 'agent', logger: this.coreLogger, }); const client = cluster(clientClass, options); this._patchClusterClient(client); return client; }; ... ... } ... ... }
上面:point_up_2:程式碼是在agent 和 application物件上掛了一個名為cluser
的建立方法,方法返回一個ClientWrapper
例項。
Cluster-Client程式碼結構
|--cluster-client |--lib |--protocol --byte_buffer.js --packet.js --request.js --response.js --api_client.js --client.js --connections.js --const.js --default_logger.js --default_transcode.js --follower.js --index.js --leader.js --server.js --symbol.js --utils.js
這裡我們先重點關注api_client.js / index.js / client.js
這三個原始碼。回想到上面Egg文件給我提供的建立apiClient
的例程式碼:point_down: :
new APIClient(Object.assign({}, config, { cluster: app.cluster });
我們就來到了cluster-client/lib/api_client.js
, 這裡將app.cluster
方法傳入,參考原始碼:
1constructor(options) { 2options = options || {}; 3super(options); 4const wrapper = (options.cluster || cluster)( 5this.DataClient, this.clusterOptions 6); 7for (const from in this.delegates) { 8const to = this.delegates[from]; 9wrapper.delegate(from, to); 10} 11this._client = wrapper.create(options); 12utils.delegateEvents(this._client, this); 13if (!options.initMethod) { 14this._client.ready(err => { 15this.ready(err ? err : true); 16}); 17} 18 }
第4行程式碼直接就呼叫了cluster
方法建立了一個ClientWrapper
例項,第11行呼叫了wrapper的create方法,這樣我們就來到了cluster-client/lib/index.js
:
// 去掉不分析的程式碼 ... create (...args) { ... function createRealClient() { return Reflect.construct(clientClass, args); } const client = new ClusterClient(Object.assign({ createRealClient, descriptors: this._descriptors, }, this._options)); ... } ...
create
方法主要是做了一些方法delegate生成和方法校驗(下回分析),這裡呼叫了包裝了一個反射建立真實RegistryClient
例項的方法並傳入ClusterClient
生成了一個例項最終返回給呼叫者其實就是APIClient
中的_client
,那麼這樣就來到了重點的cluster-client/lib/client.js
, 方便檢視這裡直接就貼出[init]
部分程式碼:
async [init]() { const name = this.options.name; const port = this.options.port; let server; if (this.options.isLeader === true) { server = await ClusterServer.create(name, port); if (!server) { throw new Error(`create "${name}" leader failed, the port:${port} is occupied by other`); } } else if (this.options.isLeader === false) { // wait for leader active await ClusterServer.waitFor(port, this.options.maxWaitTime); } else { debug('[ClusterClient:%s] init cluster client, try to seize the leader on port:%d', name, port); server = await ClusterServer.create(name, port); } if (server) { this[innerClient] = new Leader(Object.assign({ server }, this.options)); debug('[ClusterClient:%s] has seized port %d, and serves as leader client.', name, port); } else { this[innerClient] = new Follower(this.options); debug('[ClusterClient:%s] gives up seizing port %d, and serves as follower client.', name, port); } ... }
程式碼非常清晰,如果是leader就會建立一個server並監聽<port>, 如果是follower就連結server的<port>埠(可以檢視server.js程式碼)。然後分別new了Leader和Follower兩個例項並賦值給[innerClient]
。當我們再檢視Leader.js
的程式碼時,發現在建構函式裡有this._realClient = this.options.createRealClient();
, 原來真正的client是在這個時間建立的,而檢視Follower.js
的程式碼時發現都是傳送的tcp請求。這樣上面的兩個問題我們就都有了答案。
-
agent程序起來後加載agent.js的時候設定了
cluster
方法,在beforeStart
時通過new APIClient
初始化_client
屬性的同時啟動了一個tcp server並在new Leader
物件時初始化正在的client。Egg的agent程序只有一個因此真正的client例項也只有一個。 -
當呼叫
APIClient
的方法時就會通過_client
屬性呼叫到ClusterClient
,然後再呼叫它內部[innerClient]
, 而[innerClient]
分別是Leader
和Follower
的例項,所以如果是leader就直接呼叫realClient否則就傳送tcp請求。
至此cluster-client
的多程序增強模式的主從原理就分析完成了,在實際的實現過程中具體的呼叫還是有一些規則和約束,如:delegates
的設定以及subscribe / publish / invoke / invokeOneway
分別是如何使用的還需要進一步細看,抽空在開一篇吧,文件不詳細,只能翻原始碼了!!!