JAVA I/O(六)多路複用IO
在前邊介紹Socket和ServerSocket連線互動的過程中,讀寫都是阻塞的。套接字寫資料時,資料先寫入作業系統的快取中,形成TCP或UDP的負載,作為套接字傳輸到目標端,當快取大小不足時,執行緒會阻塞。套接字讀資料時,如果作業系統快取沒有接收到資訊,則讀執行緒阻塞。執行緒阻塞情況下,就不能處理其他事情。JDK1.4引入了通道和選擇器的概念,以支援非同步或多路複用的IO。
Unix系統中的select()方法可以實現非同步IO,可以給該Selector註冊多個描述符(可讀或可寫),然後對這些描述符進行監控。在Java中,描述符即為套接字Socket。
如 JAVA I/O(二)檔案NIO 中對選擇器的介紹,在非阻塞模式下,用select()方法檢測發生變化的通道,每個通道都關聯一個Socket,用一個執行緒實現多個客戶端的請求,從而實現多路複用。
1. 簡單例項
伺服器端
import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.CharBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.nio.charset.Charset; import java.util.Iterator; public class MultiJabberServer1 { public static final int PORT = 8080; public static void main(String[] args) throws IOException{ String encoding = System.getProperty("file.encoding"); Charset cs = Charset.forName(encoding); ByteBuffer buffer = ByteBuffer.allocate(16); SocketChannel ch = null;//Socket對應的channel //1.建立ServerSocketChannel ServerSocketChannel ssc = ServerSocketChannel.open(); //2.建立選擇器Selector Selector sel = Selector.open(); try { //3.設定ServerSocketChannel通道為非阻塞 ssc.configureBlocking(false); //4.ServerSocketChannel關聯Socket,用於監聽連線,使用本地ip和port //注意:Socket也對通道進行了改造,直接調Socket.getChannel()將返回bull,除非通過下邊與通道關聯 //the expression (ssc.socket().getChannel() != null) is true ssc.socket().bind(new InetSocketAddress(PORT)); //5.將通道註冊到Selector,感興趣的事件為連線事件 ssc.register(sel, SelectionKey.OP_ACCEPT); System.out.println("Server on port: " + PORT); while(true) { //6.沒有事件發生時,一直阻塞等待 sel.select(); //7.有事件發生時,獲取Selector中所有SelectorKey(持有選擇器與通道的關聯關係)。 //由於基於作業系統的poll()方法,當有事件發生時,只返回事件個數,無法確定具體通道,故只能對所有註冊的通道進行遍歷。 Iterator<SelectionKey> it = sel.selectedKeys().iterator(); //8.遍歷所有SelectorKey,處理事件 while(it.hasNext()) { SelectionKey sKey = it.next(); it.remove();//防止重複處理 //9.判斷SelectorKey對應的channel發生的事件是否socket連線 if(sKey.isAcceptable()) { //10.與ServerSocket.accept()方法相似,接收到該通道套接字的連線,返回SocketChannel,與客戶端進行互動 ch = ssc.accept(); System.out.println( "Accepted connection from:" + ch.socket()); //11.設定該SocketChannel為非阻塞模式 ch.configureBlocking(false); //12.將該通道註冊到Selector中,感興趣的事件為OP_READ(讀) ch.register(sel, SelectionKey.OP_READ); }else { //13.發生非連線事件,此處為OP_READ事件。SelectorKey獲取註冊的SocketChannel,用於讀寫 ch = (SocketChannel)sKey.channel(); //14.將資料從channel讀到ByteBuffer中 ch.read(buffer); CharBuffer cb = cs.decode((ByteBuffer)buffer.flip()); String response = cb.toString(); System.out.print("Echoing : " + response); //15.再將獲取到的資料會寫給客戶端 ch.write((ByteBuffer)buffer.rewind()); if(response.indexOf("END") != -1) ch.close(); buffer.clear(); } } } } finally { if(ch != null) ch.close(); ssc.close(); sel.close(); } } }
如程式碼中註釋標明,大致步驟包含:
- 建立ServerSocketChannel和Selector,設定通道非阻塞,並與服務端的Socket繫結
- 註冊 ServerSocketChannel到Selector,感興趣的事件為OP_CONNECT(獲取連線)
- select()方法阻塞等待,直到有事件發生
- 遍歷Selector中的所有註冊事件,通過SelectorKey維護Selector和Channel關聯關係
- 如果是連線事件,則調ServerSocketChannel.accept()方法獲取SocketChannel,與客戶端互動
- 如果是讀事件,則通過SelectorKey中獲取SocketChannel,讀寫資料
執行結果:
Server on port: 8080
客戶端
import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.nio.charset.Charset; import java.util.Iterator; import com.test.socketio.JabberServer; /** * 採用這種方式,讀與寫是非阻塞的 * 普通的讀寫是阻塞的,直到讀完或寫完 * */ public class JabberClient1 { static final int clPot = 8899; public static void main(String[] args) throws IOException{ //1.建立SocketChannel SocketChannel sc = SocketChannel.open(); //2.建立Selector Selector sel = Selector.open(); try { sc.configureBlocking(false); //3.關聯SocketChannel和Socket,socket繫結到本機埠 sc.socket().bind(new InetSocketAddress(clPot)); //4.註冊到Selector,感興趣的事件為OP_CONNECT、OP_READ、OP_WRITE sc.register(sel, SelectionKey.OP_CONNECT | SelectionKey.OP_READ | SelectionKey.OP_WRITE); int i = 0; boolean written = false, done = false; String encoding = System.getProperty("file.encoding"); Charset cs = Charset.forName(encoding); ByteBuffer buffer = ByteBuffer.allocate(16); while(!done) { sel.select(); //5.從選擇器中獲取所有註冊的通道資訊(SelectionKey作為標識) Iterator<SelectionKey> it = sel.selectedKeys().iterator(); while(it.hasNext()) { SelectionKey key = it.next(); it.remove(); //6.獲取通道,此處即為上邊建立的channel sc = (SocketChannel)key.channel(); //7.判斷SelectorKey對應的channel發生的事件是否socket連線,並且還沒有連線 if(key.isConnectable() && !sc.isConnected()) { InetAddress addr = InetAddress.getByName(null); //連線addr和port對應的伺服器 boolean success = sc.connect(new InetSocketAddress(addr, JabberServer.PORT)); if(!success) sc.finishConnect(); } //8.讀與寫是非阻塞的:客戶端寫一個資訊到伺服器,伺服器傳送一個資訊到客戶端,客戶端再讀 if(key.isReadable() && written) { if(sc.read((ByteBuffer)buffer.clear()) > 0) { written = false; String response = cs.decode((ByteBuffer)buffer.flip()).toString(); System.out.println(response); if(response.indexOf("END") != -1) done = true; } } if(key.isWritable() && !written) { if(i < 10) sc.write(ByteBuffer.wrap(new String("howdy " + i + "\n").getBytes())); else if(i == 10){ sc.write(ByteBuffer.wrap("END".getBytes())); } written = true; i++; } } } } finally { sc.close(); sel.close(); } } }
客戶端與服務端類似,不同之處:
- 建立SocketChannel通道,註冊到選擇器,剛興趣的事件為OP_CONNECT、OP_READ、OP_WRITE
- 除錯發現,客戶端sel.select()不會阻塞,對註冊通道不斷的遍歷,並且每次都可寫。 原因是OP_WRITE事件會持續生效,即只要連線存在就可以寫,不管服務端是否有返回
- 本例中,客戶端傳送一條資料,服務端接收一條,並返回給客戶端;客戶端接到服務端的訊息後,才會發生下一條資料,主要通過written標識進行控制的。
執行機制
執行結果
服務端
Server on port: 8080 Accepted connection from:Socket[addr=/127.0.0.1,port=8899,localport=8080] Echoing : howdy 0 Echoing : howdy 1 Echoing : howdy 2 Echoing : howdy 3 Echoing : howdy 4 Echoing : howdy 5 Echoing : howdy 6 Echoing : howdy 7 Echoing : howdy 8 Echoing : howdy 9 Echoing : END
客戶端
howdy 0 howdy 1 howdy 2 howdy 3 howdy 4 howdy 5 howdy 6 howdy 7 howdy 8 howdy 9 END
2.核心類分析
(1)通道(SelectableChannel)
通道Channel繼承體系如下,其中ServerSocketChannel和SocketChannel都繼承自SelectableChannel。
- SelectableChannel通道可以通過Selector實現多路複用(multiplexed)。
- 通道通過register(Selector,int,Object)方法註冊到Selector中,並返回SelectorKey(代表註冊到Selector上的註冊資訊)。
- 在一個Selector中,同一個通道只能註冊一份;是否可以註冊到多個Selector中,由程式呼叫isRegistered()方法決定。
- SelectableChannel通道是執行緒安全的。
- SelectableChannel包含阻塞和非阻塞兩種模式,只有非阻塞時才可以註冊到Selector中。
ServerSocketChannel(A selectable channel for stream-oriented listening sockets.),用於 監聽 Socket的基於流的可選通道。
SocketChannel(A selectable channel for stream-oriented connecting sockets.),用於 連線 Socket的基於流額可選通道。
(2)選擇器(Selector)
Selector是SelectableChannel的多路複選器,該類包含以下方法。
- 通過open()方法建立Selector
- 包含三種SelectorKey Set:所有註冊的SelectorKey、被選的SelectorKey(通道發生事件)、被取消的SelectorKey(不可直接訪問)
- 每次select()操作,都會從被選的SelectorKey集合中刪除或新增,清楚被取消的SelectorKey中的SelectorKey
(3)選擇建(SelectorKey)
選擇鍵封裝了特定的 通道 與 特定的選擇器 的註冊關係。選擇鍵物件被SelectableChannel.register()返回並提供一個表示這種註冊關係的標記。選擇鍵包含了兩個位元集(以整數的形式進行編碼),指示了該註冊關係所關心的通道操作,以及通道已經準備好的操作。包括讀、寫、連線和接收操作,如下:
public static final int OP_READ = 1 << 0; public static final int OP_WRITE = 1 << 2; public static final int OP_CONNECT = 1 << 3; public static final int OP_ACCEPT = 1 << 4;
3.Reactor設計模式
基於Selector的多路複用IO,機制是採用Reactor設計模式,將一個或多個客戶的服務請求分離(demultiplex)和事件分發器 (dispatch)給應用程式( I/O模型之三:兩種高效能 I/O 設計模式 Reactor 和 Proactor ),即通過Selector阻塞等待事件發生,然後再分發給相應的處理器介面。詳情可以參考該篇文章或更多的資料。
摘自連結文章中的一幅圖如下:
- Reactor是排程中心,包含select()阻塞,等待事件發生,並分發不同的業務處理。
- 客戶端請求連線時,select()接收到事件後,會調acceptor,建立連線並與客戶端互動。
- 客戶端寫資料給服務端時,select()接收到事件後,調read操作,讀取客戶端資料,可以採用執行緒池對與客戶端互動,對資料進行處理。
- 服務端可也以發生資料給客戶端。
4.總結
1. SelectableChannel(ServerSocketChannel和SocketChannel)可以註冊到Selector中,並用選擇鍵(SelectorKey)進行分裝
2. SelectorKey中包含選擇器感興趣的事件(讀、寫、連線和接收)
3. Selector中select()方法阻塞,直到註冊通道有事件發生,可以一個執行緒監控多個客戶端,實現多路複用
4. 基於Selector的多路複用採用Reactor設計模式,使得選擇器與業務處理進行分離。
5. Netty是非同步基於事件的應用框架,其實現是基於Java NIO的,並對其進行了優化,可以進一步學習。
5. 參考
《Thinking in Enterprise Java》