flink RPC(akka)
flink中的rpc框架使用的akka。在本節並不詳細講述akka,而是就flink中rpc來講述akka的部分內容。本節,我從AkkaRpcActor.handleRpcInvocation方法講起。
看過hadoop、yarn、hive、hbase、presto的rpc框架,感覺flink的通訊框架是最容易讓人繞暈的。雖然之前也看過一點spark中akka的通訊,但現在早已忘得一乾二淨。如今重拾akka通訊,感覺還是挺複雜的。因此,這裡特意拿出一節來講解。
1.這裡首先要講述的是flink中關於心跳的rpc互動。這裡也是akka中第一種遠端通訊方式,也就是說通過tell方式非同步傳輸。
如下圖所示,這裡是我前幾天畫的《flink心跳》思維導圖的一部分,需要完整版加我微信——letusflyinthesky(有償出售,flink吹牛必備哦)。
這裡我們從HeartbeatTarget.requestHeartbeat開始講。真正呼叫的是ResourceManager.registerTaskExecutorInternal方法中 型別為HeartbeatTarget的匿名類,其內部呼叫了taskExecutorGateway.heartbeatFromResourceManager。這裡的taskExecutorGateway是一個代理類,其invocationHandler為AkkaInvocationHandler。因此,這裡首先呼叫的是AkkaInvocationHandler.invoke,由於這裡要呼叫的並非本地方法,因此接著呼叫了方法AkkaInvocationHandler.invokeRpc。在該方法中首先通過方法createRpcInvocationMessage封裝了發現taskmanager端的請求RemoteRpcInvocation,接著獲取了欲呼叫方法的返回值(這裡的判斷是為了後面使用不同的akka通訊方式)。我們這裡的返回值為Void。然後呼叫了AkkaInvocationHandler.tell。這裡的入參是剛剛封裝的RemoteRpcInvocation,該方法內部呼叫了ActorRef.tell。該actor就是taskmanager端的化生,傳送了RemoteRpcInvocation(可序列化)。jobmanager端,也就是resourcemanager端的流程到這裡就結束了,因為我們遠端呼叫的方法是無返回值的。
接著,我們來到taskmanager端,這裡的AkkaRpcActor.onReceive接收到resourcemanager端發來的訊息。根據型別的匹配,我們來到AkkaRpcActor.handleRpcMessage。由於這裡的資訊是RemoteRpcInvocation,實現了介面RpcInvocation,因此,我們來到AkkaRpcActor.handleRpcInvocation方法。這裡首先呼叫方法lookupRpcMethod根據方法名獲取taskmanager端對應的方法,也就是TaskExecutor中對應的方法。接著,設定了其訪問屬性後,便開始反射呼叫。由於我們這裡的方法返回值型別為Void,因此,在呼叫了TaskExecutor.heartbeatFromResourceManager後再無後續操作。
2.接著是akka中的第二種通訊方式——非同步返回。我這裡的使用的是taskmanager向resourcemanager遠端註冊的例子來講解。
這裡使用了akka的非同步返回機制。如果對akka的非同步返回不太熟悉的朋友,我推薦大家看一下 http://sunxiang0918.cn/2016/01/10/Akka-in-JAVA-1/ 。這裡一共有四篇文章,對於akka入門有極大裨益。另外,我會在下篇部落格釋出時,將整理的flink中關於akka的程式碼釋出到我的github上,到時大家可以參考一下。這裡我配合思維導圖方便大家的理解。
從TaskExecutorToResourceManagerConnection.ResourceManagerRegistration.invokeRegistration講起。該方法內部呼叫了resourceManager.registerTaskExecutor。這裡的resourceManager實際型別是FencedAkkaInvocationHandler。FencedAkkaInvocationHandler繼承自AkkaInvocationHandler。這裡的部分呼叫流程與上面的非同步無返回類似,我就從其中不同的地方講起。由於我們這裡的返回值型別為CompletableFuture<RegistrationResponse>,不是Void型別,因此,這裡首先呼叫了FencedAkkaInvocationHandler.ask,接著呼叫了FencedAkkaInvocationHandler.fenceMessage將資訊型別封裝為RemoteFencedMessage,接著呼叫AkkaInvocationHandler.ask。這裡是比較複雜的地方。首先呼叫了Patterns.ask(ActorRef, message),這裡的ActorRef是resourcemanager端的化身,Patterns.ask是akka用於遠端非同步呼叫的一種方式。其返回值為scala.concurrent.Future,也就是scala型別的Future。該型別有方法onComplete,作用是當該Future完成是,不論是丟擲異常或返回值完成此未來時,呼叫該方法入參中的函式。這裡我們通過FutureUtils.toJava將scala中的Future轉換為java中的CompletableFuture。得到CompletableFuture後,taskmanager端接著呼叫CompletableFuture.thenApply方法,內部呼叫了返回值的deserializeValue方法,也就是獲取到遠端的序列化的返回值後,將其反序列化。由於我們這裡rpc呼叫的方法返回值是CompletableFuture型別,因此這裡並不阻塞,直接返回。
然後,我們來到resourcemanager端,這裡的AkkaRpcActor.onReceive方法被呼叫(注意,這裡的實際型別是FencedAkkaRpcActor),由於傳入的型別為RemoteFencedMessage,這裡接著呼叫了FencedAkkaRpcActor.handleRpcMessage。經過幾個判斷後,這裡呼叫了AkkaRpcActor.handleRpcMessage,此時,這裡的入參為RemoteFencedMessage.getPayload,也就是RemoteRpcInvocation。接下來的流程我在上面已經提到,這裡就不贅述了。所不同的是,我們這裡的返回為型別為CompletableFuture,因此,這裡接著會呼叫AkkaRpcActor.sendAsyncResponse。這裡首先呼叫了方法——Patterns.pipe(promise.future(), getContext().dispatcher()).to(sender),這裡的promise是scala中的Promise.DefaultPromise型別,該方法的作用其實就是講java中的CompletableFuture轉換為scala中的型別DefaultPromise,畢竟,java中的CompletableFuture型別無法實現rpc。sendAsyncResponse方法的作用就是,當入參asyncResponse完成後,會呼叫Promise.DefaultPromise的相應方法(success或failure)被呼叫。此時,由於Patterns.pipe(promise.future(), getContext().dispatcher()).to(sender)已經被呼叫,因此,taskmanager端呼叫Patterns.ask方法的返回的future為完成狀態,也就是呼叫了其onComplete。接著,在taskmanager端將返回值反序列化,完成非同步rpc的呼叫。
3.接著是akka的最後通訊方式——阻塞返回。在flink中的對應的方法是AkkaRpcActor.sendSyncResponse(這裡在flink中很少用到,因此我這裡並沒有舉例)。
這裡rpc呼叫方法的返回值為非CompletableFuture型別,前面的呼叫流程與上面講述的非同步返回一樣,所不同的是,由於方法返回值型別為非CompletableFuture,因此,這裡呼叫了CompletableFuture.get,這裡會一直阻塞,直待該CompletableFuture的完成。這裡的CompletableFuture其實就是通過FutureUtils.toJava實現了將scala中的future轉換為java中的CompletableFuture。也就是說,這裡會一直等到遠端方法Promise.DefaultPromise的相應方法(success或failure)被呼叫,這裡的阻塞才會被打斷。
好了,到這裡為止,關於flink中應用akka完成其rpc通訊框架的流程就結束了,感謝大家的關注。另外,本人正在找成都大資料底層開發的工作,有推薦的朋友可以加我的微信交流(letusflyinthesky),非誠勿擾。