使用反射實現簡易的 RPC 框架
經過兩個月的迷茫和爬坑,我小吳又回來了,本文總結下 RPC 的簡單實現。程式碼: ofollow,noindex">GitHub
背景
微服務架構下資料互動一般是對內 RPC,對外 REST,拿筆者所在的社交 App 後端業務舉例:使用者註冊時客戶端會帶上輸入的手機號請求 API 層,API 將手機號傳遞給簡訊微服務,簡訊微服務再呼叫阿里大魚的簡訊介面,下發驗證碼。
其實簡訊傳送的業務完全可以放到 API 層直接做,session 和 profile 的業務同理。但這麼做有 3 個缺點:
-
部署效率低:如果加上 websocket(保持與客戶端長連線)、goexif(使用者頭像解碼)… 等各種第三方依賴,API 專案下的
vendor/
將會變得臃腫,上輒幾百 MB,每次編譯、部署和測試過程都需要大量時間等待。 -
開發成本高:當業務繁雜模組較多時,每個模組新增新功能或 fix bug 都要重新完整發布 API 專案,重新測試,測試不通過還得重新發布。
-
系統可用性差:所有模組功能都編譯到一個可執行檔案中,若某一模組程式碼出現問題,將可能導致整個 API 專案掛掉,所有服務不可用。比如在使用者位置模組中有經緯度轉城市的功能,需要呼叫高德地圖的 API,使用 gopool 庫批量併發的去請求轉換,忘記呼叫
batch.QueueComplete()
結果導致 pool 中 goroutine 的數量只增不減,可能拖垮整個 API 專案。
將業務按功能模組拆分到各個微服務,具有提高專案協作效率、降低模組耦合度、提高系統可用性等優點,但是開發門檻比較高,比如 RPC 框架的使用、後期的服務監控等工作。
本文實現一個極簡的 RPC 框架,完成 Client 遠端呼叫 Server 的核心功能,姑且不考慮超時重連、心跳保活等網路層機制。
本地呼叫
在程式中,常常將程式碼段封裝成函式執行。如:
package main import "fmt" type User struct { Name string Ageint } func main() { u, err := queryUser(6) if err != nil { fmt.Println(err) return } fmt.Printf("name: %s, age: %d\n", u.Name, u.Age) } // 模擬資料庫查詢 func queryUser(uid int) (User, error) { userDB := make(map[int]User) userDB[0] = User{"Dennis", 70} userDB[1] = User{"Ken", 75} userDB[2] = User{"Rob", 62} if u, ok := userDB[uid]; ok { return u, nil } return User{}, fmt.Errorf("id %d not in user db", uid) }
函式 queryUser()
在原生代碼庫中直接呼叫,就能查詢到想要的使用者資訊。
RPC 呼叫
現將模擬的使用者資料作為單獨的服務執行,客戶端通過網路實現呼叫。大致流程圖如下:
注:client 和 server 可以是兩臺不同 IP 的主機,也可以是本機上兩個埠不同的程式。
如上圖,實現呼叫的前提是 server 能解析請求資料,client 能解析響應資料,即兩端要約定好資料包的格式。
網路傳輸資料格式
成熟的 RPC 框架會有自定義 TLV 協議(固定長度訊息頭 + 變長訊息體)等。在 simple_rpc 中儘量簡化,包的格式如下:
讀取網路位元組流時,需要知道要讀取多少位元組作為的資料部分,故在頭部中使用 4 位元組長的 header 部分來標識 data 的長度。讀寫如下:
package simple_rpc import ( "encoding/binary" "io" "net" ) type Session struct { conn net.Conn } // 向連線中寫資料 func (s *Session) Write(data []byte) error { buf := make([]byte, 4+len(data))// 4 位元組頭部 + 資料長度 binary.BigEndian.PutUint32(buf[:4], uint32(len(data))) // 寫入頭部 copy(buf[4:], data)// 寫入資料 _, err := s.conn.Write(buf) if err != nil { return err } return nil } // 從連線中讀資料 func (s *Session) Read() ([]byte, error) { header := make([]byte, 4) _, err := io.ReadFull(s.conn, header) if err != nil { return nil, err } dataLen := binary.BigEndian.Uint32(header) data := make([]byte, dataLen) _, err = io.ReadFull(s.conn, data) if err != nil { return nil, err } return data, nil }
注:binary 包只認固定長度的型別,故 header 使用 uint32 而非 int
func TestSession_ReadWrite(t *testing.T) { addr := "0.0.0.0:2333" cont := "yep" wg := sync.WaitGroup{} wg.Add(2) go func() { defer wg.Done() l, err := net.Listen("tcp", addr) if err != nil { t.Fatal(err) } conn, _ := l.Accept() s := Session{conn: conn} err = s.Write([]byte(cont)) if err != nil { t.Fatal(err) } }() go func() { defer wg.Done() conn, err := net.Dial("tcp", addr) if err != nil { t.Fatal(err) } s := Session{conn: conn} data, err := s.Read() if err != nil { t.Fatal(err) } if string(data) != cont { t.FailNow() } }() wg.Wait() }
測試讀寫正常:
反射與 RPC
server 端接收到的資料需要包括:呼叫的函式名、引數列表。一般我們會約定第二個返回值是 error 型別,表示 RPC 呼叫結結果(gRPC 標準)
Call 執行呼叫
RPC Server 需解決 2 個問題:
Value.Call()
package main import ( "fmt" "reflect" ) func main() { funcs := make(map[string]reflect.Value) // server 端維護 funcName => func 的 map funcs["incr"] = reflect.ValueOf(incr) args := []reflect.Value{reflect.ValueOf(1)} // 構建引數(client 傳遞上來) vals := funcs["incr"].Call(args)// 呼叫執行 var res []interface{} for _, val := range vals { res = append(res, val.Interface()) // 處理返回值 } fmt.Println(res)// [2, <nil>] } func incr(n int) (int, error) { return n + 1, nil }
看到這裡,RPC Server 端的核心工作如下:
- 維護函式名到函式反射值的 map
- client 端傳遞函式名、引數列表後,解析為反射值,呼叫執行
- 函式的返回值打包通過網路返回給客戶端
MakeFunc 生成呼叫
RPC Client 需解決問題:函式的具體實現在 Server 端,Client 只有該函式的原型。使用 MakeFunc()
完成原型到函式的呼叫。
package main import ( "fmt" "reflect" ) func main() { swap := func(args []reflect.Value) []reflect.Value { return []reflect.Value{args[1], args[0]} } var intSwap func(int, int) (int, int) fn := reflect.ValueOf(&intSwap).Elem() // 獲取 intSwap 未初始化的函式原型 v := reflect.MakeFunc(fn.Type(), swap) // MakeFunc 使用傳入的函式原型建立一個繫結 swap 的新函式 fn.Set(v)// 為函式 intSwap 賦值 fmt.Println(intSwap(1, 2)) // 2 1 }
RPC 資料
我們定義 RPC 互動的資料格式,即要儲存到上邊網路位元組流中 data
部分的資料:
type RPCData struct { Name string Args []interface{} }
定義其對應的編碼解碼函式:
func encode(data RPCData) ([]byte, error) { var buf bytes.Buffer bufEnc := gob.NewEncoder(&buf) if err := bufEnc.Encode(data); err != nil { return nil, err } return buf.Bytes(), nil } func decode(b []byte) (RPCData, error) { buf := bytes.NewBuffer(b) bufDec := gob.NewDecoder(buf) var data RPCData if err := bufDec.Decode(&data); err != nil { return data, err } return data, nil }
Server 端
結構
server 端需要維護連線與 RPC 函式名到 RPC 函式本身的對映,結構如下:
type Server struct { addrstring funcs map[string]reflect.Value }
註冊函式
將函式名與函式的真正實現對應起來:
func (s *Server) Register(rpcName string, f interface{}) { if _, ok := s.funcs[rpcName]; ok { return } fVal := reflect.ValueOf(f) s.funcs[rpcName] = fVal }
執行呼叫
為了看清楚服務端的工作流程,暫且忽略錯誤處理:
// 等待 func (s *Server) Run() { l, _ := net.Listen("tcp", s.addr) for { conn, _ := l.Accept() srvSession := NewSession(conn) // 讀取 RPC 呼叫資料 b, _ := srvSession.Read() // 解碼 RPC 呼叫資料 rpcData, _ := decode(b) f, ok := s.funcs[rpcData.Name] if !ok { fmt.Printf("func %s not exists", rpcData.Name) return } // 建構函式的引數 inArgs := make([]reflect.Value, 0, len(rpcData.Args)) for _, arg := range rpcData.Args { inArgs = append(inArgs, reflect.ValueOf(arg)) } // 執行呼叫 out := f.Call(inArgs) outArgs := make([]interface{}, 0, len(out)) for _, o := range out { outArgs = append(outArgs, o.Interface()) } // 包裝資料返回給客戶端 respRPCData := RPCData{rpcData.Name, outArgs} respBytes, _ := encode(respRPCData) srvSession.Write(respBytes) } }
Client 端
直接呼叫即可:
// fPtr 指向函式原型 func (c *Client) callRPC(rpcName string, fPtr interface{}) { fn := reflect.ValueOf(fPtr).Elem() // 完成與 Server 的互動 f := func(args []reflect.Value) []reflect.Value { // 處理輸入引數 inArgs := make([]interface{}, 0, len(args)) for _, arg := range args { inArgs = append(inArgs, arg.Interface()) } // 編碼 RPC 資料並請求 cliSession := NewSession(c.conn) reqRPC := RPCData{Name: rpcName, Args: inArgs} b, _ := encode(reqRPC) cliSession.Write(b) // 解碼響應資料,得到返回引數 respBytes, _ := cliSession.Read() respRPC, _ := decode(respBytes) outArgs := make([]reflect.Value, 0, len(respRPC.Args)) for i, arg := range respRPC.Args { // 必須進行 nil 轉換 if arg == nil { outArgs = append(outArgs, reflect.Zero(fn.Type().Out(i))) continue } outArgs = append(outArgs, reflect.ValueOf(arg)) } return outArgs } v := reflect.MakeFunc(fn.Type(), f) fn.Set(v) }
MakeFunc
是 Client 從函式原型到網路呼叫的關鍵。
測試
func TestRPC(t *testing.T) { gob.Register(User{}) addr := "0.0.0.0:2333" srv := NewServer(addr) srv.Register("queryUser", queryUser) go srv.Run() conn, err := net.Dial("tcp", addr) if err != nil { t.Error(err) } cli := NewClient(conn) var query func(int) (User, error) cli.callRPC("queryUser", &query) // RPC 呼叫 u, err := query(1) fmt.Println(err, u) } type User struct { Name string Ageint } func queryUser(uid int) (User, error) { userDB := make(map[int]User) userDB[0] = User{"Dennis", 70} userDB[1] = User{"Ken", 75} userDB[2] = User{"Rob", 62} if u, ok := userDB[uid]; ok { return u, nil } return User{}, fmt.Errorf("id %d not in user db", uid) }
RPC 呼叫成功,測試通過:
總結
如測試檔案中所示, queryUser()
沒有在 server.go 中實現,所以本文的 demo 並不是完全意義上的 RPC 框架,不過闡釋清楚了 RPC 的核心點:反射呼叫。
上邊的 demo 使用裸 net.Conn
進行阻塞式的讀寫。投入生產環境的 RPC 框架往往有著健壯的底層網路機制,比如使用非阻塞式 IO 讀寫、實現 Client 與 Server 端保持超時重連、心跳檢測等等複雜的機制。