分散式系列五: RMI通訊
RPC(Remote Procedure Call)協議
RPC協議是一種通過網路從遠端計算機上請求服務, 而不需要了解底層網路技術的協議, 在OSI模型中處在應用層和網路層.
作為一個規範, 使用RPC協議的框架有很多, Dubbo,Hessian等均使用這個協議, RMI也使用該協議實現.
RMI(Remote Method Invocation) 遠端方法呼叫
RMI使用Java遠端訊息交換協議JRMP(Java Remote Messaging Protocol)進行通訊,JRMP是純java的.
-
定義介面, 使其extends
Remote
介面, 方法需要丟擲異常RemoteException
, Remote是一個標記介面
public interface IRmiTest extends Remote { String hello() throws RemoteException; }
-
實現介面, 使其extends
UnicastRemoteObject
, 需要有構造方法, 並丟擲異常RemoteException
public class RmiTest extends UnicastRemoteObject implements IRmiTest { public RmiTest() throws RemoteException { } @Override public String hello() { return "Hello ...."; } }
- 定義服務端, 註冊和繫結
public class TestServer { public static void main(String[] args) throws RemoteException, AlreadyBoundException, MalformedURLException { IRmiTest rmiTest = new RmiTest(); LocateRegistry.createRegistry(8888); Naming.bind("rmi://localhost:8888/hello",rmiTest); System.out.println("server started"); } }
- 定義客戶端, lookup方法的引數url與服務端bind的必須一致. 介面需要定義為與服務端一致.
public class TestClient { public static void main(String[] args) throws RemoteException,MalformedURLException, NotBoundException { IRmiTest rmiTest = (IRmiTest) Naming.lookup("rmi://localhost:8888/hello"); System.out.println(rmiTest.hello()); } }
RMI實現機制
RMI遮蔽了底層複雜的網路呼叫, 使得遠端物件的方法呼叫變得透明, 就像呼叫本地方法一樣方便.
下面深入探究下jdk中rmi的實現原理, 看看底層是如何實現遠端呼叫的.
首先, 需要了解下比較重要的兩個角色stub和skeleton, 這兩個角色封裝了與網路相關的程式碼. 原始的互動式這樣的,客戶端--網路--伺服器--具體服務. 有了這兩個角色之後的模型變為: 客戶端--stub--網路--skeleton--伺服器--服務.可以參考的圖ofollow,noindex" target="_blank">維基百科
下面來看原始碼...
一.例項化RegistryImpl,初始化
LocateRegistry.createRegistry(8888);
這句程式碼啟動了一個註冊器(其中有個Map物件來儲存名稱和服務的對映,這個後面再細看)
public static Registry createRegistry(int port) throws RemoteException { return new RegistryImpl(port); }
這個方法例項化了一個RegistryImpl
的例項,RegistryImpl
實現了Registry
.
public RegistryImpl(final int var1) throws RemoteException { if(var1 == 1099 && System.getSecurityManager() != null) { try { AccessController.doPrivileged(new PrivilegedExceptionAction() { public Void run() throws RemoteException { LiveRef var1x = new LiveRef(RegistryImpl.id, var1); RegistryImpl.this.setup(new UnicastServerRef(var1x)); return null; } }, (AccessControlContext)null, new Permission[]{new SocketPermission("localhost:" + var1, "listen,accept")}); } catch (PrivilegedActionException var3) { throw (RemoteException)var3.getException(); } } else { LiveRef var2 = new LiveRef(id, var1); this.setup(new UnicastServerRef(var2)); } }
兩個分支最終都呼叫了setup()
方法, 主要關注該方法.if分支中var1=1099是指預設埠並且存在安全管理器的時候不做校驗, 這是為了效能考慮.
private void setup(UnicastServerRef var1) throws RemoteException { this.ref = var1; // UnicastServerRef繼承了RemoteRef,this.ref的型別就是RemoteRef var1.exportObject(this, (Object)null, true); }
setup方法的引數是包裝後的UnicastServerRef
物件,UnicastServerRef
繼承了RemoteRef
因此可以賦值給ref變數. 該方法將呼叫委託給UnicastServerRef
的方法exportObject()
如果是拿文章開頭的程式碼進行除錯, 會發現這個方法會走兩次, 除了RegistryImpl
, 還有一次是RmiTest
也會走這個方法.不同的是RegistryImpl
會走下面程式碼中的if(var5 instanceof RemoteStub)
分支語句, 這個語句最終將生成一個Skeleton例項並設定給當前例項的域變數skel, 不過自jdk1.2之後skeleton就沒什麼用了.
public Remote exportObject(Remote var1, Object var2, boolean var3) throws RemoteException { Class var4 = var1.getClass(); Remote var5; try { var5 = Util.createProxy(var4, this.getClientRef(), this.forceStubUse); } catch (IllegalArgumentException var7) { throw new ExportException("remote object implements illegal remote interface", var7); } if(var5 instanceof RemoteStub) { // 生成Skeleton例項並設定給當前例項的域變數skel this.setSkeleton(var1); } Target var6 = new Target(var1, this, var5, this.ref.getObjID(), var3); this.ref.exportObject(var6);//ref是例項化UnicastServerRef的時候傳入的 this.hashToMethod_Map = (Map)hashToMethod_Maps.get(var4); return var5; }
上面方法首先根據Remote
的引數var1建立了一個代理物件var5, var1是RegistryImpl
類的例項. 然後例項化一個Target
的例項, 從引數可以看到,Target物件包含了幾乎之前程式碼的所有物件.然後將這個物件作為引數,呼叫LiveRef
例項ref的exportObject()
方法.
二. 網路連線和物件傳輸
public void exportObject(Target var1) throws RemoteException { this.ep.exportObject(var1); }
接上一步,RemoteRef
的方法最終委託給TCPEndpoint
的同名方法(委託模式), 到此程式碼將控制權傳遞給傳輸層.
public void exportObject(Target var1) throws RemoteException { synchronized(this) { this.listen(); ++this.exportCount; } boolean var2 = false; boolean var12 = false; try { var12 = true; super.exportObject(var1); var2 = true; var12 = false; } finally { if (var12) { if (!var2) { synchronized(this) { this.decrementExportCount(); } } } } if (!var2) { synchronized(this) { this.decrementExportCount(); } } }
這個方法實現了網路通訊, 首先linsten()
啟動了一個ServerSocket
的執行緒,並開始監聽埠. 然後呼叫父類的方法將Target
物件暴露出去, 此時服務端的初始化就完成了.
三. 註冊服務
Naming.bind("rmi://localhost:8888/hello",rmiTest);
完成名稱和服務物件的繫結.
public static void bind(String name, Remote obj) throws AlreadyBoundException, java.net.MalformedURLException, RemoteException { ParsedNamingURL parsed = parseURL(name); Registry registry = getRegistry(parsed); if (obj == null) throw new NullPointerException("cannot bind to null"); registry.bind(parsed.name, obj); }
上面程式碼Naming
類, 呼叫的是註冊器Registry
的bind()
方法
public void bind(String var1, Remote var2) throws RemoteException, AlreadyBoundException, AccessException { Hashtable var3 = this.bindings; synchronized(this.bindings) { Remote var4 = (Remote)this.bindings.get(var1); if (var4 != null) { throw new AlreadyBoundException(var1); } else { this.bindings.put(var1, var2); } } }
註冊使用的容器是一個HashTable
, 最終服務的名稱和服務會被註冊到這個map容器中.
到此為止, 服務端的初始化完成. 首先例項化了一個實現Register
註冊器的例項, 通過層層組裝, 最終生成一個Target
物件, 其中包含了組裝過程中生成的全部狀態, 最後呼叫RemoteRef
的方法將物件轉交給傳輸層物件TCPEndpoint
的例項, 最終由這個物件啟動Socket開啟通訊連線. 註冊服務是通過Naming
的方法委託呼叫Register
註冊器的方法實現, 並將結果最終註冊到Register
域的map物件中.
四. 客戶端遠端呼叫
IRmiTest rmiTest = (IRmiTest) Naming.lookup("rmi://localhost:8888/hello");
客戶端通過Naming
的方法獲取服務的例項
public static Remote lookup(String name) throws NotBoundException, java.net.MalformedURLException, RemoteException{ ParsedNamingURL parsed = parseURL(name); Registry registry = getRegistry(parsed); if (parsed.name == null) return registry; return registry.lookup(parsed.name); }
與服務端註冊時候使用Naming.bind()
方法一樣, 這裡lookup()
最終也會委託給Registry
的例項. 這個例項的實現不是用的服務端的Register_Impl
, 而是使用RegistryImpl_Stub
, 下面程式碼是lookup()
的實現, 可以看出這裡封裝了網路io的一些邏輯.
public Remote lookup(String var1) throws AccessException, NotBoundException, RemoteException { try { RemoteCall var2 = this.ref.newCall(this, operations, 2, 4905912898345647071L); try { ObjectOutput var3 = var2.getOutputStream(); var3.writeObject(var1); } catch (IOException var17) { throw new MarshalException("error marshalling arguments", var17); } this.ref.invoke(var2); Remote var22; try { ObjectInput var4 = var2.getInputStream(); var22 = (Remote)var4.readObject(); } catch (IOException var14) { throw new UnmarshalException("error unmarshalling return", var14); } catch (ClassNotFoundException var15) { throw new UnmarshalException("error unmarshalling return", var15); } finally { this.ref.done(var2); } return var22; } catch (RuntimeException var18) { throw var18; } catch (RemoteException var19) { throw var19; } catch (NotBoundException var20) { throw var20; } catch (Exception var21) { throw new UnexpectedException("undeclared checked exception", var21); } }
至此, 服務端和客戶端的連線完成, 可以開始通訊了.
RMI自JDK1.1就已經提供了, 它提供了Java語言自己的RPC呼叫方式, 雖然有些老舊, 但依然經典. 目前有很多跨語言的技術或框架, 如後來的WebService, 再到目前的netty,shrift等基本已經取代了這種原始的呼叫方式, 他們是非阻塞的,且還能跨語言呼叫. 但熟悉RMI的實現方式對了解分散式系統的通訊的實現原理有很大幫助.