android中非阻塞socket通信
时间:2014-05-18 04:01:56
收藏:0
阅读:389
1、什么是同步与异步,阻塞与非阻塞
首先我们要明白搞明白:同步就等于阻塞?异步就等于非阻塞?这是不对的,同步不等于阻 塞,而异步也不等于非阻塞。
1)那什么是同步编程?
什么是同步,就是在发出一个功能调用时,在没有得到结果之前,该调用就不返回。根据这个定义,android中绝大多数函数都是同步调用。但是一般而言,我们在谈论同步、异步的时候,特指那些需要其他部件协作或者需要一定时间完成的任务。在android中,由于主线程(UI线程的不安全性),我们经常会用到handler的SendMessage函数,就是一个同步线程,它将数据传送给某个窗口后,在对方处理完消息后,这个函数是不会返回的,当处理完毕的时候才返回相应的返回值。
2)那什么是异步编程?
异步的概念和同步相反的。当一个调用者异步发出一个功能调用时,调用者不能立刻得到结果。实际处理这个调用的部件在完成后,通过状态、通知和回调来通知调用者。以 android中AsyncTask类为例,顾名思义异步执行任务,在doInBackground 执行完成后,onPostExecute 方法将被UI 线程调用,后台的计算结果将通过该方法传递到UI 线程,并且在界面上展示给用户.。在android或者java异步编程中需要注意以下几个知识点:回调,监听者模式,观察者模式。这几点在之后另外几篇文章中会提及。
3)什么是阻塞式编程?
阻塞调用是指调用结果返回之前,当前线程会被挂起。函数只有在 得到结果之后才会返回。因为这点定义跟同步编程的定义很相像,所以很多人认为同步编程就等阻塞式编程。对于同步调用来说,很多时候当前线程还是激活的,只是从逻辑上当前函数没有返回而已。例如,我们在 socket编程中调用Receive函数,如果缓冲区中没有数据,这个函数就会一直等待,直到有数据才返回。而此时,当前线程还会继续处理各种各样的消
息。如果主窗口和调用函数在同一个线程中,除非你在特殊的界面操作函数中调用,其实主界面还是应该可以刷新。但是在android中,由于主线程(UI线程)的不安全性,特别到4.0版本后,系统已经不允许在主线程中进行耗时的同步编程。所以android才出现了AsyncTask类用于异步编程。
4)什么是非阻塞式编程?
非阻塞和阻塞的概念相对应,指在不能立刻得到结果之前,该函数不会阻塞当前线程,而会立刻返回。从这个定义上来说,非阻塞编程可以说是异步编程的一种,但是异步编程并不等于非阻塞式编程。
5)区别大概
我们用买票的案例去理解它,当我们去买票的时候,如果还在排队,一直排着,直到买到票再离开,这个就是同步编程(所谓同步就是当一个进程发起一个函数(任务)调用的时候,一直会到函数(任务)完成)。那还有另外一方式,你可以叫一个人(监听者,观察者)帮你看着,直接你买票了,再通知你,你可以先去别的事情(而异步这不会这样,异步情况下是当一个进程发
起一个函数(任务)调用的时候,不会等函数返回)。阻塞是就是等排队,非阻塞就是直接走开。
2、几个关键知识点
1)java.net.InetSocketAddress
此类实现 IP 套接字地址(IP 地址 + 端口号)。它还可以是一个对(主机名 + 端口号),在此情况下,将尝试解析主机名。如果解析失败,则该地址将被视为未解析 地址,但是其在某些情形下仍然可以使用,比如通过代理连接。
需注意接口:
public InetSocketAddress(InetAddress addr,int port)
根据 IP 地址和端口号创建套接字地址。
有效端口值介于 0 和 65535 之间。端口号 zero 允许系统在 bind 操作中挑选暂时的端口。
有效端口值介于 0 和 65535 之间。端口号 zero 允许系统在 bind 操作中挑选暂时的端口。
2)java.nio.channels.Selector
可通过调用此类的 open 方法创建选择器,该方法将使用系统的默认选择器提供者创建新的选择器。也可通过调用自定义选择器提供者的 openSelector 方法来创建选择器。通过选择器的 close 方法关闭选择器之前,它一直保持打开状态。
需注意接口:
public static Selector open()throws IOException
打开一个选择器。
public abstract void close()throws IOException
关闭此选择器。
如果某个线程目前正阻塞在此选择器的某个选择方法中,则中断该线程,如同调用该选择器的 wakeup 方法那样。
所有仍与此选择器关联的未取消键已无效、其通道已注销,并且与此选择器关联的所有其他资源已释放。
如果此选择器已经关闭,则调用此方法无效。
关闭选择器后,除了调用此方法或 wakeup 方法外,以任何其他方式继续使用它都将导致抛出 ClosedSelectorException。
打开一个选择器。
public abstract void close()throws IOException
关闭此选择器。
如果某个线程目前正阻塞在此选择器的某个选择方法中,则中断该线程,如同调用该选择器的 wakeup 方法那样。
所有仍与此选择器关联的未取消键已无效、其通道已注销,并且与此选择器关联的所有其他资源已释放。
如果此选择器已经关闭,则调用此方法无效。
关闭选择器后,除了调用此方法或 wakeup 方法外,以任何其他方式继续使用它都将导致抛出 ClosedSelectorException。
注:选择器的关闭是关键点,特别需要注意上述第二条
3)java.nio.channels.SocketChannel
针对面向流的连接套接字的可选择通道。
套接字通道不是连接网络套接字的完整抽象。必须通过调用 socket 方法所获得的关联 Socket 对象来完成对套接字选项的绑定、关闭和操作。不可能为任意的已有套接字创建通道,也不可能指定与套接字通道关联的套接字所使用的 SocketImpl 对象。
套接字通道不是连接网络套接字的完整抽象。必须通过调用 socket 方法所获得的关联 Socket 对象来完成对套接字选项的绑定、关闭和操作。不可能为任意的已有套接字创建通道,也不可能指定与套接字通道关联的套接字所使用的 SocketImpl 对象。
通过调用此类的某个 open 方法创建套接字通道。新创建的套接字通道已打开,但尚未连接。试图在未连接的通道上调用 I/O 操作将导致抛出 NotYetConnectedException。可通过调用套接字通道的 connect 方法连接该通道;一旦连接后,关闭套接字通道之前它会一直保持已连接状态。可通过调用套接字通道的 isConnected 方法来确定套接字通道是否已连接。
套接字通道支持非阻塞连接:可创建一个套接字通道,并且通过 connect 方法可以发起到远程套接字的连接,之后通过 finishConnect 方法完成该连接。可通过调用 isConnectionPending 方法来确定是否正在进行连接操作。
可单独地关闭 套接字通道的输入端和输出端,而无需实际关闭该通道。调用关联套接字对象的 shutdownInput 方法来关闭某个通道的输入端将导致该通道上的后续读取操作返回 -1(指示流的末尾)。调用关联套接字对象的 shutdownOutput 方法来关闭通道的输出端将导致该通道上的后续写入操作抛出 ClosedChannelException。
套接字通道支持异步关闭,这与 Channel 类中所指定的异步 close 操作类似。如果一个线程关闭了某个套接字的输入端,而同时另一个线程被阻塞在该套接字通道上的读取操作中,那么处于阻塞线程中的读取操作将完成,而不读取任何字节且返回 -1。I如果一个线程关闭了某个套接字的输出端,而同时另一个线程被阻塞在该套接字通道上的写入操作中,那么阻塞线程将收到 AsynchronousCloseException。
多个并发线程可安全地使用套接字通道。尽管在任意给定时刻最多只能有一个线程进行读取和写入操作,但数据报通道支持并发的读写。connect 和 finishConnect 方法是相互同步的,如果正在调用其中某个方法的同时试图发起读取或写入操作,则在该调用完成之前该操作被阻塞。
3、实例代码演示
连接核心代码:
Selector mSelector = null; ByteBuffer sendBuffer = null; SocketChannel client = null; InetSocketAddress isa = null; SocketEventListener mSocketEventListener = null; private boolean Connect(String site, int port) { if (mSocketEventListener != null) { mSocketEventListener.OnSocketPause(); } boolean ret = false; try { mSelector = Selector.open(); client = SocketChannel.open(); client.socket().setSoTimeout(5000); isa = new InetSocketAddress(site, port); boolean isconnect = client.connect(isa); // 将客户端设定为异步 client.configureBlocking(false); // 在轮讯对象中注册此客户端的读取事件(就是当服务器向此客户端发送数据的时候) client.register(mSelector, SelectionKey.OP_READ); long waittimes = 0; if(!isconnect) { while (!client.finishConnect()) { EngineLog.redLog(TAG, "等待非阻塞连接建立...."); Thread.sleep(50); if(waittimes < 100) { waittimes++; } else { break; } } } Thread.sleep(500); haverepaired(); startListener(); ret = true; } catch (Exception e) { EngineLog.redLog(TAG + " - Connect error", e != null ? e.toString() : "null"); try { Thread.sleep(1000 * 10); } catch (Exception e1) { EngineLog.redLog(TAG + " - Connect error", e1 != null ? e1.toString() : "null"); } ret = false; } return ret; }
在上述代码中,我们可以看到有一个SocketEventListener监听接口,这个接口用于监听socket事件,将其回调给调用者
SocketEventListener接口:
public interface SocketEventListener { /** * Socket正在接收数据 * */ public void OnStreamRecive(); /** * Socket接收数据完成 * */ public void OnStreamReciveFinish(); /** * Socket有新的消息返回 * */ public void OnStreamComing(byte[] aStreamData); /** * Socket出现异常 * */ public void OnSocketPause(); /** * Socket已修复,可用 * */ public void OnSocketAvaliable(); }监听接口的使用:
rivate void startListener() { if (mReadThread == null || mReadThread.isInterrupted()) { mReadThread = null; mReadThread = new Thread() { @Override public void run() { while (!this.isInterrupted() && mRunRead) { MyLineLog.redLog(TAG,"startListener:" + mSendMsgTime); try { // 如果客户端连接没有打开就退出循环 if (!client.isOpen()) break; // 此方法为查询是否有事件发生如果没有就阻塞,有的话返回事件数量 int eventcount = mSelector.select(); // 如果没有事件返回循环 if (eventcount > 0) { starttime = CommonClass.getCurrentTime(); // 遍例所有的事件 for (SelectionKey key : mSelector.selectedKeys()) { // 删除本次事件 mSelector.selectedKeys().remove(key); // 如果本事件的类型为read时,表示服务器向本客户端发送了数据 if (key.isValid() && key.isReadable()) { if (mSocketEventListener != null) { mSocketEventListener.OnStreamRecive(); } boolean readresult = ReceiveDataBuffer((SocketChannel) key.channel()); if (mSocketEventListener != null) { mSocketEventListener.OnStreamReciveFinish(); } if(readresult) { key.interestOps(SelectionKey.OP_READ); sleep(200); } else { throw new Exception(); } } key = null; } mSelector.selectedKeys().clear(); } } catch (Exception e) { mRunRead = false; mReadThread = null; if(e instanceof InterruptedException) { MyLineLog.redLog(TAG, "startListener:" + e.toString()); } else { break; } } } } }; mReadThread.setName(TAG + " Listener, " + CommonClass.getCurrentTime()); mRunRead = true; mReadThread.start(); } }
连接完之后就是发送数据和接收数据,下面是发送数据的核心代码:
public boolean SendSocketMsg(byte[] aMessage) throws IOException { boolean ret = false; try { sendBuffer.clear(); sendBuffer = ByteBuffer.wrap(aMessage); int sendsize = client.write(sendBuffer); sendBuffer.flip(); sendBuffer.clear(); mSendMsgTime = CommonClass.getCurrentTime(); MyLineLog.redLog(TAG, "SendSocketMsg:" + mSendMsgTime + ", sendsize:" + sendsize); ret = true; } catch (Exception e) { MyLineLog.redLog(TAG, "发送数据失败。"); if (mSocketEventListener != null) { mSocketEventListener.OnSocketPause(); } // crash(); } return ret; }
因为实际工作需要,我们需要经常会碰到两个问题,无效数据和大数据,如何去解决这个问题呢,无效数据用过滤,大数据用分块接收,下面是接收数据的方法:
private boolean ReceiveDataBuffer(SocketChannel aSocketChannel) { // n 有数据的时候返回读取到的字节数。 // 0 没有数据并且没有达到流的末端时返回0。 // -1 当达到流末端的时候返回-1。 boolean ret = false; ByteArrayBuffer bab = new ByteArrayBuffer(8*1024); while(true) { try { ByteBuffer readBuffer = ByteBuffer.allocate(1024 * 1); readBuffer.clear(); int readsize = aSocketChannel.read(readBuffer); if(readsize > 0) { MyLineLog.redLog(TAG, "aSocketChannel.read=>" + readsize); byte[] readbytes = readBuffer.array(); bab.append(readbytes, 0, readsize); readBuffer.clear(); readBuffer.flip(); ret = true; } else if(readsize == 0) { int buffersize = bab.length(); byte[] readdata = bab.buffer(); int readdataoffset = 0; boolean parsedata = true; while(readdataoffset < buffersize && parsedata) { byte datatype = readdata[readdataoffset]; if (datatype == PushUtils.PACKAGETYPE_HEARTBEAT || datatype == PushUtils.PACKAGETYPE_HEARTBEAR_NODATA) { byte[] blockdata = new byte[] { datatype }; ReceiveData(blockdata); readdataoffset += 1; blockdata = null; } else { byte[] blocklength = new byte[4]; System.arraycopy(readdata, readdataoffset + 5, blocklength, 0, 4); int blocksize = CommonClass.bytes2int(CommonClass.LitteEndian_BigEndian(blocklength)); blocklength = null; int blockdatasize = 5 + blocksize + 4; if(blockdatasize <= buffersize) { MyLineLog.redLog(TAG, "块数据大小:" + blockdatasize); byte[] blockdata = new byte[blockdatasize]; System.arraycopy(readdata, readdataoffset, blockdata, 0, blockdatasize); long starttime = CommonClass.getCurrentTime(); ReceiveData(blockdata); long endtime = CommonClass.getCurrentTime(); MyLineLog.redLog(TAG, "解析数据用时:" + (endtime - starttime) + "ms"); readdataoffset += blockdatasize; blockdata = null; } else if(blockdatasize < 10240) {//小于10k,则属于正常包 MyLineLog.redLog(TAG, "块数据大小:" + blockdatasize + ",小于10k,说明数据不完整,继续获取。"); //将未解析数据存到临时buffer int IncompleteSize = buffersize - readdataoffset; if(IncompleteSize > 0) { byte[] Incompletedata = new byte[IncompleteSize]; System.arraycopy(readdata, readdataoffset, Incompletedata, 0, IncompleteSize); bab.clear(); bab.append(Incompletedata, 0, IncompleteSize); parsedata = false; Incompletedata = null; } } else {//异常包 MyLineLog.yellowLog(TAG, "块数据错误大小:" + blockdatasize); MyLineLog.redLog(TAG,"blockdatasize error:" + blockdatasize); ret = true; break; } } } if(parsedata) { ret = true; break; } } else if(readsize == -1) { ret = false; break; } else { ret = true; break; } } catch (IOException e) { MyLineLog.redLog(TAG, "aSocketChannel IOException=>" + e.toString()); ret = false; break; } } bab.clear(); bab = null; return ret; }
如果数据量过大的话,还会使用压缩方法进行传输,那应该如何接收呢,下面是一段接收压缩数据的方法:
private void ReceiveData(byte[] aDataBlock) { try { MyLineLog.redLog(TAG, "ReceiveData:" + mSendMsgTime); if (mSendMsgTime != 0) { mSendMsgTime = 0; } byte[] ret = null; int offset = 0; byte datatype = aDataBlock[offset]; offset += 1; if (datatype != -1) { if (datatype == PushUtils.PACKAGETYPE_HEARTBEAT) { ret = new byte[] { datatype }; } else if (datatype == PushUtils.PACKAGETYPE_HEARTBEAR_NODATA) { ret = new byte[] { datatype }; } else if (datatype == PushUtils.PACKAGETYPE_NORMAL || datatype == PushUtils.PACKAGETYPE_HEARTBEAR_HAVEDATA) { byte[] databytelength = new byte[4]; System.arraycopy(aDataBlock, offset, databytelength, 0, 4); offset += 4; int header = CommonClass.bytes2int(CommonClass.LitteEndian_BigEndian(databytelength)); databytelength = null; if (header == PushUtils.PACKAGEHEADER) { byte[] datalengthbyte = new byte[4]; System.arraycopy(aDataBlock, offset, datalengthbyte, 0, 4); offset += 4; int datalength = CommonClass.bytes2int(CommonClass.LitteEndian_BigEndian(datalengthbyte)); datalengthbyte = null; if (datalength > 4) { // compressed bit 暂时不压缩 byte compressed = aDataBlock[offset]; offset += 1; if (compressed == 1) {//解压缩 //跳过头4个字节,此处用于解压缩后的数据大小,暂时不需要 offset += 4; int contentlength = datalength - 1 - 4; byte[] datacontentbyte = new byte[contentlength]; System.arraycopy(aDataBlock, offset, datacontentbyte, 0, contentlength); offset += contentlength; byte[] compressdata = new byte[contentlength - 4]; System.arraycopy(datacontentbyte, 0, compressdata, 0, contentlength - 4); long starttime = CommonClass.getCurrentTime(); byte[] decompressdatacontentbyte = CommonClass.decompress(compressdata); long endtime = CommonClass.getCurrentTime(); MyLineLog.redLog(TAG, "解压缩数据用时:" + (endtime - starttime) + "ms"); int decompressdatacontentbytelength = decompressdatacontentbyte.length; compressdata = null; int footer = PushUtils.getInt(datacontentbyte, contentlength - 4); if (footer == PushUtils.PACKAGEFOOTER) { ret = new byte[decompressdatacontentbytelength + 1]; ret[0] = datatype; System.arraycopy(decompressdatacontentbyte, 0, ret, 1, decompressdatacontentbytelength); datacontentbyte = null; decompressdatacontentbyte = null; } } else {//数据未压缩 int contentlength = datalength - 1; byte[] datacontentbyte = new byte[contentlength]; System.arraycopy(aDataBlock, offset, datacontentbyte, 0, contentlength); offset += contentlength; int footer = PushUtils.getInt(datacontentbyte, contentlength - 4); if (footer == PushUtils.PACKAGEFOOTER) { ret = new byte[contentlength + 1 - 4]; ret[0] = datatype; System.arraycopy(datacontentbyte, 0, ret, 1, contentlength - 4); datacontentbyte = null; } } } } } if (mSocketEventListener != null) { mSocketEventListener.OnStreamComing(ret); } } } catch (Exception e) { MyLineLog.redLog(TAG + " - ReceiveData error", e.toString()); } }
在介绍SocketChannel的时候,api提到关闭需要注意事项,下面一段关闭SocketChannel的示例代码:
public void closeSocket() { mRunRead = false; if (mReadThread != null) { if (!mReadThread.isInterrupted()) { mReadThread.interrupt(); mReadThread = null; } } if (mSelector != null && mSelector.isOpen()) { try { mSelector.close(); } catch (IOException e) { MyLineLog.redLog(TAG + " - closeSocket error", e.toString()); } mSelector = null; } if (client != null) { try { client.close(); client = null; } catch (IOException e) { MyLineLog.redLog(TAG + " - closeSocket2 error", e.toString()); } } System.gc(); }
这篇文章讲解部分大量参照JavaApi,其实很多问题的答案就在Api里面,当你不知道如何去做的时候,回头看一下Api,仔细思考一下,就能解决大部分问题。
Ps:感谢我大学舍友阿钢为我提供的代码
评论(0)