c#网络通信框架networkcomms内核解析之十一 TCP连接与UDP连接
时间:2015-02-18 09:27:08
收藏:0
阅读:4435
连接是通信的核心
客户端一般只会有一个连接
服务器端会维护成千上万的连接
在服务器端连接的维护工作是由NetworkComms静态类来完成的,当有新的客户端请求,服务器上会创建相应的连接,并把连接注册到NetworkComms静态类中。当连接断开后,NetworkComms通信框架会自动把相应连接的引用从NetworkComms静态类中删除。
连接的类图:
在V3以上版本中,数据监听部分已从Connnection类中提取出去成为一个单独的类: TCPConnectionListener ,使得结构更加的清晰。
V2.3.1中代码如下:
// Copyright 2011-2013 Marc Fletcher, Matthew Dean // // This program is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // // This program is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // // You should have received a copy of the GNU General Public License // along with this program. If not, see <http://www.gnu.org/licenses/>. // // A commercial license of this software can also be purchased. // Please see <http://www.networkcomms.net/licensing/> for details. using System; using System.Collections.Generic; using System.Text; using System.Net; using System.Threading; using DPSBase; using System.Net.Sockets; using System.IO; namespace NetworkCommsDotNet { /// <summary> /// Connection对象 这个类是TcpConnection和 UDPConnnection连接类的父类 /// Connection由以下五个文件组成 大家注意到每个类前面都有个 partial关键字 /// ConnectionCreate.cs <1> /// ConnectionDelegatesHandlers.cs <2> /// ConnectionIncomingData.cs <3> /// ConnectionSendClose.cs <4> /// ConnectionStatic.cs <5> /// </summary> public abstract partial class Connection { /// <summary> /// 当前连接的连接信息类 /// </summary> public ConnectionInfo ConnectionInfo { get; protected set; } /// 一个 manualResetEvent信号 用来处理连接的创建 setup protected ManualResetEvent connectionSetupWait = new ManualResetEvent(false); /// <summary> /// 一个 manualResetEvent信号 用来处理连接的创建 establish. /// </summary> protected ManualResetEvent connectionEstablishWait = new ManualResetEvent(false); /// <summary> /// 连接创建是否异常 /// </summary> protected bool connectionSetupException = false; /// <summary> /// 连接床架异常相关的信息 /// </summary> protected string connectionSetupExceptionStr = ""; /// <summary> ///创建一个连接对象 /// </summary> /// <param name="connectionInfo">连接信息类</param> /// <param name="defaultSendReceiveOptions">默认的收发参数</param> protected Connection(ConnectionInfo connectionInfo, SendReceiveOptions defaultSendReceiveOptions) { //创建一个方差类 这个是数学模型 SendTimesMSPerKBCache = new CommsMath(); //默认的数据缓冲区大小 dataBuffer = new byte[NetworkComms.ReceiveBufferSizeBytes]; //实例化一个数据包创建器 //PacketBuilder用来保存连接上收到的二进制数据 //他的模型是这样的,连接上收到的第一个字节中的数据,对应的是数据包包头的长度,根据第一个字节中的数据,解析出数据包包头, //然后根据数据包包头中的信息,信息包括数据包长度,再解析出数据包,然后把数据包交给相应的处理器进行处理 packetBuilder = new PacketBuilder(); //初始化一个顺序号 之后每发送一个数据包顺序后都加1 所以没有包的顺序号都是唯一的 packetSequenceCounter = Interlocked.Increment(ref NetworkComms.totalPacketSendCount); ConnectionInfo = connectionInfo; if (defaultSendReceiveOptions != null) ConnectionDefaultSendReceiveOptions = defaultSendReceiveOptions; else //如果没有默认的收发参数 则使用NetworkComms静态类中的默认收发参数 //默认收发参数使用protobuf作为序列化器,没有启用加密和压缩处理器 ConnectionDefaultSendReceiveOptions = NetworkComms.DefaultSendReceiveOptions; if (NetworkComms.commsShutdown) throw new ConnectionSetupException("Attempting to create new connection after global comms shutdown has been initiated."); if (ConnectionInfo.ConnectionType == ConnectionType.Undefined || ConnectionInfo.RemoteEndPoint == null) throw new ConnectionSetupException("ConnectionType and RemoteEndPoint must be defined within provided ConnectionInfo."); //If a connection already exists with this info then we can throw an exception here to prevent duplicates if (NetworkComms.ConnectionExists(connectionInfo.RemoteEndPoint, connectionInfo.ConnectionType)) throw new ConnectionSetupException("A connection already exists with " + ConnectionInfo); //添加连接到NetworkComms静态类中 //比如说服务器的话,会把监听到的连接都添加一个引用到NetworkComms静态类中 //如果某个连接断开了,networkComms静态类中,也会相应的删除这个连接。 NetworkComms.AddConnectionByReferenceEndPoint(this); } /// <summary> ///创建这个连接 /// </summary> public void EstablishConnection() { try { bool connectionAlreadyEstablishing = false; lock (delegateLocker) { if (ConnectionInfo.ConnectionState == ConnectionState.Established) return; else if (ConnectionInfo.ConnectionState == ConnectionState.Shutdown) throw new ConnectionSetupException("Attempting to re-establish a closed connection. Please create a new connection instead."); else if (ConnectionInfo.ConnectionState == ConnectionState.Establishing) connectionAlreadyEstablishing = true; else ConnectionInfo.NoteStartConnectionEstablish(); } if (connectionAlreadyEstablishing) { if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Trace("Waiting for connection with " + ConnectionInfo + " to be established."); if (!WaitForConnectionEstablish(NetworkComms.ConnectionEstablishTimeoutMS)) throw new ConnectionSetupException("Timeout waiting for connection to be succesfully established."); } else { if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Trace("Establishing new connection with " + ConnectionInfo); //这是个virtual方法,具体工作交给子类去做,TCPConnection类负责建立Tcp连接 UDPConnection类负责建UDP连接 EstablishConnectionSpecific(); if (ConnectionInfo.ConnectionState == ConnectionState.Shutdown) throw new ConnectionSetupException("Connection was closed during establish handshake."); if (ConnectionInfo.NetworkIdentifier == ShortGuid.Empty) throw new ConnectionSetupException("Remote network identifier should have been set by this point."); //Once the above has been done the last step is to allow other threads to use the connection ConnectionInfo.NoteCompleteConnectionEstablish(); NetworkComms.AddConnectionReferenceByIdentifier(this); connectionEstablishWait.Set(); if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Trace(" ... connection succesfully established with " + ConnectionInfo); } } catch (SocketException e) { //If anything goes wrong we close the connection. CloseConnection(true, 43); throw new ConnectionSetupException(e.ToString()); } catch (Exception ex) { //If anything goes wrong we close the connection. CloseConnection(true, 44); //For some odd reason not all SocketExceptions get caught above, so another check here if (ex.GetBaseException().GetType() == typeof(SocketException)) throw new ConnectionSetupException(ex.ToString()); else throw; } } /// <summary> /// Any connection type specific establish tasks. Base should be called to trigger connection establish delegates /// </summary> protected virtual void EstablishConnectionSpecific() { //Call asynchronous connection establish delegates here if (NetworkComms.globalConnectionEstablishDelegatesAsync != null) { NetworkComms.CommsThreadPool.EnqueueItem(QueueItemPriority.Normal, new WaitCallback((obj) => { Connection connectionParam = obj as Connection; NetworkComms.globalConnectionEstablishDelegatesAsync(connectionParam); }), this); } //Call synchronous connection establish delegates here if (NetworkComms.globalConnectionEstablishDelegatesSync != null) NetworkComms.globalConnectionEstablishDelegatesSync(this); } /// <summary> /// Return true if the connection is established within the provided timeout, otherwise false /// </summary> /// <param name="waitTimeoutMS">Wait time in milliseconds before returning</param> /// <returns>True if the wait was triggered, false otherwise after the provided timeout.</returns> protected bool WaitForConnectionEstablish(int waitTimeoutMS) { if (ConnectionInfo.ConnectionState == ConnectionState.Established) return true; else { if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Trace("Waiting for new connection to be succesfully established before continuing with " + ConnectionInfo); if (ConnectionInfo.ConnectionState == ConnectionState.Shutdown) throw new ConnectionShutdownException("Attempted to wait for connection establish on a connection that is already shutdown."); return connectionSetupWait.WaitOne(waitTimeoutMS); } } /// <summary> /// Handle an incoming ConnectionSetup packet type /// </summary> /// <param name="packetDataSection">Serialised handshake data</param> internal void ConnectionSetupHandler(MemoryStream packetDataSection) { //We should never be trying to handshake an established connection ConnectionInfo remoteConnectionInfo = NetworkComms.InternalFixedSendReceiveOptions.DataSerializer.DeserialiseDataObject<ConnectionInfo>(packetDataSection, NetworkComms.InternalFixedSendReceiveOptions.DataProcessors, NetworkComms.InternalFixedSendReceiveOptions.Options); if (ConnectionInfo.ConnectionType != remoteConnectionInfo.ConnectionType) { connectionSetupException = true; connectionSetupExceptionStr = "Remote connectionInfo provided connectionType did not match expected connection type."; } else { //We use the following bool to track a possible existing connection which needs closing bool possibleClashConnectionWithPeer_ByEndPoint = false; Connection existingConnection = null; //We first try to establish everything within this lock in one go //If we can‘t quite complete the establish we have to come out of the lock at try to sort the problem bool connectionEstablishedSuccess = ConnectionSetupHandlerFinal(remoteConnectionInfo, ref possibleClashConnectionWithPeer_ByEndPoint, ref existingConnection); //If we were not succesfull at establishing the connection we need to sort it out! if (!connectionEstablishedSuccess && !connectionSetupException) { if (existingConnection == null) throw new Exception("Connection establish issues and existingConnection was left as null."); if (possibleClashConnectionWithPeer_ByEndPoint) { //If we have a clash by endPoint we test the existing connection if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Debug("Existing connection with " + ConnectionInfo + ". Testing existing connection."); if (existingConnection.ConnectionAlive(1000)) { //If the existing connection comes back as alive we don‘t allow this one to go any further //This might happen if two peers try to connect to each other at the same time connectionSetupException = true; connectionSetupExceptionStr = " ... existing live connection at provided end point for this connection (" + ConnectionInfo + "), there should be no need for a second."; } } //We only try again if we did not log an exception if (!connectionSetupException) { //Once we have tried to sort the problem we can try to finish the establish one last time connectionEstablishedSuccess = ConnectionSetupHandlerFinal(remoteConnectionInfo, ref possibleClashConnectionWithPeer_ByEndPoint, ref existingConnection); //If we still failed then that‘s it for this establish if (!connectionEstablishedSuccess && !connectionSetupException) { connectionSetupException = true; connectionSetupExceptionStr = "Attempted to establish conneciton with " + ConnectionInfo + ", but due to an existing connection this was not possible."; } } } } //Trigger any setup waits connectionSetupWait.Set(); } /// <summary> /// Attempts to complete the connection establish with a minimum of locking to prevent possible deadlocking /// </summary> /// <param name="remoteConnectionInfo"><see cref="ConnectionInfo"/> corresponding with remoteEndPoint</param> /// <param name="possibleClashConnectionWithPeer_ByEndPoint">True if a connection already exists with provided remoteEndPoint</param> /// <param name="existingConnection">A reference to an existing connection if it exists</param> /// <returns>True if connection is successfully setup, otherwise false</returns> private bool ConnectionSetupHandlerFinal(ConnectionInfo remoteConnectionInfo, ref bool possibleClashConnectionWithPeer_ByEndPoint, ref Connection existingConnection) { lock (NetworkComms.globalDictAndDelegateLocker) { Connection connectionByEndPoint = NetworkComms.GetExistingConnection(ConnectionInfo.RemoteEndPoint, ConnectionInfo.ConnectionType); //If we no longer have the original endPoint reference (set in the constructor) then the connection must have been closed already if (connectionByEndPoint == null) { connectionSetupException = true; connectionSetupExceptionStr = "Connection setup received after connection closure with " + ConnectionInfo; } else { //We need to check for a possible GUID clash //Probability of a clash is approx 0.1% if 1E19 connection are maintained simultaneously (This many connections has not be tested ;)) //but hey, we live in a crazy world! if (remoteConnectionInfo.NetworkIdentifier == NetworkComms.NetworkIdentifier) { connectionSetupException = true; connectionSetupExceptionStr = "Remote peer has same network idendifier to local, " + remoteConnectionInfo.NetworkIdentifier + ". A real duplication is vanishingly improbable so this exception has probably been thrown because the local and remote application are the same."; } else if (connectionByEndPoint != this) { possibleClashConnectionWithPeer_ByEndPoint = true; existingConnection = connectionByEndPoint; } else { //Update the connection info //We never change the this.ConnectionInfo.RemoteEndPoint.Address as there might be NAT involved //We may update the port however IPEndPoint newRemoteIPEndPoint = new IPEndPoint(this.ConnectionInfo.RemoteEndPoint.Address, remoteConnectionInfo.LocalEndPoint.Port); NetworkComms.UpdateConnectionReferenceByEndPoint(this, newRemoteIPEndPoint); ConnectionInfo.UpdateInfoAfterRemoteHandshake(remoteConnectionInfo, newRemoteIPEndPoint); return true; } } } return false; } /// <summary> /// Returns ConnectionInfo.ToString /// </summary> /// <returns></returns> public override string ToString() { return ConnectionInfo.ToString(); } } }
// Copyright 2011-2013 Marc Fletcher, Matthew Dean // // This program is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // // This program is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // // You should have received a copy of the GNU General Public License // along with this program. If not, see <http://www.gnu.org/licenses/>. // // A commercial license of this software can also be purchased. // Please see <http://www.networkcomms.net/licensing/> for details. using System; using System.Collections.Generic; using System.Text; using System.Net; using System.Threading; using DPSBase; using System.Net.Sockets; using System.IO; namespace NetworkCommsDotNet { /// <summary> /// Connection对象 这个类是TcpConnection和 UDPConnnection连接类的父类 /// Connection由以下五个文件组成 大家注意到每个类前面都有个 partial关键字 /// ConnectionCreate.cs <1> /// ConnectionDelegatesHandlers.cs <2> /// ConnectionIncomingData.cs <3> /// ConnectionSendClose.cs <4> /// ConnectionStatic.cs <5> /// </summary> public abstract partial class Connection { /// <summary> ///线程安全锁 /// </summary> protected object delegateLocker = new object(); /// <summary> /// 默认的收发参数 /// </summary> public SendReceiveOptions ConnectionDefaultSendReceiveOptions { get; protected set; } /// <summary> /// 一个与连接关闭相关的委托 /// </summary> private NetworkComms.ConnectionEstablishShutdownDelegate ConnectionSpecificShutdownDelegate { get; set; } /// <summary> /// By default all incoming objects are handled using ConnectionDefaultSendReceiveOptions. Should the user want something else /// those settings are stored here /// </summary> private Dictionary<string, PacketTypeUnwrapper> incomingPacketUnwrappers = new Dictionary<string, PacketTypeUnwrapper>(); /// <summary> /// A connection specific incoming packet handler dictionary. These are called before any global handlers /// </summary> private Dictionary<string, List<IPacketTypeHandlerDelegateWrapper>> incomingPacketHandlers = new Dictionary<string, List<IPacketTypeHandlerDelegateWrapper>>(); /// <summary> /// Returns the <see cref="SendReceiveOptions"/> to be used for the provided <see cref="PacketHeader"/>. Ensures there will not be a serializer / data processor clash for different delegate levels. /// </summary> /// <param name="header">The <see cref="PacketHeader"/> options are desired.</param> /// <returns>The requested <see cref="SendReceiveOptions"/></returns> private SendReceiveOptions IncomingPacketSendReceiveOptions(PacketHeader header) { //Are there connection specific or global packet handlers? bool connectionSpecificHandlers = false; lock (delegateLocker) connectionSpecificHandlers = incomingPacketHandlers.ContainsKey(header.PacketType); bool globalHandlers = NetworkComms.GlobalIncomingPacketHandlerExists(header.PacketType); //Get connection specific options for this packet type, if there arn‘t any use the connection default options SendReceiveOptions connectionSpecificOptions = PacketTypeUnwrapperOptions(header.PacketType); if (connectionSpecificOptions == null) connectionSpecificOptions = ConnectionDefaultSendReceiveOptions; //Get global options for this packet type, if there arn‘t any use the global default options SendReceiveOptions globalOptions = NetworkComms.GlobalPacketTypeUnwrapperOptions(header.PacketType); if (globalOptions == null) globalOptions = NetworkComms.DefaultSendReceiveOptions; if (connectionSpecificHandlers && globalHandlers) { if (!connectionSpecificOptions.OptionsCompatible(globalOptions)) throw new PacketHandlerException("Attempted to determine correct sendReceiveOptions for packet of type ‘" + header.PacketType + "‘. Unable to continue as connection specific and global sendReceiveOptions are not equal."); //We need to combine options in this case using the connection specific option in preference if both are present var combinedOptions = new Dictionary<string, string>(globalOptions.Options); foreach (var pair in connectionSpecificOptions.Options) combinedOptions[pair.Key] = pair.Value; //If the header specifies a serializer and data processors we will autodetect those if (header.ContainsOption(PacketHeaderLongItems.SerializerProcessors)) { DataSerializer serializer; List<DataProcessor> dataProcessors; DPSManager.GetSerializerDataProcessorsFromIdentifier(header.GetOption(PacketHeaderLongItems.SerializerProcessors), out serializer, out dataProcessors); return new SendReceiveOptions(serializer, dataProcessors, combinedOptions); } //Otherwise we will use options that were specified return new SendReceiveOptions(connectionSpecificOptions.DataSerializer, connectionSpecificOptions.DataProcessors, combinedOptions); } else if (connectionSpecificHandlers) { //If the header specifies a serializer and data processors we will autodetect those if (header.ContainsOption(PacketHeaderLongItems.SerializerProcessors)) { DataSerializer serializer; List<DataProcessor> dataProcessors; DPSManager.GetSerializerDataProcessorsFromIdentifier(header.GetOption(PacketHeaderLongItems.SerializerProcessors), out serializer, out dataProcessors); return new SendReceiveOptions(serializer, dataProcessors, connectionSpecificOptions.Options); } return connectionSpecificOptions; } else { //If the header specifies a serializer and data processors we will autodetect those if (header.ContainsOption(PacketHeaderLongItems.SerializerProcessors)) { DataSerializer serializer; List<DataProcessor> dataProcessors; DPSManager.GetSerializerDataProcessorsFromIdentifier(header.GetOption(PacketHeaderLongItems.SerializerProcessors), out serializer, out dataProcessors); return new SendReceiveOptions(serializer, dataProcessors, globalOptions.Options); } //If just globalHandlers is set (or indeed no handlers atall we just return the global options return globalOptions; } } /// <summary> /// Trigger connection specific packet delegates with the provided parameters. Returns true if connection specific handlers were executed. /// </summary> /// <param name="packetHeader">The packetHeader for which all delegates should be triggered with</param> /// <param name="incomingObjectBytes">The serialised and or compressed bytes to be used</param> /// <param name="options">The incoming sendReceiveOptions to use overriding defaults</param> /// <returns>Returns true if connection specific handlers were executed.</returns> public bool TriggerSpecificPacketHandlers(PacketHeader packetHeader, MemoryStream incomingObjectBytes, SendReceiveOptions options) { try { if (packetHeader == null) throw new ArgumentNullException("packetHeader", "Provided PacketHeader cannot not be null."); if (incomingObjectBytes == null) throw new ArgumentNullException("incomingObjectBytes", "Provided MemoryStream cannot not be null for packetType " + packetHeader.PacketType); if (options == null) throw new ArgumentNullException("options", "Provided SendReceiveOptions cannot not be null for packetType " + packetHeader.PacketType); //We take a copy of the handlers list incase it is modified outside of the lock List<IPacketTypeHandlerDelegateWrapper> handlersCopy = null; lock (delegateLocker) if (incomingPacketHandlers.ContainsKey(packetHeader.PacketType)) handlersCopy = new List<IPacketTypeHandlerDelegateWrapper>(incomingPacketHandlers[packetHeader.PacketType]); if (handlersCopy == null) //If we have received an unknown packet type we ignore them on this connection specific level and just finish here return false; else { //Idiot check if (handlersCopy.Count == 0) throw new PacketHandlerException("An entry exists in the packetHandlers list but it contains no elements. This should not be possible."); //Deserialise the object only once object returnObject = handlersCopy[0].DeSerialize(incomingObjectBytes, options); //Pass the data onto the handler and move on. if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Trace(" ... passing completed data packet to selected connection specific handlers."); //Pass the object to all necessary delgates //We need to use a copy because we may modify the original delegate list during processing foreach (IPacketTypeHandlerDelegateWrapper wrapper in handlersCopy) { try { wrapper.Process(packetHeader, this, returnObject); } catch (Exception ex) { if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Fatal("An unhandled exception was caught while processing a packet handler for a packet type ‘" + packetHeader.PacketType + "‘. Make sure to catch errors in packet handlers. See error log file for more information."); NetworkComms.LogError(ex, "PacketHandlerErrorSpecific_" + packetHeader.PacketType); } } } } catch (Exception ex) { //If anything goes wrong here all we can really do is log the exception if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Fatal("An exception occured in TriggerPacketHandler() for a packet type ‘" + packetHeader.PacketType + "‘. See error log file for more information."); NetworkComms.LogError(ex, "PacketHandlerErrorSpecific_" + packetHeader.PacketType); } return true; } /// <summary> /// Returns the packet type sendReceiveOptions possibly used to unwrap incoming data. If no specific options are registered returns null /// </summary> /// <param name="packetTypeStr">The packet type for which the <see cref="SendReceiveOptions"/> are required.</param> /// <returns>The requested <see cref="SendReceiveOptions"/> otherwise null</returns> public SendReceiveOptions PacketTypeUnwrapperOptions(string packetTypeStr) { SendReceiveOptions options = null; //If we find a global packet unwrapper for this packetType we used those options lock (delegateLocker) { if (incomingPacketUnwrappers.ContainsKey(packetTypeStr)) options = incomingPacketUnwrappers[packetTypeStr].Options; } return options; } /// <summary> /// Append a connection specific packet handler /// </summary> /// <typeparam name="T">The type of incoming object</typeparam> /// <param name="packetTypeStr">The packet type for which this handler will be executed</param> /// <param name="packetHandlerDelgatePointer">The delegate to be executed when a packet of packetTypeStr is received</param> /// <param name="options">The <see cref="SendReceiveOptions"/> to be used for the provided packet type</param> public void AppendIncomingPacketHandler<T>(string packetTypeStr, NetworkComms.PacketHandlerCallBackDelegate<T> packetHandlerDelgatePointer, SendReceiveOptions options) { if (packetTypeStr == null) throw new ArgumentNullException("packetTypeStr", "Provided packetType string cannot be null."); if (packetHandlerDelgatePointer == null) throw new ArgumentNullException("packetHandlerDelgatePointer", "Provided NetworkComms.PacketHandlerCallBackDelegate<T> cannot be null."); if (options == null) throw new ArgumentNullException("options", "Provided SendReceiveOptions cannot be null."); lock (delegateLocker) { if (incomingPacketUnwrappers.ContainsKey(packetTypeStr)) { //Make sure if we already have an existing entry that it matches with the provided if (!incomingPacketUnwrappers[packetTypeStr].Options.OptionsCompatible(options)) throw new PacketHandlerException("The proivded SendReceiveOptions are not compatible with existing SendReceiveOptions already specified for this packetTypeStr."); } else incomingPacketUnwrappers.Add(packetTypeStr, new PacketTypeUnwrapper(packetTypeStr, options)); //Ad the handler to the list if (incomingPacketHandlers.ContainsKey(packetTypeStr)) { //Make sure we avoid duplicates PacketTypeHandlerDelegateWrapper<T> toCompareDelegate = new PacketTypeHandlerDelegateWrapper<T>(packetHandlerDelgatePointer); bool delegateAlreadyExists = false; foreach (var handler in incomingPacketHandlers[packetTypeStr]) { if (handler == toCompareDelegate) { delegateAlreadyExists = true; break; } } if (delegateAlreadyExists) throw new PacketHandlerException("This specific packet handler delegate already exists for the provided packetTypeStr."); incomingPacketHandlers[packetTypeStr].Add(new PacketTypeHandlerDelegateWrapper<T>(packetHandlerDelgatePointer)); } else incomingPacketHandlers.Add(packetTypeStr, new List<IPacketTypeHandlerDelegateWrapper>() { new PacketTypeHandlerDelegateWrapper<T>(packetHandlerDelgatePointer) }); if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Info("Added connection specific incoming packetHandler for ‘" + packetTypeStr + "‘ packetType with " + ConnectionInfo); } } /// <summary> /// Returns true if a packet handler exists for the provided packet type, on this connection /// </summary> /// <param name="packetTypeStr">The packet type for which to check incoming packet handlers</param> /// <returns>True if a packet handler exists</returns> public bool IncomingPacketHandlerExists(string packetTypeStr) { lock (delegateLocker) return incomingPacketHandlers.ContainsKey(packetTypeStr); } /// <summary> /// Returns true if the provided packet handler has been added for the provided packet type, on this connection. /// </summary> /// <param name="packetTypeStr">The packet type within which to check packet handlers</param> /// <param name="packetHandlerDelgatePointer">The packet handler to look for</param> /// <returns>True if a global packet handler exists for the provided packetType</returns> public bool IncomingPacketHandlerExists(string packetTypeStr, Delegate packetHandlerDelgatePointer) { lock (delegateLocker) { if (incomingPacketHandlers.ContainsKey(packetTypeStr)) { foreach (var handler in incomingPacketHandlers[packetTypeStr]) if (handler.EqualsDelegate(packetHandlerDelgatePointer)) return true; } } return false; } /// <summary> /// Remove the provided delegate for the specified packet type /// </summary> /// <param name="packetTypeStr">Packet type for which this delegate should be removed</param> /// <param name="packetHandlerDelgatePointer">The delegate to remove</param> public void RemoveIncomingPacketHandler(string packetTypeStr, Delegate packetHandlerDelgatePointer) { lock (delegateLocker) { if (incomingPacketHandlers.ContainsKey(packetTypeStr)) { //Remove any instances of this handler from the delegates //The bonus here is if the delegate has not been added we continue quite happily IPacketTypeHandlerDelegateWrapper toRemove = null; foreach (var handler in incomingPacketHandlers[packetTypeStr]) { if (handler.EqualsDelegate(packetHandlerDelgatePointer)) { toRemove = handler; break; } } if (toRemove != null) incomingPacketHandlers[packetTypeStr].Remove(toRemove); if (incomingPacketHandlers[packetTypeStr] == null || incomingPacketHandlers[packetTypeStr].Count == 0) { incomingPacketHandlers.Remove(packetTypeStr); //Remove any entries in the unwrappers dict as well as we are done with this packetTypeStr if (incomingPacketHandlers.ContainsKey(packetTypeStr)) incomingPacketHandlers.Remove(packetTypeStr); if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Info("Removed a connection specific packetHandler for ‘" + packetTypeStr + "‘ packetType. No handlers remain with " + ConnectionInfo); } else if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Info("Removed a connection specific packetHandler for ‘" + packetTypeStr + "‘ packetType. Handlers remain with " + ConnectionInfo); } } } /// <summary> /// Removes all delegates for the provided packet type /// </summary> /// <param name="packetTypeStr">Packet type for which all delegates should be removed</param> public void RemoveIncomingPacketHandler(string packetTypeStr) { lock (delegateLocker) { //We don‘t need to check for potentially removing a critical reserved packet handler here because those cannot be removed. if (incomingPacketHandlers.ContainsKey(packetTypeStr)) { incomingPacketHandlers.Remove(packetTypeStr); if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Info("Removed all connection specific incoming packetHandlers for ‘" + packetTypeStr + "‘ packetType with " + ConnectionInfo); } } } /// <summary> /// Removes all delegates for all packet types /// </summary> public void RemoveIncomingPacketHandler() { lock (delegateLocker) { incomingPacketHandlers = new Dictionary<string, List<IPacketTypeHandlerDelegateWrapper>>(); if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Info("Removed all connection specific incoming packetHandlers for all packetTypes with " + ConnectionInfo); } } /// <summary> /// Add a connection specific shutdown delegate /// </summary> /// <param name="handlerToAppend">The delegate to call when a connection is shutdown</param> public void AppendShutdownHandler(NetworkComms.ConnectionEstablishShutdownDelegate handlerToAppend) { lock (delegateLocker) { if (ConnectionSpecificShutdownDelegate == null) ConnectionSpecificShutdownDelegate = handlerToAppend; else ConnectionSpecificShutdownDelegate += handlerToAppend; if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Debug("Added a connection specific shutdown delegate to connection with " + ConnectionInfo); } } /// <summary> /// Remove a connection specific shutdown delegate. /// </summary> /// <param name="handlerToRemove">The delegate to remove for shutdown events</param> public void RemoveShutdownHandler(NetworkComms.ConnectionEstablishShutdownDelegate handlerToRemove) { lock (delegateLocker) { ConnectionSpecificShutdownDelegate -= handlerToRemove; if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Debug("Removed ConnectionSpecificShutdownDelegate to connection with " + ConnectionInfo); if (ConnectionSpecificShutdownDelegate == null) { if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Info("No handlers remain for ConnectionSpecificShutdownDelegate with " + ConnectionInfo); } else { if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Info("Handlers remain for ConnectionSpecificShutdownDelegate with " + ConnectionInfo); } } } } }
// Copyright 2011-2013 Marc Fletcher, Matthew Dean // // This program is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // // This program is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // // You should have received a copy of the GNU General Public License // along with this program. If not, see <http://www.gnu.org/licenses/>. // // A commercial license of this software can also be purchased. // Please see <http://www.networkcomms.net/licensing/> for details. using System; using System.Collections.Generic; using System.Text; using System.Net; using System.Threading; using DPSBase; using System.Net.Sockets; using System.IO; #if WINDOWS_PHONE using QueueItemPriority = Windows.System.Threading.WorkItemPriority; #else using QueueItemPriority = System.Threading.ThreadPriority; #endif namespace NetworkCommsDotNet { /// <summary> /// Connection对象 这个类是TcpConnection和 UDPConnnection连接类的父类 /// Connection由以下五个文件组成 大家注意到每个类前面都有个 partial关键字 /// ConnectionCreate.cs <1> /// ConnectionDelegatesHandlers.cs <2> /// ConnectionIncomingData.cs <3> /// ConnectionSendClose.cs <4> /// ConnectionStatic.cs <5> /// </summary> /// 这部分负责接收数据,并交个networkcomms的优先级线程池处理 public abstract partial class Connection { /// <summary> /// The <see cref="PacketBuilder"/> for this connection /// </summary> protected PacketBuilder packetBuilder; /// <summary> /// The current incoming data buffer /// </summary> protected byte[] dataBuffer; /// <summary> /// The total bytes read so far within dataBuffer /// </summary> protected int totalBytesRead; /// <summary> /// The thread listening for incoming data should we be using synchronous methods. /// </summary> protected Thread incomingDataListenThread = null; /// <summary> /// A connection specific method which triggers any requisites for accepting incoming data /// </summary> protected abstract void StartIncomingDataListen(); /// <summary> /// Attempts to use the data provided in packetBuilder to recreate something usefull. If we don‘t have enough data yet that value is set in packetBuilder. /// </summary> /// <param name="packetBuilder">The <see cref="PacketBuilder"/> containing incoming cached data</param> protected void IncomingPacketHandleHandOff(PacketBuilder packetBuilder) { try { if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Trace(" ... checking for completed packet with " + packetBuilder.TotalBytesCached.ToString() + " bytes read."); if (packetBuilder.TotalPartialPacketCount == 0) throw new Exception("Executing IncomingPacketHandleHandOff when no packets exist in packetbuilder."); //Loop until we are finished with this packetBuilder int loopCounter = 0; while (true) { //If we have ended up with a null packet at the front, probably due to some form of concatentation we can pull it off here //It is possible we have concatenation of several null packets along with real data so we loop until the firstByte is greater than 0 if (packetBuilder.FirstByte() == 0) { if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Trace(" ... null packet removed in IncomingPacketHandleHandOff() from " + ConnectionInfo + ", loop index - " + loopCounter.ToString()); packetBuilder.ClearNTopBytes(1); //Reset the expected bytes to 0 so that the next check starts from scratch packetBuilder.TotalBytesExpected = 0; //If we have run out of data completely then we can return immediately if (packetBuilder.TotalBytesCached == 0) return; } else { //First determine the expected size of a header packet int packetHeaderSize = packetBuilder.FirstByte() + 1; //Do we have enough data to build a header? if (packetBuilder.TotalBytesCached < packetHeaderSize) { if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Trace(" ... ... more data required for complete packet header."); //Set the expected number of bytes and then return packetBuilder.TotalBytesExpected = packetHeaderSize; return; } //We have enough for a header PacketHeader topPacketHeader; using(MemoryStream headerStream = packetBuilder.ReadDataSection(1, packetHeaderSize - 1)) topPacketHeader = new PacketHeader(headerStream, NetworkComms.InternalFixedSendReceiveOptions); //Idiot test if (topPacketHeader.PacketType == null) throw new SerialisationException("packetType value in packetHeader should never be null"); //We can now use the header to establish if we have enough payload data //First case is when we have not yet received enough data if (packetBuilder.TotalBytesCached < packetHeaderSize + topPacketHeader.PayloadPacketSize) { if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Trace(" ... ... more data required for complete packet payload. Expecting " + (packetHeaderSize + topPacketHeader.PayloadPacketSize).ToString() + " total packet bytes."); //Set the expected number of bytes and then return packetBuilder.TotalBytesExpected = packetHeaderSize + topPacketHeader.PayloadPacketSize; return; } //Second case is we have enough data else if (packetBuilder.TotalBytesCached >= packetHeaderSize + topPacketHeader.PayloadPacketSize) { //We can either have exactly the right amount or even more than we were expecting //We may have too much data if we are sending high quantities and the packets have been concatenated //no problem!! SendReceiveOptions incomingPacketSendReceiveOptions = IncomingPacketSendReceiveOptions(topPacketHeader); if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Debug("Received packet of type ‘" + topPacketHeader.PacketType + "‘ from " + ConnectionInfo + ", containing " + packetHeaderSize.ToString() + " header bytes and " + topPacketHeader.PayloadPacketSize.ToString() + " payload bytes."); //If this is a reserved packetType we call the method inline so that it gets dealt with immediately bool isReservedType = false; foreach (var tName in NetworkComms.reservedPacketTypeNames) { //isReservedType |= topPacketHeader.PacketType == tName; if (topPacketHeader.PacketType == tName) { isReservedType = true; break; } } //Only reserved packet types get completed inline if (isReservedType) { #if WINDOWS_PHONE var priority = QueueItemPriority.Normal; #else var priority = (QueueItemPriority)Thread.CurrentThread.Priority; #endif PriorityQueueItem item = new PriorityQueueItem(priority, this, topPacketHeader, packetBuilder.ReadDataSection(packetHeaderSize, topPacketHeader.PayloadPacketSize), incomingPacketSendReceiveOptions); if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Trace(" ... handling packet type ‘" + topPacketHeader.PacketType + "‘ inline. Loop index - " + loopCounter.ToString()); NetworkComms.CompleteIncomingItemTask(item); } else { QueueItemPriority itemPriority = (incomingPacketSendReceiveOptions.Options.ContainsKey("ReceiveHandlePriority") ? (QueueItemPriority)Enum.Parse(typeof(QueueItemPriority), incomingPacketSendReceiveOptions.Options["ReceiveHandlePriority"]) : QueueItemPriority.Normal); PriorityQueueItem item = new PriorityQueueItem(itemPriority, this, topPacketHeader, packetBuilder.ReadDataSection(packetHeaderSize, topPacketHeader.PayloadPacketSize), incomingPacketSendReceiveOptions); //QueueItemPriority.Highest is the only priority that is executed inline #if !WINDOWS_PHONE if (itemPriority == QueueItemPriority.Highest) { if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Trace(" ... handling packet type ‘" + topPacketHeader.PacketType + "‘ with priority HIGHEST inline. Loop index - " + loopCounter.ToString()); NetworkComms.CompleteIncomingItemTask(item); } else { int threadId = NetworkComms.CommsThreadPool.EnqueueItem(item.Priority, NetworkComms.CompleteIncomingItemTask, item); if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Trace(" ... added completed " + item.PacketHeader.PacketType + " packet to thread pool (Q:" + NetworkComms.CommsThreadPool.QueueCount.ToString() + ", T:" + NetworkComms.CommsThreadPool.CurrentNumTotalThreads.ToString() + ", I:" + NetworkComms.CommsThreadPool.CurrentNumIdleThreads.ToString() + ") with priority " + itemPriority.ToString() + (threadId > 0 ? ". Selected threadId=" + threadId.ToString() : "") + ". Loop index=" + loopCounter.ToString() + "."); } #else int threadId = NetworkComms.CommsThreadPool.EnqueueItem(item.Priority, NetworkComms.CompleteIncomingItemTask, item); if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Trace(" ... added completed " + item.PacketHeader.PacketType + " packet to thread pool (Q:" + NetworkComms.CommsThreadPool.QueueCount.ToString() + ", T:" + NetworkComms.CommsThreadPool.CurrentNumTotalThreads.ToString() + ", I:" + NetworkComms.CommsThreadPool.CurrentNumIdleThreads.ToString() + ") with priority " + itemPriority.ToString() + (threadId > 0 ? ". Selected threadId=" + threadId.ToString() : "") + ". Loop index=" + loopCounter.ToString() + "."); #endif } //We clear the bytes we have just handed off if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Trace("Removing " + (packetHeaderSize + topPacketHeader.PayloadPacketSize).ToString() + " bytes from incoming packet buffer from connection with " + ConnectionInfo +"."); packetBuilder.ClearNTopBytes(packetHeaderSize + topPacketHeader.PayloadPacketSize); //Reset the expected bytes to 0 so that the next check starts from scratch packetBuilder.TotalBytesExpected = 0; //If we have run out of data completely then we can return immediately if (packetBuilder.TotalBytesCached == 0) return; } else throw new CommunicationException("This should be impossible!"); } loopCounter++; } } catch (Exception ex) { //Any error, throw an exception. if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Fatal("A fatal exception occured in IncomingPacketHandleHandOff(), connection with " + ConnectionInfo + " be closed. See log file for more information."); NetworkComms.LogError(ex, "CommsError"); CloseConnection(true, 45); } } /// <summary> /// Handle an incoming CheckSumFailResend packet type /// </summary> /// <param name="packetDataSection"></param> internal void CheckSumFailResendHandler(MemoryStream packetDataSection) { //If we have been asked to resend a packet then we just go through the list and resend it. SentPacket packetToReSend; lock (sentPacketsLocker) { string checkSumRequested = NetworkComms.InternalFixedSendReceiveOptions.DataSerializer.DeserialiseDataObject<string>(packetDataSection, NetworkComms.InternalFixedSendReceiveOptions.DataProcessors, NetworkComms.InternalFixedSendReceiveOptions.Options); if (sentPackets.ContainsKey(checkSumRequested)) packetToReSend = sentPackets[checkSumRequested]; else throw new CheckSumException("There was no packet sent with a matching check sum"); } //If we have already tried resending the packet 10 times something has gone horribly wrong if (packetToReSend.SendCount > 10) throw new CheckSumException("Packet sent resulted in a catastropic checksum check exception."); if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Warn(" ... resending packet due to MD5 mismatch."); //Increment send count and then resend packetToReSend.IncrementSendCount(); SendPacket(packetToReSend.Packet); } } }
// Copyright 2011-2013 Marc Fletcher, Matthew Dean // // This program is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // // This program is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // // You should have received a copy of the GNU General Public License // along with this program. If not, see <http://www.gnu.org/licenses/>. // // A commercial license of this software can also be purchased. // Please see <http://www.networkcomms.net/licensing/> for details. using System; using System.Collections.Generic; using System.Text; using System.Net; using System.Threading; using DPSBase; using System.Net.Sockets; namespace NetworkCommsDotNet { /// <summary> /// Connection对象 这个类是TcpConnection和 UDPConnnection连接类的父类 /// Connection由以下五个文件组成 大家注意到每个类前面都有个 partial关键字 /// ConnectionCreate.cs <1> /// ConnectionDelegatesHandlers.cs <2> /// ConnectionIncomingData.cs <3> /// ConnectionSendClose.cs <4> /// ConnectionStatic.cs <5> /// </summary> /// 这部分主要是收发连接上的数据包的 public abstract partial class Connection : IDisposable { /// <summary> /// Lockers for maintaing thread safe operation /// </summary> protected object sendLocker = new object(); /// <summary> /// A comms math oject for tracking send times. Used to prevent send deadlocks. /// Initialisated at 1000 milliseconds per KB write speed, corresponding with 1KB / second. /// </summary> protected CommsMath SendTimesMSPerKBCache; /// <summary> /// A counter which is incremented during every a send. The current value is included in the header of all sent packets. /// </summary> protected long packetSequenceCounter; /// <summary> /// Maintains a list of sent packets for the purpose of confirmation and possible resends. /// </summary> object sentPacketsLocker = new object(); Dictionary<string, SentPacket> sentPackets = new Dictionary<string, SentPacket>(); /// <summary> /// Send an object using the connection default <see cref="SendReceiveOptions"/> /// </summary> /// <param name="sendingPacketType">The sending packet type</param> /// <param name="objectToSend">The object to send</param> public void SendObject(string sendingPacketType, object objectToSend) { SendObject(sendingPacketType, objectToSend, ConnectionDefaultSendReceiveOptions); } /// <summary> /// Send an object using the connection default <see cref="SendReceiveOptions"/> /// </summary> /// <param name="sendingPacketType">The sending packet type</param> /// <param name="objectToSend">The object to send</param> /// <param name="packetSequenceNumber">The sequence number of the packet sent</param> public void SendObject(string sendingPacketType, object objectToSend, out long packetSequenceNumber) { SendObject(sendingPacketType, objectToSend, ConnectionDefaultSendReceiveOptions, out packetSequenceNumber); } /// <summary> /// Send an object using the provided SendReceiveOptions /// </summary> /// <param name="sendingPacketType">The packet type to use for send</param> /// <param name="objectToSend">The object to send</param> /// <param name="options">Send specific <see cref="SendReceiveOptions"/></param> public void SendObject(string sendingPacketType, object objectToSend, SendReceiveOptions options) { long packetSequenceNumber; SendPacket(new Packet(sendingPacketType, objectToSend, options), out packetSequenceNumber); } /// <summary> /// Send an object using the provided SendReceiveOptions /// </summary> /// <param name="sendingPacketType">The packet type to use for send</param> /// <param name="objectToSend">The object to send</param> /// <param name="options">Send specific <see cref="SendReceiveOptions"/></param> /// <param name="packetSequenceNumber">The sequence number of the packet sent</param> public void SendObject(string sendingPacketType, object objectToSend, SendReceiveOptions options, out long packetSequenceNumber) { SendPacket(new Packet(sendingPacketType, objectToSend, options), out packetSequenceNumber); } /// <summary> /// Send an empty packet using the provided packetType. Usefull for signalling. /// </summary> /// <param name="sendingPacketType">The sending packet type</param> public void SendObject(string sendingPacketType) { SendObject(sendingPacketType, null); } /// <summary> /// Send an empty packet using the provided packetType. Usefull for signalling. /// </summary> /// <param name="sendingPacketType">The sending packet type</param> /// <param name="packetSequenceNumber">The sequence number of the packet sent</param> public void SendObject(string sendingPacketType, out long packetSequenceNumber) { SendObject(sendingPacketType, null, ConnectionDefaultSendReceiveOptions, out packetSequenceNumber); } /// <summary> /// Send an object using the connection default <see cref="SendReceiveOptions"/> and wait for a returned object again using default <see cref="SendReceiveOptions"/>. /// </summary> /// <typeparam name="returnObjectType">The type of return object</typeparam> /// <param name="sendingPacketTypeStr">The sending packet type</param> /// <param name="expectedReturnPacketTypeStr">The packet type which will be used for the reply</param> /// <param name="returnPacketTimeOutMilliSeconds">A timeout in milliseconds after which if not reply is received will throw an ExpectedReturnTimeoutException.</param> /// <param name="sendObject">The object to send</param> /// <returns>The requested return object</returns> public returnObjectType SendReceiveObject<returnObjectType>(string sendingPacketTypeStr, string expectedReturnPacketTypeStr, int returnPacketTimeOutMilliSeconds, object sendObject) { return SendReceiveObject<returnObjectType>(sendingPacketTypeStr, expectedReturnPacketTypeStr, returnPacketTimeOutMilliSeconds, sendObject, null, null); } /// <summary> /// Send an object using the connection default <see cref="SendReceiveOptions"/> and wait for a returned object again using default <see cref="SendReceiveOptions"/>. /// </summary> /// <typeparam name="returnObjectType">The type of return object</typeparam> /// <param name="sendingPacketTypeStr">The sending packet type</param> /// <param name="expectedReturnPacketTypeStr">The packet type which will be used for the reply</param> /// <param name="returnPacketTimeOutMilliSeconds">A timeout in milliseconds after which if not reply is received will throw an ExpectedReturnTimeoutException.</param> /// <param name="sendObject">The object to send</param> /// <param name="sentPacketSequenceNumber">The sequence number of the packet sent</param> /// <returns>The requested return object</returns> public returnObjectType SendReceiveObject<returnObjectType>(string sendingPacketTypeStr, string expectedReturnPacketTypeStr, int returnPacketTimeOutMilliSeconds, object sendObject, out long sentPacketSequenceNumber) { return SendReceiveObject<returnObjectType>(sendingPacketTypeStr, expectedReturnPacketTypeStr, returnPacketTimeOutMilliSeconds, sendObject, null, null, out sentPacketSequenceNumber); } /// <summary> /// Send an object using the provided <see cref="SendReceiveOptions"/> and wait for a returned object using provided <see cref="SendReceiveOptions"/>. /// </summary> /// <typeparam name="returnObjectType">The type of return object</typeparam> /// <param name="sendingPacketTypeStr">The sending packet type</param> /// <param name="expectedReturnPacketTypeStr">The packet type which will be used for the reply</param> /// <param name="returnPacketTimeOutMilliSeconds">A timeout in milliseconds after which if not reply is received will throw an ExpectedReturnTimeoutException.</param> /// <param name="sendObject">The object to send</param> /// <param name="sendOptions">SendReceiveOptions to use when sending</param> /// <param name="receiveOptions">SendReceiveOptions used when receiving the return object</param> /// <returns>The requested return object</returns> public returnObjectType SendReceiveObject<returnObjectType>(string sendingPacketTypeStr, string expectedReturnPacketTypeStr, int returnPacketTimeOutMilliSeconds, object sendObject, SendReceiveOptions sendOptions, SendReceiveOptions receiveOptions) { long sentPacketSequenceNumber; return SendReceiveObject<returnObjectType>(sendingPacketTypeStr, expectedReturnPacketTypeStr, returnPacketTimeOutMilliSeconds, sendObject, sendOptions, receiveOptions, out sentPacketSequenceNumber); } /// <summary> /// Send an object using the provided <see cref="SendReceiveOptions"/> and wait for a returned object using provided <see cref="SendReceiveOptions"/>. /// </summary> /// <typeparam name="returnObjectType">The type of return object</typeparam> /// <param name="sendingPacketTypeStr">The sending packet type</param> /// <param name="expectedReturnPacketTypeStr">The packet type which will be used for the reply</param> /// <param name="returnPacketTimeOutMilliSeconds">A timeout in milliseconds after which if not reply is received will throw an ExpectedReturnTimeoutException.</param> /// <param name="sendObject">The object to send</param> /// <param name="sendOptions">SendReceiveOptions to use when sending</param> /// <param name="receiveOptions">SendReceiveOptions used when receiving the return object</param> /// <param name="sentPacketSequenceNumber">The sequence number of the packet sent</param> /// <returns>The requested return object</returns> public returnObjectType SendReceiveObject<returnObjectType>(string sendingPacketTypeStr, string expectedReturnPacketTypeStr, int returnPacketTimeOutMilliSeconds, object sendObject, SendReceiveOptions sendOptions, SendReceiveOptions receiveOptions, out long sentPacketSequenceNumber) { returnObjectType returnObject = default(returnObjectType); bool remotePeerDisconnectedDuringWait = false; AutoResetEvent returnWaitSignal = new AutoResetEvent(false); #region SendReceiveDelegate NetworkComms.PacketHandlerCallBackDelegate<returnObjectType> SendReceiveDelegate = (packetHeader, sourceConnection, incomingObject) => { returnObject = incomingObject; returnWaitSignal.Set(); }; //We use the following delegate to quickly force a response timeout if the remote end disconnects NetworkComms.ConnectionEstablishShutdownDelegate SendReceiveShutDownDelegate = (sourceConnection) => { remotePeerDisconnectedDuringWait = true; returnObject = default(returnObjectType); returnWaitSignal.Set(); }; #endregion if (sendOptions == null) sendOptions = ConnectionDefaultSendReceiveOptions; if (receiveOptions == null) receiveOptions = ConnectionDefaultSendReceiveOptions; AppendShutdownHandler(SendReceiveShutDownDelegate); AppendIncomingPacketHandler(expectedReturnPacketTypeStr, SendReceiveDelegate, receiveOptions); using(Packet sendPacket = new Packet(sendingPacketTypeStr, expectedReturnPacketTypeStr, sendObject, sendOptions)) SendPacket(sendPacket, out sentPacketSequenceNumber); //We wait for the return data here if (!returnWaitSignal.WaitOne(returnPacketTimeOutMilliSeconds)) { RemoveIncomingPacketHandler(expectedReturnPacketTypeStr, SendReceiveDelegate); throw new ExpectedReturnTimeoutException("Timeout occurred after " + returnPacketTimeOutMilliSeconds.ToString() + "ms waiting for response packet of type ‘" + expectedReturnPacketTypeStr + "‘."); } RemoveIncomingPacketHandler(expectedReturnPacketTypeStr, SendReceiveDelegate); RemoveShutdownHandler(SendReceiveShutDownDelegate); if (remotePeerDisconnectedDuringWait) throw new ExpectedReturnTimeoutException("Remote end closed connection before data was successfully returned."); else return returnObject; } /// <summary> /// Send an empty packet using the connection default <see cref="SendReceiveOptions"/> and wait for a returned object again using default <see cref="SendReceiveOptions"/>. Usefull to request an object when there is no need to send anything. /// </summary> /// <typeparam name="returnObjectType">The type of return object</typeparam> /// <param name="sendingPacketTypeStr">The sending packet type</param> /// <param name="expectedReturnPacketTypeStr">The packet type which will be used for the reply</param> /// <param name="returnPacketTimeOutMilliSeconds">A timeout in milliseconds after which if not reply is received will throw an ExpectedReturnTimeoutException.</param> /// <returns></returns> public returnObjectType SendReceiveObject<returnObjectType>(string sendingPacketTypeStr, string expectedReturnPacketTypeStr, int returnPacketTimeOutMilliSeconds) { return SendReceiveObject<returnObjectType>(sendingPacketTypeStr, expectedReturnPacketTypeStr, returnPacketTimeOutMilliSeconds, null, null, null); } /// <summary> /// Send an empty packet using the connection default <see cref="SendReceiveOptions"/> and wait for a returned object again using default <see cref="SendReceiveOptions"/>. Usefull to request an object when there is no need to send anything. /// </summary> /// <typeparam name="returnObjectType">The type of return object</typeparam> /// <param name="sendingPacketTypeStr">The sending packet type</param> /// <param name="expectedReturnPacketTypeStr">The packet type which will be used for the reply</param> /// <param name="returnPacketTimeOutMilliSeconds">A timeout in milliseconds after which if not reply is received will throw an ExpectedReturnTimeoutException.</param> /// <param name="sentPacketSequenceNumber">The sequence number of the packet sent</param> /// <returns></returns> public returnObjectType SendReceiveObject<returnObjectType>(string sendingPacketTypeStr, string expectedReturnPacketTypeStr, int returnPacketTimeOutMilliSeconds, out long sentPacketSequenceNumber) { return SendReceiveObject<returnObjectType>(sendingPacketTypeStr, expectedReturnPacketTypeStr, returnPacketTimeOutMilliSeconds, null, null, null, out sentPacketSequenceNumber); } /// <summary> /// Closes the connection and trigger any associated shutdown delegates. /// </summary> /// <param name="closeDueToError">Closing a connection due an error possibly requires a few extra steps.</param> /// <param name="logLocation">Optional debug parameter.</param> public void CloseConnection(bool closeDueToError, int logLocation = 0) { try { if (NetworkComms.LoggingEnabled) { if (closeDueToError) NetworkComms.Logger.Debug("Closing connection with " + ConnectionInfo + " due to error from [" + logLocation.ToString() + "]."); else NetworkComms.Logger.Debug("Closing connection with " + ConnectionInfo + " from [" + logLocation.ToString() + "]."); } ConnectionInfo.NoteConnectionShutdown(); //Set possible error cases if (closeDueToError) { connectionSetupException = true; connectionSetupExceptionStr = "Connection was closed during setup from [" + logLocation.ToString() + "]."; } //Ensure we are not waiting for a connection to be established if we have died due to error connectionSetupWait.Set(); //Call any connection specific close requirements CloseConnectionSpecific(closeDueToError, logLocation); try { //If we are calling close from the listen thread we are actually in the same thread //We must guarantee the listen thread stops even if that means we need to nuke it //If we did not we may not be able to shutdown properly. if (incomingDataListenThread != null && incomingDataListenThread != Thread.CurrentThread && (incomingDataListenThread.ThreadState == System.Threading.ThreadState.WaitSleepJoin || incomingDataListenThread.ThreadState == System.Threading.ThreadState.Running)) { //If we have made it this far we give the ythread a further 50ms to finish before nuking. if (!incomingDataListenThread.Join(50)) { incomingDataListenThread.Abort(); if (NetworkComms.LoggingEnabled && ConnectionInfo != null) NetworkComms.Logger.Warn("Incoming data listen thread with " + ConnectionInfo + " aborted."); } } } catch (Exception) { } //Close connection my get called multiple times for a given connection depending on the reason for being closed bool firstClose = NetworkComms.RemoveConnectionReference(this); try { //Almost there //Last thing is to call any connection specific shutdown delegates if (firstClose && ConnectionSpecificShutdownDelegate != null) { if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Debug("Triggered connection specific shutdown delegates with " + ConnectionInfo); ConnectionSpecificShutdownDelegate(this); } } catch (Exception ex) { NetworkComms.LogError(ex, "ConnectionSpecificShutdownDelegateError", "Error while executing connection specific shutdown delegates for " + ConnectionInfo + ". Ensure any shutdown exceptions are caught in your own code."); } try { //Last but not least we call any global connection shutdown delegates if (firstClose && NetworkComms.globalConnectionShutdownDelegates != null) { if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Debug("Triggered global shutdown delegates with " + ConnectionInfo); NetworkComms.globalConnectionShutdownDelegates(this); } } catch (Exception ex) { NetworkComms.LogError(ex, "GlobalConnectionShutdownDelegateError", "Error while executing global connection shutdown delegates for " + ConnectionInfo + ". Ensure any shutdown exceptions are caught in your own code."); } } catch (Exception ex) { if (ex is ThreadAbortException) { /*Ignore the threadabort exception if we had to nuke a thread*/ } else NetworkComms.LogError(ex, "NCError_CloseConnection", "Error closing connection with " + ConnectionInfo + ". Close called from " + logLocation.ToString() + (closeDueToError ? " due to error." : ".")); //We try to rethrow where possible but CloseConnection could very likely be called from within networkComms so we just have to be happy with a log here } } /// <summary> /// Every connection will probably have to perform connection specific shutdown tasks. This is called before the global connection close tasks. /// </summary> /// <param name="closeDueToError">Closing a connection due an error possibly requires a few extra steps.</param> /// <param name="logLocation">ptional debug parameter.</param> protected abstract void CloseConnectionSpecific(bool closeDueToError, int logLocation = 0); /// <summary> /// Uses the current connection and returns a bool dependant on the remote end responding to a SendReceiveObject call within the default <see cref="NetworkComms.ConnectionAliveTestTimeoutMS"/> /// </summary> /// <returns>True if the remote end responds within <see cref="NetworkComms.ConnectionAliveTestTimeoutMS"/> otherwise false</returns> public bool ConnectionAlive() { return ConnectionAlive(NetworkComms.ConnectionAliveTestTimeoutMS); } /// <summary> /// Uses the current connection and returns a bool dependant on the remote end responding to a SendReceiveObject call within the provided aliveRespondTimeoutMS /// </summary> /// <param name="aliveRespondTimeoutMS">The time to wait in milliseconds before returning false</param> /// <returns>True if the remote end responds within the provided aliveRespondTimeoutMS</returns> public bool ConnectionAlive(int aliveRespondTimeoutMS) { long responseTime; return ConnectionAlive(aliveRespondTimeoutMS, out responseTime); } /// <summary> /// Uses the current connection and returns a bool dependant on the remote end responding to a SendReceiveObject call within the provided aliveRespondTimeoutMS /// </summary> /// <param name="aliveRespondTimeoutMS">The time to wait in milliseconds before returning false</param> /// <param name="responseTimeMS">The number of milliseconds taken for a succesfull response to be received</param> /// <returns></returns> public bool ConnectionAlive(int aliveRespondTimeoutMS, out long responseTimeMS) { System.Diagnostics.Stopwatch timer = new System.Diagnostics.Stopwatch(); responseTimeMS = long.MaxValue; if (!(ConnectionInfo.ConnectionState == ConnectionState.Established)) { if ((DateTime.Now - ConnectionInfo.ConnectionCreationTime).Milliseconds > NetworkComms.ConnectionEstablishTimeoutMS) { CloseConnection(false, -11); return false; } else return true; } else { try { timer.Start(); byte[] returnValue = SendReceiveObject<byte[]>(Enum.GetName(typeof(ReservedPacketType), ReservedPacketType.AliveTestPacket), Enum.GetName(typeof(ReservedPacketType), ReservedPacketType.AliveTestPacket), aliveRespondTimeoutMS, new byte[1] { 0 }, NetworkComms.InternalFixedSendReceiveOptions, NetworkComms.InternalFixedSendReceiveOptions); timer.Stop(); responseTimeMS = timer.ElapsedMilliseconds; if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Trace("ConnectionAliveTest success, response in " + timer.ElapsedMilliseconds.ToString() + "ms."); return returnValue[0] == 1; } catch (Exception) { CloseConnection(true, 46); return false; } } } /// <summary> /// Send the provided packet to the remoteEndPoint. Waits for receive confirmation if required. /// </summary> /// <param name="packet">The packet to send</param> internal void SendPacket(Packet packet) { long packetSequenceNumber; SendPacket(packet, out packetSequenceNumber); } /// <summary> /// Send the provided packet to the remoteEndPoint. Waits for receive confirmation if required. /// </summary> /// <param name="packet">The packet to send</param> /// <param name="packetSequenceNumber">The sequence number of the packet sent</param> internal void SendPacket(Packet packet, out long packetSequenceNumber) { if (NetworkComms.LoggingEnabled) { string packetDataMD5 = ""; if (packet.PacketHeader.ContainsOption(PacketHeaderStringItems.CheckSumHash)) packetDataMD5 = packet.PacketHeader.GetOption(PacketHeaderStringItems.CheckSumHash); NetworkComms.Logger.Trace("Entering packet send of ‘" + packet.PacketHeader.PacketType + "‘ packetType to " + ConnectionInfo + (packetDataMD5 == "" ? "" : ". PacketCheckSum="+packetDataMD5)); } //Multiple threads may try to send packets at the same time so wait one at a time here lock (sendLocker) { //We don‘t allow sends on a closed connection if (ConnectionInfo.ConnectionState == ConnectionState.Shutdown) throw new CommunicationException("Attempting to send packet on connection which has been closed or is currently closing."); //Set packet sequence number inside sendLocker //Increment the global counter as well to ensure future connections with the same host can not create duplicates Interlocked.Increment(ref NetworkComms.totalPacketSendCount); packetSequenceNumber = packetSequenceCounter++; packet.PacketHeader.SetOption(PacketHeaderLongItems.PacketSequenceNumber, packetSequenceNumber); string confirmationCheckSum = ""; AutoResetEvent confirmationWaitSignal = new AutoResetEvent(false); bool remotePeerDisconnectedDuringWait = false; #region Delegates //Specify a delegate we may use if we require receive confirmation NetworkComms.PacketHandlerCallBackDelegate<string> confirmationDelegate = (packetHeader, connectionInfo, incomingString) => { //if (connectionInfo.NetworkIdentifier == this.ConnectionInfo.NetworkIdentifier && connectionInfo.RemoteEndPoint == this.ConnectionInfo.RemoteEndPoint) //{ confirmationCheckSum = incomingString; confirmationWaitSignal.Set(); //} }; //We use the following delegate to quickly force a response timeout if the remote end disconnects during a send/wait NetworkComms.ConnectionEstablishShutdownDelegate ConfirmationShutDownDelegate = (connectionInfo) => { //if (connectionInfo.NetworkIdentifier == this.ConnectionInfo.NetworkIdentifier && connectionInfo.RemoteEndPoint == this.ConnectionInfo.RemoteEndPoint) //{ remotePeerDisconnectedDuringWait = true; confirmationWaitSignal.Set(); //} }; #endregion try { #region Prepare For Confirmation and Possible Validation //Add the confirmation handler if required if (packet.PacketHeader.ContainsOption(PacketHeaderStringItems.ReceiveConfirmationRequired)) { AppendIncomingPacketHandler(Enum.GetName(typeof(ReservedPacketType), ReservedPacketType.Confirmation), confirmationDelegate, NetworkComms.InternalFixedSendReceiveOptions); AppendShutdownHandler(ConfirmationShutDownDelegate); } //If this packet is not a checkSumFailResend if (NetworkComms.EnablePacketCheckSumValidation && packet.PacketHeader.PacketType != Enum.GetName(typeof(ReservedPacketType), ReservedPacketType.CheckSumFailResend)) { //We only want to keep packets when they are under some provided theshold //otherwise this becomes a quick ‘memory leak‘ if (packet.PacketData.Length < NetworkComms.CheckSumMismatchSentPacketCacheMaxByteLimit) { lock (sentPacketsLocker) { var hash = packet.PacketHeader.GetOption(PacketHeaderStringItems.CheckSumHash); if (!sentPackets.ContainsKey(hash)) sentPackets.Add(hash, new SentPacket(packet)); } } } #endregion SendPacketSpecific(packet); #region SentPackets Cleanup //If sent packets is greater than 40 we delete anything older than a minute lock (sentPacketsLocker) { if ((DateTime.Now - NetworkComms.LastSentPacketCacheCleanup).TotalMinutes > NetworkComms.MinimumSentPacketCacheTimeMinutes / 2) { Dictionary<string, SentPacket> newSentPackets = new Dictionary<string, SentPacket>(); DateTime thresholdTime = DateTime.Now.AddMinutes(-NetworkComms.MinimumSentPacketCacheTimeMinutes); foreach (var storedPacket in sentPackets) { if (storedPacket.Value.SentPacketCreationTime >= thresholdTime) newSentPackets.Add(storedPacket.Key, storedPacket.Value); } sentPackets = newSentPackets; NetworkComms.LastSentPacketCacheCleanup = DateTime.Now; } } #endregion #region Wait For Confirmation If Required //If we required receive confirmation we now wait for that confirmation if (packet.PacketHeader.ContainsOption(PacketHeaderStringItems.ReceiveConfirmationRequired)) { if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Trace(" ... waiting for receive confirmation packet."); if (!(confirmationWaitSignal.WaitOne(NetworkComms.PacketConfirmationTimeoutMS))) throw new ConfirmationTimeoutException("Confirmation packet timeout."); if (remotePeerDisconnectedDuringWait) throw new ConfirmationTimeoutException("Remote end closed connection before confirmation packet was returned."); else { if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Trace(" ... confirmation packet received."); } } #endregion //Update the traffic time as late as possible incase there is a problem ConnectionInfo.UpdateLastTrafficTime(); } catch (ConfirmationTimeoutException) { //Confirmation timeout there is no need to close the connection as this //does not neccessarily mean there is a conneciton problem throw; } catch (CommunicationException) { //We close the connection due to communication exceptions CloseConnection(true, 47); throw; } catch (TimeoutException ex) { //We close the connection due to communication exceptions if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Warn("Timeout exception for connection " + this.ConnectionInfo + (ex.Message != null ? ". " +ex.Message : ".")); CloseConnection(true, 48); throw new ConnectionSendTimeoutException(ex.ToString()); } catch (Exception ex) { //We close the connection due to communication exceptions CloseConnection(true, 49); throw new CommunicationException(ex.ToString()); } finally { if (packet.PacketHeader.ContainsOption(PacketHeaderStringItems.ReceiveConfirmationRequired)) { //Cleanup our delegates RemoveIncomingPacketHandler(Enum.GetName(typeof(ReservedPacketType), ReservedPacketType.Confirmation), confirmationDelegate); RemoveShutdownHandler(ConfirmationShutDownDelegate); } } } if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Trace("Completed packet send of ‘" + packet.PacketHeader.PacketType + "‘ packetType to " + ConnectionInfo); } /// <summary> /// Connection specific implementation for sending packets on this connection type. Will only be called from within a lock so method does not need to implement further thread safety. /// </summary> /// <param name="packet">The packet to send</param> protected abstract void SendPacketSpecific(Packet packet); /// <summary> /// Connection specific implementation for sending a null packets on this connection type. Will only be called from within a lock so method does not need to implement further thread safety. /// </summary> protected abstract void SendNullPacket(); /// <summary> /// Dispose of the connection. Recommended usage is to call CloseConnection instead. /// </summary> public void Dispose() { CloseConnection(false, -3); try { ((IDisposable)connectionSetupWait).Dispose(); ((IDisposable)connectionEstablishWait).Dispose(); } catch (Exception) { } } } }
using System; using System.Collections.Generic; using System.Text; using System.Net; using System.Threading; using DPSBase; using System.Net.Sockets; namespace NetworkCommsDotNet { /// <summary> /// Connection对象 这个类是TcpConnection和 UDPConnnection连接类的父类 /// Connection由以下五个文件组成 大家注意到每个类前面都有个 partial关键字 /// ConnectionCreate.cs <1> /// ConnectionDelegatesHandlers.cs <2> /// ConnectionIncomingData.cs <3> /// ConnectionSendClose.cs <4> /// ConnectionStatic.cs <5> /// </summary> public abstract partial class Connection { static ManualResetEvent workedThreadSignal = new ManualResetEvent(false); static volatile bool shutdownWorkerThreads = false; static object staticConnectionLocker = new object(); static Thread connectionKeepAliveWorker; /// <summary> /// 一些默认的设置 /// </summary> static Connection() { ConnectionKeepAlivePollIntervalSecs = 30; MaxNumSendTimes = 100; MinNumSendsBeforeConnectionSpecificSendTimeout = 4; MinSendTimeoutMS = 2000; MinimumMSPerKBSendTimeout = 20; DefaultMSPerKBSendTimeout = 1000; NumberOfStDeviationsForWriteTimeout = 3; } /// <summary> /// 每KB数据发送超时的最小时间 默认为10. /// </summary> public static double MinimumMSPerKBSendTimeout { get; set; } /// <summary> /// 写入间隔的最大时间 默认100 /// </summary> public static int MaxNumSendTimes { get; set; } /// <summary> /// The minimum number of writes before the connection specific write timeouts will be used. Default is 3. /// </summary> public static int MinNumSendsBeforeConnectionSpecificSendTimeout { get; set; } /// <summary> /// The default milliseconds per KB write timeout before connection specific values become available. Default is 1000. See <see cref="MinNumSendsBeforeConnectionSpecificSendTimeout"/>. /// </summary> public static int DefaultMSPerKBSendTimeout { get; set; } /// <summary> /// The minimum timeout for any sized send in milliseconds. Prevents timeouts when sending less than 1KB. Default is 500. /// </summary> public static int MinSendTimeoutMS { get; set; } /// <summary> /// The interval between keep alive polls of all connections. Set to int.MaxValue to disable keep alive poll /// </summary> public static int ConnectionKeepAlivePollIntervalSecs { get; set; } /// <summary> /// The number of standard deviations from the mean to use for write timeouts. Default is 4.0. /// </summary> public static double NumberOfStDeviationsForWriteTimeout { get; set; } /// 如果 connectionKeepAliveWorker线程没有启动,则启动之 这个线程主要用来进行心跳检测 protected static void TriggerConnectionKeepAliveThread() { lock (staticConnectionLocker) { if (!shutdownWorkerThreads && (connectionKeepAliveWorker == null || connectionKeepAliveWorker.ThreadState == ThreadState.Stopped)) { connectionKeepAliveWorker = new Thread(ConnectionKeepAliveWorker); connectionKeepAliveWorker.Name = "ConnectionKeepAliveWorker"; connectionKeepAliveWorker.Start(); } } } /// 一个单独的静态工作者线程,用来保持连接是活动的 private static void ConnectionKeepAliveWorker() { if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Debug("Connection keep alive polling thread has started."); DateTime lastPollCheck = DateTime.Now; while (!shutdownWorkerThreads) { try { //We have a short sleep here so that we can exit the thread fairly quickly if we need too if (ConnectionKeepAlivePollIntervalSecs == int.MaxValue) workedThreadSignal.WaitOne(5000); else workedThreadSignal.WaitOne(100); //Check for shutdown here if (shutdownWorkerThreads) break; //Any connections which we have not seen in the last poll interval get tested using a null packet if (ConnectionKeepAlivePollIntervalSecs < int.MaxValue && (DateTime.Now - lastPollCheck).TotalSeconds > (double)ConnectionKeepAlivePollIntervalSecs) { AllConnectionsSendNullPacketKeepAlive(); lastPollCheck = DateTime.Now; } } catch (Exception ex) { NetworkComms.LogError(ex, "ConnectionKeepAlivePollError"); } } } /// <summary> /// Polls all existing connections based on ConnectionKeepAlivePollIntervalSecs value. Serverside connections are polled slightly earlier than client side to help reduce potential congestion. /// </summary> /// <param name="returnImmediately"></param> private static void AllConnectionsSendNullPacketKeepAlive(bool returnImmediately = false) { if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Trace("Starting AllConnectionsSendNullPacketKeepAlive"); //Loop through all connections and test the alive state List<Connection> allConnections = NetworkComms.GetExistingConnection(); int remainingConnectionCount = allConnections.Count; #if WINDOWS_PHONE QueueItemPriority nullSendPriority = QueueItemPriority.High; #else QueueItemPriority nullSendPriority = QueueItemPriority.AboveNormal; #endif ManualResetEvent allConnectionsComplete = new ManualResetEvent(false); for (int i = 0; i < allConnections.Count; i++) { //We don‘t send null packets to unconnected udp connections UDPConnection asUDP = allConnections[i] as UDPConnection; if (asUDP != null && asUDP.UDPOptions == UDPOptions.None) { if (Interlocked.Decrement(ref remainingConnectionCount) == 0) allConnectionsComplete.Set(); continue; } else { int innerIndex = i; NetworkComms.CommsThreadPool.EnqueueItem(nullSendPriority, new WaitCallback((obj) => { try { //If the connection is server side we poll preferentially if (allConnections[innerIndex] != null) { if (allConnections[innerIndex].ConnectionInfo.ServerSide) { //We check the last incoming traffic time //In scenarios where the client is sending us lots of data there is no need to poll if ((DateTime.Now - allConnections[innerIndex].ConnectionInfo.LastTrafficTime).TotalSeconds > ConnectionKeepAlivePollIntervalSecs) allConnections[innerIndex].SendNullPacket(); } else { //If we are client side we wait upto an additional 3 seconds to do the poll //This means the server will probably beat us if ((DateTime.Now - allConnections[innerIndex].ConnectionInfo.LastTrafficTime).TotalSeconds > ConnectionKeepAlivePollIntervalSecs + 1.0 + (NetworkComms.randomGen.NextDouble() * 2.0)) allConnections[innerIndex].SendNullPacket(); } } } catch (Exception) { } finally { if (Interlocked.Decrement(ref remainingConnectionCount) == 0) allConnectionsComplete.Set(); } }), null); } } //Max wait is 1 seconds per connection if (!returnImmediately && allConnections.Count > 0) { if (!allConnectionsComplete.WaitOne(allConnections.Count * 2500)) //This timeout should not really happen so we are going to log an error if it does NetworkComms.LogError(new TimeoutException("Timeout after " + allConnections.Count.ToString() + " seconds waiting for null packet sends to finish. " + remainingConnectionCount.ToString() + " connection waits remain. This error indicates very high send load or a possible send deadlock."), "NullPacketKeepAliveTimeoutError"); } } /// <summary> /// Shutdown any static connection components /// </summary> /// <param name="threadShutdownTimeoutMS"></param> internal static void ShutdownBase(int threadShutdownTimeoutMS = 1000) { try { shutdownWorkerThreads = true; if (connectionKeepAliveWorker != null && !connectionKeepAliveWorker.Join(threadShutdownTimeoutMS)) connectionKeepAliveWorker.Abort(); } catch (Exception ex) { NetworkComms.LogError(ex, "CommsShutdownError"); } finally { shutdownWorkerThreads = false; workedThreadSignal.Reset(); } } } }
Tcp连接相对应的类
// Copyright 2011-2013 Marc Fletcher, Matthew Dean // // This program is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // // This program is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // // You should have received a copy of the GNU General Public License // along with this program. If not, see <http://www.gnu.org/licenses/>. // // A commercial license of this software can also be purchased. // Please see <http://www.networkcomms.net/licensing/> for details. using System; using System.Collections.Generic; using System.Text; using System.Net.Sockets; using System.Threading; using System.Net; using System.IO; using DPSBase; #if WINDOWS_PHONE using Windows.Networking.Sockets; using Windows.Storage.Streams; using System.Runtime.InteropServices.WindowsRuntime; #endif namespace NetworkCommsDotNet { public sealed partial class TCPConnection : Connection { #if WINDOWS_PHONE /// <summary> /// The windows phone socket corresponding to this connection. /// </summary> StreamSocket socket; #else /// <summary> /// 此连接对应的TcpClient /// </summary> TcpClient tcpClient; /// <summary> /// TcpClient相对应的NetworkStream /// </summary> NetworkStream tcpClientNetworkStream; #endif /// <summary> /// Create a <see cref="TCPConnection"/> with the provided connectionInfo. If there is an existing connection that will be returned instead. /// If a new connection is created it will be registered with NetworkComms and can be retreived using <see cref="NetworkComms.GetExistingConnection()"/> and overrides. /// </summary> /// <param name="connectionInfo">ConnectionInfo to be used to create connection</param> /// <param name="establishIfRequired">If true will establish the TCP connection with the remote end point before returning</param> /// <returns>Returns a <see cref="TCPConnection"/></returns> public static TCPConnection GetConnection(ConnectionInfo connectionInfo, bool establishIfRequired = true) { return GetConnection(connectionInfo, null, null, establishIfRequired); } /// <summary> /// Create a TCP connection with the provided connectionInfo and sets the connection default SendReceiveOptions. If there is an existing connection that is returned instead. /// If a new connection is created it will be registered with NetworkComms and can be retreived using <see cref="NetworkComms.GetExistingConnection()"/> and overrides. /// </summary> /// <param name="connectionInfo">ConnectionInfo to be used to create connection</param> /// <param name="defaultSendReceiveOptions">The SendReceiveOptions which will be set as this connections defaults</param> /// <param name="establishIfRequired">If true will establish the TCP connection with the remote end point before returning</param> /// <returns>Returns a <see cref="TCPConnection"/></returns> public static TCPConnection GetConnection(ConnectionInfo connectionInfo, SendReceiveOptions defaultSendReceiveOptions, bool establishIfRequired = true) { return GetConnection(connectionInfo, defaultSendReceiveOptions, null, establishIfRequired); } /// <summary> /// Internal <see cref="TCPConnection"/> creation which hides the necessary internal calls /// </summary> /// <param name="connectionInfo">ConnectionInfo to be used to create connection</param> /// <param name="defaultSendReceiveOptions">Connection default SendReceiveOptions</param> /// <param name="tcpClient">If this is an incoming connection we will already have access to the tcpClient, otherwise use null</param> /// <param name="establishIfRequired">Establish during create if true</param> /// <returns>An existing connection or a new one</returns> #if WINDOWS_PHONE internal static TCPConnection GetConnection(ConnectionInfo connectionInfo, SendReceiveOptions defaultSendReceiveOptions, StreamSocket socket, bool establishIfRequired = true) #else internal static TCPConnection GetConnection(ConnectionInfo connectionInfo, SendReceiveOptions defaultSendReceiveOptions, TcpClient tcpClient, bool establishIfRequired = true) #endif { connectionInfo.ConnectionType = ConnectionType.TCP; //If we have a tcpClient at this stage we must be serverside #if WINDOWS_PHONE if (socket != null) connectionInfo.ServerSide = true; #else if (tcpClient != null) connectionInfo.ServerSide = true; #endif bool newConnection = false; TCPConnection connection; lock (NetworkComms.globalDictAndDelegateLocker) { //Check to see if a conneciton already exists, if it does return that connection, if not return a new one if (NetworkComms.ConnectionExists(connectionInfo.RemoteEndPoint, connectionInfo.ConnectionType)) { if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Trace("Attempted to create new TCPConnection to connectionInfo=‘" + connectionInfo + "‘ but there is an existing connection. Existing connection will be returned instead."); establishIfRequired = false; connection = (TCPConnection)NetworkComms.GetExistingConnection(connectionInfo.RemoteEndPoint, connectionInfo.ConnectionType); } else { if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Trace("Creating new TCPConnection to connectionInfo=‘" + connectionInfo + "‘." + (establishIfRequired ? " Connection will be established." : " Connection will not be established.")); if (connectionInfo.ConnectionState == ConnectionState.Establishing) throw new ConnectionSetupException("Connection state for connection " + connectionInfo + " is marked as establishing. This should only be the case here due to a bug."); //If an existing connection does not exist but the info we are using suggests it should we need to reset the info //so that it can be reused correctly. This case generally happens when using Comms in the format //TCPConnection.GetConnection(info).SendObject(packetType, objToSend); if (connectionInfo.ConnectionState == ConnectionState.Established || connectionInfo.ConnectionState == ConnectionState.Shutdown) connectionInfo.ResetConnectionInfo(); //We add a reference to networkComms for this connection within the constructor #if WINDOWS_PHONE connection = new TCPConnection(connectionInfo, defaultSendReceiveOptions, socket); #else connection = new TCPConnection(connectionInfo, defaultSendReceiveOptions, tcpClient); #endif newConnection = true; } } if (newConnection && establishIfRequired) connection.EstablishConnection(); else if (!newConnection) connection.WaitForConnectionEstablish(NetworkComms.ConnectionEstablishTimeoutMS); if (!NetworkComms.commsShutdown) TriggerConnectionKeepAliveThread(); return connection; } /// <summary> /// TCP connection constructor /// </summary> #if WINDOWS_PHONE private TCPConnection(ConnectionInfo connectionInfo, SendReceiveOptions defaultSendReceiveOptions, StreamSocket socket) #else private TCPConnection(ConnectionInfo connectionInfo, SendReceiveOptions defaultSendReceiveOptions, TcpClient tcpClient) #endif : base(connectionInfo, defaultSendReceiveOptions) { //We don‘t guarantee that the tcpClient has been created yet #if WINDOWS_PHONE if (socket != null) this.socket = socket; #else if (tcpClient != null) this.tcpClient = tcpClient; #endif } /// <summary> /// Establish the connection /// </summary> protected override void EstablishConnectionSpecific() { #if WINDOWS_PHONE if (socket == null) ConnectSocket(); //For the local endpoint var localEndPoint = new IPEndPoint(IPAddress.Parse(socket.Information.LocalAddress.CanonicalName.ToString()), int.Parse(socket.Information.LocalPort)); //We should now be able to set the connectionInfo localEndPoint ConnectionInfo.UpdateLocalEndPointInfo(localEndPoint); //Set the outgoing buffer size socket.Control.OutboundBufferSizeInBytes = (uint)NetworkComms.SendBufferSizeBytes; #else if (tcpClient == null) ConnectSocket(); //We should now be able to set the connectionInfo localEndPoint ConnectionInfo.UpdateLocalEndPointInfo((IPEndPoint)tcpClient.Client.LocalEndPoint); //We are going to be using the networkStream quite a bit so we pull out a reference once here tcpClientNetworkStream = tcpClient.GetStream(); //When we tell the socket/client to close we want it to do so immediately //this.tcpClient.LingerState = new LingerOption(false, 0); //We need to set the keep alive option otherwise the connection will just die at some random time should we not be using it //NOTE: This did not seem to work reliably so was replaced with the keepAlive packet feature //this.tcpClient.Client.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.KeepAlive, true); tcpClient.ReceiveBufferSize = NetworkComms.ReceiveBufferSizeBytes; tcpClient.SendBufferSize = NetworkComms.SendBufferSizeBytes; //This disables the ‘nagle alogrithm‘ //http://msdn.microsoft.com/en-us/library/system.net.sockets.socket.nodelay.aspx //Basically we may want to send lots of small packets (<200 bytes) and sometimes those are time critical (e.g. when establishing a connection) //If we leave this enabled small packets may never be sent until a suitable send buffer length threshold is passed. i.e. BAD tcpClient.NoDelay = true; tcpClient.Client.NoDelay = true; #endif //Start listening for incoming data StartIncomingDataListen(); //Get a list of existing listeners List<IPEndPoint> existingListeners = TCPConnection.ExistingLocalListenEndPoints(ConnectionInfo.LocalEndPoint.Address); //Select a listener for this connection IPEndPoint selectedExistingListener = null; if (existingListeners.Count > 0) selectedExistingListener = (existingListeners.Contains(ConnectionInfo.LocalEndPoint) ? ConnectionInfo.LocalEndPoint : existingListeners[0]); //If we are server side and we have just received an incoming connection we need to return a conneciton id //This id will be used in all future connections from this machine if (ConnectionInfo.ServerSide) { if (selectedExistingListener == null) throw new ConnectionSetupException("Detected a server side connection when an existing listener was not present."); if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Debug("Waiting for client connnectionInfo from " + ConnectionInfo); //Wait for the client to send its identification if (!connectionSetupWait.WaitOne(NetworkComms.ConnectionEstablishTimeoutMS)) throw new ConnectionSetupException("Timeout waiting for client connectionInfo with " + ConnectionInfo + ". Connection created at " + ConnectionInfo.ConnectionCreationTime.ToString("HH:mm:ss.fff") + ", its now " + DateTime.Now.ToString("HH:mm:ss.f")); if (connectionSetupException) { if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Debug("Connection setup exception. ServerSide with " + ConnectionInfo + ", " + connectionSetupExceptionStr); throw new ConnectionSetupException("ServerSide. " + connectionSetupExceptionStr); } //Trigger the connection establish delegates before replying to the connection establish base.EstablishConnectionSpecific(); //Once we have the clients id we send our own SendObject(Enum.GetName(typeof(ReservedPacketType), ReservedPacketType.ConnectionSetup), new ConnectionInfo(ConnectionType.TCP, NetworkComms.NetworkIdentifier, new IPEndPoint(ConnectionInfo.RemoteEndPoint.Address, selectedExistingListener.Port), true), NetworkComms.InternalFixedSendReceiveOptions); } else { if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Debug("Sending connnectionInfo to " + ConnectionInfo); //As the client we initiated the connection we now forward our local node identifier to the server //If we are listening we include our local listen port as well SendObject(Enum.GetName(typeof(ReservedPacketType), ReservedPacketType.ConnectionSetup), new ConnectionInfo(ConnectionType.TCP, NetworkComms.NetworkIdentifier, new IPEndPoint(ConnectionInfo.RemoteEndPoint.Address, (selectedExistingListener != null ? selectedExistingListener.Port : ConnectionInfo.LocalEndPoint.Port)), selectedExistingListener != null), NetworkComms.InternalFixedSendReceiveOptions); //Wait here for the server end to return its own identifier if (!connectionSetupWait.WaitOne(NetworkComms.ConnectionEstablishTimeoutMS)) throw new ConnectionSetupException("Timeout waiting for server connnectionInfo from " + ConnectionInfo + ". Connection created at " + ConnectionInfo.ConnectionCreationTime.ToString("HH:mm:ss.fff") + ", its now " + DateTime.Now.ToString("HH:mm:ss.f")); //If we are client side we can update the localEndPoint for this connection to reflect what the remote end might see if we are also listening if (selectedExistingListener != null) ConnectionInfo.UpdateLocalEndPointInfo(selectedExistingListener); if (connectionSetupException) { if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Debug("Connection setup exception. ClientSide with " + ConnectionInfo + ", " + connectionSetupExceptionStr); throw new ConnectionSetupException("ClientSide. " + connectionSetupExceptionStr); } //Trigger the connection establish delegates once the server has replied to the connection establish base.EstablishConnectionSpecific(); } #if !WINDOWS_PHONE //Once the connection has been established we may want to re-enable the ‘nagle algorithm‘ used for reducing network congestion (apparently). //By default we leave the nagle algorithm disabled because we want the quick through put when sending small packets if (EnableNagleAlgorithmForNewConnections) { tcpClient.NoDelay = false; tcpClient.Client.NoDelay = false; } #endif } /// <summary> /// If we were not provided with a tcpClient on creation we need to create one /// </summary> private void ConnectSocket() { try { if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Trace("Connecting TCP client with " + ConnectionInfo); bool connectSuccess = true; #if WINDOWS_PHONE //We now connect to our target socket = new StreamSocket(); socket.Control.NoDelay = EnableNagleAlgorithmForNewConnections; CancellationTokenSource cancelAfterTimeoutToken = new CancellationTokenSource(NetworkComms.ConnectionEstablishTimeoutMS); try { if (ConnectionInfo.LocalEndPoint != null) { var endpointPairForConnection = new Windows.Networking.EndpointPair(new Windows.Networking.HostName(ConnectionInfo.LocalEndPoint.Address.ToString()), ConnectionInfo.LocalEndPoint.Port.ToString(), new Windows.Networking.HostName(ConnectionInfo.RemoteEndPoint.Address.ToString()), ConnectionInfo.RemoteEndPoint.Port.ToString()); var task = socket.ConnectAsync(endpointPairForConnection).AsTask(cancelAfterTimeoutToken.Token); task.Wait(); } else { var task = socket.ConnectAsync(new Windows.Networking.HostName(ConnectionInfo.RemoteEndPoint.Address.ToString()), ConnectionInfo.RemoteEndPoint.Port.ToString()).AsTask(cancelAfterTimeoutToken.Token); task.Wait(); } } catch (Exception) { socket.Dispose(); connectSuccess = false; } #else //We now connect to our target tcpClient = new TcpClient(ConnectionInfo.RemoteEndPoint.AddressFamily); //Start the connection using the asyn version //This allows us to choose our own connection establish timeout IAsyncResult ar = tcpClient.BeginConnect(ConnectionInfo.RemoteEndPoint.Address, ConnectionInfo.RemoteEndPoint.Port, null, null); WaitHandle connectionWait = ar.AsyncWaitHandle; try { if (!ar.AsyncWaitHandle.WaitOne(NetworkComms.ConnectionEstablishTimeoutMS, false)) { tcpClient.Close(); connectSuccess = false; } tcpClient.EndConnect(ar); } finally { connectionWait.Close(); } #endif if (!connectSuccess) throw new ConnectionSetupException("Timeout waiting for remoteEndPoint to accept TCP connection."); } catch (Exception ex) { CloseConnection(true, 17); throw new ConnectionSetupException("Error during TCP connection establish with destination (" + ConnectionInfo + "). Destination may not be listening or connect timed out. " + ex.ToString()); } } /// <summary> /// Starts listening for incoming data on this TCP connection /// </summary> protected override void StartIncomingDataListen() { if (!NetworkComms.ConnectionExists(ConnectionInfo.RemoteEndPoint, ConnectionType.TCP)) { CloseConnection(true, 18); throw new ConnectionSetupException("A connection reference by endPoint should exist before starting an incoming data listener."); } #if WINDOWS_PHONE var stream = socket.InputStream.AsStreamForRead(); stream.BeginRead(dataBuffer, 0, dataBuffer.Length, new AsyncCallback(IncomingTCPPacketHandler), stream); #else lock (delegateLocker) { if (NetworkComms.ConnectionListenModeUseSync) { if (incomingDataListenThread == null) { incomingDataListenThread = new Thread(IncomingTCPDataSyncWorker); //Incoming data always gets handled in a time critical fashion incomingDataListenThread.Priority = NetworkComms.timeCriticalThreadPriority; incomingDataListenThread.Name = "IncomingDataListener"; incomingDataListenThread.Start(); } } else tcpClientNetworkStream.BeginRead(dataBuffer, 0, dataBuffer.Length, new AsyncCallback(IncomingTCPPacketHandler), tcpClientNetworkStream); } #endif if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Trace("Listening for incoming data from " + ConnectionInfo); } } }
// Copyright 2011-2013 Marc Fletcher, Matthew Dean // // This program is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // // This program is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // // You should have received a copy of the GNU General Public License // along with this program. If not, see <http://www.gnu.org/licenses/>. // // A commercial license of this software can also be purchased. // Please see <http://www.networkcomms.net/licensing/> for details. using System; using System.Collections.Generic; using System.Text; using System.Net.Sockets; using System.Threading; using System.Net; using System.IO; using DPSBase; #if WINDOWS_PHONE using Windows.Networking.Sockets; using System.Threading.Tasks; using System.Runtime.InteropServices.WindowsRuntime; using Windows.Storage.Streams; #endif namespace NetworkCommsDotNet { public sealed partial class TCPConnection : Connection { /// <summary> /// Asynchronous incoming connection data delegate /// </summary> /// <param name="ar">The call back state object</param> void IncomingTCPPacketHandler(IAsyncResult ar) { //Initialised with true so that logic still works in WP8 bool dataAvailable = true; #if !WINDOWS_PHONE //Incoming data always gets handled in a timeCritical fashion at this point Thread.CurrentThread.Priority = NetworkComms.timeCriticalThreadPriority; //int bytesRead; #endif try { #if WINDOWS_PHONE var stream = ar.AsyncState as Stream; var count = stream.EndRead(ar); totalBytesRead = count + totalBytesRead; #else NetworkStream netStream = (NetworkStream)ar.AsyncState; if (!netStream.CanRead) throw new ObjectDisposedException("Unable to read from stream."); totalBytesRead = netStream.EndRead(ar) + totalBytesRead; dataAvailable = netStream.DataAvailable; #endif if (totalBytesRead > 0) { //收到数据后,更新连接信息类上的数据最后传输时间 ConnectionInfo.UpdateLastTrafficTime(); //If we have read a single byte which is 0 and we are not expecting other data if (totalBytesRead == 1 && dataBuffer[0] == 0 && packetBuilder.TotalBytesExpected - packetBuilder.TotalBytesCached == 0) { if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Trace(" ... null packet removed in IncomingPacketHandler() from " + ConnectionInfo + ". 1"); } else { //if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Trace(" ... " + totalBytesRead.ToString() + " bytes added to packetBuilder."); //If there is more data to get then add it to the packets lists; packetBuilder.AddPartialPacket(totalBytesRead, dataBuffer); #if !WINDOWS_PHONE //If we have more data we might as well continue reading syncronously //In order to deal with data as soon as we think we have sufficient we will leave this loop while (dataAvailable && packetBuilder.TotalBytesCached < packetBuilder.TotalBytesExpected) { int bufferOffset = 0; //We need a buffer for our incoming data //First we try to reuse a previous buffer if (packetBuilder.TotalPartialPacketCount > 0 && packetBuilder.NumUnusedBytesMostRecentPartialPacket() > 0) dataBuffer = packetBuilder.RemoveMostRecentPartialPacket(ref bufferOffset); else //If we have nothing to reuse we allocate a new buffer dataBuffer = new byte[NetworkComms.ReceiveBufferSizeBytes]; totalBytesRead = netStream.Read(dataBuffer, bufferOffset, dataBuffer.Length - bufferOffset) + bufferOffset; if (totalBytesRead > 0) { ConnectionInfo.UpdateLastTrafficTime(); //If we have read a single byte which is 0 and we are not expecting other data if (totalBytesRead == 1 && dataBuffer[0] == 0 && packetBuilder.TotalBytesExpected - packetBuilder.TotalBytesCached == 0) { if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Trace(" ... null packet removed in IncomingPacketHandler() from " + ConnectionInfo + ". 2"); //LastTrafficTime = DateTime.Now; } else { //if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Trace(" ... " + totalBytesRead.ToString() + " bytes added to packetBuilder for connection with " + ConnectionInfo + ". Cached " + packetBuilder.TotalBytesCached.ToString() + "B, expecting " + packetBuilder.TotalBytesExpected.ToString() + "B."); packetBuilder.AddPartialPacket(totalBytesRead, dataBuffer); dataAvailable = netStream.DataAvailable; } } else break; } #endif } } if (packetBuilder.TotalBytesCached > 0 && packetBuilder.TotalBytesCached >= packetBuilder.TotalBytesExpected) { //Once we think we might have enough data we call the incoming packet handle handoff //Should we have a complete packet this method will start the appriate task //This method will now clear byes from the incoming packets if we have received something complete. IncomingPacketHandleHandOff(packetBuilder); } if (totalBytesRead == 0 && (!dataAvailable || ConnectionInfo.ConnectionState == ConnectionState.Shutdown)) CloseConnection(false, -2); else { //We need a buffer for our incoming data //First we try to reuse a previous buffer if (packetBuilder.TotalPartialPacketCount > 0 && packetBuilder.NumUnusedBytesMostRecentPartialPacket() > 0) dataBuffer = packetBuilder.RemoveMostRecentPartialPacket(ref totalBytesRead); else { //If we have nothing to reuse we allocate a new buffer dataBuffer = new byte[NetworkComms.ReceiveBufferSizeBytes]; totalBytesRead = 0; } #if WINDOWS_PHONE stream.BeginRead(dataBuffer, totalBytesRead, dataBuffer.Length - totalBytesRead, IncomingTCPPacketHandler, stream); #else netStream.BeginRead(dataBuffer, totalBytesRead, dataBuffer.Length - totalBytesRead, IncomingTCPPacketHandler, netStream); #endif } } catch (IOException) { CloseConnection(true, 12); } catch (ObjectDisposedException) { CloseConnection(true, 13); } catch (SocketException) { CloseConnection(true, 14); } catch (InvalidOperationException) { CloseConnection(true, 15); } catch (Exception ex) { NetworkComms.LogError(ex, "Error_TCPConnectionIncomingPacketHandler"); CloseConnection(true, 31); } #if !WINDOWS_PHONE Thread.CurrentThread.Priority = ThreadPriority.Normal; #endif } #if !WINDOWS_PHONE /// <summary> /// Synchronous incoming connection data worker /// </summary> void IncomingTCPDataSyncWorker() { bool dataAvailable = false; try { while (true) { if (ConnectionInfo.ConnectionState == ConnectionState.Shutdown) break; int bufferOffset = 0; //We need a buffer for our incoming data //First we try to reuse a previous buffer if (packetBuilder.TotalPartialPacketCount > 0 && packetBuilder.NumUnusedBytesMostRecentPartialPacket() > 0) dataBuffer = packetBuilder.RemoveMostRecentPartialPacket(ref bufferOffset); else //If we have nothing to reuse we allocate a new buffer dataBuffer = new byte[NetworkComms.ReceiveBufferSizeBytes]; //We block here until there is data to read //When we read data we read until method returns or we fill the buffer length totalBytesRead = tcpClientNetworkStream.Read(dataBuffer, bufferOffset, dataBuffer.Length - bufferOffset) + bufferOffset; //Check to see if there is more data ready to be read dataAvailable = tcpClientNetworkStream.DataAvailable; //If we read any data it gets handed off to the packetBuilder if (totalBytesRead > 0) { ConnectionInfo.UpdateLastTrafficTime(); //If we have read a single byte which is 0 and we are not expecting other data if (totalBytesRead == 1 && dataBuffer[0] == 0 && packetBuilder.TotalBytesExpected - packetBuilder.TotalBytesCached == 0) { if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Trace(" ... null packet removed in IncomingDataSyncWorker() from "+ConnectionInfo+"."); } else { if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Trace(" ... " + totalBytesRead.ToString() + " bytes added to packetBuilder for connection with " + ConnectionInfo + ". Cached " + packetBuilder.TotalBytesCached.ToString() + "B, expecting " + packetBuilder.TotalBytesExpected.ToString() + "B."); packetBuilder.AddPartialPacket(totalBytesRead, dataBuffer); } } else if (totalBytesRead == 0 && (!dataAvailable || ConnectionInfo.ConnectionState == ConnectionState.Shutdown)) { //If we read 0 bytes and there is no data available we should be shutting down CloseConnection(false, -10); break; } //If we have read some data and we have more or equal what was expected we attempt a data handoff if (packetBuilder.TotalBytesCached > 0 && packetBuilder.TotalBytesCached >= packetBuilder.TotalBytesExpected) IncomingPacketHandleHandOff(packetBuilder); } } //On any error here we close the connection catch (NullReferenceException) { CloseConnection(true, 7); } catch (IOException) { CloseConnection(true, 8); } catch (ObjectDisposedException) { CloseConnection(true, 9); } catch (SocketException) { CloseConnection(true, 10); } catch (InvalidOperationException) { CloseConnection(true, 11); } catch (Exception ex) { NetworkComms.LogError(ex, "Error_TCPConnectionIncomingPacketHandler"); CloseConnection(true, 39); } //Clear the listen thread object because the thread is about to end incomingDataListenThread = null; if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Trace("Incoming data listen thread ending for " + ConnectionInfo); } #endif /// <summary> /// Closes the <see cref="TCPConnection"/> /// </summary> /// <param name="closeDueToError">Closing a connection due an error possibly requires a few extra steps.</param> /// <param name="logLocation">Optional debug parameter.</param> protected override void CloseConnectionSpecific(bool closeDueToError, int logLocation = 0) { #if WINDOWS_PHONE //Try to close the socket try { socket.Dispose(); } catch (Exception) { } #else //The following attempts to correctly close the connection //Try to close the networkStream first try { if (tcpClientNetworkStream != null) tcpClientNetworkStream.Close(); } catch (Exception) { } finally { tcpClientNetworkStream = null; } //Try to close the tcpClient try { tcpClient.Client.Disconnect(false); tcpClient.Client.Close(); } catch (Exception) { } //Try to close the tcpClient try { tcpClient.Close(); } catch (Exception) { } #endif } /// <summary> /// Sends the provided packet to the remote end point /// </summary> /// <param name="packet">Packet to send</param> protected override void SendPacketSpecific(Packet packet) { //To keep memory copies to a minimum we send the header and payload in two calls to networkStream.Write byte[] headerBytes = packet.SerialiseHeader(NetworkComms.InternalFixedSendReceiveOptions); double maxSendTimePerKB = double.MaxValue; if (!NetworkComms.DisableConnectionSendTimeouts) { if (SendTimesMSPerKBCache.Count > MinNumSendsBeforeConnectionSpecificSendTimeout) maxSendTimePerKB = Math.Max(MinimumMSPerKBSendTimeout, SendTimesMSPerKBCache.CalculateMean() + NumberOfStDeviationsForWriteTimeout * SendTimesMSPerKBCache.CalculateStdDeviation()); else maxSendTimePerKB = DefaultMSPerKBSendTimeout; } if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Debug("Sending a packet of type ‘" + packet.PacketHeader.PacketType + "‘ to " + ConnectionInfo + " containing " + headerBytes.Length.ToString() + " header bytes and " + packet.PacketData.Length.ToString() + " payload bytes. Allowing " + maxSendTimePerKB.ToString("0.0##") + " ms/KB for send."); DateTime startTime = DateTime.Now; Stream sendingStream; #if WINDOWS_PHONE sendingStream = socket.OutputStream.AsStreamForWrite(); #else sendingStream = tcpClientNetworkStream; #endif double headerWriteTime = StreamWriteWithTimeout.Write(headerBytes, headerBytes.Length, sendingStream, NetworkComms.SendBufferSizeBytes, maxSendTimePerKB, MinSendTimeoutMS); double dataWriteTime = 0; if (packet.PacketData.Length > 0) dataWriteTime = packet.PacketData.ThreadSafeStream.CopyTo(sendingStream, packet.PacketData.Start, packet.PacketData.Length, NetworkComms.SendBufferSizeBytes, maxSendTimePerKB, MinSendTimeoutMS); #if WINDOWS_PHONE sendingStream.Flush(); #endif //We record each send independantly as if one is considerably larger than //the other it will provide a much more reliable rate SendTimesMSPerKBCache.AddValue(headerWriteTime, headerBytes.Length); SendTimesMSPerKBCache.AddValue(dataWriteTime, packet.PacketData.Length); SendTimesMSPerKBCache.TrimList(MaxNumSendTimes); //Correctly dispose the stream if we are finished with it if (packet.PacketData.ThreadSafeStream.CloseStreamAfterSend) packet.PacketData.ThreadSafeStream.Close(); if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Trace(" ... " + ((headerBytes.Length + packet.PacketData.Length)/1024.0).ToString("0.000") + "KB written to TCP netstream at average of " + (((headerBytes.Length + packet.PacketData.Length) / 1024.0) / (DateTime.Now - startTime).TotalSeconds).ToString("0.000") + "KB/s. Current:" + ((headerWriteTime + dataWriteTime)/2).ToString("0.00") + " ms/KB, AVG:" + SendTimesMSPerKBCache.CalculateMean().ToString("0.00")+ " ms/KB."); #if !WINDOWS_PHONE if (!tcpClient.Connected) { if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Error("TCPClient is not marked as connected after write to networkStream. Possibly indicates a dropped connection."); throw new CommunicationException("TCPClient is not marked as connected after write to networkStream. Possibly indicates a dropped connection."); } #endif } /// <summary> /// Send a null packet (1 byte) to the remotEndPoint. Helps keep the TCP connection alive while ensuring the bandwidth usage is an absolute minimum. If an exception is thrown the connection will be closed. /// </summary> protected override void SendNullPacket() { try { //Only once the connection has been established do we send null packets if (ConnectionInfo.ConnectionState == ConnectionState.Established) { //Multiple threads may try to send packets at the same time so we need this lock to prevent a thread cross talk lock (sendLocker) { if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Trace("Sending null packet to " + ConnectionInfo); //Send a single 0 byte double maxSendTimePerKB = double.MaxValue; if (!NetworkComms.DisableConnectionSendTimeouts) { if (SendTimesMSPerKBCache.Count > MinNumSendsBeforeConnectionSpecificSendTimeout) maxSendTimePerKB = Math.Max(MinimumMSPerKBSendTimeout, SendTimesMSPerKBCache.CalculateMean() + NumberOfStDeviationsForWriteTimeout * SendTimesMSPerKBCache.CalculateStdDeviation()); else maxSendTimePerKB = DefaultMSPerKBSendTimeout; } #if WINDOWS_PHONE var stream = socket.OutputStream.AsStreamForWrite(); StreamWriteWithTimeout.Write(new byte[] { 0 }, 1, stream, 1, maxSendTimePerKB, MinSendTimeoutMS); stream.Flush(); #else StreamWriteWithTimeout.Write(new byte[] { 0 }, 1, tcpClientNetworkStream, 1, maxSendTimePerKB, MinSendTimeoutMS); #endif //Update the traffic time after we have written to netStream ConnectionInfo.UpdateLastTrafficTime(); } } //If the connection is shutdown we should call close if (ConnectionInfo.ConnectionState == ConnectionState.Shutdown) CloseConnection(false, -8); } catch (Exception) { CloseConnection(true, 19); } } } }
// Copyright 2011-2013 Marc Fletcher, Matthew Dean // // This program is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // // This program is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // // You should have received a copy of the GNU General Public License // along with this program. If not, see <http://www.gnu.org/licenses/>. // // A commercial license of this software can also be purchased. // Please see <http://www.networkcomms.net/licensing/> for details. using System; using System.Collections.Generic; using System.Text; using System.Net.Sockets; using System.Threading; using System.Net; using System.IO; using DPSBase; #if WINDOWS_PHONE using Windows.Networking.Sockets; #endif namespace NetworkCommsDotNet { /// <summary> /// A connection object which utilises <see href="http://en.wikipedia.org/wiki/Transmission_Control_Protocol">TCP</see> to communicate between peers. /// </summary> public sealed partial class TCPConnection : Connection { static object staticTCPConnectionLocker = new object(); #if WINDOWS_PHONE static Dictionary<IPEndPoint, StreamSocketListener> tcpListenerDict = new Dictionary<IPEndPoint, StreamSocketListener>(); #else static volatile bool shutdownIncomingConnectionWorkerThread = false; static Thread newIncomingConnectionWorker; static Dictionary<IPEndPoint, TcpListener> tcpListenerDict = new Dictionary<IPEndPoint, TcpListener>(); #endif /// <summary> /// By default usage of <see href="http://en.wikipedia.org/wiki/Nagle‘s_algorithm">Nagle‘s algorithm</see> during TCP exchanges is disabled for performance reasons. If you wish it to be used for newly established connections set this property to true. /// </summary> public static bool EnableNagleAlgorithmForNewConnections { get; set; } /// <summary> /// Accept new incoming TCP connections on all allowed IP‘s and Port‘s /// </summary> /// <param name="useRandomPortFailOver">If true and the default local port is not available will select one at random. If false and a port is unavailable listening will not be enabled on that adaptor</param> public static void StartListening(bool useRandomPortFailOver = false) { List<IPAddress> localIPs = NetworkComms.AllAllowedIPs(); if (NetworkComms.ListenOnAllAllowedInterfaces) { try { foreach (IPAddress ip in localIPs) { try { StartListening(new IPEndPoint(ip, NetworkComms.DefaultListenPort), useRandomPortFailOver); } catch (CommsSetupShutdownException) { } } } catch (Exception) { //If there is an exception here we remove any added listeners and then rethrow Shutdown(); throw; } } else StartListening(new IPEndPoint(localIPs[0], NetworkComms.DefaultListenPort), useRandomPortFailOver); } /// <summary> /// Accept new incoming TCP connections on specified <see cref="IPEndPoint"/> /// </summary> /// <param name="newLocalEndPoint">The localEndPoint to listen for connections on.</param> /// <param name="useRandomPortFailOver">If true and the requested local port is not available will select one at random. If false and a port is unavailable will throw <see cref="CommsSetupShutdownException"/></param> public static void StartListening(IPEndPoint newLocalEndPoint, bool useRandomPortFailOver = true) { lock (staticTCPConnectionLocker) { //If as listener is already added there is not need to continue if (tcpListenerDict.ContainsKey(newLocalEndPoint)) return; #if WINDOWS_PHONE StreamSocketListener newListenerInstance = new StreamSocketListener(); newListenerInstance.ConnectionReceived += newListenerInstance_ConnectionReceived; #else TcpListener newListenerInstance; #endif try { #if WINDOWS_PHONE newListenerInstance.BindEndpointAsync(new Windows.Networking.HostName(newLocalEndPoint.Address.ToString()), newLocalEndPoint.Port.ToString()).AsTask().Wait(); #else newListenerInstance = new TcpListener(newLocalEndPoint.Address, newLocalEndPoint.Port); newListenerInstance.Start(); #endif } catch (SocketException) { //If the port we wanted is not available if (useRandomPortFailOver) { try { #if WINDOWS_PHONE newListenerInstance.BindEndpointAsync(new Windows.Networking.HostName(newLocalEndPoint.Address.ToString()), "").AsTask().Wait(); #else newListenerInstance = new TcpListener(newLocalEndPoint.Address, 0); newListenerInstance.Start(); #endif } catch (SocketException) { //If we get another socket exception this appears to be a bad IP. We will just ignore this IP if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Error("It was not possible to open a random port on " + newLocalEndPoint.Address + ". This endPoint may not support listening or possibly try again using a different port."); throw new CommsSetupShutdownException("It was not possible to open a random port on " + newLocalEndPoint.Address + ". This endPoint may not support listening or possibly try again using a different port."); } } else { if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Error("It was not possible to open port #" + newLocalEndPoint.Port.ToString() + " on " + newLocalEndPoint.Address + ". This endPoint may not support listening or possibly try again using a different port."); throw new CommsSetupShutdownException("It was not possible to open port #" + newLocalEndPoint.Port.ToString() + " on " + newLocalEndPoint.Address + ". This endPoint may not support listening or possibly try again using a different port."); } } #if WINDOWS_PHONE IPEndPoint ipEndPointUsed = new IPEndPoint(newLocalEndPoint.Address, int.Parse(newListenerInstance.Information.LocalPort)); #else IPEndPoint ipEndPointUsed = (IPEndPoint)newListenerInstance.LocalEndpoint; #endif if (tcpListenerDict.ContainsKey(ipEndPointUsed)) throw new CommsSetupShutdownException("Unable to add new TCP listenerInstance to tcpListenerDict as there is an existing entry."); else { //If we were succesfull we can add the new localEndPoint to our dict tcpListenerDict.Add(ipEndPointUsed, newListenerInstance); if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Info("Added new TCP localEndPoint - " + ipEndPointUsed.Address + ":" + ipEndPointUsed.Port.ToString()); } } #if !WINDOWS_PHONE TriggerIncomingConnectionWorkerThread(); #endif } /// <summary> /// Accept new TCP connections on specified list of <see cref="IPEndPoint"/> /// </summary> /// <param name="localEndPoints">The localEndPoints to listen for connections on</param> /// <param name="useRandomPortFailOver">If true and the requested local port is not available on a given IPEndPoint will select one at random. If false and a port is unavailable will throw <see cref="CommsSetupShutdownException"/></param> public static void StartListening(List<IPEndPoint> localEndPoints, bool useRandomPortFailOver = true) { if (localEndPoints == null) throw new ArgumentNullException("localEndPoints", "Provided List<IPEndPoint> cannot be null."); try { foreach (var endPoint in localEndPoints) StartListening(endPoint, useRandomPortFailOver); } catch (Exception) { //If there is an exception here we remove any added listeners and then rethrow Shutdown(); throw; } } /// <summary> /// Returns a list of <see cref="IPEndPoint"/> corresponding to all current TCP local listeners /// </summary> /// <returns>List of <see cref="IPEndPoint"/> corresponding to all current TCP local listeners</returns> public static List<IPEndPoint> ExistingLocalListenEndPoints() { lock (staticTCPConnectionLocker) { List<IPEndPoint> res = new List<IPEndPoint>(); foreach (var pair in tcpListenerDict) res.Add(pair.Key); return res; } } /// <summary> /// Returns a list of <see cref="IPEndPoint"/> corresponding to a possible local listeners on the provided <see cref="IPAddress"/>. If not listening on provided <see cref="IPAddress"/> returns empty list. /// </summary> /// <param name="ipAddress">The <see cref="IPAddress"/> to match to a possible local listener</param> /// <returns>If listener exists returns <see cref="IPAddress"/> otherwise null</returns> public static List<IPEndPoint> ExistingLocalListenEndPoints(IPAddress ipAddress) { List<IPEndPoint> returnList = new List<IPEndPoint>(); lock (staticTCPConnectionLocker) { foreach (var pair in tcpListenerDict) if (pair.Key.Address.Equals(ipAddress)) returnList.Add(pair.Key); } return returnList; } /// <summary> /// Returns true if listening for new TCP connections. /// </summary> /// <returns>True if listening for new TCP connections.</returns> public static bool Listening() { lock (staticTCPConnectionLocker) return tcpListenerDict.Count > 0; } #if WINDOWS_PHONE private static void newListenerInstance_ConnectionReceived(StreamSocketListener sender, StreamSocketListenerConnectionReceivedEventArgs args) { try { var newConnectionInfo = new ConnectionInfo(true, ConnectionType.TCP, new IPEndPoint(IPAddress.Parse(args.Socket.Information.RemoteAddress.DisplayName.ToString()), int.Parse(args.Socket.Information.RemotePort))); TCPConnection.GetConnection(newConnectionInfo, NetworkComms.DefaultSendReceiveOptions, args.Socket, true); } catch (ConfirmationTimeoutException) { //If this exception gets thrown its generally just a client closing a connection almost immediately after creation } catch (CommunicationException) { //If this exception gets thrown its generally just a client closing a connection almost immediately after creation } catch (ConnectionSetupException) { //If we are the server end and we did not pick the incoming connection up then tooo bad! } catch (SocketException) { //If this exception gets thrown its generally just a client closing a connection almost immediately after creation } catch (Exception ex) { //For some odd reason SocketExceptions don‘t always get caught above, so another check if (ex.GetBaseException().GetType() != typeof(SocketException)) { //Can we catch the socketException by looking at the string error text? if (ex.ToString().StartsWith("System.Net.Sockets.SocketException")) NetworkComms.LogError(ex, "ConnectionSetupError_SE"); else NetworkComms.LogError(ex, "ConnectionSetupError"); } } } #else /// <summary> /// Start the IncomingConnectionWorker if required /// </summary> private static void TriggerIncomingConnectionWorkerThread() { lock (staticTCPConnectionLocker) { if (!NetworkComms.commsShutdown && (newIncomingConnectionWorker == null || newIncomingConnectionWorker.ThreadState == ThreadState.Stopped)) { newIncomingConnectionWorker = new Thread(IncomingConnectionWorker); newIncomingConnectionWorker.Name = "TCPNewConnectionWorker"; newIncomingConnectionWorker.Start(); } } } /// <summary> /// Picks up any new incoming connections /// </summary> private static void IncomingConnectionWorker() { if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Info("TCP IncomingConnectionWorker thread started."); try { while (!shutdownIncomingConnectionWorkerThread) { try { bool pickedUpNewConnection = false; List<TcpListener> currentTCPListeners = new List<TcpListener>(); lock (staticTCPConnectionLocker) { foreach (var pair in tcpListenerDict) currentTCPListeners.Add(pair.Value); } foreach (var listener in currentTCPListeners) { if (listener.Pending() && !shutdownIncomingConnectionWorkerThread) { pickedUpNewConnection = true; //Pick up the new connection TcpClient newClient = listener.AcceptTcpClient(); //Perform the establish in a task so that we can continue picking up new connections here ThreadPool.QueueUserWorkItem(new WaitCallback((obj) => { #region Pickup The New Connection try { GetConnection(new ConnectionInfo(true, ConnectionType.TCP, (IPEndPoint)newClient.Client.RemoteEndPoint), NetworkComms.DefaultSendReceiveOptions, newClient, true); } catch (ConfirmationTimeoutException) { //If this exception gets thrown its generally just a client closing a connection almost immediately after creation } catch (CommunicationException) { //If this exception gets thrown its generally just a client closing a connection almost immediately after creation } catch (ConnectionSetupException) { //If we are the server end and we did not pick the incoming connection up then tooo bad! } catch (SocketException) { //If this exception gets thrown its generally just a client closing a connection almost immediately after creation } catch (Exception ex) { //For some odd reason SocketExceptions don‘t always get caught above, so another check if (ex.GetBaseException().GetType() != typeof(SocketException)) { //Can we catch the socketException by looking at the string error text? if (ex.ToString().StartsWith("System.Net.Sockets.SocketException")) NetworkComms.LogError(ex, "ConnectionSetupError_SE"); else NetworkComms.LogError(ex, "ConnectionSetupError"); } } #endregion })); } } //We will only pause if we didnt get any new connections if (!pickedUpNewConnection && !shutdownIncomingConnectionWorkerThread) Thread.Sleep(100); } catch (ConfirmationTimeoutException) { //If this exception gets thrown its generally just a client closing a connection almost immediately after creation } catch (CommunicationException) { //If this exception gets thrown its generally just a client closing a connection almost immediately after creation } catch (ConnectionSetupException) { //If we are the server end and we did not pick the incoming connection up then tooo bad! } catch (SocketException) { //If this exception gets thrown its generally just a client closing a connection almost immediately after creation } catch (ObjectDisposedException) { //If this exception gets thrown its generally just a client closing a connection almost immediately after creation } catch (Exception ex) { //For some odd reason SocketExceptions don‘t always get caught above, so another check if (ex.GetBaseException().GetType() != typeof(SocketException)) { //Can we catch the socketException by looking at the string error text? if (ex.ToString().StartsWith("System.Net.Sockets.SocketException")) NetworkComms.LogError(ex, "CommsSetupError_SE"); else NetworkComms.LogError(ex, "CommsSetupError"); } } } } catch (Exception ex) { NetworkComms.LogError(ex, "CriticalCommsError"); } finally { //We try to close all of the tcpListeners CloseAndRemoveAllLocalConnectionListeners(); } //newIncomingListenThread = null; if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Info("TCP IncomingConnectionWorker thread ended."); } #endif /// <summary> /// Shutdown everything TCP related /// </summary> internal static void Shutdown(int threadShutdownTimeoutMS = 1000) { #if WINDOWS_PHONE try { CloseAndRemoveAllLocalConnectionListeners(); } catch (Exception ex) { NetworkComms.LogError(ex, "TCPCommsShutdownError"); } #else try { shutdownIncomingConnectionWorkerThread = true; CloseAndRemoveAllLocalConnectionListeners(); //If the worker thread does not shutdown in the required time we kill it if (newIncomingConnectionWorker != null && !newIncomingConnectionWorker.Join(threadShutdownTimeoutMS)) newIncomingConnectionWorker.Abort(); } catch (Exception ex) { NetworkComms.LogError(ex, "TCPCommsShutdownError"); } finally { shutdownIncomingConnectionWorkerThread = false; } #endif } /// <summary> /// Close down all local TCP listeners /// </summary> private static void CloseAndRemoveAllLocalConnectionListeners() { lock (staticTCPConnectionLocker) { try { foreach (var listener in tcpListenerDict.Values) { try { #if WINDOWS_PHONE if (listener != null) listener.Dispose(); #else if (listener != null) listener.Stop(); #endif } catch (Exception) { } } } catch (Exception) { } finally { //Once we have stopped all listeners we set the list to null incase we want to resart listening #if WINDOWS_PHONE tcpListenerDict = new Dictionary<IPEndPoint,StreamSocketListener>(); #else tcpListenerDict = new Dictionary<IPEndPoint, TcpListener>(); #endif } } } } }
NetworkComms静态类代码:
// Copyright 2011-2013 Marc Fletcher, Matthew Dean // // This program is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // // This program is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // // You should have received a copy of the GNU General Public License // along with this program. If not, see <http://www.gnu.org/licenses/>. // // A commercial license of this software can also be purchased. // Please see <http://www.networkcomms.net/licensing/> for details. using System; using System.Collections.Generic; using System.Text; using System.Net; using System.Threading; using System.Net.Sockets; using DPSBase; using System.Collections; using System.Net.NetworkInformation; using System.Diagnostics; using System.IO; #if !NO_LOGGING using NLog; using NLog.Config; #endif //Assembly marked as CLSCompliant [assembly: CLSCompliant(true)] namespace NetworkCommsDotNet { /// <summary> /// Top level interface for NetworkCommsDotNet library. Anything which is not connection specific generally happens within the NetworkComms class. e.g. Keeping track of all connections, global defaults and settings, serialisers and data processors etc. /// </summary> public static class NetworkComms { /// <summary> /// Static constructor which sets comm default values /// </summary> static NetworkComms() { //Generally comms defaults are defined here NetworkIdentifier = ShortGuid.NewGuid(); NetworkLoadUpdateWindowMS = 2000; InterfaceLinkSpeed = 95000000; DefaultListenPort = 10000; ListenOnAllAllowedInterfaces = true; CheckSumMismatchSentPacketCacheMaxByteLimit = 75000; MinimumSentPacketCacheTimeMinutes = 1; ConnectionEstablishTimeoutMS = 10000; PacketConfirmationTimeoutMS = 5000; ConnectionAliveTestTimeoutMS = 1000; #if SILVERLIGHT || WINDOWS_PHONE CurrentRuntimeEnvironment = RuntimeEnvironment.WindowsPhone_Silverlight; SendBufferSizeBytes = ReceiveBufferSizeBytes = 8000; #elif iOS CurrentRuntimeEnvironment = RuntimeEnvironment.Xamarin_iOS; SendBufferSizeBytes = ReceiveBufferSizeBytes = 8000; #elif ANDROID CurrentRuntimeEnvironment = RuntimeEnvironment.Xamarin_Android; SendBufferSizeBytes = ReceiveBufferSizeBytes = 8000; #elif NET2 if (Type.GetType("Mono.Runtime") != null) { CurrentRuntimeEnvironment = RuntimeEnvironment.Mono_Net2; //Mono send buffer smaller as different large object heap limit SendBufferSizeBytes = ReceiveBufferSizeBytes = 8000; } else { CurrentRuntimeEnvironment = RuntimeEnvironment.Native_Net2; SendBufferSizeBytes = ReceiveBufferSizeBytes = 80000; } #elif NET35 if (Type.GetType("Mono.Runtime") != null) { CurrentRuntimeEnvironment = RuntimeEnvironment.Mono_Net35; //Mono send buffer smaller as different large object heap limit SendBufferSizeBytes = ReceiveBufferSizeBytes = 8000; } else { CurrentRuntimeEnvironment = RuntimeEnvironment.Native_Net35; SendBufferSizeBytes = ReceiveBufferSizeBytes = 80000; } #else if (Type.GetType("Mono.Runtime") != null) { CurrentRuntimeEnvironment = RuntimeEnvironment.Mono_Net4; //Mono send buffer smaller as different large object heap limit SendBufferSizeBytes = ReceiveBufferSizeBytes = 8000; } else { CurrentRuntimeEnvironment = RuntimeEnvironment.Native_Net4; SendBufferSizeBytes = ReceiveBufferSizeBytes = 80000; } #endif //We want to instantiate our own thread pool here CommsThreadPool = new CommsThreadPool(1, Environment.ProcessorCount*2, Environment.ProcessorCount * 20, new TimeSpan(0, 0, 10)); //Initialise the core extensions DPSManager.AddDataSerializer<ProtobufSerializer>(); DPSManager.AddDataSerializer<NullSerializer>(); DPSManager.AddDataProcessor<SevenZipLZMACompressor.LZMACompressor>(); #if !FREETRIAL //Only the full version includes the encrypter DPSManager.AddDataProcessor<RijndaelPSKEncrypter>(); #endif #if !WINDOWS_PHONE DPSManager.AddDataSerializer<BinaryFormaterSerializer>(); #endif InternalFixedSendReceiveOptions = new SendReceiveOptions(DPSManager.GetDataSerializer<ProtobufSerializer>(), new List<DataProcessor>(), new Dictionary<string, string>()); DefaultSendReceiveOptions = new SendReceiveOptions(DPSManager.GetDataSerializer<ProtobufSerializer>(), new List<DataProcessor>() { DPSManager.GetDataProcessor<SevenZipLZMACompressor.LZMACompressor>() }, new Dictionary<string, string>()); } #region Local Host Information /// <summary> /// Returns the current machine hostname /// </summary> public static string HostName { get { #if WINDOWS_PHONE return Windows.Networking.Connectivity.NetworkInformation.GetInternetConnectionProfile().ToString(); #else return Dns.GetHostName(); #endif } } /// <summary> /// If set NetworkCommsDotNet will only operate on matching IP Addresses. Also see <see cref="AllowedAdaptorNames"/>. /// Correct format is string[] { "192.168", "213.111.10" }. If multiple prefixes are provided the earlier prefix, if found, takes priority. /// </summary> public static string[] AllowedIPPrefixes { get; set; } /// <summary> /// If set NetworkCommsDotNet will only operate on specified adaptors. Correct format is string[] { "eth0", "en0", "wlan0" }. /// </summary> public static string[] AllowedAdaptorNames { get; set; } /// <summary> /// Returns all allowed local IP addresses. /// If <see cref="AllowedAdaptorNames"/> has been set only returns IP addresses corresponding with specified adaptors. /// If <see cref="AllowedIPPrefixes"/> has been set only returns matching addresses ordered in descending preference. i.e. Most preffered at [0]. /// </summary> /// <returns></returns> public static List<IPAddress> AllAllowedIPs() { #if WINDOWS_PHONE //On windows phone we simply ignore ip addresses from the autoassigned range as well as those without a valid prefix List<IPAddress> allowedIPs = new List<IPAddress>(); foreach (var hName in Windows.Networking.Connectivity.NetworkInformation.GetHostNames()) { if (!hName.DisplayName.StartsWith("169.254")) { if (AllowedIPPrefixes != null) { bool valid = false; for (int i = 0; i < AllowedIPPrefixes.Length; i++) valid |= hName.DisplayName.StartsWith(AllowedIPPrefixes[i]); if(valid) allowedIPs.Add(IPAddress.Parse(hName.DisplayName)); } else allowedIPs.Add(IPAddress.Parse(hName.DisplayName)); } } return allowedIPs; #else //We want to ignore IP‘s that have been autoassigned //169.254.0.0 IPAddress autoAssignSubnetv4 = new IPAddress(new byte[] { 169, 254, 0, 0 }); //255.255.0.0 IPAddress autoAssignSubnetMaskv4 = new IPAddress(new byte[] { 255, 255, 0, 0 }); List<IPAddress> validIPAddresses = new List<IPAddress>(); IPComparer comparer = new IPComparer(); #if ANDROID var iFaces = Java.Net.NetworkInterface.NetworkInterfaces; while (iFaces.HasMoreElements) { bool interfaceValid = false; var iFace = iFaces.NextElement() as Java.Net.NetworkInterface; var javaAddresses = iFace.InetAddresses; while (javaAddresses.HasMoreElements) { var javaAddress = javaAddresses.NextElement() as Java.Net.InetAddress; IPAddress address = default(IPAddress); if (IPAddress.TryParse(javaAddress.HostAddress, out address)) { if (address.AddressFamily == AddressFamily.InterNetwork || address.AddressFamily == AddressFamily.InterNetworkV6) { if (AllowedAdaptorNames != null) { foreach (var id in AllowedAdaptorNames) if (id == iFace.Name) { interfaceValid = true; break; } } else interfaceValid = true; if (interfaceValid) break; } } } if (!interfaceValid) continue; javaAddresses = iFace.InetAddresses; while (javaAddresses.HasMoreElements) { var javaAddress = javaAddresses.NextElement() as Java.Net.InetAddress; IPAddress address = default(IPAddress); if (IPAddress.TryParse(javaAddress.HostAddress, out address)) { if (address.AddressFamily == AddressFamily.InterNetwork || address.AddressFamily == AddressFamily.InterNetworkV6) { if (!IsAddressInSubnet(address, autoAssignSubnetv4, autoAssignSubnetMaskv4)) { bool allowed = false; if (AllowedAdaptorNames != null) { foreach (var id in AllowedAdaptorNames) { if (id == iFace.Name) { allowed = true; break; } } } else allowed = true; if (!allowed) continue; allowed = false; if (AllowedIPPrefixes != null) { foreach (var ip in AllowedIPPrefixes) { if (comparer.Equals(address.ToString(), ip)) { allowed = true; break; } } } else allowed = true; if (!allowed) continue; if (address != IPAddress.None) validIPAddresses.Add(address); } } } } } #else foreach (var iFace in NetworkInterface.GetAllNetworkInterfaces()) { bool interfaceValid = false; var unicastAddresses = iFace.GetIPProperties().UnicastAddresses; foreach (var address in unicastAddresses) { if (address.Address.AddressFamily == AddressFamily.InterNetwork || address.Address.AddressFamily == AddressFamily.InterNetworkV6) { if (AllowedAdaptorNames != null) { foreach (var id in AllowedAdaptorNames) if (iFace.Id == id) { interfaceValid = true; break; } } else interfaceValid = true; if (interfaceValid) break; } } if (!interfaceValid) continue; foreach (var address in unicastAddresses) { var addressInformation = address.Address; if (addressInformation.AddressFamily == AddressFamily.InterNetwork || addressInformation.AddressFamily == AddressFamily.InterNetworkV6) { if (!IsAddressInSubnet(addressInformation, autoAssignSubnetv4, autoAssignSubnetMaskv4)) { bool allowed = false; if (AllowedAdaptorNames != null) { foreach (var id in AllowedAdaptorNames) { if(id == iFace.Id) { allowed = true; break; } } } else allowed = true; if (!allowed) continue; allowed = false; if (AllowedIPPrefixes != null) { foreach (var ip in AllowedIPPrefixes) { if (comparer.Equals(addressInformation.ToString(), ip)) { allowed = true; break; } } } else allowed = true; if (!allowed) continue; if (addressInformation != IPAddress.None) validIPAddresses.Add(addressInformation); } } } } #endif if (AllowedIPPrefixes != null) { validIPAddresses.Sort((a, b) => { for (int i = 0; i < AllowedIPPrefixes.Length; i++) { if (a.ToString().StartsWith(AllowedIPPrefixes[i])) { if (b.ToString().StartsWith(AllowedIPPrefixes[i])) return 0; else return -1; } else if (b.ToString().StartsWith(AllowedIPPrefixes[i])) return 1; } return 0; }); } return validIPAddresses; #endif } /// <summary> /// Custom comparer for IP addresses. Used by <see cref="AllAllowedIPs"/> /// </summary> class IPComparer : IEqualityComparer<string> { // Products are equal if their names and product numbers are equal. public bool Equals(string x, string y) { //Check whether the compared objects reference the same data. if (Object.ReferenceEquals(x, y)) return true; //Check whether any of the compared objects is null. if (Object.ReferenceEquals(x, null) || Object.ReferenceEquals(y, null)) return false; return (y.StartsWith(x) || x.StartsWith(y)); } // If Equals() returns true for a pair of objects // then GetHashCode() must return the same value for these objects. public int GetHashCode(string ipAddress) { return ipAddress.GetHashCode(); } } /// <summary> /// Returns true if the provided address exists within the provided subnet. /// </summary> /// <param name="address">The address to check, i.e. 192.168.0.10</param> /// <param name="subnet">The subnet, i.e. 192.168.0.0</param> /// <param name="mask">The subnet mask, i.e. 255.255.255.0</param> /// <returns>True if address is in the provided subnet</returns> public static bool IsAddressInSubnet(IPAddress address, IPAddress subnet, IPAddress mask) { if (address == null) throw new ArgumentNullException("address", "Provided IPAddress cannot be null."); if (subnet == null) throw new ArgumentNullException("subnet", "Provided IPAddress cannot be null."); if (mask == null) throw new ArgumentNullException("mask", "Provided IPAddress cannot be null."); //Catch for IPv6 if (subnet.AddressFamily == AddressFamily.InterNetworkV6 || mask.AddressFamily == AddressFamily.InterNetworkV6) throw new NotImplementedException("This method does not yet support IPv6. Please contact NetworkComms.Net support if you would like this functionality."); //If we have provided IPV4 subnets and masks and we have an ipv6 address then return false else if (address.AddressFamily == AddressFamily.InterNetworkV6) return false; byte[] addrBytes = address.GetAddressBytes(); byte[] maskBytes = mask.GetAddressBytes(); byte[] maskedAddressBytes = new byte[addrBytes.Length]; //Catch for IPv6 if (maskBytes.Length < maskedAddressBytes.Length) return false; for (int i = 0; i < maskedAddressBytes.Length; ++i) maskedAddressBytes[i] = (byte)(addrBytes[i] & maskBytes[i]); IPAddress maskedAddress = new IPAddress(maskedAddressBytes); bool equal = subnet.Equals(maskedAddress); return equal; } /// <summary> /// The default port NetworkCommsDotNet will operate on /// </summary> public static int DefaultListenPort { get; set; } /// <summary> /// The local identifier for this instance of NetworkCommsDotNet. This is an application unique identifier. /// </summary> public static ShortGuid NetworkIdentifier { get; private set; } /// <summary> /// The current runtime environment. Detected automatically on startup. Performance may be adversly affected if this is changed. /// </summary> public static RuntimeEnvironment CurrentRuntimeEnvironment { get; set; } /// <summary> /// An internal random object /// </summary> internal static Random randomGen = new Random(); /// <summary> /// A single boolean used to control a NetworkCommsDotNet shutdown /// </summary> internal static volatile bool commsShutdown; /// <summary> /// A running total of the number of packets sent on all connections. Used to initialise packet sequence counters to ensure duplicates can not occur. /// </summary> internal static long totalPacketSendCount; /// <summary> /// The number of millisconds over which to take an instance load (CurrentNetworkLoad) to be used in averaged values (AverageNetworkLoad). /// Default is 2000ms. Shorter values can be used but less than 200ms may cause significant errors in the value of returned value, especially in mono environments. /// </summary> public static int NetworkLoadUpdateWindowMS { get; set; } private static double currentNetworkLoadIncoming; private static double currentNetworkLoadOutgoing; #if !WINDOWS_PHONE && !ANDROID private static Thread NetworkLoadThread = null; private static CommsMath currentNetworkLoadValuesIncoming; private static CommsMath currentNetworkLoadValuesOutgoing; private static ManualResetEvent NetworkLoadThreadWait; #endif /// <summary> /// The interface link speed in bits/sec used for network load calculations. Default is 100Mb/sec /// </summary> public static long InterfaceLinkSpeed { get; set; } /// <summary> /// Returns the current instance network usage, as a value between 0 and 1. Returns the largest value for any available network adaptor. Triggers load analysis upon first call. /// </summary> public static double CurrentNetworkLoadIncoming { get { #if !WINDOWS_PHONE && !ANDROID //We start the load thread when we first access the network load //this helps cut down on uncessary threads if unrequired if (!commsShutdown && NetworkLoadThread == null) { lock (globalDictAndDelegateLocker) { if (!commsShutdown && NetworkLoadThread == null) { currentNetworkLoadValuesIncoming = new CommsMath(); currentNetworkLoadValuesOutgoing = new CommsMath(); NetworkLoadThread = new Thread(NetworkLoadWorker); NetworkLoadThread.Name = "NetworkLoadThread"; NetworkLoadThread.Start(); } } } #endif return currentNetworkLoadIncoming; } private set { currentNetworkLoadIncoming = value; } } /// <summary> /// Returns the current instance network usage, as a value between 0 and 1. Returns the largest value for any available network adaptor. Triggers load analysis upon first call. /// </summary> public static double CurrentNetworkLoadOutgoing { get { #if !WINDOWS_PHONE && !ANDROID //We start the load thread when we first access the network load //this helps cut down on uncessary threads if unrequired if (!commsShutdown && NetworkLoadThread == null) { lock (globalDictAndDelegateLocker) { if (!commsShutdown && NetworkLoadThread == null) { currentNetworkLoadValuesIncoming = new CommsMath(); currentNetworkLoadValuesOutgoing = new CommsMath(); NetworkLoadThread = new Thread(NetworkLoadWorker); NetworkLoadThread.Name = "NetworkLoadThread"; NetworkLoadThread.Start(); } } } #endif return currentNetworkLoadOutgoing; } private set { currentNetworkLoadOutgoing = value; } } /// <summary> /// Returns the averaged value of CurrentNetworkLoadIncoming, as a value between 0 and 1, for a time window of upto 254 seconds. Triggers load analysis upon first call. /// </summary> /// <param name="secondsToAverage">Number of seconds over which historial data should be used to arrive at an average</param> /// <returns>Average network load as a double between 0 and 1</returns> public static double AverageNetworkLoadIncoming(byte secondsToAverage) { #if !WINDOWS_PHONE && !ANDROID if (!commsShutdown && NetworkLoadThread == null) { lock (globalDictAndDelegateLocker) { if (!commsShutdown && NetworkLoadThread == null) { currentNetworkLoadValuesIncoming = new CommsMath(); currentNetworkLoadValuesOutgoing = new CommsMath(); NetworkLoadThread = new Thread(NetworkLoadWorker); NetworkLoadThread.Name = "NetworkLoadThread"; NetworkLoadThread.Start(); } } } return currentNetworkLoadValuesIncoming.CalculateMean((int)((secondsToAverage * 1000.0) / NetworkLoadUpdateWindowMS)); #else return 0; #endif } /// <summary> /// Returns the averaged value of CurrentNetworkLoadIncoming, as a value between 0 and 1, for a time window of upto 254 seconds. Triggers load analysis upon first call. /// </summary> /// <param name="secondsToAverage">Number of seconds over which historial data should be used to arrive at an average</param> /// <returns>Average network load as a double between 0 and 1</returns> public static double AverageNetworkLoadOutgoing(byte secondsToAverage) { #if !WINDOWS_PHONE && !ANDROID if (!commsShutdown && NetworkLoadThread == null) { lock (globalDictAndDelegateLocker) { if (!commsShutdown && NetworkLoadThread == null) { currentNetworkLoadValuesIncoming = new CommsMath(); currentNetworkLoadValuesOutgoing = new CommsMath(); NetworkLoadThread = new Thread(NetworkLoadWorker); NetworkLoadThread.Name = "NetworkLoadThread"; NetworkLoadThread.Start(); } } } return currentNetworkLoadValuesOutgoing.CalculateMean((int)((secondsToAverage * 1000.0) / NetworkLoadUpdateWindowMS)); #else return 0; #endif } /// <summary> /// Determines the most appropriate local end point to contact the provided remote end point. /// Testing shows this method takes on average 1.6ms to return. /// </summary> /// <param name="remoteIPEndPoint">The remote end point</param> /// <returns>The selected local end point</returns> public static IPEndPoint BestLocalEndPoint(IPEndPoint remoteIPEndPoint) { if (remoteIPEndPoint == null) throw new ArgumentNullException("remoteIPEndPoint", "Provided IPEndPoint cannot be null."); #if WINDOWS_PHONE var t = Windows.Networking.Sockets.DatagramSocket.GetEndpointPairsAsync(new Windows.Networking.HostName(remoteIPEndPoint.Address.ToString()), remoteIPEndPoint.Port.ToString()).AsTask(); if (t.Wait(20) && t.Result.Count > 0) { var enumerator = t.Result.GetEnumerator(); enumerator.MoveNext(); var endpointPair = enumerator.Current; return new IPEndPoint(IPAddress.Parse(endpointPair.LocalHostName.DisplayName.ToString()), int.Parse(endpointPair.LocalServiceName)); } else throw new ConnectionSetupException("Unable to determine correct local end point."); #else //We use UDP as its connectionless hence faster IPEndPoint result; using (Socket testSocket = new Socket(remoteIPEndPoint.AddressFamily, SocketType.Dgram, ProtocolType.Udp)) { testSocket.Connect(remoteIPEndPoint); result = (IPEndPoint)testSocket.LocalEndPoint; } return result; #endif } #if !WINDOWS_PHONE && !ANDROID /// <summary> /// Takes a network load snapshot (CurrentNetworkLoad) every NetworkLoadUpdateWindowMS /// </summary> private static void NetworkLoadWorker() { NetworkLoadThreadWait = new ManualResetEvent(false); //Get all interfaces NetworkInterface[] interfacesToUse = NetworkInterface.GetAllNetworkInterfaces(); long[] startSent, startReceived, endSent, endReceived; while (!commsShutdown) { try { //we need to look at the load across all adaptors, by default we will probably choose the adaptor with the highest usage DateTime startTime = DateTime.Now; IPv4InterfaceStatistics[] stats = new IPv4InterfaceStatistics[interfacesToUse.Length]; startSent = new long[interfacesToUse.Length]; startReceived = new long[interfacesToUse.Length]; for (int i = 0; i < interfacesToUse.Length; ++i) { stats[i] = interfacesToUse[i].GetIPv4Statistics(); startSent[i] = stats[i].BytesSent; startReceived[i] = stats[i].BytesReceived; } if (commsShutdown) return; //Thread.Sleep(NetworkLoadUpdateWindowMS); NetworkLoadThreadWait.WaitOne(NetworkLoadUpdateWindowMS); if (commsShutdown) return; stats = new IPv4InterfaceStatistics[interfacesToUse.Length]; endSent = new long[interfacesToUse.Length]; endReceived = new long[interfacesToUse.Length]; for (int i = 0; i < interfacesToUse.Length; ++i) { stats[i] = interfacesToUse[i].GetIPv4Statistics(); endSent[i] = stats[i].BytesSent; endReceived[i] = stats[i].BytesReceived; } DateTime endTime = DateTime.Now; List<double> outUsage = new List<double>(); List<double> inUsage = new List<double>(); for(int i=0; i<startSent.Length; i++) { outUsage.Add((double)(endSent[i] - startSent[i]) / ((double)(InterfaceLinkSpeed * (endTime - startTime).TotalMilliseconds) / 8000)); inUsage.Add((double)(endReceived[i] - startReceived[i]) / ((double)(InterfaceLinkSpeed * (endTime - startTime).TotalMilliseconds) / 8000)); } //double loadValue = Math.Max(outUsage.Max(), inUsage.Max()); double inMax = double.MinValue, outMax = double.MinValue; for (int i = 0; i < startSent.Length; ++i) { if (inUsage[i] > inMax) inMax = inUsage[i]; if (outUsage[i] > outMax) outMax = outUsage[i]; } //If either of the usage levels have gone above 2 it suggests we are most likely on a faster connection that we think //As such we will bump the interfacelinkspeed upto 1Gbps so that future load calcualtions more acurately reflect the //actual load. if (inMax > 2 || outMax > 2) InterfaceLinkSpeed = 950000000; //Limit to one CurrentNetworkLoadIncoming = (inMax > 1 ? 1 : inMax); CurrentNetworkLoadOutgoing = (outMax > 1 ? 1 : outMax); currentNetworkLoadValuesIncoming.AddValue(CurrentNetworkLoadIncoming); currentNetworkLoadValuesOutgoing.AddValue(CurrentNetworkLoadOutgoing); //We can only have upto 255 seconds worth of data in the average list int maxListSize = (int)(255000.0 / NetworkLoadUpdateWindowMS); currentNetworkLoadValuesIncoming.TrimList(maxListSize); currentNetworkLoadValuesOutgoing.TrimList(maxListSize); } catch (Exception ex) { LogError(ex, "NetworkLoadWorker"); //It may be the interfaces available to the OS have changed so we will reset them here interfacesToUse = NetworkInterface.GetAllNetworkInterfaces(); //If an error has happened we dont want to thrash the problem, we wait for 5 seconds and hope whatever was wrong goes away Thread.Sleep(5000); } } } #endif #endregion #region Established Connections /// <summary> /// Locker for connection dictionaries /// </summary> internal static object globalDictAndDelegateLocker = new object(); /// <summary> /// Primary connection dictionary stored by network indentifier /// </summary> internal static Dictionary<ShortGuid, Dictionary<ConnectionType, List<Connection>>> allConnectionsById = new Dictionary<ShortGuid, Dictionary<ConnectionType, List<Connection>>>(); /// <summary> /// Secondary connection dictionary stored by ip end point. Allows for quick cross referencing. /// </summary> internal static Dictionary<IPEndPoint, Dictionary<ConnectionType, Connection>> allConnectionsByEndPoint = new Dictionary<IPEndPoint, Dictionary<ConnectionType, Connection>>(); /// <summary> /// Old connection cache so that requests for connectionInfo can be returned even after a connection has been closed. /// </summary> internal static Dictionary<ShortGuid, Dictionary<ConnectionType, List<ConnectionInfo>>> oldNetworkIdentifierToConnectionInfo = new Dictionary<ShortGuid, Dictionary<ConnectionType, List<ConnectionInfo>>>(); #endregion #region Incoming Data and Connection Config /// <summary> /// Used for switching between async and sync connectionListen modes. Default is false. No noticable performance difference between the two modes. /// </summary> public static bool ConnectionListenModeUseSync { get; set; } /// <summary> /// Used for switching between listening on a single interface or multiple interfaces. Default is true. See <see cref="AllowedIPPrefixes"/> and <see cref="AllowedAdaptorNames"/> /// </summary> public static bool ListenOnAllAllowedInterfaces { get; set; } /// <summary> /// Receive data buffer size. Default is 80KB. CAUTION: Changing the default value can lead to performance degredation. /// </summary> public static int ReceiveBufferSizeBytes { get; set; } /// <summary> /// Send data buffer size. Default is 80KB. CAUTION: Changing the default value can lead to performance degredation. /// </summary> public static int SendBufferSizeBytes { get; set; } /// <summary> /// The threadpool used by networkComms.Net to execute incoming packet handlers. /// </summary> public static CommsThreadPool CommsThreadPool { get; set; } /// <summary> /// Once we have received all incoming data we handle it further. This is performed at the global level to help support different priorities. /// </summary> /// <param name="itemAsObj">Possible PriorityQueueItem. If null is provided an item will be removed from the global item queue</param> internal static void CompleteIncomingItemTask(object itemAsObj) { if (itemAsObj == null) throw new ArgumentNullException("itemAsObj", "Provided parameter itemAsObj cannot be null."); PriorityQueueItem item = null; try { //If the packetBytes are null we need to ask the incoming packet queue for what we should be running item = itemAsObj as PriorityQueueItem; if (item == null) throw new InvalidCastException("Cast from object to PriorityQueueItem resulted in null reference, unable to continue."); if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Trace("Handling a " + item.PacketHeader.PacketType + " packet from " + item.Connection.ConnectionInfo + " with a priority of " + item.Priority.ToString() + "."); #if !WINDOWS_PHONE if (Thread.CurrentThread.Priority != (ThreadPriority)item.Priority) Thread.CurrentThread.Priority = (ThreadPriority)item.Priority; #endif //Check for a shutdown connection if (item.Connection.ConnectionInfo.ConnectionState == ConnectionState.Shutdown) return; //We only look at the check sum if we want to and if it has been set by the remote end if (NetworkComms.EnablePacketCheckSumValidation && item.PacketHeader.ContainsOption(PacketHeaderStringItems.CheckSumHash)) { var packetHeaderHash = item.PacketHeader.GetOption(PacketHeaderStringItems.CheckSumHash); //Validate the checkSumhash of the data string packetDataSectionMD5 = NetworkComms.MD5Bytes(item.DataStream); if (packetHeaderHash != packetDataSectionMD5) { if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Warn(" ... corrupted packet detected, expected " + packetHeaderHash + " but received " + packetDataSectionMD5 + "."); //We have corruption on a resend request, something is very wrong so we throw an exception. if (item.PacketHeader.PacketType == Enum.GetName(typeof(ReservedPacketType), ReservedPacketType.CheckSumFailResend)) throw new CheckSumException("Corrupted md5CheckFailResend packet received."); if (item.PacketHeader.PayloadPacketSize < NetworkComms.CheckSumMismatchSentPacketCacheMaxByteLimit) { //Instead of throwing an exception we can request the packet to be resent Packet returnPacket = new Packet(Enum.GetName(typeof(ReservedPacketType), ReservedPacketType.CheckSumFailResend), packetHeaderHash, NetworkComms.InternalFixedSendReceiveOptions); item.Connection.SendPacket(returnPacket); //We need to wait for the packet to be resent before going further return; } else throw new CheckSumException("Corrupted packet detected from " + item.Connection.ConnectionInfo + ", expected " + packetHeaderHash + " but received " + packetDataSectionMD5 + "."); } } //Remote end may have requested packet receive confirmation so we send that now if (item.PacketHeader.ContainsOption(PacketHeaderStringItems.ReceiveConfirmationRequired)) { if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Trace(" ... sending requested receive confirmation packet."); var hash = item.PacketHeader.ContainsOption(PacketHeaderStringItems.CheckSumHash) ? item.PacketHeader.GetOption(PacketHeaderStringItems.CheckSumHash) : ""; Packet returnPacket = new Packet(Enum.GetName(typeof(ReservedPacketType), ReservedPacketType.Confirmation), hash, NetworkComms.InternalFixedSendReceiveOptions); item.Connection.SendPacket(returnPacket); } //We can now pass the data onto the correct delegate //First we have to check for our reserved packet types //The following large sections have been factored out to make reading and debugging a little easier if (item.PacketHeader.PacketType == Enum.GetName(typeof(ReservedPacketType), ReservedPacketType.CheckSumFailResend)) item.Connection.CheckSumFailResendHandler(item.DataStream); else if (item.PacketHeader.PacketType == Enum.GetName(typeof(ReservedPacketType), ReservedPacketType.ConnectionSetup)) item.Connection.ConnectionSetupHandler(item.DataStream); else if (item.PacketHeader.PacketType == Enum.GetName(typeof(ReservedPacketType), ReservedPacketType.AliveTestPacket) && (NetworkComms.InternalFixedSendReceiveOptions.DataSerializer.DeserialiseDataObject<byte[]>(item.DataStream, NetworkComms.InternalFixedSendReceiveOptions.DataProcessors, NetworkComms.InternalFixedSendReceiveOptions.Options))[0] == 0) { //If we have received a ping packet from the originating source we reply with true Packet returnPacket = new Packet(Enum.GetName(typeof(ReservedPacketType), ReservedPacketType.AliveTestPacket), new byte[1] { 1 }, NetworkComms.InternalFixedSendReceiveOptions); item.Connection.SendPacket(returnPacket); } //We allow users to add their own custom handlers for reserved packet types here //else if (true) { if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Trace("Triggering handlers for packet of type ‘" + item.PacketHeader.PacketType + "‘ from " + item.Connection.ConnectionInfo); //We trigger connection specific handlers first bool connectionSpecificHandlersTriggered = item.Connection.TriggerSpecificPacketHandlers(item.PacketHeader, item.DataStream, item.SendReceiveOptions); //We trigger global handlers second NetworkComms.TriggerGlobalPacketHandlers(item.PacketHeader, item.Connection, item.DataStream, item.SendReceiveOptions, connectionSpecificHandlersTriggered); //This is a really bad place to put a garbage collection, comment left in so that it doesn‘t get added again at some later date //We don‘t want the CPU to JUST be trying to garbage collect the WHOLE TIME //GC.Collect(); } } catch (CommunicationException) { if (item != null) { if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Fatal("A communcation exception occured in CompleteIncomingPacketWorker(), connection with " + item.Connection.ConnectionInfo + " be closed."); item.Connection.CloseConnection(true, 2); } } catch (DuplicateConnectionException ex) { if (item != null) { if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Warn(ex.Message != null ? ex.Message : "A possible duplicate connection was detected with " + item.Connection + ". Closing connection."); item.Connection.CloseConnection(true, 42); } } catch (Exception ex) { NetworkComms.LogError(ex, "CompleteIncomingItemTaskError"); if (item != null) { //If anything goes wrong here all we can really do is log the exception if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Fatal("An unhandled exception occured in CompleteIncomingPacketWorker(), connection with " + item.Connection.ConnectionInfo + " be closed. See log file for more information."); item.Connection.CloseConnection(true, 3); } } finally { //We need to dispose the data stream correctly if (item!=null) item.DataStream.Close(); #if !WINDOWS_PHONE //Ensure the thread returns to the pool with a normal priority if (Thread.CurrentThread.Priority != ThreadPriority.Normal) Thread.CurrentThread.Priority = ThreadPriority.Normal; #endif } } #endregion #if !WINDOWS_PHONE #region High CPU Usage Tuning /// <summary> /// In times of high CPU usage we need to ensure that certain time critical functions, like connection handshaking do not timeout. /// This sets the thread priority for those processes. /// </summary> internal static ThreadPriority timeCriticalThreadPriority = ThreadPriority.AboveNormal; #endregion #endif #region Checksum Config /// <summary> /// When enabled uses an MD5 checksum to validate all received packets. Default is false, relying on any possible connection checksum alone. /// Also when enabled any packets sent less than CheckSumMismatchSentPacketCacheMaxByteLimit will be cached for a duration to ensure successful delivery. /// Default false. /// </summary> public static bool EnablePacketCheckSumValidation { get; set; } /// <summary> /// When checksum validation is enabled sets the limit below which sent packets are cached to ensure successful delivery. Default 75KB. /// </summary> public static int CheckSumMismatchSentPacketCacheMaxByteLimit { get; set; } /// <summary> /// When a sent packet has been cached for a possible resend this is the minimum length of time it will be retained. Default is 1.0 minutes. /// </summary> public static double MinimumSentPacketCacheTimeMinutes { get; set; } /// <summary> /// Records the last sent packet cache cleanup time. Prevents the sent packet cache from being checked too frequently. /// </summary> internal static DateTime LastSentPacketCacheCleanup { get; set; } #endregion #region PacketType Config and Global Handlers /// <summary> /// An internal reference copy of all reservedPacketTypeNames. /// </summary> internal static string[] reservedPacketTypeNames = Enum.GetNames(typeof(ReservedPacketType)); /// <summary> /// Dictionary of all custom packetHandlers. Key is packetType. /// </summary> static Dictionary<string, List<IPacketTypeHandlerDelegateWrapper>> globalIncomingPacketHandlers = new Dictionary<string, List<IPacketTypeHandlerDelegateWrapper>>(); /// <summary> /// Dictionary of any non default custom packet unwrappers. Key is packetType. /// </summary> static Dictionary<string, PacketTypeUnwrapper> globalIncomingPacketUnwrappers = new Dictionary<string, PacketTypeUnwrapper>(); /// <summary> /// Delegate for handling incoming packets. See AppendGlobalIncomingPacketHandler members. /// </summary> /// <typeparam name="T">The type of object which is expected for this handler</typeparam> /// <param name="packetHeader">The <see cref="PacketHeader"/> of the incoming packet</param> /// <param name="connection">The connection with which this packet was received</param> /// <param name="incomingObject">The incoming object of specified type T</param> public delegate void PacketHandlerCallBackDelegate<T>(PacketHeader packetHeader, Connection connection, T incomingObject); /// <summary> /// If true any unknown incoming packet types are ignored. Default is false and will result in an error file being created if an unknown packet type is received. /// </summary> public static bool IgnoreUnknownPacketTypes { get; set; } /// <summary> /// Add an incoming packet handler using default SendReceiveOptions. Multiple handlers for the same packet type will be executed in the order they are added. /// </summary> /// <typeparam name="T">The type of incoming object</typeparam> /// <param name="packetTypeStr">The packet type for which this handler will be executed</param> /// <param name="packetHandlerDelgatePointer">The delegate to be executed when a packet of packetTypeStr is received</param> public static void AppendGlobalIncomingPacketHandler<T>(string packetTypeStr, PacketHandlerCallBackDelegate<T> packetHandlerDelgatePointer) { AppendGlobalIncomingPacketHandler<T>(packetTypeStr, packetHandlerDelgatePointer, DefaultSendReceiveOptions); } /// <summary> /// Add an incoming packet handler using the provided SendReceiveOptions. Multiple handlers for the same packet type will be executed in the order they are added. /// </summary> /// <typeparam name="T">The type of incoming object</typeparam> /// <param name="packetTypeStr">The packet type for which this handler will be executed</param> /// <param name="packetHandlerDelgatePointer">The delegate to be executed when a packet of packetTypeStr is received</param> /// <param name="sendReceiveOptions">The SendReceiveOptions to be used for the provided packet type</param> public static void AppendGlobalIncomingPacketHandler<T>(string packetTypeStr, PacketHandlerCallBackDelegate<T> packetHandlerDelgatePointer, SendReceiveOptions sendReceiveOptions) { if (packetTypeStr == null) throw new ArgumentNullException("packetTypeStr", "Provided packetType string cannot be null."); if (packetHandlerDelgatePointer == null) throw new ArgumentNullException("packetHandlerDelgatePointer", "Provided PacketHandlerCallBackDelegate<T> cannot be null."); if (sendReceiveOptions == null) throw new ArgumentNullException("sendReceiveOptions", "Provided SendReceiveOptions cannot be null."); lock (globalDictAndDelegateLocker) { if (globalIncomingPacketUnwrappers.ContainsKey(packetTypeStr)) { //Make sure if we already have an existing entry that it matches with the provided if (!globalIncomingPacketUnwrappers[packetTypeStr].Options.OptionsCompatible(sendReceiveOptions)) throw new PacketHandlerException("The proivded SendReceiveOptions are not compatible with existing SendReceiveOptions already specified for this packetTypeStr."); } else globalIncomingPacketUnwrappers.Add(packetTypeStr, new PacketTypeUnwrapper(packetTypeStr, sendReceiveOptions)); //Ad the handler to the list if (globalIncomingPacketHandlers.ContainsKey(packetTypeStr)) { //Make sure we avoid duplicates PacketTypeHandlerDelegateWrapper<T> toCompareDelegate = new PacketTypeHandlerDelegateWrapper<T>(packetHandlerDelgatePointer); bool delegateAlreadyExists = false; foreach (var handler in globalIncomingPacketHandlers[packetTypeStr]) { if (handler == toCompareDelegate) { delegateAlreadyExists = true; break; } } if (delegateAlreadyExists) throw new PacketHandlerException("This specific packet handler delegate already exists for the provided packetTypeStr."); globalIncomingPacketHandlers[packetTypeStr].Add(new PacketTypeHandlerDelegateWrapper<T>(packetHandlerDelgatePointer)); } else globalIncomingPacketHandlers.Add(packetTypeStr, new List<IPacketTypeHandlerDelegateWrapper>() { new PacketTypeHandlerDelegateWrapper<T>(packetHandlerDelgatePointer) }); if (LoggingEnabled) logger.Info("Added incoming packetHandler for ‘" + packetTypeStr + "‘ packetType."); } } /// <summary> /// Removes the provided delegate for the specified packet type. If the provided delegate does not exist for this packet type just returns. /// </summary> /// <param name="packetTypeStr">The packet type for which the delegate will be removed</param> /// <param name="packetHandlerDelgatePointer">The delegate to be removed</param> public static void RemoveGlobalIncomingPacketHandler(string packetTypeStr, Delegate packetHandlerDelgatePointer) { lock (globalDictAndDelegateLocker) { if (globalIncomingPacketHandlers.ContainsKey(packetTypeStr)) { //Remove any instances of this handler from the delegates //The bonus here is if the delegate has not been added we continue quite happily IPacketTypeHandlerDelegateWrapper toRemove = null; foreach (var handler in globalIncomingPacketHandlers[packetTypeStr]) { if (handler.EqualsDelegate(packetHandlerDelgatePointer)) { toRemove = handler; break; } } if (toRemove != null) globalIncomingPacketHandlers[packetTypeStr].Remove(toRemove); if (globalIncomingPacketHandlers[packetTypeStr] == null || globalIncomingPacketHandlers[packetTypeStr].Count == 0) { globalIncomingPacketHandlers.Remove(packetTypeStr); globalIncomingPacketUnwrappers.Remove(packetTypeStr); if (LoggingEnabled) logger.Info("Removed a packetHandler for ‘" + packetTypeStr + "‘ packetType. No handlers remain."); } else if (LoggingEnabled) logger.Info("Removed a packetHandler for ‘" + packetTypeStr + "‘ packetType. Handlers remain."); } } } /// <summary> /// Removes all delegates for the provided packet type. /// </summary> /// <param name="packetTypeStr">Packet type for which all delegates should be removed</param> public static void RemoveGlobalIncomingPacketHandler(string packetTypeStr) { lock (globalDictAndDelegateLocker) { //We don‘t need to check for potentially removing a critical reserved packet handler here because those cannot be removed. if (globalIncomingPacketHandlers.ContainsKey(packetTypeStr)) { globalIncomingPacketHandlers.Remove(packetTypeStr); globalIncomingPacketUnwrappers.Remove(packetTypeStr); if (LoggingEnabled) logger.Info("Removed all incoming packetHandlers for ‘" + packetTypeStr + "‘ packetType."); } } } /// <summary> /// Removes all delegates for all packet types /// </summary> public static void RemoveGlobalIncomingPacketHandler() { lock (globalDictAndDelegateLocker) { globalIncomingPacketHandlers = new Dictionary<string, List<IPacketTypeHandlerDelegateWrapper>>(); globalIncomingPacketUnwrappers = new Dictionary<string, PacketTypeUnwrapper>(); if (LoggingEnabled) logger.Info("Removed all incoming packetHandlers for all packetTypes"); } } /// <summary> /// Trigger incoming packet delegates for the provided parameters. /// </summary> /// <param name="packetHeader">The packet header</param> /// <param name="connection">The incoming connection</param> /// <param name="incomingDataStream">The bytes corresponding to the incoming object</param> /// <param name="options">The SendReceiveOptions to be used to convert incomingObjectBytes back to the desired object</param> public static void TriggerGlobalPacketHandlers(PacketHeader packetHeader, Connection connection, MemoryStream incomingDataStream, SendReceiveOptions options) { TriggerGlobalPacketHandlers(packetHeader, connection, incomingDataStream, options, IgnoreUnknownPacketTypes); } /// <summary> /// Trigger incoming packet delegates for the provided parameters. /// </summary> /// <param name="packetHeader">The packet header</param> /// <param name="connection">The incoming connection</param> /// <param name="incomingDataStream">The bytes corresponding to the incoming object</param> /// <param name="options">The SendReceiveOptions to be used to convert incomingObjectBytes back to the desired object</param> /// <param name="ignoreUnknownPacketTypeOverride">Used to potentially override NetworkComms.IgnoreUnknownPacketTypes property</param> internal static void TriggerGlobalPacketHandlers(PacketHeader packetHeader, Connection connection, MemoryStream incomingDataStream, SendReceiveOptions options, bool ignoreUnknownPacketTypeOverride = false) { try { if (options == null) throw new PacketHandlerException("Provided sendReceiveOptions should not be null for packetType " + packetHeader.PacketType); //We take a copy of the handlers list incase it is modified outside of the lock List<IPacketTypeHandlerDelegateWrapper> handlersCopy = null; lock (globalDictAndDelegateLocker) if (globalIncomingPacketHandlers.ContainsKey(packetHeader.PacketType)) handlersCopy = new List<IPacketTypeHandlerDelegateWrapper>(globalIncomingPacketHandlers[packetHeader.PacketType]); if (handlersCopy == null && !IgnoreUnknownPacketTypes && !ignoreUnknownPacketTypeOverride) { //We may get here if we have not added any custom delegates for reserved packet types bool isReservedType = false; for (int i = 0; i < reservedPacketTypeNames.Length; i++) { if (reservedPacketTypeNames[i] == packetHeader.PacketType) { isReservedType = true; break; } } if (!isReservedType) { //Change this to just a log because generally a packet of the wrong type is nothing to really worry about if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Warn("The received packet type ‘" + packetHeader.PacketType + "‘ has no configured handler and network comms is not set to ignore unknown packet types. Set NetworkComms.IgnoreUnknownPacketTypes=true to prevent this error."); LogError(new UnexpectedPacketTypeException("The received packet type ‘" + packetHeader.PacketType + "‘ has no configured handler and network comms is not set to ignore unknown packet types. Set NetworkComms.IgnoreUnknownPacketTypes=true to prevent this error."), "PacketHandlerErrorGlobal_" + packetHeader.PacketType); } return; } else if (handlersCopy == null && (IgnoreUnknownPacketTypes || ignoreUnknownPacketTypeOverride)) //If we have received and unknown packet type and we are choosing to ignore them we just finish here return; else { //Idiot check if (handlersCopy.Count == 0) throw new PacketHandlerException("An entry exists in the packetHandlers list but it contains no elements. This should not be possible."); //Deserialise the object only once object returnObject = handlersCopy[0].DeSerialize(incomingDataStream, options); //Pass the data onto the handler and move on. if (LoggingEnabled) logger.Trace(" ... passing completed data packet of type ‘" + packetHeader.PacketType + "‘ to " + handlersCopy.Count.ToString() + " selected global handlers."); //Pass the object to all necessary delgates //We need to use a copy because we may modify the original delegate list during processing foreach (IPacketTypeHandlerDelegateWrapper wrapper in handlersCopy) { try { wrapper.Process(packetHeader, connection, returnObject); } catch (Exception ex) { if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Fatal("An unhandled exception was caught while processing a packet handler for a packet type ‘" + packetHeader.PacketType + "‘. Make sure to catch errors in packet handlers. See error log file for more information."); NetworkComms.LogError(ex, "PacketHandlerErrorGlobal_" + packetHeader.PacketType); } } if (LoggingEnabled) logger.Trace(" ... all handlers for packet of type ‘" + packetHeader.PacketType + "‘ completed."); } } catch (Exception ex) { //If anything goes wrong here all we can really do is log the exception if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Fatal("An exception occured in TriggerPacketHandler() for a packet type ‘" + packetHeader.PacketType + "‘. See error log file for more information."); NetworkComms.LogError(ex, "PacketHandlerErrorGlobal_" + packetHeader.PacketType); } } /// <summary> /// Returns the unwrapper <see cref="SendReceiveOptions"/> for the provided packet type. If no specific options are registered returns null. /// </summary> /// <param name="packetTypeStr">The packet type for which the <see cref="SendReceiveOptions"/> are required</param> /// <returns>The requested <see cref="SendReceiveOptions"/> otherwise null</returns> public static SendReceiveOptions GlobalPacketTypeUnwrapperOptions(string packetTypeStr) { SendReceiveOptions options = null; //If we find a global packet unwrapper for this packetType we used those options lock (globalDictAndDelegateLocker) { if (globalIncomingPacketUnwrappers.ContainsKey(packetTypeStr)) options = globalIncomingPacketUnwrappers[packetTypeStr].Options; } return options; } /// <summary> /// Returns true if a global packet handler exists for the provided packet type. /// </summary> /// <param name="packetTypeStr">The packet type for which to check incoming packet handlers</param> /// <returns>True if a global packet handler exists</returns> public static bool GlobalIncomingPacketHandlerExists(string packetTypeStr) { lock (globalDictAndDelegateLocker) return globalIncomingPacketHandlers.ContainsKey(packetTypeStr); } /// <summary> /// Returns true if the provided global packet handler has been added for the provided packet type. /// </summary> /// <param name="packetTypeStr">The packet type within which to check packet handlers</param> /// <param name="packetHandlerDelgatePointer">The packet handler to look for</param> /// <returns>True if a global packet handler exists for the provided packetType</returns> public static bool GlobalIncomingPacketHandlerExists(string packetTypeStr, Delegate packetHandlerDelgatePointer) { lock (globalDictAndDelegateLocker) { if (globalIncomingPacketHandlers.ContainsKey(packetTypeStr)) { foreach (var handler in globalIncomingPacketHandlers[packetTypeStr]) { if (handler.EqualsDelegate(packetHandlerDelgatePointer)) return true; } } } return false; } #endregion #region Connection Establish and Shutdown /// <summary> /// Delegate which is executed when a connection is established or shutdown. See <see cref="AppendGlobalConnectionEstablishHandler"/> and <see cref="AppendGlobalConnectionCloseHandler"/>. /// </summary> /// <param name="connection">The connection which has been established or shutdown.</param> public delegate void ConnectionEstablishShutdownDelegate(Connection connection); /// <summary> /// Multicast delegate pointer for connection shutdowns. /// </summary> internal static ConnectionEstablishShutdownDelegate globalConnectionShutdownDelegates; /// <summary> /// Delegate counter for debugging. /// </summary> internal static int globalConnectionShutdownDelegateCount = 0; /// <summary> /// Multicast delegate pointer for connection establishments, run asynchronously. /// </summary> internal static ConnectionEstablishShutdownDelegate globalConnectionEstablishDelegatesAsync; /// <summary> /// Multicast delegate pointer for connection establishments, run synchronously. /// </summary> internal static ConnectionEstablishShutdownDelegate globalConnectionEstablishDelegatesSync; /// <summary> /// Delegate counter for debugging. /// </summary> internal static int globalConnectionEstablishDelegateCount = 0; /// <summary> /// Comms shutdown event. This will be triggered when calling NetworkComms.Shutdown /// </summary> public static event EventHandler<EventArgs> OnCommsShutdown; /// <summary> /// Add a new connection shutdown delegate which will be called for every connection as it is closes. /// </summary> /// <param name="connectionShutdownDelegate">The delegate to call on all connection shutdowns</param> public static void AppendGlobalConnectionCloseHandler(ConnectionEstablishShutdownDelegate connectionShutdownDelegate) { lock (globalDictAndDelegateLocker) { if (globalConnectionShutdownDelegates == null) globalConnectionShutdownDelegates = connectionShutdownDelegate; else globalConnectionShutdownDelegates += connectionShutdownDelegate; globalConnectionShutdownDelegateCount++; if (LoggingEnabled) logger.Info("Added globalConnectionShutdownDelegates. " + globalConnectionShutdownDelegateCount.ToString()); } } /// <summary> /// Remove a connection shutdown delegate. /// </summary> /// <param name="connectionShutdownDelegate">The delegate to remove from connection shutdown events</param> public static void RemoveGlobalConnectionCloseHandler(ConnectionEstablishShutdownDelegate connectionShutdownDelegate) { lock (globalDictAndDelegateLocker) { globalConnectionShutdownDelegates -= connectionShutdownDelegate; globalConnectionShutdownDelegateCount--; if (LoggingEnabled) logger.Info("Removed globalConnectionShutdownDelegates. " + globalConnectionShutdownDelegateCount.ToString()); } } /// <summary> /// Add a new connection establish delegate which will be called for every connection once it has been succesfully established. /// </summary> /// <param name="connectionEstablishDelegate">The delegate to call after all connection establishments.</param> /// <param name="runSynchronously">If true this ConnectionEstablishShutdownDelegate will be called synchronously during the connection establish. The connection will not be considered established until the ConnectionEstablishShutdownDelegate has completed.</param> public static void AppendGlobalConnectionEstablishHandler(ConnectionEstablishShutdownDelegate connectionEstablishDelegate, bool runSynchronously = false) { lock (globalDictAndDelegateLocker) { if (runSynchronously) { if (globalConnectionEstablishDelegatesSync == null) globalConnectionEstablishDelegatesSync = connectionEstablishDelegate; else globalConnectionEstablishDelegatesSync += connectionEstablishDelegate; } else { if (globalConnectionEstablishDelegatesAsync == null) globalConnectionEstablishDelegatesAsync = connectionEstablishDelegate; else globalConnectionEstablishDelegatesAsync += connectionEstablishDelegate; } globalConnectionEstablishDelegateCount++; if (LoggingEnabled) logger.Info("Added globalConnectionEstablishDelegates. " + globalConnectionEstablishDelegateCount.ToString()); } } /// <summary> /// Remove a connection establish delegate. /// </summary> /// <param name="connectionEstablishDelegate">The delegate to remove from connection establish events</param> public static void RemoveGlobalConnectionEstablishHandler(ConnectionEstablishShutdownDelegate connectionEstablishDelegate) { lock (globalDictAndDelegateLocker) { //Remove from either async or sync delegates globalConnectionEstablishDelegatesAsync -= connectionEstablishDelegate; globalConnectionEstablishDelegatesSync -= connectionEstablishDelegate; globalConnectionEstablishDelegateCount--; if (LoggingEnabled) logger.Info("Removed globalConnectionEstablishDelegates. " + globalConnectionEstablishDelegateCount.ToString()); } } /// <summary> /// Shutdown all connections, comms threads and execute OnCommsShutdown event. Any packet handlers are left unchanged. If any comms activity has taken place this should be called on application close. /// </summary> /// <param name="threadShutdownTimeoutMS">The time to wait for worker threads to close before attempting a thread abort.</param> public static void Shutdown(int threadShutdownTimeoutMS = 1000) { if (LoggingEnabled) logger.Trace("NetworkCommsDotNet shutdown initiated."); commsShutdown = true; CommsThreadPool.BeginShutdown(); Connection.ShutdownBase(threadShutdownTimeoutMS); TCPConnection.Shutdown(threadShutdownTimeoutMS); UDPConnection.Shutdown(); try { CloseAllConnections(); } catch (CommsException) { } catch (Exception ex) { LogError(ex, "CommsShutdownError"); } #if !WINDOWS_PHONE && !ANDROID try { if (NetworkLoadThread != null) { NetworkLoadThreadWait.Set(); if (!NetworkLoadThread.Join(threadShutdownTimeoutMS)) { NetworkLoadThread.Abort(); throw new CommsSetupShutdownException("Timeout waiting for NetworkLoadThread thread to shutdown after " + threadShutdownTimeoutMS.ToString() + " ms. "); } } } catch (Exception ex) { LogError(ex, "CommsShutdownError"); } #endif try { if (OnCommsShutdown != null) OnCommsShutdown(null, new EventArgs()); } catch (Exception ex) { LogError(ex, "CommsShutdownError"); } CommsThreadPool.EndShutdown(threadShutdownTimeoutMS); commsShutdown = false; if (LoggingEnabled) logger.Info("NetworkCommsDotNet has shutdown"); #if !WINDOWS_PHONE && !NO_LOGGING //Mono bug fix //Sometimes NLog ends up in a deadlock on close, workaround provided on NLog website if (Logger != null) { LogManager.Flush(); Logger.Factory.Flush(); if (NetworkComms.CurrentRuntimeEnvironment == RuntimeEnvironment.Mono_Net2 || NetworkComms.CurrentRuntimeEnvironment == RuntimeEnvironment.Mono_Net35 || NetworkComms.CurrentRuntimeEnvironment == RuntimeEnvironment.Mono_Net4) LogManager.Configuration = null; } #endif } #endregion #region Timeouts /// <summary> /// Time to wait in milliseconds before throwing an exception when waiting for a connection to be established. Default is 30000. /// </summary> public static int ConnectionEstablishTimeoutMS { get; set; } /// <summary> /// Time to wait in milliseconds before throwing an exception when waiting for confirmation of packet receipt. Default is 5000. /// </summary> public static int PacketConfirmationTimeoutMS { get; set; } /// <summary> /// Time to wait in milliseconds before assuming a remote connection is dead when doing a connection test. Default is 1000. /// </summary> public static int ConnectionAliveTestTimeoutMS { get; set; } /// <summary> /// By default NetworkComms.Net closes connections for which sends take a long time. The timeout is calculated based on previous connection send performances. Set this to true to disable this feature. /// </summary> public static bool DisableConnectionSendTimeouts { get; set; } #endregion #region Logging /// <summary> /// Returns true if comms logging has been enabled. /// </summary> public static bool LoggingEnabled { get; private set; } private static Logger logger = null; /// <summary> /// Access the NetworkCommsDotNet logger externally. /// </summary> public static Logger Logger { get { return logger; } } #if NO_LOGGING /// <summary> /// Enable basic logging using the provided logFileLocation /// </summary> /// <param name="loggingConfiguration"></param> public static void EnableLogging(string logFileLocation) { lock (globalDictAndDelegateLocker) { LoggingEnabled = true; logger = new Logger(); logger.LogFileLocation = logFileLocation; } } /// <summary> /// Disable all logging in NetworkCommsDotNet /// </summary> public static void DisableLogging() { lock (globalDictAndDelegateLocker) { LoggingEnabled = false; logger = null; } } #else /// <summary> /// Enable logging using a default config. All log output is written directly to the local console. /// </summary> public static void EnableLogging() { LoggingConfiguration logConfig = new LoggingConfiguration(); NLog.Targets.ConsoleTarget consoleTarget = new NLog.Targets.ConsoleTarget(); consoleTarget.Layout = "${date:format=HH\\:MM\\:ss} [${level}] - ${message}"; logConfig.AddTarget("console", consoleTarget); logConfig.LoggingRules.Add(new LoggingRule("*", LogLevel.Trace, consoleTarget)); EnableLogging(logConfig); } /// <summary> /// Enable logging using the provided config. See examples for usage. /// </summary> /// <param name="loggingConfiguration"></param> public static void EnableLogging(LoggingConfiguration loggingConfiguration) { lock (globalDictAndDelegateLocker) { LoggingEnabled = true; LogManager.Configuration = loggingConfiguration; logger = LogManager.GetCurrentClassLogger(); LogManager.EnableLogging(); } } /// <summary> /// Disable all logging in NetworkCommsDotNet /// </summary> public static void DisableLogging() { lock (globalDictAndDelegateLocker) { LoggingEnabled = false; LogManager.DisableLogging(); } } #endif /// <summary> /// Locker for LogError() which ensures thread safe saves. /// </summary> static object errorLocker = new object(); /// <summary> /// Appends the provided logString to end of fileName.txt. If the file does not exist it will be created. /// </summary> /// <param name="fileName">The filename to use. The extension .txt will be appended automatically</param> /// <param name="logString">The string to append.</param> public static void AppendStringToLogFile(string fileName, string logString) { try { lock (errorLocker) { using (System.IO.StreamWriter sw = new System.IO.StreamWriter(fileName + ".txt", true)) sw.WriteLine(logString); } } catch (Exception) { //If an error happens here, such as if the file is locked then we lucked out. } } /// <summary> /// Logs the provided exception to a file to assist troubleshooting. /// </summary> /// <param name="ex">The exception to be logged</param> /// <param name="fileName">The filename to use. A timestamp and extension .txt will be appended automatically</param> /// <param name="optionalCommentStr">An optional string which will appear at the top of the error file</param> /// <returns>The entire fileName used.</returns> public static string LogError(Exception ex, string fileName, string optionalCommentStr = "") { string entireFileName; lock (errorLocker) { #if iOS //We need to ensure we add the correct document path for iOS entireFileName = Path.Combine(Environment.GetFolderPath(Environment.SpecialFolder.MyDocuments), fileName + " " + DateTime.Now.Hour.ToString() + "." + DateTime.Now.Minute.ToString() + "." + DateTime.Now.Second.ToString() + "." + DateTime.Now.Millisecond.ToString() + " " + DateTime.Now.ToString("dd-MM-yyyy" + " [" + Thread.CurrentThread.ManagedThreadId.ToString() + "]")); #elif ANDROID entireFileName = Path.Combine(global::Android.OS.Environment.ExternalStorageDirectory.AbsolutePath, fileName + " " + DateTime.Now.Hour.ToString() + "." + DateTime.Now.Minute.ToString() + "." + DateTime.Now.Second.ToString() + "." + DateTime.Now.Millisecond.ToString() + " " + DateTime.Now.ToString("dd-MM-yyyy" + " [" + Thread.CurrentThread.ManagedThreadId.ToString() + "]")); #elif WINDOWS_PHONE entireFileName = fileName + " " + DateTime.Now.Hour.ToString() + "." + DateTime.Now.Minute.ToString() + "." + DateTime.Now.Second.ToString() + "." + DateTime.Now.Millisecond.ToString() + " " + DateTime.Now.ToString("dd-MM-yyyy" + " [" + Thread.CurrentThread.ManagedThreadId.ToString() + "]"); #else using (Process currentProcess = System.Diagnostics.Process.GetCurrentProcess()) entireFileName = fileName + " " + DateTime.Now.Hour.ToString() + "." + DateTime.Now.Minute.ToString() + "." + DateTime.Now.Second.ToString() + "." + DateTime.Now.Millisecond.ToString() + " " + DateTime.Now.ToString("dd-MM-yyyy" + " [" + currentProcess.Id.ToString() + "-" + Thread.CurrentContext.ContextID.ToString() + "]"); #endif if (LoggingEnabled) logger.Fatal(entireFileName, ex); try { using (System.IO.StreamWriter sw = new System.IO.StreamWriter(entireFileName + ".txt", false)) { if (optionalCommentStr != "") { sw.WriteLine("Comment: " + optionalCommentStr); sw.WriteLine(""); } if (ex.GetBaseException() != null) sw.WriteLine("Base Exception Type: " + ex.GetBaseException().ToString()); if (ex.InnerException != null) sw.WriteLine("Inner Exception Type: " + ex.InnerException.ToString()); if (ex.StackTrace != null) { sw.WriteLine(""); sw.WriteLine("Stack Trace: " + ex.StackTrace.ToString()); } } } catch (Exception) { //This should never really happen, but just incase. } } return entireFileName; } #endregion #region Serializers and Compressors /// <summary> /// The following are used for internal comms objects, packet headers, connection establishment etc. /// We generally seem to increase the size of our data if compressing small objects (~50 bytes) /// Given the typical header size is 40 bytes we might as well not compress these objects. /// </summary> internal static SendReceiveOptions InternalFixedSendReceiveOptions { get; set; } /// <summary> /// Default options for sending and receiving in the absence of specific values /// </summary> public static SendReceiveOptions DefaultSendReceiveOptions { get; set; } #endregion #region Connection Access /// <summary> /// Send the provided object to the specified destination using TCP. Uses default sendReceiveOptions. For more control over options see connection specific methods. /// </summary> /// <param name="packetTypeStr">Packet type to use for send</param> /// <param name="destinationIPAddress">The destination ip address</param> /// <param name="destinationPort">The destination listen port</param> /// <param name="sendObject">The obect to send</param> public static void SendObject(string packetTypeStr, string destinationIPAddress, int destinationPort, object sendObject) { TCPConnection conn = TCPConnection.GetConnection(new ConnectionInfo(destinationIPAddress, destinationPort)); conn.SendObject(packetTypeStr, sendObject); } /// <summary> /// Send the provided object to the specified destination and wait for a return object using TCP. Uses default sendReceiveOptions. For more control over options see connection specific methods. /// </summary> /// <typeparam name="returnObjectType">The expected return object type, i.e. string, int[], etc</typeparam> /// <param name="sendingPacketTypeStr">Packet type to use during send</param> /// <param name="destinationIPAddress">The destination ip address</param> /// <param name="destinationPort">The destination listen port</param> /// <param name="expectedReturnPacketTypeStr">Expected packet type used for return object</param> /// <param name="returnPacketTimeOutMilliSeconds">Time to wait in milliseconds for return object</param> /// <param name="sendObject">Object to send</param> /// <returns>The expected return object</returns> public static returnObjectType SendReceiveObject<returnObjectType>(string sendingPacketTypeStr, string destinationIPAddress, int destinationPort, string expectedReturnPacketTypeStr, int returnPacketTimeOutMilliSeconds, object sendObject) { TCPConnection conn = TCPConnection.GetConnection(new ConnectionInfo(destinationIPAddress, destinationPort)); return conn.SendReceiveObject<returnObjectType>(sendingPacketTypeStr, expectedReturnPacketTypeStr, returnPacketTimeOutMilliSeconds, sendObject); } /// <summary> /// Return the MD5 hash of the provided memory stream as a string. Stream position will be equal to the length of stream on return, this ensures the MD5 is consistent. /// </summary> /// <param name="streamToMD5">The bytes which will be checksummed</param> /// <returns>The MD5 checksum as a string</returns> public static string MD5Bytes(Stream streamToMD5) { if (streamToMD5 == null) throw new ArgumentNullException("streamToMD5", "Provided Stream cannot be null."); string resultStr; using (System.Security.Cryptography.HashAlgorithm md5 = #if WINDOWS_PHONE new DPSBase.MD5Managed()) #else System.Security.Cryptography.MD5.Create()) #endif { //If we don‘t ensure the position is consistent the MD5 changes streamToMD5.Seek(0, SeekOrigin.Begin); resultStr = BitConverter.ToString(md5.ComputeHash(streamToMD5)).Replace("-", ""); } return resultStr; } /// <summary> /// Return the MD5 hash of the provided memory stream as a string. Stream position will be equal to the length of stream on return, this ensures the MD5 is consistent. /// </summary> /// <param name="streamToMD5">The bytes which will be checksummed</param> /// <param name="start">The start position in the stream</param> /// <param name="length">The length in the stream to MD5</param> /// <returns>The MD5 checksum as a string</returns> public static string MD5Bytes(Stream streamToMD5, long start, int length) { if (streamToMD5 == null) throw new ArgumentNullException("streamToMD5", "Provided Stream cannot be null."); using (MemoryStream stream = new MemoryStream(length)) { StreamWriteWithTimeout.Write(streamToMD5, start, length, stream, 8000, 100, 2000); return MD5Bytes(stream); } } /// <summary> /// Return the MD5 hash of the provided byte array as a string /// </summary> /// <param name="bytesToMd5">The bytes which will be checksummed</param> /// <returns>The MD5 checksum as a string</returns> public static string MD5Bytes(byte[] bytesToMd5) { if (bytesToMd5 == null) throw new ArgumentNullException("bytesToMd5", "Provided byte[] cannot be null."); using(MemoryStream stream = new MemoryStream(bytesToMd5, 0, bytesToMd5.Length, false, true)) return MD5Bytes(stream); } /// <summary> /// Returns a ConnectionInfo array containing information for all connections /// </summary> /// <param name="includeClosedConnections">If true information for closed connections will also be included</param> /// <returns>List of ConnectionInfo containing information for all requested connections</returns> public static List<ConnectionInfo> AllConnectionInfo(bool includeClosedConnections = false) { List<ConnectionInfo> returnList = new List<ConnectionInfo>(); lock (globalDictAndDelegateLocker) { foreach (var connectionsByEndPoint in allConnectionsByEndPoint) { foreach (var connection in connectionsByEndPoint.Value.Values) { if (connection.ConnectionInfo != null) returnList.Add(connection.ConnectionInfo); } } if (includeClosedConnections) { foreach (var pair in oldNetworkIdentifierToConnectionInfo) { foreach (var infoList in pair.Value.Values) { returnList.AddRange(infoList); } } } } List<ConnectionInfo> distinctList = new List<ConnectionInfo>(); foreach (var info in returnList) if (!distinctList.Contains(info)) distinctList.Add(info); return distinctList; } /// <summary> /// Returns a ConnectionInfo array containing information for all connections which have the provided networkIdentifier. It is also possible to include information for closed connections. /// </summary> /// <param name="networkIdentifier">The networkIdentifier corresponding to the desired connectionInfo information</param> /// <param name="includeClosedConnections">If true will include information for connections which are closed. Otherwise only active connections will be included.</param> /// <returns>List of ConnectionInfo containing information for matching connections</returns> public static List<ConnectionInfo> AllConnectionInfo(ShortGuid networkIdentifier, bool includeClosedConnections = false) { List<ConnectionInfo> returnList = new List<ConnectionInfo>(); lock (globalDictAndDelegateLocker) { foreach (var pair in allConnectionsByEndPoint) { foreach (var connection in pair.Value.Values) { if (connection.ConnectionInfo != null && connection.ConnectionInfo.NetworkIdentifier == networkIdentifier) returnList.Add(connection.ConnectionInfo); } } if (includeClosedConnections) { foreach (var pair in oldNetworkIdentifierToConnectionInfo) { if (pair.Key == networkIdentifier) { foreach (var infoList in pair.Value.Values) foreach (var info in infoList) returnList.Add(info); break; } } } } List<ConnectionInfo> distinctList = new List<ConnectionInfo>(); foreach (var info in returnList) if (!distinctList.Contains(info)) distinctList.Add(info); return distinctList; } /// <summary> /// Returns the total number of connections /// </summary> /// <returns>Total number of connections</returns> public static int TotalNumConnections() { lock (globalDictAndDelegateLocker) { int sum = 0; foreach (var current in allConnectionsByEndPoint) sum += current.Value.Count; return sum; } } /// <summary> /// Returns the total number of connections where the <see cref="ConnectionInfo.RemoteEndPoint"/> matches the provided <see cref="IPAddress"/> /// </summary> /// <param name="matchIP">The <see cref="IPAddress"/> to match</param> /// <returns>Total number of connections where the <see cref="ConnectionInfo.RemoteEndPoint "/> matches the provided <see cref="IPAddress"/></returns> public static int TotalNumConnections(IPAddress matchIP) { lock (globalDictAndDelegateLocker) { int sum = 0; foreach (var current in allConnectionsByEndPoint) foreach (var connection in current.Value) if (connection.Value.ConnectionInfo.RemoteEndPoint.Address.Equals(matchIP)) sum++; return sum; } } /// <summary> /// Close all connections /// </summary> public static void CloseAllConnections() { CloseAllConnections(ConnectionType.Undefined, new IPEndPoint[0]); } /// <summary> /// Close all connections of the provided <see cref="ConnectionType"/> /// </summary> /// <param name="connectionType">The type of connections to be closed</param> public static void CloseAllConnections(ConnectionType connectionType) { CloseAllConnections(connectionType, new IPEndPoint[0]); } /// <summary> /// Close all connections of the provided <see cref="ConnectionType"/> except to provided <see cref="IPEndPoint"/> array. /// </summary> /// <param name="connectionTypeToClose">The type of connections to be closed. ConnectionType.<see cref="ConnectionType.Undefined"/> matches all types.</param> /// <param name="closeAllExceptTheseEndPoints">Close all except those with provided <see cref="IPEndPoint"/> array</param> public static void CloseAllConnections(ConnectionType connectionTypeToClose, IPEndPoint[] closeAllExceptTheseEndPoints) { List<Connection> connectionsToClose = new List<Connection>(); lock (globalDictAndDelegateLocker) { foreach (var pair in allConnectionsByEndPoint) { foreach (var innerPair in pair.Value) { if (innerPair.Value != null && (connectionTypeToClose == ConnectionType.Undefined || innerPair.Key == connectionTypeToClose)) { bool dontClose = false; foreach (var endPointToNotClose in closeAllExceptTheseEndPoints) { if (endPointToNotClose == innerPair.Value.ConnectionInfo.RemoteEndPoint) { dontClose = true; break; } } if (!dontClose ) connectionsToClose.Add(innerPair.Value); } } } } if (LoggingEnabled) logger.Trace("Closing " + connectionsToClose.Count.ToString() + " connections."); foreach (Connection connection in connectionsToClose) connection.CloseConnection(false, -6); } /// <summary> /// Returns a list of all connections /// </summary> /// <returns>A list of requested connections. If no matching connections exist returns empty list.</returns> public static List<Connection> GetExistingConnection() { return GetExistingConnection(ConnectionType.Undefined); } /// <summary> /// Returns a list of all connections matching the provided <see cref="ConnectionType"/> /// </summary> /// <param name="connectionType">The type of connections to return. ConnectionType.<see cref="ConnectionType.Undefined"/> matches all types.</param> /// <returns>A list of requested connections. If no matching connections exist returns empty list.</returns> public static List<Connection> GetExistingConnection(ConnectionType connectionType) { List<Connection> result = new List<Connection>(); lock (globalDictAndDelegateLocker) { foreach (var current in allConnectionsByEndPoint) { foreach (var inner in current.Value) { if (connectionType == ConnectionType.Undefined || inner.Key == connectionType) result.Add(inner.Value); } } } if (LoggingEnabled) logger.Trace("RetrieveConnection by connectionType=‘" + connectionType.ToString() + "‘. Returning list of " + result.Count.ToString() + " connections."); return result; } /// <summary> /// Retrieve a list of connections with the provided <see cref="ShortGuid"/> networkIdentifier of the provided <see cref="ConnectionType"/>. /// </summary> /// <param name="networkIdentifier">The <see cref="ShortGuid"/> corresponding with the desired peer networkIdentifier</param> /// <param name="connectionType">The <see cref="ConnectionType"/> desired</param> /// <returns>A list of connections to the desired peer. If no matching connections exist returns empty list.</returns> public static List<Connection> GetExistingConnection(ShortGuid networkIdentifier, ConnectionType connectionType) { List<Connection> resultList = new List<Connection>(); lock (globalDictAndDelegateLocker) { foreach (var pair in allConnectionsById) { if (pair.Key == networkIdentifier && pair.Value.ContainsKey(connectionType)) { resultList.AddRange(pair.Value[connectionType]); break; } } } if (LoggingEnabled) logger.Trace("RetrieveConnection by networkIdentifier=‘" + networkIdentifier + "‘ and connectionType=‘" + connectionType.ToString() + "‘. Returning list of " + resultList.Count.ToString() + " connections."); return resultList; } /// <summary> /// Retrieve an existing connection with the provided ConnectionInfo. /// </summary> /// <param name="connectionInfo">ConnectionInfo corresponding with the desired connection</param> /// <returns>The desired connection. If no matching connection exists returns null.</returns> public static Connection GetExistingConnection(ConnectionInfo connectionInfo) { if (connectionInfo == null) throw new ArgumentNullException("connectionInfo", "Provided ConnectionInfo cannot be null."); Connection result = null; lock (globalDictAndDelegateLocker) { foreach (var pair in allConnectionsByEndPoint) { if(pair.Key.Equals(connectionInfo.RemoteEndPoint) && pair.Value.ContainsKey(connectionInfo.ConnectionType)) { result = pair.Value[connectionInfo.ConnectionType]; break; } } } if (LoggingEnabled) { if (result == null) logger.Trace("RetrieveConnection by connectionInfo=‘"+connectionInfo+"‘. No matching connection was found."); else logger.Trace("RetrieveConnection by connectionInfo=‘"+connectionInfo+"‘. Matching connection was found."); } return result; } /// <summary> /// Retrieve an existing connection with the provided <see cref="IPEndPoint"/> of the provided <see cref="ConnectionType"/>. /// </summary> /// <param name="remoteEndPoint">IPEndPoint corresponding with the desired connection</param> /// <param name="connectionType">The <see cref="ConnectionType"/> desired</param> /// <returns>The desired connection. If no matching connection exists returns null.</returns> public static Connection GetExistingConnection(IPEndPoint remoteEndPoint, ConnectionType connectionType) { Connection result = null; lock (globalDictAndDelegateLocker) { //return (from current in NetworkComms.allConnectionsByEndPoint where current.Key == IPEndPoint && current.Value.ContainsKey(connectionType) select current.Value[connectionType]).FirstOrDefault(); //return (from current in NetworkComms.allConnectionsByEndPoint where current.Key == IPEndPoint select current.Value[connectionType]).FirstOrDefault(); if (allConnectionsByEndPoint.ContainsKey(remoteEndPoint)) { if (allConnectionsByEndPoint[remoteEndPoint].ContainsKey(connectionType)) result = allConnectionsByEndPoint[remoteEndPoint][connectionType]; } } if (LoggingEnabled) { string connectionTypeStr = connectionType.ToString(); if (result == null) logger.Trace("RetrieveConnection by remoteEndPoint=‘" + remoteEndPoint.Address + ":" + remoteEndPoint.Port.ToString() + "‘ and connectionType=‘" + connectionTypeStr + "‘. No matching connection was found."); else logger.Trace("RetrieveConnection by remoteEndPoint=‘" + remoteEndPoint.Address + ":" + remoteEndPoint.Port.ToString() + "‘ and connectionType=‘" + connectionTypeStr + "‘. Matching connection was found."); } return result; } /// <summary> /// Check if a connection exists with the provided IPEndPoint and ConnectionType /// </summary> /// <param name="connectionInfo">ConnectionInfo corresponding with the desired connection</param> /// <returns>True if a matching connection exists, otherwise false</returns> public static bool ConnectionExists(ConnectionInfo connectionInfo) { if (connectionInfo == null) throw new ArgumentNullException("connectionInfo", "Provided ConnectionInfo cannot be null."); bool result = false; lock (globalDictAndDelegateLocker) { if (allConnectionsByEndPoint.ContainsKey(connectionInfo.RemoteEndPoint)) result = allConnectionsByEndPoint[connectionInfo.RemoteEndPoint].ContainsKey(connectionInfo.ConnectionType); } if (LoggingEnabled) logger.Trace("Checking for existing connection by connectionInfo=‘" + connectionInfo +"‘"); return result; } /// <summary> /// Check if a connection exists with the provided networkIdentifier and ConnectionType /// </summary> /// <param name="networkIdentifier">The <see cref="ShortGuid"/> corresponding with the desired peer networkIdentifier</param> /// <param name="connectionType">The <see cref="ConnectionType"/> desired</param> /// <returns>True if a matching connection exists, otherwise false</returns> public static bool ConnectionExists(ShortGuid networkIdentifier, ConnectionType connectionType) { bool result = false; lock (globalDictAndDelegateLocker) { if (allConnectionsById.ContainsKey(networkIdentifier)) { if (allConnectionsById[networkIdentifier].ContainsKey(connectionType)) result = allConnectionsById[networkIdentifier][connectionType].Count > 0; } } if (LoggingEnabled) { string connectionTypeStr = connectionType.ToString(); logger.Trace("Checking for existing connection by identifier=‘" + networkIdentifier + "‘ and connectionType=‘" + connectionTypeStr + "‘"); } return result; } /// <summary> /// Check if a connection exists with the provided IPEndPoint and ConnectionType /// </summary> /// <param name="remoteEndPoint">IPEndPoint corresponding with the desired connection</param> /// <param name="connectionType">The <see cref="ConnectionType"/> desired</param> /// <returns>True if a matching connection exists, otherwise false</returns> public static bool ConnectionExists(IPEndPoint remoteEndPoint, ConnectionType connectionType) { bool result = false; lock (globalDictAndDelegateLocker) { if (allConnectionsByEndPoint.ContainsKey(remoteEndPoint)) result = allConnectionsByEndPoint[remoteEndPoint].ContainsKey(connectionType); } if (LoggingEnabled) { string connectionTypeStr = connectionType.ToString(); logger.Trace("Checking for existing connection by endPoint=‘" + remoteEndPoint.Address + ":" + remoteEndPoint.Port.ToString() + "‘ and connectionType=‘" + connectionTypeStr + "‘"); } return result; } /// <summary> /// Removes the reference to the provided connection from within networkComms. DOES NOT CLOSE THE CONNECTION. Returns true if the provided connection reference existed and was removed, false otherwise. /// </summary> /// <param name="connection"></param> /// <param name="maintainConnectionInfoHistory"></param> /// <returns></returns> internal static bool RemoveConnectionReference(Connection connection, bool maintainConnectionInfoHistory = true) { if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Trace("Entering RemoveConnectionReference for " + connection.ConnectionInfo); //We don‘t have the connection identifier until the connection has been established. //if (!connection.ConnectionInfo.ConnectionEstablished && !connection.ConnectionInfo.ConnectionShutdown) // return false; if (connection.ConnectionInfo.ConnectionState == ConnectionState.Established && !(connection.ConnectionInfo.ConnectionState == ConnectionState.Shutdown)) throw new ConnectionShutdownException("A connection can only be removed once correctly shutdown."); bool returnValue = false; //Ensure connection references are removed from networkComms //Once we think we have closed the connection it‘s time to get rid of our other references lock (globalDictAndDelegateLocker) { #region Update NetworkComms Connection Dictionaries ShortGuid currentNetworkIdentifier = connection.ConnectionInfo.NetworkIdentifier; //We establish whether we have already done this step if ((allConnectionsById.ContainsKey(currentNetworkIdentifier) && allConnectionsById[currentNetworkIdentifier].ContainsKey(connection.ConnectionInfo.ConnectionType) && allConnectionsById[currentNetworkIdentifier][connection.ConnectionInfo.ConnectionType].Contains(connection)) || (allConnectionsByEndPoint.ContainsKey(connection.ConnectionInfo.RemoteEndPoint) && allConnectionsByEndPoint[connection.ConnectionInfo.RemoteEndPoint].ContainsKey(connection.ConnectionInfo.ConnectionType))) { //Maintain a reference if this is our first connection close returnValue = true; } //Keep a reference of the connection for possible debugging later if (maintainConnectionInfoHistory) { if (oldNetworkIdentifierToConnectionInfo.ContainsKey(currentNetworkIdentifier)) { if (oldNetworkIdentifierToConnectionInfo[currentNetworkIdentifier].ContainsKey(connection.ConnectionInfo.ConnectionType)) oldNetworkIdentifierToConnectionInfo[currentNetworkIdentifier][connection.ConnectionInfo.ConnectionType].Add(connection.ConnectionInfo); else oldNetworkIdentifierToConnectionInfo[currentNetworkIdentifier].Add(connection.ConnectionInfo.ConnectionType, new List<ConnectionInfo>() { connection.ConnectionInfo }); } else oldNetworkIdentifierToConnectionInfo.Add(currentNetworkIdentifier, new Dictionary<ConnectionType, List<ConnectionInfo>>() { { connection.ConnectionInfo.ConnectionType, new List<ConnectionInfo>() { connection.ConnectionInfo } } }); } if (allConnectionsById.ContainsKey(currentNetworkIdentifier) && allConnectionsById[currentNetworkIdentifier].ContainsKey(connection.ConnectionInfo.ConnectionType)) { //if (!allConnectionsById[currentNetworkIdentifier][connection.ConnectionInfo.ConnectionType].Contains(connection)) // throw new ConnectionShutdownException("A reference to the connection being closed was not found in the allConnectionsById dictionary."); //else if (allConnectionsById[currentNetworkIdentifier][connection.ConnectionInfo.ConnectionType].Contains(connection)) allConnectionsById[currentNetworkIdentifier][connection.ConnectionInfo.ConnectionType].Remove(connection); //Remove the connection type reference if it is empty if (allConnectionsById[currentNetworkIdentifier][connection.ConnectionInfo.ConnectionType].Count == 0) allConnectionsById[currentNetworkIdentifier].Remove(connection.ConnectionInfo.ConnectionType); //Remove the identifier reference if (allConnectionsById[currentNetworkIdentifier].Count == 0) allConnectionsById.Remove(currentNetworkIdentifier); if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Trace("Removed connection reference by ID for " + connection.ConnectionInfo); } //We can now remove this connection by end point as well if (allConnectionsByEndPoint.ContainsKey(connection.ConnectionInfo.RemoteEndPoint)) { if (allConnectionsByEndPoint[connection.ConnectionInfo.RemoteEndPoint].ContainsKey(connection.ConnectionInfo.ConnectionType)) allConnectionsByEndPoint[connection.ConnectionInfo.RemoteEndPoint].Remove(connection.ConnectionInfo.ConnectionType); //If this was the last connection type for this endpoint we can remove the endpoint reference as well if (allConnectionsByEndPoint[connection.ConnectionInfo.RemoteEndPoint].Count == 0) allConnectionsByEndPoint.Remove(connection.ConnectionInfo.RemoteEndPoint); if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Trace("Removed connection reference by endPoint for " + connection.ConnectionInfo); } #endregion } return returnValue; } /// <summary> /// Adds a reference by IPEndPoint to the provided connection within networkComms. /// </summary> /// <param name="connection"></param> /// <param name="endPointToUse">An optional override which forces a specific IPEndPoint</param> internal static void AddConnectionByReferenceEndPoint(Connection connection, IPEndPoint endPointToUse = null) { if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Trace("Adding connection reference by endPoint. Connection=‘"+connection.ConnectionInfo+"‘." + (endPointToUse != null ? " Provided override endPoint of " + endPointToUse.Address + ":" + endPointToUse.Port.ToString() : "")); //If the remoteEndPoint is IPAddress.Any we don‘t record it by endPoint if (connection.ConnectionInfo.RemoteEndPoint.Address.Equals(IPAddress.Any) || (endPointToUse != null && endPointToUse.Address.Equals(IPAddress.Any))) return; if (connection.ConnectionInfo.ConnectionState == ConnectionState.Established || connection.ConnectionInfo.ConnectionState == ConnectionState.Shutdown) throw new ConnectionSetupException("Connection reference by endPoint should only be added before a connection is established. This is to prevent duplicate connections."); if (endPointToUse == null) endPointToUse = connection.ConnectionInfo.RemoteEndPoint; //We can double check for an existing connection here first so that it occurs outside the lock Connection existingConnection = GetExistingConnection(endPointToUse, connection.ConnectionInfo.ConnectionType); if (existingConnection != null && existingConnection.ConnectionInfo.ConnectionState == ConnectionState.Established && connection!=existingConnection) existingConnection.ConnectionAlive(); //How do we prevent multiple threads from trying to create a duplicate connection?? lock (globalDictAndDelegateLocker) { //We now check for an existing connection again from within the lock if (ConnectionExists(endPointToUse, connection.ConnectionInfo.ConnectionType)) { //If a connection still exist we don‘t assume it is the same as above existingConnection = GetExistingConnection(endPointToUse, connection.ConnectionInfo.ConnectionType); if (existingConnection != connection) { throw new DuplicateConnectionException("A different connection already exists with the desired endPoint (" + endPointToUse.Address + ":" + endPointToUse.Port.ToString() + "). This can occasionaly occur if two peers try to connect to each other simultaneously. New connection is " + (existingConnection.ConnectionInfo.ServerSide ? "server side" : "client side") + " - " + connection.ConnectionInfo + ". Existing connection is " + (existingConnection.ConnectionInfo.ServerSide ? "server side" : "client side") + ", " + existingConnection.ConnectionInfo.ConnectionState.ToString() + " - " + (existingConnection.ConnectionInfo.ConnectionState == ConnectionState.Establishing ? "creationTime:" + existingConnection.ConnectionInfo.ConnectionCreationTime.ToString() : "establishedTime:" + existingConnection.ConnectionInfo.ConnectionEstablishedTime.ToString()) + " - " + " details - " + existingConnection.ConnectionInfo); } else { //We have just tried to add the same reference twice, no need to do anything this time around } } else { #if FREETRIAL //If this is a free trial we only allow a single connection. We will throw an exception if any connections already exist if (TotalNumConnections() != 0) throw new NotSupportedException("Unable to create connection as this version of NetworkComms.Net is limited to only one connection. Please purchase a commerical license from www.networkcomms.net which supports an unlimited number of connections."); #endif //Add reference to the endPoint dictionary if (allConnectionsByEndPoint.ContainsKey(endPointToUse)) { if (allConnectionsByEndPoint[endPointToUse].ContainsKey(connection.ConnectionInfo.ConnectionType)) throw new Exception("Idiot check fail. The method ConnectionExists should have prevented execution getting here!!"); else allConnectionsByEndPoint[endPointToUse].Add(connection.ConnectionInfo.ConnectionType, connection); } else allConnectionsByEndPoint.Add(endPointToUse, new Dictionary<ConnectionType, Connection>() { { connection.ConnectionInfo.ConnectionType, connection } }); } } } /// <summary> /// Update the endPoint reference for the provided connection with the newEndPoint. If there is no change just returns /// </summary> /// <param name="connection"></param> /// <param name="newRemoteEndPoint"></param> internal static void UpdateConnectionReferenceByEndPoint(Connection connection, IPEndPoint newRemoteEndPoint) { if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Trace("Updating connection reference by endPoint. Connection=‘" + connection.ConnectionInfo + "‘." + (newRemoteEndPoint != null ? " Provided new endPoint of " + newRemoteEndPoint.Address + ":" + newRemoteEndPoint.Port.ToString() : "")); if (!connection.ConnectionInfo.RemoteEndPoint.Equals(newRemoteEndPoint)) { lock (globalDictAndDelegateLocker) { RemoveConnectionReference(connection, false); AddConnectionByReferenceEndPoint(connection, newRemoteEndPoint); } } } /// <summary> /// Add a reference by networkIdentifier to the provided connection within NetworkComms. Requires a reference by IPEndPoint to already exist. /// </summary> /// <param name="connection"></param> internal static void AddConnectionReferenceByIdentifier(Connection connection) { if (!(connection.ConnectionInfo.ConnectionState == ConnectionState.Established) || connection.ConnectionInfo.ConnectionState == ConnectionState.Shutdown) throw new ConnectionSetupException("Connection reference by identifier should only be added once a connection is established. This is to prevent duplicate connections."); if (connection.ConnectionInfo.NetworkIdentifier == ShortGuid.Empty) throw new ConnectionSetupException("Should not be calling AddConnectionByIdentifierReference unless the connection remote identifier has been set."); if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Trace("Adding connection reference by identifier. Connection=" + connection.ConnectionInfo + "."); lock (globalDictAndDelegateLocker) { //There should already be a reference to this connection in the endPoint dictionary if (!ConnectionExists(connection.ConnectionInfo.RemoteEndPoint, connection.ConnectionInfo.ConnectionType)) throw new ConnectionSetupException("A reference by identifier should only be added if a reference by endPoint already exists."); //Check for an existing reference first, if there is one and it matches this connection then no worries if (allConnectionsById.ContainsKey(connection.ConnectionInfo.NetworkIdentifier)) { if (allConnectionsById[connection.ConnectionInfo.NetworkIdentifier].ContainsKey(connection.ConnectionInfo.ConnectionType)) { if (!allConnectionsById[connection.ConnectionInfo.NetworkIdentifier][connection.ConnectionInfo.ConnectionType].Contains(connection)) { foreach (var current in allConnectionsById[connection.ConnectionInfo.NetworkIdentifier][connection.ConnectionInfo.ConnectionType]) { if (current.ConnectionInfo.RemoteEndPoint.Equals(connection.ConnectionInfo.RemoteEndPoint)) throw new ConnectionSetupException("A different connection to the same remoteEndPoint already exists. Duplicate connections should be prevented elsewhere. Existing connection " + current.ConnectionInfo + ", new connection " + connection.ConnectionInfo); } } else { //We are trying to add the same connection twice, so just do nothing here. } } else allConnectionsById[connection.ConnectionInfo.NetworkIdentifier].Add(connection.ConnectionInfo.ConnectionType, new List<Connection>() { connection }); } else allConnectionsById.Add(connection.ConnectionInfo.NetworkIdentifier, new Dictionary<ConnectionType, List<Connection>>() { { connection.ConnectionInfo.ConnectionType, new List<Connection>() {connection}} }); } } #endregion } #if NO_LOGGING /// <summary> /// On some platforms NLog has issues so this class provides the most basic logging featyres. /// </summary> public class Logger { internal object locker = new object(); internal string LogFileLocation { get; set; } public void Trace(string message) { log("Trace", message); } public void Debug(string message) { log("Debug", message); } public void Fatal(string message, Exception e = null) { log("Fatal", message); } public void Info(string message) { log("Info", message); } public void Warn(string message) { log("Warn", message); } public void Error(string message) { log("Error", message); } private void log(string level, string message) { if (LogFileLocation != null) { //Try to get the threadId which is very usefull when debugging string threadId = null; try { threadId = Thread.CurrentThread.ManagedThreadId.ToString(); } catch (Exception) { } try { lock (locker) { using (var sw = new StreamWriter(LogFileLocation, true)) { if (threadId != null) sw.WriteLine(DateTime.Now.Hour.ToString() + "." + DateTime.Now.Minute.ToString() + "." + DateTime.Now.Second.ToString() + "." + DateTime.Now.Millisecond.ToString() + " [" + threadId + " - " + level + "] - " + message); else sw.WriteLine(DateTime.Now.Hour.ToString() + "." + DateTime.Now.Minute.ToString() + "." + DateTime.Now.Second.ToString() + "." + DateTime.Now.Millisecond.ToString() + " [" + level + "] - " + message); } } } catch (Exception) { } } } public Logger() { } public void Shutdown() { } } #endif }
通过阅读代码,您会发现NetworkComms像一首诗一样优美
新年快乐
www.cnblogs.com/networkComms
www.networkcomms.cn
评论(0)