diff --git a/src/FRC.NetworkTables/Dispatcher.cs b/src/FRC.NetworkTables/Dispatcher.cs index 1600dd3..72ec5e1 100644 --- a/src/FRC.NetworkTables/Dispatcher.cs +++ b/src/FRC.NetworkTables/Dispatcher.cs @@ -4,6 +4,7 @@ using NetworkTables.Interfaces; using NetworkTables.TcpSockets; using NetworkTables.Logging; +using System; namespace NetworkTables { @@ -45,22 +46,23 @@ public void StartServer(string persistentFilename, string listenAddress, int por public void SetServer(string serverName, int port) { - SetConnector(() => TcpConnector.Connect(serverName, port, Logger.Instance, 1)); + SetConnector(() => TcpConnector.Connect(new List<(string server, int port)> { (serverName, port) }, Logger.Instance, TimeSpan.FromSeconds(1))); } public void SetServer(IList servers) { - List connectors = new List(); + List<(string server, int port)> addresses = new List<(string server, int port)>(servers.Count); foreach (var server in servers) { - connectors.Add(() => TcpConnector.Connect(server.IpAddress, server.Port, Logger.Instance, 1)); + addresses.Add(server); } - SetConnector(connectors); + + SetConnector(() => TcpConnector.Connect(addresses, Logger.Instance, TimeSpan.FromSeconds(1))); } public void SetServerOverride(IPAddress address, int port) { - SetConnectorOverride(() => TcpConnector.Connect(address.ToString(), port, Logger.Instance, 1)); + SetConnectorOverride(() => TcpConnector.Connect(new List<(string server, int port)> { (address.ToString(), port) }, Logger.Instance, TimeSpan.FromSeconds(1))); } public void ClearServerOverride() diff --git a/src/FRC.NetworkTables/DispatcherBase.cs b/src/FRC.NetworkTables/DispatcherBase.cs index a5d4512..269ccab 100644 --- a/src/FRC.NetworkTables/DispatcherBase.cs +++ b/src/FRC.NetworkTables/DispatcherBase.cs @@ -8,12 +8,13 @@ using System.Threading.Tasks; using Nito.AsyncEx.Synchronous; using NetworkTables.Logging; +using System.Net.Sockets; namespace NetworkTables { internal class DispatcherBase : IDisposable { - public delegate NtTcpClient Connector(); + public delegate TcpClient Connector(); public const double MinimumUpdateTime = 0.01; //100ms public const double MaximumUpdateTime = 1.0; //1 second @@ -38,7 +39,7 @@ internal class DispatcherBase : IDisposable private bool m_doReconnect = true; private string m_identity = ""; - private IList m_clientConnectors = new List(); + private Connector m_clientConnector; private DateTime m_lastFlush; @@ -153,15 +154,10 @@ public void StartClient() } public void SetConnector(Connector connector) - { - SetConnector(new List() { connector }); - } - - public void SetConnector(IList connectors) { lock (m_userMutex) { - m_clientConnectors = connectors; + m_clientConnector = connector; } } @@ -191,7 +187,7 @@ public void Stop() //Wake up client thread with a reconnect lock (m_userMutex) { - m_clientConnectors.Clear(); + m_clientConnector = null; } ClientReconnect(); @@ -360,7 +356,7 @@ private void ServerThreadMain() } if (!m_active) return; - if (stream.RemoteEndPoint is IPEndPoint ipEp) + if (stream.Client.RemoteEndPoint is IPEndPoint ipEp) { Debug(Logger.Instance, $"server: client connection from {ipEp.Address} port {ipEp.Port.ToString()}"); } @@ -397,7 +393,6 @@ private void ServerThreadMain() private void ClientThreadMain() { - int i = 0; while (m_active) { //Sleep between retries @@ -407,16 +402,7 @@ private void ClientThreadMain() lock (m_userMutex) { - if (m_clientConnectorOverride != null) - { - connect = m_clientConnectorOverride; - } - else - { - if (m_clientConnectors.Count == 0) continue; - if (i >= m_clientConnectors.Count) i = 0; - connect = m_clientConnectors[i++]; - } + connect = m_clientConnectorOverride ?? m_clientConnector; } Debug(Logger.Instance, "client trying to connect"); diff --git a/src/FRC.NetworkTables/NetworkConnection.cs b/src/FRC.NetworkTables/NetworkConnection.cs index 27b8279..e971bc1 100644 --- a/src/FRC.NetworkTables/NetworkConnection.cs +++ b/src/FRC.NetworkTables/NetworkConnection.cs @@ -5,7 +5,6 @@ using System.Threading; using NetworkTables.Streams; using NetworkTables.Support; -using NetworkTables.TcpSockets; using NetworkTables.Wire; using static NetworkTables.Logging.Logger; using System.IO; @@ -13,6 +12,7 @@ using System.Threading.Tasks; using Nito.AsyncEx.Synchronous; using NetworkTables.Logging; +using System.Net.Sockets; namespace NetworkTables { @@ -32,7 +32,7 @@ public enum State { Created, Init, Handshake, Synchronized, Active, Dead }; private readonly Stream m_stream; // ReSharper disable once PrivateFieldCanBeConvertedToLocalVariable - private readonly IClient m_client; + private readonly TcpClient m_client; public int PeerPort { get; } public string PeerIP { get; } @@ -64,7 +64,7 @@ public enum State { Created, Init, Handshake, Synchronized, Active, Dead }; private readonly List<(int First, int Second)> m_pendingUpdate = new List<(int First, int Second)>(); - public NetworkConnection(IClient client, Notifier notifier, HandshakeFunc handshake, + public NetworkConnection(TcpClient client, Notifier notifier, HandshakeFunc handshake, Message.GetEntryTypeFunc getEntryType) { Uid = (uint)Interlocked.Increment(ref s_uid) - 1; @@ -79,7 +79,7 @@ public NetworkConnection(IClient client, Notifier notifier, HandshakeFunc handsh m_state = State.Created; LastUpdate = 0; - if (m_client.RemoteEndPoint is IPEndPoint ipEp) + if (m_client.Client.RemoteEndPoint is IPEndPoint ipEp) { PeerIP = ipEp.Address.ToString(); PeerPort = ipEp.Port; diff --git a/src/FRC.NetworkTables/TcpSockets/IClient.cs b/src/FRC.NetworkTables/TcpSockets/IClient.cs deleted file mode 100644 index 110982f..0000000 --- a/src/FRC.NetworkTables/TcpSockets/IClient.cs +++ /dev/null @@ -1,13 +0,0 @@ -using System; -using System.IO; -using System.Net; - -namespace NetworkTables.TcpSockets -{ - interface IClient : IDisposable - { - Stream GetStream(); - EndPoint RemoteEndPoint { get; } - bool NoDelay { set; } - } -} diff --git a/src/FRC.NetworkTables/TcpSockets/INetworkAcceptor.cs b/src/FRC.NetworkTables/TcpSockets/INetworkAcceptor.cs index 4444f2c..455420b 100644 --- a/src/FRC.NetworkTables/TcpSockets/INetworkAcceptor.cs +++ b/src/FRC.NetworkTables/TcpSockets/INetworkAcceptor.cs @@ -1,4 +1,5 @@ using System; +using System.Net.Sockets; namespace NetworkTables.TcpSockets { @@ -6,6 +7,6 @@ internal interface INetworkAcceptor: IDisposable { int Start(); void Shutdown(); - IClient Accept(); + TcpClient Accept(); } } diff --git a/src/FRC.NetworkTables/TcpSockets/NtTcpClient.cs b/src/FRC.NetworkTables/TcpSockets/NtTcpClient.cs deleted file mode 100644 index 8d3f38c..0000000 --- a/src/FRC.NetworkTables/TcpSockets/NtTcpClient.cs +++ /dev/null @@ -1,208 +0,0 @@ -// Licensed to the .NET Foundation under one or more agreements. -// The .NET Foundation licenses this file to you under the MIT license. -// See the LICENSE file in the project root for more information. - -using System; -using System.Net; -using System.Net.Sockets; -using System.IO; -using static NetworkTables.Logging.Logger; -using NetworkTables.Logging; - -namespace NetworkTables.TcpSockets -{ - internal class NtTcpClient : IClient - { - private Socket m_clientSocket; - private NetworkStream m_dataStream; - private bool m_cleanedUp; - private bool m_active; - - public NtTcpClient() : this(AddressFamily.InterNetwork) { } - - public bool NoDelay - { - get { return m_clientSocket.NoDelay; } - set { m_clientSocket.NoDelay = value; } - } - - public EndPoint RemoteEndPoint => m_clientSocket.RemoteEndPoint; - - public NtTcpClient(AddressFamily family) - { - if (family != AddressFamily.InterNetwork && family != AddressFamily.InterNetworkV6) - { - throw new ArgumentException("Invalid TCP Family", nameof(family)); - } - - m_clientSocket = new Socket(family, SocketType.Stream, ProtocolType.Tcp); - } - - internal NtTcpClient(Socket acceptedSocket) - { - m_clientSocket = acceptedSocket; - m_active = true; - } - - public bool Active - { - get { return m_active; } - set { m_active = value; } - } - - public Stream GetStream() - { - if (m_cleanedUp) - { - throw new ObjectDisposedException(GetType().FullName); - } - if (!m_clientSocket.Connected) - { - throw new InvalidOperationException("Not Connected"); - } - return m_dataStream ?? (m_dataStream = new NetworkStream(m_clientSocket, true)); - } - - public void Connect(IPAddress[] ipAddresses, int port) - { - m_clientSocket.Connect(ipAddresses, port); - m_active = true; - } - - public bool ConnectWithTimeout(IPAddress[] ipAddresses, int port, Logger logger, int timeout) - { - if (ipAddresses == null) - throw new ArgumentNullException(nameof(ipAddresses), "IP Addresses cannot be null"); - if (ipAddresses.Length == 0) - throw new ArgumentOutOfRangeException(nameof(ipAddresses), "IP Adresses must have values internally"); - - IPEndPoint ipEp = new IPEndPoint(ipAddresses[0], port); - bool isProperlySupported = RuntimeDetector.GetRuntimeHasProperSockets(); - try - { - m_clientSocket.Blocking = false; - if (isProperlySupported) - { - m_clientSocket.Connect(ipAddresses, port); - } - else - { - m_clientSocket.Connect(ipEp); - } - //We have connected - m_active = true; - return true; - - } - catch (SocketException ex) - { - - if (ex.SocketErrorCode == SocketError.WouldBlock || ex.SocketErrorCode == SocketError.InProgress) - { - DateTime waitUntil = DateTime.UtcNow + TimeSpan.FromSeconds(timeout); - try - { - while (true) - { - if (m_clientSocket.Poll(1000, SelectMode.SelectWrite)) - { - if (!isProperlySupported) - m_clientSocket.Connect(ipEp); - // We have connected - m_active = true; - return true; - } - else - { - if (DateTime.UtcNow >= waitUntil) - { - // We have timed out - Info(logger, $"Connect() to {ipAddresses[0]} port {port.ToString()} timed out"); - break; - } - } - } - } - catch (SocketException ex2) - { - if (!isProperlySupported) - { - if (ex2.SocketErrorCode == SocketError.IsConnected) - { - m_active = true; - return true; - } - } - Error(logger, $"Select() to {ipAddresses[0]} port {port.ToString()} error {ex2.SocketErrorCode}"); - } - } - else - { - if (ex.SocketErrorCode == SocketError.ConnectionRefused) - { - // A connection refused is an unexceptional case - Info(logger, $"Connect() to {ipAddresses[0]} port {port.ToString()} timed out"); - } - Error(logger, $"Connect() to {ipAddresses[0]} port {port.ToString()} error {ex.SocketErrorCode}"); - } - - } - finally - { - m_clientSocket.Blocking = true; - } - return false; - } - - protected virtual void Dispose(bool disposing) - { - if (m_cleanedUp) - { - return; - } - - if (disposing) - { - IDisposable dataStream = m_dataStream; - if (dataStream != null) - { - dataStream.Dispose(); - } - else - { - Socket chkClientSocket = m_clientSocket; - if (chkClientSocket != null) - { - try - { - chkClientSocket.Shutdown(SocketShutdown.Both); - } - catch (SocketException) { } // Ignore any socket exception - finally - { - chkClientSocket.Dispose(); - m_clientSocket = null; - } - } - } - - GC.SuppressFinalize(this); - } - - m_cleanedUp = true; - } - - ~NtTcpClient() - { - Dispose(false); - } - - public void Dispose() - { - Dispose(true); - } - - public bool Connected => m_clientSocket.Connected; - } - -} diff --git a/src/FRC.NetworkTables/TcpSockets/NtTcpListener.cs b/src/FRC.NetworkTables/TcpSockets/NtTcpListener.cs deleted file mode 100644 index 9c45a0d..0000000 --- a/src/FRC.NetworkTables/TcpSockets/NtTcpListener.cs +++ /dev/null @@ -1,93 +0,0 @@ -// Licensed to the .NET Foundation under one or more agreements. -// The .NET Foundation licenses this file to you under the MIT license. -// See the LICENSE file in the project root for more information. - -using System; -using System.Net; -using System.Net.Sockets; - -namespace NetworkTables.TcpSockets -{ - internal class NtTcpListener - { - private readonly IPEndPoint m_serverSocketEp; - private Socket m_serverSocket; - private bool m_active; - - - public NtTcpListener(IPAddress localaddr, int port) - { - if (localaddr == null) - { - throw new ArgumentNullException(nameof(localaddr)); - } - m_serverSocketEp = new IPEndPoint(localaddr, port); - m_serverSocket = new Socket(m_serverSocketEp.AddressFamily, SocketType.Stream, ProtocolType.Tcp); - } - - public void Start(int backlog = (int)SocketOptionName.MaxConnections) - { - if (backlog < 0) - { - throw new ArgumentOutOfRangeException(nameof(backlog)); - } - - if (m_serverSocket == null) - { - throw new InvalidOperationException("Invalid Socket Handle"); - } - - if (m_active) - { - return; - } - try - { - m_serverSocket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, 1); - } - catch (SocketException) - { - // Raspberry Pi does not support setting the socket option, so must catch. - } - m_serverSocket.Bind(m_serverSocketEp); - try - { - m_serverSocket.Listen(backlog); - } - catch (SocketException) - { - Stop(); - throw; - } - - m_active = true; - } - - public void Stop() - { - if (m_serverSocket != null) - { - m_serverSocket.Dispose(); - m_serverSocket = null; - } - m_active = false; - m_serverSocket = new Socket(m_serverSocketEp.AddressFamily, SocketType.Stream, ProtocolType.Tcp); - } - - public Socket Accept(out SocketError errorCode) - { - Socket socket; - try - { - socket = m_serverSocket.Accept(); - } - catch (SocketException ex) - { - errorCode = ex.SocketErrorCode; - return null; - } - errorCode = 0; - return socket; - } - } -} diff --git a/src/FRC.NetworkTables/TcpSockets/RuntimeDetector.cs b/src/FRC.NetworkTables/TcpSockets/RuntimeDetector.cs deleted file mode 100644 index 0373b66..0000000 --- a/src/FRC.NetworkTables/TcpSockets/RuntimeDetector.cs +++ /dev/null @@ -1,56 +0,0 @@ -using System; -#if NETSTANDARD1_3 -using System.Runtime.InteropServices; -#endif - -namespace NetworkTables.TcpSockets -{ - internal static class RuntimeDetector - { - enum ProperSocketsCacheState - { - NotCached, - Supported, - NotSupported - } - - private static ProperSocketsCacheState s_socketState = ProperSocketsCacheState.NotCached; - - /// - /// Gets if the runtime has sockets that support proper connections - /// - /// - public static bool GetRuntimeHasProperSockets() - { - if (s_socketState == ProperSocketsCacheState.NotCached) - { - Type type = Type.GetType("Mono.Runtime"); - if (type == null) - { -#if NETSTANDARD1_3 - if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows)) - { - // Windows - s_socketState = ProperSocketsCacheState.Supported; - return true; - } - else - { - // Unix - s_socketState = ProperSocketsCacheState.NotSupported; - return false; - } -#else - // Full .net framework works perfectly. - s_socketState = ProperSocketsCacheState.Supported; - return true; -#endif - } - // For now mono does not support, so return false - s_socketState = ProperSocketsCacheState.NotSupported; - return false; - } - return s_socketState == ProperSocketsCacheState.Supported; - } - } -} diff --git a/src/FRC.NetworkTables/TcpSockets/TcpAcceptor.cs b/src/FRC.NetworkTables/TcpSockets/TcpAcceptor.cs index 3a55700..fbf7552 100644 --- a/src/FRC.NetworkTables/TcpSockets/TcpAcceptor.cs +++ b/src/FRC.NetworkTables/TcpSockets/TcpAcceptor.cs @@ -1,4 +1,5 @@ using NetworkTables.Logging; +using Nito.AsyncEx; using System; using System.Net; using System.Net.Sockets; @@ -8,7 +9,7 @@ namespace NetworkTables.TcpSockets { internal class TcpAcceptor : INetworkAcceptor { - private NtTcpListener m_server; + private TcpListener m_server; private readonly int m_port; private readonly string m_address; @@ -35,11 +36,11 @@ public int Start() if (m_listening) return 0; var address = !string.IsNullOrEmpty(m_address) ? IPAddress.Parse(m_address) : IPAddress.Any; - m_server = new NtTcpListener(address, m_port); + m_server = new TcpListener(address, m_port); try { - m_server.Start(5); + m_server.Start(); } catch (ObjectDisposedException) { @@ -94,22 +95,11 @@ public void Shutdown() m_server = null; } - public IClient Accept() + public TcpClient Accept() { if (!m_listening || m_shutdown) return null; - Socket socket = m_server.Accept(out SocketError error); - if (socket == null) - { - if (!m_shutdown) Error(Logger.Instance, $"Accept() failed: {error}"); - return null; - } - if (m_shutdown) - { - socket.Dispose(); - return null; - } - return new NtTcpClient(socket); + return AsyncContext.Run(() => m_server.AcceptTcpClientAsync()); } } } diff --git a/src/FRC.NetworkTables/TcpSockets/TcpConnector.cs b/src/FRC.NetworkTables/TcpSockets/TcpConnector.cs index 4d6f8e6..858ff4d 100644 --- a/src/FRC.NetworkTables/TcpSockets/TcpConnector.cs +++ b/src/FRC.NetworkTables/TcpSockets/TcpConnector.cs @@ -6,59 +6,80 @@ using System.Threading.Tasks; using System.Runtime.ExceptionServices; using static NetworkTables.Logging.Logger; +using Nito.AsyncEx; namespace NetworkTables.TcpSockets { internal class TcpConnector { - private static bool WaitAndUnwrapException(Task task, int timeout) + public static TcpClient Connect(IList<(string server, int port)> servers, Logger logger, TimeSpan timeout) { - try + if (servers.Count == 0) { - return task.Wait(timeout); - } - catch (AggregateException ex) - { - ExceptionDispatchInfo.Capture(ex.InnerException).Throw(); - throw ex.InnerException; + return null; } - } - private static int ResolveHostName(string hostName, out IPAddress[] addr) - { - try - { - var entries = Dns.GetHostAddressesAsync(hostName); - var success = WaitAndUnwrapException(entries, 1000); - if (!success) + return AsyncContext.Run(async () => { + TcpClient toReturn = null; + var clientTcp = new List(); + var clientTask = new List(); + try { - addr = null; - return 1; + for (int i = 0; i < servers.Count; i++) + { + TcpClient client = new TcpClient(); + Task connectTask = client.ConnectAsync(servers[i].server, servers[i].port); + clientTcp.Add(client); + clientTask.Add(connectTask); + } + + // 10 second timeout + var delayTask = Task.Delay(timeout); + + clientTask.Add(delayTask); + + while (clientTcp.Count != 0) + { + var finished = await Task.WhenAny(clientTask); + + var index = clientTask.IndexOf(finished); + if (finished == delayTask) + { + return null; + } + else if (finished.IsCompleted && !finished.IsFaulted && !finished.IsCanceled) + { + toReturn = clientTcp[index]; + return toReturn; + } + var remove = clientTcp[index]; + clientTcp.RemoveAt(index); + remove.Dispose(); + clientTask.RemoveAt(index); + } + return null; } - List addresses = new List(); - foreach (var ipAddress in entries.Result) + finally { - // Only allow IPV4 addresses for now - // Sockets don't all support IPV6 - if (ipAddress.AddressFamily == AddressFamily.InterNetwork) + for (int i = 0; i < clientTcp.Count; i++) { - if (!addresses.Contains(ipAddress)) + if (clientTcp[i] != toReturn) { - addresses.Add(ipAddress); + try + { + clientTcp[i].Dispose(); + } + catch (Exception e) + { + // Ignore exception + } } } } - addr = addresses.ToArray(); - - } - catch (SocketException e) - { - addr = null; - return (int)e.SocketErrorCode; - } - return 0; + }); } + /* public static NtTcpClient Connect(string server, int port, Logger logger, int timeout = 0) { if (ResolveHostName(server, out IPAddress[] addr) != 0) @@ -102,5 +123,6 @@ public static NtTcpClient Connect(string server, int port, Logger logger, int ti } return client; } + */ } } diff --git a/src/Shared/NtIPAddress.cs b/src/Shared/NtIPAddress.cs index 8cbd0da..e6a6bf4 100644 --- a/src/Shared/NtIPAddress.cs +++ b/src/Shared/NtIPAddress.cs @@ -24,5 +24,25 @@ public NtIPAddress(string ip, int port) IpAddress = ip; Port = port; } + +#if !CORE + /// + /// Implicitly converts an NtIPAddress to a tuple + /// + /// The address to convert from + public static implicit operator (string server, int port) (NtIPAddress address) + { + return (address.IpAddress, address.Port); + } + + /// + /// Implicitly converts a tuple to an NtIPAddress + /// + /// The address to convert from + public static implicit operator NtIPAddress((string server, int port) address) + { + return new NtIPAddress(address.server, address.port); + } +#endif } } \ No newline at end of file