java核心技術-NIO
1、reactor(反應器)模式
使用單執行緒模擬多執行緒,提高資源利用率和程式的效率,增加系統吞吐量。下面例子比較形象的說明了什麼是反應器模式:
一個老闆經營一個飯店,
傳統模式 - 來一個客人安排一個服務員招呼,客人很滿意;(相當於一個連線一個執行緒)
後來客人越來越多,需要的服務員越來越多,資源條件不足以再請更多的服務員了,傳統模式已經不能滿足需求。老闆之所以為老闆自然有過人之處,老闆發現,服務員在為客人服務時,當客人點菜的時候,服務員基本處於等待狀態,(阻塞執行緒,不做事)。
於是乎就讓服務員在客人點菜的時候,去為其他客人服務,當客人菜點好後再招呼服務員即可。 --反應器(reactor)模式誕生了
飯店的生意紅紅火火,幾個服務員就足以支撐大量的客流量,老闆用有限的資源賺了更多的money~~~~ _
通道:類似於流,但是可以非同步讀寫資料(流只能同步讀寫),通道是雙向的,(流是單向的),通道的資料總是要先讀到一個buffer 或者 從一個buffer寫入,即通道與buffer進行資料互動。
通道型別:
- FileChannel:從檔案中讀寫資料。
- DatagramChannel:能通過 UDP 讀寫網路中的資料。
- SocketChannel:能通過 TCP 讀寫網路中的資料。
- ServerSocketChannel:可以監聽新進來的 TCP 連線,像Web伺服器那樣。對每一個新進來的連線都會建立一個SocketChannel。
-FileChannel比較特殊,它可以與通道進行資料互動, 不能切換到非阻塞模式,套接字通道可以切換到非阻塞模式;
緩衝區 - 本質上是一塊可以儲存資料的記憶體,被封裝成了buffer物件而已!
緩衝區型別:
- ByteBuffer
- MappedByteBuffer
- CharBuffer
- DoubleBuffer
- FloatBuffer
- IntBuffer
- LongBuffer
- ShortBuffer
常用方法:
- allocate() - 分配一塊緩衝區
- put() - 向緩衝區寫資料
- get() - 向緩衝區讀資料
- filp() - 將緩衝區從寫模式切換到讀模式
- clear() - 從讀模式切換到寫模式,不會清空資料,但後續寫資料會覆蓋原來的資料,即使有部分資料沒有讀,也會被遺忘;
- compact() - 從讀資料切換到寫模式,資料不會被清空,會將所有未讀的資料copy到緩衝區頭部,後續寫資料不會覆蓋,而是在這些資料之後寫資料
- mark() - 對position做出標記,配合reset使用
- reset() - 將position置為標記值
緩衝區的一些屬性:
-
capacity - 緩衝區大小,無論是讀模式還是寫模式,此屬性值不會變;
-
position - 寫資料時,position表示當前寫的位置,每寫一個數據,會向下移動一個數據單元,初始為0;最大為capacity - 1,切換到讀模式時,position會被置為0,表示當前讀的位置
-
limit - 寫模式下,limit 相當於capacity 表示最多可以寫多少資料,切換到讀模式時,limit 等於原先的position,表示最多可以讀多少資料。
非直接緩衝區:通過allocate() 方法 分配緩衝區,將緩衝區建立在JVM記憶體中
直接緩衝區:通過allocateDirect() 方法直接緩衝區 將緩衝區建立在實體記憶體中
2.1 關於緩衝區各個屬性的測試
String str = "abcde"; //1. 分配一個指定大小的緩衝區 ByteBuffer buf = ByteBuffer.allocate(1024); System.out.println("--------------allocate()----------------"); System.out.println(buf.position());//0 System.out.println(buf.limit());//1024 System.out.println(buf.capacity());//1024 //2. 利用put存入資料到緩衝區中去 buf.put(str.getBytes()); System.out.println("----------------put()-------------------"); System.out.println(buf.position());//5 System.out.println(buf.limit());//1024 System.out.println(buf.capacity());//1024 //3. 切換到讀取模式 buf.flip(); System.out.println("----------------flip()------------------"); System.out.println(buf.position());//0 System.out.println(buf.limit());//5 System.out.println(buf.capacity());//1024 //4. 利用get() 讀取緩衝區中的資料 byte[] dst = new byte[buf.limit()]; buf.get(dst); System.out.println(new String(dst,0,dst.length)); System.out.println("----------------get()------------------"); System.out.println(buf.position());//5 System.out.println(buf.limit());//5 System.out.println(buf.capacity());//1024 //5.可重複讀 buf.rewind(); System.out.println("----------------rewind()------------------"); System.out.println(buf.position());//0 System.out.println(buf.limit());//5 System.out.println(buf.capacity());//1024 //6.clear(): 清空緩衝區, 但是緩衝區的資料依然存在, 但是處於被遺忘的狀態 buf.clear(); System.out.println("----------------clear()-------------------"); System.out.println(buf.position());//0 System.out.println(buf.limit());//1024 System.out.println(buf.capacity());//1024 byte[] newByte = new byte[buf.limit()]; buf.get(newByte); System.out.println(new String(newByte,0,newByte.length));
2.2 關於通道的使用
1.利用通道進行 檔案的複製 非直接緩衝區
FileInputStream fis = null; FileOutputStream fos = null; FileChannel inChannel = null; FileChannel outChannel = null; try { fis = new FileInputStream("1.jpg"); fos = new FileOutputStream("2.jpg"); // ①獲取通道 inChannel = fis.getChannel(); outChannel = fos.getChannel(); // ②將通道中的資料存入緩衝區 ByteBuffer byteBuffer = ByteBuffer.allocate(1024); // 將通道中的資料存入緩衝區 while (inChannel.read(byteBuffer) != -1) { byteBuffer.flip(); // 切換讀取資料的模式 outChannel.write(byteBuffer); byteBuffer.clear(); } } catch (IOException e) { e.printStackTrace(); } finally { if (inChannel != null) { try { inChannel.close(); } catch (IOException e) { e.printStackTrace(); } } if (outChannel != null) { try { outChannel.close(); } catch (IOException e) { e.printStackTrace(); } } if (fis != null) { try { fis.close(); } catch (IOException e) { e.printStackTrace(); } } if (fos != null) { try { fos.close(); } catch (IOException e) { e.printStackTrace(); } } }
2.通道之間的傳輸
CREATE_NEW:如果檔案不存在就建立,存在就報錯
CREATE:如果檔案不存在就建立,存在建立(覆蓋)
FileChannel inChannel = null; FileChannel outChannel = null; try { inChannel = FileChannel.open(Paths.get("hello.txt"), StandardOpenOption.READ); outChannel = FileChannel.open(Paths.get("hello2.txt"), StandardOpenOption.READ,StandardOpenOption.WRITE,StandardOpenOption.CREATE_NEW); inChannel.transferTo(0, inChannel.size(), outChannel); } catch (Exception e) { e.printStackTrace(); }finally { if(inChannel != null){ try { inChannel.close(); } catch (IOException e) { e.printStackTrace(); } } if(outChannel != null){ try { outChannel.close(); } catch (IOException e) { e.printStackTrace(); } } }
3. 使用直接緩衝區完成記憶體檔案的複製
FileChannel inChannel = null; FileChannel outChannel = null; try { inChannel = FileChannel.open(Paths.get("1.jpg"), StandardOpenOption.READ); outChannel = FileChannel.open(Paths.get("x.jpg"), StandardOpenOption.READ,StandardOpenOption.WRITE,StandardOpenOption.CREATE_NEW); MappedByteBuffer inMappedBuffer = inChannel.map(MapMode.READ_ONLY, 0, inChannel.size()); MappedByteBuffer outMappedBuffer = outChannel.map(MapMode.READ_WRITE, 0, inChannel.size()); System.out.println(inMappedBuffer.limit()); byte[] b = new byte[inMappedBuffer.limit()];; inMappedBuffer.get(b); outMappedBuffer.put(b); } catch (Exception e) { e.printStackTrace(); } finally { if(inChannel != null){ try { inChannel.close(); } catch (IOException e) { e.printStackTrace(); } } if(outChannel != null){ try { outChannel.close(); } catch (IOException e) { e.printStackTrace(); } } }
2.3 重點 NIO-非阻塞IO
個人認為 NIO 最難的兩點 一個是對於選擇器和選擇鍵的理解 其次是對於網路通訊模型的理解
本章內容以防過長 只講解 NIO 的使用方法 上述兩點參看下回分解
阻塞IO示例:
//客戶端 @Test public void client() throws IOException{ SocketChannel sChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 9898)); FileChannel inChannel = FileChannel.open(Paths.get("1.jpg"), StandardOpenOption.READ); ByteBuffer buf = ByteBuffer.allocate(1024); while(inChannel.read(buf) != -1){ buf.flip(); sChannel.write(buf); buf.clear(); } sChannel.shutdownOutput(); //接收服務端的反饋 int len = 0; while((len = sChannel.read(buf)) != -1){ buf.flip(); System.out.println(new String(buf.array(), 0, len)); buf.clear(); } inChannel.close(); sChannel.close(); } //服務端 @Test public void server() throws IOException{ ServerSocketChannel ssChannel = ServerSocketChannel.open(); FileChannel outChannel = FileChannel.open(Paths.get("2.jpg"), StandardOpenOption.WRITE, StandardOpenOption.CREATE); ssChannel.bind(new InetSocketAddress(9898)); SocketChannel sChannel = ssChannel.accept(); ByteBuffer buf = ByteBuffer.allocate(1024); while(sChannel.read(buf) != -1){ buf.flip(); outChannel.write(buf); buf.clear(); } //傳送反饋給客戶端 buf.put("服務端接收資料成功".getBytes()); buf.flip(); sChannel.write(buf); sChannel.close(); outChannel.close(); ssChannel.close(); }
非阻塞IO示例-TCP:
//客戶端 @Test public void client() throws IOException{ //1. 獲取通道 SocketChannel sChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 9898)); //2. 切換非阻塞模式 sChannel.configureBlocking(false); //3. 分配指定大小的緩衝區 ByteBuffer buf = ByteBuffer.allocate(1024); //4. 傳送資料給服務端 Scanner scan = new Scanner(System.in); while(scan.hasNext()){ String str = scan.next(); buf.put((new Date().toString() + "\n" + str).getBytes()); buf.flip(); sChannel.write(buf); buf.clear(); } //5. 關閉通道 sChannel.close(); } //服務端 @Test public void server() throws IOException{ //1. 獲取通道 ServerSocketChannel ssChannel = ServerSocketChannel.open(); //2. 切換非阻塞模式 ssChannel.configureBlocking(false); //3. 繫結連線 ssChannel.bind(new InetSocketAddress(9898)); //4. 獲取選擇器 Selector selector = Selector.open(); //5. 將通道註冊到選擇器上, 並且指定“監聽接收事件” ssChannel.register(selector, SelectionKey.OP_ACCEPT); //6. 輪詢式的獲取選擇器上已經“準備就緒”的事件 while(selector.select() > 0){ //7. 獲取當前選擇器中所有註冊的“選擇鍵(已就緒的監聽事件)” Iterator<SelectionKey> it = selector.selectedKeys().iterator(); while(it.hasNext()){ //8. 獲取準備“就緒”的是事件 SelectionKey sk = it.next(); //9. 判斷具體是什麼事件準備就緒 if(sk.isAcceptable()){ //10. 若“接收就緒”,獲取客戶端連線 SocketChannel sChannel = ssChannel.accept(); //11. 切換非阻塞模式 sChannel.configureBlocking(false); //12. 將該通道註冊到選擇器上 sChannel.register(selector, SelectionKey.OP_READ); }else if(sk.isReadable()){ //13. 獲取當前選擇器上“讀就緒”狀態的通道 SocketChannel sChannel = (SocketChannel) sk.channel(); //14. 讀取資料 ByteBuffer buf = ByteBuffer.allocate(1024); int len = 0; while((len = sChannel.read(buf)) > 0 ){ buf.flip(); System.out.println(new String(buf.array(), 0, len)); buf.clear(); } } //15. 取消選擇鍵 SelectionKey it.remove(); } } }
非阻塞IO示例-UDP:
@Test public void send() throws IOException{ DatagramChannel dc = DatagramChannel.open(); dc.configureBlocking(false); ByteBuffer buf = ByteBuffer.allocate(1024); Scanner scan = new Scanner(System.in); while(scan.hasNext()){ String str = scan.next(); buf.put((new Date().toString() + ":\n" + str).getBytes()); buf.flip(); dc.send(buf, new InetSocketAddress("127.0.0.1", 9898)); buf.clear(); } dc.close(); } @Test public void receive() throws IOException{ DatagramChannel dc = DatagramChannel.open(); dc.configureBlocking(false); dc.bind(new InetSocketAddress(9898)); Selector selector = Selector.open(); dc.register(selector, SelectionKey.OP_READ); while(selector.select() > 0){ Iterator<SelectionKey> it = selector.selectedKeys().iterator(); while(it.hasNext()){ SelectionKey sk = it.next(); if(sk.isReadable()){ ByteBuffer buf = ByteBuffer.allocate(1024); dc.receive(buf); buf.flip(); System.out.println(new String(buf.array(), 0, buf.limit())); buf.clear(); } } it.remove(); } }