Android SocketChannel使用
一 概述:
是一種面向流連線只sockets套接字的可選擇通道。是基於TCP連線傳輸,主要用來處理網路I/O的通道,實現了可選擇通道,可以被多路複用。
二 特徵:
1 對於已經存在的socket不能建立SocketChannel
2 SocketChannel中提供的open介面建立的Channel並沒有進行網路級聯,需要使用connect介面連線到指定地址
3 未進行連線的SocketChannle執行I/O操作時,會丟擲NotYetConnectedException
4 SocketChannel支援兩種I/O模式:阻塞式和非阻塞式
5 SocketChannel支援非同步關閉。如果SocketChannel在一個執行緒上read阻塞,另一個執行緒對該SocketChannel呼叫shutdownInput,則讀阻塞的執行緒將返回-1表示沒有讀取任何資料;如果SocketChannel在一個執行緒上write阻塞,另一個執行緒對該SocketChannel呼叫shutdownWrite,則寫阻塞的執行緒將丟擲AsynchronousCloseException
三 SocketChannel的使用:
(1)建立
Selector mSelector = Selector.open(); InetSocketAddress inetSocketAddress =new InetSocketAddress(mRemoteIp, Constants.TCP_PORT); SocketChannel socketChannel= SocketChannel.open(inetSocketAddress); socketChannel.configureBlocking(false); socketChannel.register(mSelector, SelectionKey.OP_READ);
(2) 連線校驗
socketChannel.isOpen();// 測試SocketChannel是否為open狀態socketChannel.isConnected();//測試SocketChannel是否已經被連線socketChannel.isConnectionPending();//測試SocketChannel是否正在進行連線socketChannel.finishConnect();//校驗正在進行套接字連線的SocketChannel是否已經完成連線
(3) 讀寫模式
前面提到SocketChannel支援阻塞和非阻塞兩種模式:
socketChannel.configureBlocking(false);
主要是通過以上方法設定SocketChannel的讀寫模式。false表示非阻塞,true表示阻塞。
(4) 讀寫
SocketChannel socketChannel = SocketChannel.open(newInetSocketAddress("www.baidu.com",8080)); ByteBuffer byteBuffer = ByteBuffer.allocate(16); socketChannel.read(byteBuffer); socketChannel.close(); System.out.println("test end!");
以上為阻塞式讀,當執行到read出,執行緒將阻塞,控制檯將無法列印test end!。
SocketChannel socketChannel =SocketChannel.open(newInetSocketAddress("www.baidu.com",8080));socketChannel.configureBlocking(false); ByteBuffer byteBuffer = ByteBuffer.allocate(16); socketChannel.read(byteBuffer); socketChannel.close(); System.out.println("test end!");
以上為非阻塞讀,控制檯將列印test end!。
讀寫都是面向緩衝區,這個讀寫方式與前文中的FileChannel一樣,這裡不再贅述。
(5) 設定和獲取引數
socketChannel.setOption(StandardSocketOptions.SO_KEEPALIVE,Boolean.TRUE).setOption(StandardSocketOptions.TCP_NODELAY,Boolean.TRUE);
通過setOptions方法可以設定socket套接字的相關引數。
socketChannel.getOption(StandardSocketOptions.SO_KEEPALIVE)socketChannel.getOption(StandardSocketOptions.SO_RCVBUF)
可以通過getOption獲取相關引數的值。如預設的接收緩衝區大小是8192byte。
四 一個完整的例子
package com.zongmu.rpa.probes; import android.text.TextUtils; import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.Message; import com.google.protobuf.RpcCallback; import com.zongmu.rpa.constant.Constants; import com.zongmu.rpa.model.ProtoPackage; import com.zongmu.rpa.probes.callback.TcpConnCallback; import com.zongmu.rpa.utils.ByteUtils; import com.zongmu.rpa.utils.LogUtil; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Iterator; public class TcpProbes implements ZmProbes { private static String TAG = "TcpProbes"; private Selector mSelector; private SocketChannel mSocketChannel; private String mRemoteIp; private int mRecvLength = -1; private TcpConnCallback mCallback; private ProtoPackage mProtoPackage; public void setTcpConnCallback(TcpConnCallback callback){ mCallback = callback; } public Selector getSelector() { return mSelector; } public String getRemoteIp() { return mRemoteIp; } public void setRemoteIp(String ip) { this.mRemoteIp = ip; } @Override public void sendData(final byte[] pkg) { if (mSocketChannel.isOpen() && mSocketChannel.isConnected()) { new Thread(new Runnable() { @Override public void run() { try { mSocketChannel.write(ByteBuffer.wrap(pkg)); } catch (IOException e) { e.printStackTrace(); } } }).start(); } } @Override public int receData(Message responsePrototype,RpcCallback<Message>done) { if (mSocketChannel == null || !mSocketChannel.isOpen()|| !mSocketChannel.isConnected()){ return Constants.SOCKET_NOT_CONNECT; } int ret = -1; try { ret = mSelector.select(3000); } catch (IOException e) { e.printStackTrace(); } if(ret < 0){ return Constants.RECEIVE_DATA_TIMEOUT; } byte[] recvData = new byte[512]; Iterator<SelectionKey> iterator = mSelector.selectedKeys().iterator(); while(iterator.hasNext()) { SelectionKey key = iterator.next(); if(key.isValid()) { if(key.isReadable()) { LogUtil.e(TAG, "receive data"); try { mRecvLength = mSocketChannel.read(ByteBuffer.wrap(recvData)); } catch (IOException e) { e.printStackTrace(); } LogUtil.e(TAG, "Received data size: " + mRecvLength); } }else { LogUtil.e(TAG, "wait server response timeout"); mRecvLength = -1; recvData = null; } if (!iterator.hasNext()) { break; } } if(mProtoPackage == null){ mProtoPackage = new ProtoPackage(); } String raw_data = new String(recvData, 0, mRecvLength); LogUtil.e(TAG, "RawPackage:" + raw_data); LogUtil.e(TAG, "RawPackage:" + ByteUtils.bytes2HexString(ByteUtils.getSubArrays(recvData,0,mRecvLength))); String data = new String(recvData, 0, 13); LogUtil.e(TAG, "data:"+data); if (TextUtils.equals("ZongMuService", data)) { LogUtil.e(TAG, "receive server response"); mProtoPackage.setPackageBuffer(recvData); mProtoPackage.parsePackage(); if(mRecvLength != mProtoPackage.getPackageSize()){ LogUtil.e(TAG,"data length parse Error"); recvData = null; mRecvLength = -1; return -3; } byte[] serviceParamPackage = mProtoPackage.getParamPackage(); try { responsePrototype = responsePrototype.getParserForType().parseFrom(serviceParamPackage); done.run(responsePrototype); } catch (InvalidProtocolBufferException e) { e.printStackTrace(); } recvData = null; mRecvLength = -1; LogUtil.d(TAG,"callback finish"); }else { LogUtil.d(TAG,"receive error"); recvData = null; mRecvLength = -1; return Constants.RECEIVE_DATA_ERROR; } return Constants.RECEIVE_DATA_SUCCESS; } @Override public void start() { if(TextUtils.isEmpty(mRemoteIp)){ LogUtil.i( TAG,"remote ip is null"); return; } new Thread(new Runnable() { @Override public void run() { try { mSelector = Selector.open(); InetSocketAddress inetSocketAddress = new InetSocketAddress(mRemoteIp, Constants.TCP_PORT); mSocketChannel = SocketChannel.open(inetSocketAddress); mSocketChannel.configureBlocking(false); mSocketChannel.register(mSelector, SelectionKey.OP_READ); LogUtil.e(TAG, "tcp socket connect success"); mCallback.onTcpConnect(Constants.SOCKET_CONN_SUCCESS); } catch (IOException e) { e.printStackTrace(); LogUtil.e(TAG, "tcp socket connect failed"); mCallback.onTcpConnect(Constants.SOCKET_CONN_FAILED); } } }).start(); } @Override public void stop() { try { if (mSocketChannel != null && mSocketChannel.isConnected()) { mSocketChannel.finishConnect(); mSelector.close(); mSocketChannel.close(); LogUtil.e( TAG,"tcp socket closed"); } } catch (IOException e) { e.printStackTrace(); LogUtil.e( TAG,"tcp socket closed:"+e.toString()); } } }