使用Kazoo操作ZooKeeper服務治理
單機服務的可靠性及可擴充套件性有限,某臺服務宕機可能會影響整個系統的正常使用;分散式服務能夠有效地解決這一問題,但同時分散式服務也會帶來一些新的問題,如:服務發現(新增或者刪除了服務如何確保能讓客戶端知道),容災(某些服務出現故障如何讓客戶端只訪問正常的服務);ZooKeeper的提出主要是為了解決分散式服務的治理問題,它在分散式環境中協調和管理服務。
Zookeeper協調管理服務的過程如下圖:
服務端:每臺伺服器都要向註冊中心Zookeeper進行註冊登記,並且保持與Zookeeper的連線,如果伺服器與Zookeeper斷開了連線,Zookeeper將刪除該伺服器的地址。
客戶端:需要服務的時候先向Zookeeper訂閱伺服器的地址資訊,Zookeeper返回給客戶端已註冊的伺服器資訊列表,客戶端從伺服器資訊列表中選擇伺服器進行服務呼叫,如果Zookeeper記錄的伺服器資訊發生了變更,伺服器會通知客戶端變更事件,客戶端可以獲取最新的伺服器資訊。
ZooKeeper檔案系統的資料結構是個樹狀結構,它的每個節點(znode)由一個名稱標識,並用路徑/分割:
ZooKeeper的節點型別有:
1. 持久節點(ZooKeeper預設的節點型別,建立該節點的客戶端斷開連線後,持久節點仍然存在)
2. 順序節點(將10位的序列號附加到原始名稱來設定節點的路徑,如:/server0000000001)
3. 臨時節點(當客戶端與ZooKeeper斷開連線時,臨時節點會自動刪除)
RPC服務註冊到ZooKeeper
服務端:
1 import threading 2 import json 3 import socket 4 import sys 5 from kazoo.client import KazooClient 6 from divide_rpc import ServerStub 7 from divide_rpc import InvalidOperation 8 9 10 class ThreadServer(object): 11def __init__(self, host, port, handlers): 12self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 13self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 14self.host = host 15self.port = port 16self.sock.bind((host, port)) 17self.handlers = handlers 18 19def serve(self): 20""" 21開始服務 22""" 23self.sock.listen(128) 24self.register_zk() 25print("開始監聽") 26while True: 27conn, addr = self.sock.accept() 28print("建立連結%s" % str(addr)) 29t = threading.Thread(target=self.handle, args=(conn,)) 30t.start() 31 32def handle(self, client): 33stub = ServerStub(client, self.handlers) 34try: 35while True: 36stub.process() 37except EOFError: 38print("客戶端關閉連線") 39 40client.close() 41 42def register_zk(self): 43""" 44註冊到zookeeper 45""" 46self.zk = KazooClient(hosts='127.0.0.1:2181') 47self.zk.start() 48self.zk.ensure_path('/rpc')# 建立根節點 49value = json.dumps({'host': self.host, 'port': self.port}) 50# 建立服務子節點 51self.zk.create('/rpc/server', value.encode(), ephemeral=True, sequence=True) 52 53 54 class Handlers: 55@staticmethod 56def divide(num1, num2=1): 57""" 58除法 59:param num1: 60:param num2: 61:return: 62""" 63if num2 == 0: 64raise InvalidOperation() 65val = num1 / num2 66return val 67 68 69 if __name__ == '__main__': 70if len(sys.argv) < 3: 71print("usage:python server.py [host] [port]") 72exit(1) 73host = sys.argv[1] 74port = sys.argv[2] 75server = ThreadServer(host, int(port), Handlers) 76server.serve()
服務端通過kazoo連線zookeeper,依次建立根節點和服務的子節點,當啟動多執行緒伺服器的時候,會根據ip和埠建立不同的節點,依次啟動兩個server(8001、8002),檢視zookeeper的節點資訊:
1 >>> from kazoo.client import KazooClient 2 >>> zk = KazooClient(hosts='127.0.0.1:2181') 3 >>> zk.start() 4 >>> children = zk.get_children("/rpc") 5 >>> print(children) 6 ['server0000000001', 'server0000000000']
客戶端:
1 import random 2 import time 3 import json 4 import socket 5 from divide_rpc import ( 6ClientStub, InvalidOperation 7 ) 8 from kazoo.client import KazooClient 9 10 11 class DistributedChannel(object): 12def __init__(self): 13self._zk = KazooClient(hosts='127.0.0.1:2181') 14self._zk.start() 15self._get_servers() 16 17def _get_servers(self, event=None): 18""" 19從zookeeper獲取伺服器地址資訊列表 20""" 21servers = self._zk.get_children('/rpc', watch=self._get_servers) 22print(servers) 23self._servers = [] 24for server in servers: 25data = self._zk.get('/rpc/' + server)[0] 26if data: 27addr = json.loads(data.decode()) 28self._servers.append(addr) 29 30def _get_server(self): 31""" 32隨機選出一個可用的伺服器 33""" 34return random.choice(self._servers) 35 36def get_connection(self): 37""" 38提供一個可用的tcp連線 39""" 40while True: 41server = self._get_server() 42print(server) 43try: 44sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 45sock.connect((server['host'], server['port'])) 46except ConnectionRefusedError: 47time.sleep(1) 48continue 49else: 50break 51return sock 52 53 54 channel = DistributedChannel() 55 56 for i in range(50): 57try: 58stub = ClientStub(channel) 59val = stub.divide(i) 60except InvalidOperation as e: 61print(e.message) 62else: 63print(val) 64time.sleep(1)
客戶端連線zookeeper,通過get_children來獲取伺服器資訊,並watch監聽伺服器的變化情況,啟動客戶端會發現它會呼叫8001埠的server和8002埠的server:
此時服務端新增加一個結點,8003,客戶端變化情況:
可以看出zookeeper總共有三個節點了,前面呼叫的server都是8001和8002,當8003加入後,zookeeper會發現並呼叫它
此時服務端斷開一個server,8001,客戶端變化情況:
斷開server前客戶端會呼叫8001、8002、8003這三個服務,當斷開server 8001以後,zookeeper只會呼叫8002和8003這兩個server了