grpc和consul結合實現分散式rpc呼叫
GRPC
主要介紹了grpc在使用示例和原理,以及如何與consul結合
gRPC 是什麼?
gRPC 也是基於以下理念:定義一個服務,指定其能夠被遠端呼叫的方法(包含引數和返回型別)。在服務端實現這個介面,並執行一個 gRPC 伺服器來處理客戶端呼叫。在客戶端擁有一個存根能夠像服務端一樣的方法。
在 gRPC 裡客戶端應用可以像呼叫本地物件一樣直接呼叫另一臺不同的機器上服務端應用的方法,使得我們能夠更容易地建立分散式應用和服務。
開始前確保已經安裝grpcio-tools和grpcio這兩個包
定義一個GRPC有如下三個步驟:
- 定義一個訊息型別
- 編譯該proto檔案
- 編寫服務端程式碼
- 編寫客戶端程式碼
我們以實現一個echo的grpc為例。
定義一個訊息型別
首先定義通訊雙方(即客戶端和服務端)互動的訊息格式(protobuf訊息的格式),然後定義該echo服務
如下:
syntax = "proto3";// 宣告使用 proto3 語法 //定義客戶端請求的protobuf格式,如下所示,包含一個字串欄位q message Req { string q = 1; } //定義服務端相應的protobuf格式,如下所示,包含一個字串欄位a message Resp { string a = 1; } //定義echo服務,如下所示,該服務包含一個名稱為"echo"的rpc service Echoer{ rpc echo (Req) returns (Resp) {} }
使用以下命令編譯:
python -m grpc_tools.protoc -I./ --python_out=. --grpc_python_out=. ./Echoer.proto
生成兩個py檔案
- Echoer_pb2.py 此檔案包含生成的 request(Req) 和 response(Resp) 類。
- Echoer_pb2_grpc.py 此檔案包含生成的 客戶端(EchoerStub)和服務端(EchoerServicer)的類
建立服務端程式碼
建立和執行 Echoer 服務可以分為兩個部分:
- 實現我們服務定義的生成的服務介面:做我們的服務的實際的“工作”的函式。
- 執行一個 gRPC 伺服器,監聽來自客戶端的請求並傳輸服務的響應。
在當前目錄,建立檔案 Echoer_server.py,實現一個新的函式:
from concurrent import futures import time import grpc import Echoer_pb2 import Echoer_pb2_grpc _ONE_DAY_IN_SECONDS = 60 * 60 * 24 class Echoer(Echoer_pb2_grpc.EchoerServicer): # 工作函式 def SayHello(self, request, context): return Echoer_pb2.Resp(a="echo") def serve(): # gRPC 伺服器 server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) Echoer_pb2_grpc.add_EchoerServicer_to_server(Echoer(), server) server.add_insecure_port('[::]:50051') server.start()# start() 不會阻塞,如果執行時你的程式碼沒有其它的事情可做,你可能需要迴圈等待。 try: while True: time.sleep(_ONE_DAY_IN_SECONDS) except KeyboardInterrupt: server.stop(0) if __name__ == '__main__': serve()
建立客戶端程式碼
在當前目錄,開啟檔案 Echoer_client.py,實現一個新的函式:
from __future__ import print_function import grpc import Echoer_pb2 import Echoer_pb2_grpc def run(): channel = grpc.insecure_channel('localhost:50051') # 建立通道 stub = Echoer_pb2_grpc.EchoerStub(channel) # 通過通道獲取憑據,即Stub response = stub.echo(Echoer_pb2.Req(q='echo')) # 呼叫rpc,獲取響應 print("Echoer client received: " + response.a) if __name__ == '__main__': run()
執行程式碼
首先執行服務端程式碼
python Echoer_server.py
複製程式碼
然後執行客戶端程式碼
python Echoer_client.py # output Echoer client received: echo
進階
為了通訊安全起見,GRPC提供了TSlSSL的支援。
首先利用openssl建立一個自簽名證書
$ openssl genrsa -out server.key 2048 Generating RSA private key, 2048 bit long modulus (2 primes) ............................................................+++++ ................................................................................................................................+++++ e is 65537 (0x010001) $ openssl req -new -x509 -sha256 -key server.key -out server.crt -days 3650 You are about to be asked to enter information that will be incorporated into your certificate request. What you are about to enter is what is called a Distinguished Name or a DN. There are quite a few fields but you can leave some blank For some fields there will be a default value, If you enter '.', the field will be left blank. ----- Country Name (2 letter code) [AU]: State or Province Name (full name) [Some-State]: Locality Name (eg, city) []: Organization Name (eg, company) [Internet Widgits Pty Ltd]: Organizational Unit Name (eg, section) []: Common Name (e.g. server FQDN or YOUR name) []:Echoer Email Address []:
生成了server.key和server.crt兩個檔案,服務端兩個檔案都需要,客戶端只需要crt檔案
修改服務端程式碼
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) Echoer_pb2_grpc.add_EchoerServicer_to_server(Echoer(), server) # 讀取 key and certificate with open(os.path.join(os.path.split(__file__)[0], 'server.key')) as f: private_key = f.read().encode() with open(os.path.join(os.path.split(__file__)[0], 'server.crt')) as f: certificate_chain = f.read().encode() # 建立 server credentials server_creds = grpc.ssl_server_credentials(((private_key, certificate_chain,),)) # 呼叫add_secure_port方法,而不是add_insesure_port方法 server.add_secure_port('localhost:50051', server_creds)
修改客戶端程式碼
# 讀取證書 with open('server.crt') as f: trusted_certs = f.read().encode() # 建立 credentials credentials = grpc.ssl_channel_credentials(root_certificates=trusted_certs) # 呼叫secure_channel方法,而不是insecure_channel方法 channel = grpc.secure_channel('localhost:50051', credentials)
啟動服務端後,啟動客戶端,會出現以下錯誤:
grpc._channel._Rendezvous: <_Rendezvous of RPC that terminated with: status = StatusCode.UNAVAILABLE details = "Connect Failed" debug_error_string = "{"created":"@1547552759.642000000","description":"Failed to create subchannel","file":"src/core/ext/filters/client_channel/client_channel.cc","file_line":2721,"referenced_errors":[{"created":"@1547552759.642000000","description":"Pick Cancelled","file":"src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc","file_line":241,"referenced_errors":[{"created":"@1547552759.642000000","description":"Connect Failed","file":"src/core/ext/filters/client_channel/subchannel.cc","file_line":689,"grpc_status":14,"referenced_errors":[{"created":"@1547552759.642000000","description":"Peer name localhost is not in peer certificate","file":"src/core/lib/security/security_connector/security_connector.cc","file_line":880}]}]}]}" >
!!! 警告:
這是因為TSLSSL模式下,客戶端是通過服務名稱:port來獲取服務的憑據,而不是ip:port, 所以對客戶端做如下修改:
# 修改前 channel = grpc.secure_channel('localhost:50051', credentials) # 修改後 channel = grpc.secure_channel('Echoer:50051', credentials)
!!! 警告:
其次,在TSLSSL模式下,客戶端對服務名稱:port解析時候需要dns支援,目前不知道如何解決,只能夠採取以下措施解決,通過修改windows的host檔案,利用host將服務名稱解析為IP地址,
開啟windows的host檔案,地址:C:\Windows\System32\drivers\etc\hosts
備份後修改如下,新增:
# 服務的IP地址 服務名稱 127.0.0.1 Echoer
儲存即可
修改後,再次執行,即可執行成功
注意事項:CA證書和私鑰key都是配套的,不配套的CA證書和key是無法校驗成功的
結合consul
注意事項:確保consul已經正確啟動,檢視http://ip :port:8500/, 可檢視consul的狀態,確保已經安裝python-consul這個庫,否則無法操作consul
首先想象我們以上的grpc示例程式之所以成功的有限制條件,
- 我們知道服務端已經正常啟動
- 我們知道了服務端的ip和埠
但在實際過程中,一般是不可能確切知道服務的ip和埠的,所以consul就起了箇中間橋樑的作用,具體如下:
服務註冊
服務註冊,顧名思義,服務在啟動之前,必須現在consul中註冊。
服務端:當服務端啟動之後,consul會利用服務註冊時獲得的ip和port同服務建立聯絡,其中最重要的就是health check即心跳檢測。consul通過心跳檢測來判定該服務是否正常。
客戶端:客戶端通過consul來查詢所需服務的ip和port,若對應服務已經註冊且心跳檢測正常,則會返回給客戶端對應的ip和port資訊,然後客戶端就可以利用這個來連線服務端了
服務註冊示例程式碼如下:
def register(self, server_name, ip, port, consul_host=CONSUL_HOST): """ server_name: 服務名稱 ip: 服務IP地址 port: 服務監聽的埠 consul_host: 所連線的consul伺服器的IP地址 """ c = consul.Consul(host=consul_host) # 獲取與consul的連線 print(f"開始註冊服務{server_name}") check = consul.Check.tcp(ip, port, "10s") # 設定心跳檢測的超時時間和對應的ip和port埠 c.agent.service.register(server_name, f"{server_name}-{ip}-{port}", address=ip, port=port, check=check) # 註冊
既然有服務註冊,當然會有服務登出,示例程式碼如下:
def unregister(self, server_name, ip, port, consul_host=CONSUL_HOST): c = consul.Consul(host=consul_host) print(f"開始退出服務{server_name}") c.agent.service.deregister(f"{server_name}-{ip}-{port}")
服務查詢
客戶端則需要在consul中查詢對應服務的IP和port,但由於在TSL/SSL模式下,所需的只是服務名稱和port,故而只需要查詢port埠即可。
客戶端服務查詢採用的是DNS的查詢方式,必須確保安裝dnspython庫,用於建立DNS查詢
服務查詢示例程式碼如下:
# 建立一個consul dns查詢的 resolver consul_resolver = resolver.Resolver() consul_resolver.port = 8600 consul_resolver.nameservers = [consul_host] def get_host_port(self, server_name): try: dns_answer_srv = consul_resolver.query(f"{server_name}.service.consul", "SRV") # 查詢對應服務的port, except DNSException as e: return None, None return server_name, dns_answer_srv[0].port # 返回服務名和埠
grpc流模式
grpc總共提供了四種資料互動模式:
- simpe 簡單模式 RPC:即上述的所有的grpc
- server-side streaming 服務端流式 RPC
- client-side streaming 客戶端流式 RPC
- Bidirectional streaming 雙向資料流模式的 gRPC
由於grpc對於訊息有大小限制,diff資料過大會導致無法接收資料,我們在使用過程中,使用了流模式來解決了此類問題,
在此模式下,客戶端傳入的引數由具體的protobuf變為了protobuf的迭代器,客戶端接收的響應也變為了迭代器,獲取完整的響應則需要迭代獲取。
服務端響應也變為了一個迭代器。
修改服務定義檔案:
# 修改前 service Echoer{ rpc echo (Req) returns (Resp) {} } # 修改後 service Echoer{ rpc echo (stream Req) returns (stream Resp) {} }
重新編譯
修改服務端
將工作函式修改為如下所示, 即工作函式變成了一個迭代器:
def echo(self, request_iterator, context): for i in range(10): yield Echoer_pb2.Resp(a="echo")
修改客戶端
將echo的傳入引數修改為迭代器:
def qq(): for i in range(10): yield Echoer_pb2.Req(q="echo") response = stub.echo(qq()) for resp in response: print("Echoer client received: " + response.a)
重新執行,接收結果如下:
$ python Echoer_client.py Echoer client received: echo Echoer client received: echo Echoer client received: echo Echoer client received: echo Echoer client received: echo Echoer client received: echo Echoer client received: echo Echoer client received: echo Echoer client received: echo Echoer client received: echo