Java NIO:Buffer、Channel 和 Selector
Buffer
一個 Buffer 本質上是記憶體中的一塊,我們可以將資料寫入這塊記憶體,之後從這塊記憶體獲取資料。
java.nio 定義了以下幾個 Buffer 的實現,這個圖讀者應該也在不少地方見過了吧。
其實核心是最後的 ByteBuffer,前面的一大串類只是包裝了一下它而已,我們使用最多的通常也是 ByteBuffer。
我們應該將 Buffer 理解為一個數組,IntBuffer、CharBuffer、DoubleBuffer 等分別對應 int[]、char[]、double[] 等。
MappedByteBuffer 用於實現記憶體對映檔案,也不是本文關注的重點。
我覺得操作 Buffer 和運算元組、類集差不多,只不過大部分時候我們都把它放到了 IO/">NIO 的場景裡面來使用而已。下面介紹 Buffer 中的幾個重要屬性和幾個重要方法。
position、limit、capacity
就像陣列有陣列容量,每次訪問元素要指定下標,Buffer 中也有幾個重要屬性:position、limit、capacity。
最好理解的當然是 capacity,它代表這個緩衝區的容量,一旦設定就不可以更改。比如 capacity 為 1024 的 IntBuffer,代表其一次可以存放 1024 個 int 型別的值。一旦 Buffer 的容量達到 capacity,需要清空 Buffer,才能重新寫入值。
position 和 limit 是變化的,我們分別看下讀和寫操作下,它們是如何變化的。
position 的初始值是 0,每往 Buffer 中寫入一個值,position 就自動加 1,代表下一次的寫入位置。讀操作的時候也是類似的,每讀一個值,position 就自動加 1。
從寫操作模式到讀操作模式切換的時候( flip ),position 都會歸零,這樣就可以從頭開始讀寫了。
limit:寫操作模式下,limit 代表的是最大能寫入的資料,這個時候 limit 等於 capacity。寫結束後,切換到讀模式,此時的 limit 等於 Buffer 中實際的資料大小,因為 Buffer 不一定被寫滿了。
初始化 Buffer
每個 Buffer 實現類都提供了一個靜態方法 allocate(int capacity)
幫助我們快速例項化一個 Buffer。如:
ByteBuffer byteBuf = ByteBuffer.allocate(1024); IntBuffer intBuf = IntBuffer.allocate(1024); LongBuffer longBuf = LongBuffer.allocate(1024);
另外,我們經常使用 wrap 方法來初始化一個 Buffer。
public static ByteBuffer wrap(byte[] array) { ... }
填充 Buffer
各個 Buffer 類都提供了一些 put 方法用於將資料填充到 Buffer 中,如 ByteBuffer 中的幾個 put 方法:
// 填充一個 byte 值 public abstract ByteBuffer put(byte b); // 在指定位置填充一個 int 值 public abstract ByteBuffer put(int index, byte b); // 將一個數組中的值填充進去 public final ByteBuffer put(byte[] src) {...} public ByteBuffer put(byte[] src, int offset, int length) {...}
上述這些方法需要自己控制 Buffer 大小,不能超過 capacity,超過會java.nio.BufferOverflowException 異常。
對於 Buffer 來說,另一個常見的操作中就是,我們要將來自 Channel 的資料填充到 Buffer 中,在系統層面上,這個操作我們稱為讀操作,因為資料是從外部(檔案或網路等)讀到記憶體中。
int num = channel.read(buf);
上述方法會返回從 Channel 中讀入到 Buffer 的資料大小。
提取 Buffer 中的值
前面介紹了寫操作,每寫入一個值,position 的值都需要加 1,所以 position 最後會指向最後一次寫入的位置的後面一個,如果 Buffer 寫滿了,那麼 position 等於 capacity(position 從 0 開始)。
如果要讀 Buffer 中的值,需要切換模式,從寫入模式切換到讀出模式。注意,通常在說 NIO 的讀操作的時候,我們說的是從 Channel 中讀資料到 Buffer 中,對應的是對 Buffer 的寫入操作,初學者需要理清楚這個。
呼叫Buffer的flip()方法,可以從寫模式切換到讀模式,其實就是重新設定了一下position和limit的值。
public final Buffer flip() { limit = position; // 將 limit 設定為實際寫入的資料數量 position = 0; // 重置 position 為 0 mark = -1; // mark 之後再說 return this; }
對應寫操作的一系列put方法,讀操作提供了一系列的get()方法:
// 根據 position 來獲取資料 public abstract byte get(); // 獲取指定位置的資料 public abstract byte get(int index); // 將 Buffer 中的資料寫入到陣列中 public ByteBuffer get(byte[] dst)
附一個經常使用的方法:
new String(buffer.array()).trim();
除了將資料從Buffer讀取出來使用,更常見的操作是將寫入的資料輸出到Channel中,如通過FileChannel將資料寫入到檔案中,通過SocketChannel將資料寫入到網路傳送到遠端機器等。對應的,這種操作,我們稱之為寫操作。
int num = channel.write(buf);
mark()、reset()
除了position、limit、capacity這三個基本屬性外,還有一個常用的屬性就是mark。
mark用於臨時儲存position的值,每次呼叫mark()方法都會將mark設定為當前的position,便於後學需要的時候使用。
public final Buffer mark() { mark = position; return this; }
那到底什麼時候用呢?考慮以下場景,我們在 position 為 5 的時候,先 mark() 一下,然後繼續往下讀,讀到第 10 的時候,我想重新回到 position 為 5 的地方重新來一遍,那隻要調一下 reset() 方法,position 就回到 5 了。
public final Buffer reset() { int m = mark; if (m < 0) throw new InvalidMarkException(); position = m; return this; }
rewind()、clear()、compact()
rewind():會重置position為0,通常用於從頭讀寫Buffer。
public final Buffer rewind() { position = 0; mark = -1; return this; }
clear():相當於重新例項化。
通常,我們會先填充Buffer,然後從Buffer讀取資料,之後再重新往裡填充新的資料,我們一般在填充之前先呼叫clear().
public final Buffer clear() { position = 0; limit = capacity; mark = -1; return this; }
compact():和clear()一樣的是都是在準備往Buffer中填充新資料之前呼叫。
clear()會重置幾個屬性,但是並不會將Buffer中的資料清空,只不過後面寫的時候會覆蓋之前的資料。
而compact()方法呼叫之後,會先處理還沒有讀取的資料,也就是position到limit直接的資料,先將這些資料都移動到左邊,然後在這個基礎之上再開始寫入。此時,limit還是等於capacity,position指向原來資料的右邊。
Channel
所有的 NIO 操作始於通道,通道是資料來源或資料寫入的目的地,主要地,我們將關心 java.nio 包中實現的以下幾個 Channel:
FileChannel:檔案通道,用於檔案的讀和寫。
DatagramChannel:用於UDP連線的接收和傳送
SocketChannel:TCP客戶端
ServerSocketChannel:TCP服務端,監聽某個埠進來的請求。
Channel 經常翻譯為通道,類似 IO 中的流,用於讀取和寫入。它與前面介紹的 Buffer 打交道,讀操作的時候將 Channel 中的資料填充到 Buffer 中,而寫操作時將 Buffer 中的資料寫入到 Channel 中。
FileChannel
初始化:
FileInputStream inputStream = new FileInputStream(new File("/data.txt")); FileChannel fileChannel = inputStream.getChannel();
當然了,也可以從RandomAccessFile類中的getChannel來得到FileChannel。
讀取檔案內容:
ByteBuffer buffer = ByteBuffer.allocate(1024); int num = fileChannel.read(buffer);
寫入檔案內容:
ByteBuffer buffer = ByteBuffer.allocate(1024); buffer.put("隨機寫入一些內容到 Buffer 中".getBytes()); // Buffer 切換為讀模式 buffer.flip(); while(buffer.hasRemaining()) { // 將 Buffer 中的內容寫入檔案 fileChannel.write(buffer); }
SocketChannel
開啟一個TCP連結:
SocketChannel socketChannel = SocketChannel .open(new InetSocketAddress("127.0.0.1", 80));
當然了,上面的這行程式碼等價於下面的兩行:
// 開啟一個通道 SocketChannel socketChannel = SocketChannel.open(); // 發起連線 socketChannel.connect(new InetSocketAddress("127.0.0.1", 80));
SocketChannel 的讀寫和 FileChannel 沒什麼區別,就是操作緩衝區。
// 讀取資料 socketChannel.read(buffer); // 寫入資料到網路連線中 while(buffer.hasRemaining()) { socketChannel.write(buffer); }
ServerSocketChannel
ServerSocketChannel 用於監聽機器埠,管理從這個埠進來的 TCP 連線。
// 例項化 ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); // 監聽 8080 埠 serverSocketChannel.socket().bind(new InetSocketAddress(8080)); while (true) { // 一旦有一個 TCP 連線進來,就對應建立一個 SocketChannel 進行處理 SocketChannel socketChannel = serverSocketChannel.accept(); }
這裡我們看到了SocketChannel的第二個例項化方式。
到這裡,我們應該能理解SocketChannel了,它不僅僅是TCP客戶端,它代表的是一個網路通道,可讀可寫。
ServerSocketChannel不和Buffer打交道了,因為它並不實際處理資料,一旦接到請求,就會例項化一個SocketChannel,之後再這個簡介通道上傳遞的資料它就不管了,它會繼續監聽埠等待下一個連線。
DatagramChannel
UDP 和 TCP 不一樣,DatagramChannel 一個類處理了服務端和客戶端。
UDP 是面向無連線的,不需要和對方握手,不需要通知對方,就可以直接將資料包投出去,至於能不能送達,它是不知道的.
監聽埠:
DatagramChannel channel = DatagramChannel.open(); channel.socket().bind(new InetSocketAddress(9090)); ByteBuffer buf = ByteBuffer.allocate(48); channel.receive(buf);
傳送資料:
String newData = "New String to write to file..." + System.currentTimeMillis(); ByteBuffer buf = ByteBuffer.allocate(48); buf.put(newData.getBytes()); buf.flip(); int bytesSent = channel.send(buf, new InetSocketAddress("jenkov.com", 80));
Selector
Selector建立在非阻塞的基礎之上,大家經常聽到的多路複用在java世界中指的就是它,用於實現一個執行緒管理多個Channel。
開啟Selector:
Selector selector = Selector.open();
將 Channel 註冊到 Selector 上。前面我們說了,Selector 建立在非阻塞模式之上,所以註冊到 Selector 的 Channel 必須要支援非阻塞模式, FileChannel 不支援非阻塞 ,我們這裡討論最常見的 SocketChannel 和 ServerSocketChannel。
// 將通道設定為非阻塞模式,因為預設都是阻塞模式的 channel.configureBlocking(false); // 註冊 SelectionKey key = channel.register(selector, SelectionKey.OP_READ);
register 方法的第二個 int 型引數(使用二進位制的標記位)用於表明需要監聽哪些感興趣的事件,共以下四種事件:
SelectionKey.OP_READ:對應 00000001,通道中有資料可以進行讀取
SelectionKey.OP_WRITE: 對應 00000100,可以往通道中寫入資料
SelectionKey.OP_CONNECT: 對應 00001000,成功建立 TCP 連線
SelectionKey.OP_ACCEPT: 對應 00010000,接受 TCP 連線
我們可以同時監聽一個 Channel 中的發生的多個事件,比如我們要監聽 ACCEPT 和 READ 事件,那麼指定引數為二進位制的 000 1 000 1 即十進位制數值 17 即可。
註冊方法返回值是 SelectionKey 例項,它包含了 Channel 和 Selector 資訊,也包括了一個叫做 Interest Set 的資訊,即我們設定的我們感興趣的正在監聽的事件集合。
呼叫 select() 方法獲取通道資訊。用於判斷是否有我們感興趣的事件已經發生了。
示例:
Selector selector = Selector.open(); channel.configureBlocking(false); SelectionKey key = channel.register(selector, SelectionKey.OP_READ); while(true) { // 判斷是否有事件準備好 int readyChannels = selector.select(); if(readyChannels == 0) continue; // 遍歷 Set<SelectionKey> selectedKeys = selector.selectedKeys(); Iterator<SelectionKey> keyIterator = selectedKeys.iterator(); while(keyIterator.hasNext()) { SelectionKey key = keyIterator.next(); if(key.isAcceptable()) { // a connection was accepted by a ServerSocketChannel. } else if (key.isConnectable()) { // a connection was established with a remote server. } else if (key.isReadable()) { // a channel is ready for reading } else if (key.isWritable()) { // a channel is ready for writing } keyIterator.remove(); } }
對於Selector,需要熟悉以下幾個方法:
select()
呼叫此方法,會將 上次 select 之後的 準備好的 channel 對應的 SelectionKey 複製到 selected set 中。如果沒有任何通道準備好,這個方法會阻塞,直到至少有一個通道準備好。
selectNow()
功能和 select 一樣,區別在於如果沒有準備好的通道,那麼此方法會立即返回 0。
select(long timeout)
看了前面兩個,這個應該很好理解了,如果沒有通道準備好,此方法會等待一會
wakeup()
這個方法是用來喚醒等待在 select() 和 select(timeout) 上的執行緒的。如果 wakeup() 先被呼叫,此時沒有執行緒在 select 上阻塞,那麼之後的一個 select() 或 select(timeout) 會立即返回,而不會阻塞,當然,它只會作用一次。
調
用 Buffer 的 flip() 方法,可以從寫入模式切換到讀取模式。其實這個方法也就是設定了一下 position 和 limit 值罷了