Skip to content

Commit 90e7472

Browse files
committed
Func<NatsUri, NatsOpts, CancellationToken, ValueTask<ISocketConnection>>? SocketConnectionFactory (#804)
1 parent 74addaa commit 90e7472

File tree

8 files changed

+215
-197
lines changed

8 files changed

+215
-197
lines changed

Diff for: src/NATS.Client.Core/DefaultSocketConnectionFactory.cs

-110
This file was deleted.

Diff for: src/NATS.Client.Core/INatsConnection.cs

+5
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,11 @@ public interface INatsConnection : INatsClient
6161
/// </summary>
6262
Func<(string Host, int Port), ValueTask<(string Host, int Port)>>? OnConnectingAsync { get; set; }
6363

64+
/// <summary>
65+
/// Hook when socket is available.
66+
/// </summary>
67+
Func<ISocketConnection, ValueTask<ISocketConnection>>? OnSocketAvailableAsync { get; set; }
68+
6469
/// <summary>
6570
/// Publishes a serializable message payload to the given subject name, optionally supplying a reply subject.
6671
/// </summary>

Diff for: src/NATS.Client.Core/ISocketConnectionFactory.cs

-10
This file was deleted.

Diff for: src/NATS.Client.Core/TcpConnection.cs renamed to src/NATS.Client.Core/Internal/TcpConnection.cs

+24-29
Original file line numberDiff line numberDiff line change
@@ -3,39 +3,36 @@
33
using System.Runtime.CompilerServices;
44
using System.Runtime.InteropServices;
55
using Microsoft.Extensions.Logging;
6-
using NATS.Client.Core.Internal;
76

8-
namespace NATS.Client.Core;
7+
namespace NATS.Client.Core.Internal;
98

10-
public sealed class SocketClosedException : Exception
9+
internal sealed class SocketClosedException : Exception
1110
{
1211
public SocketClosedException(Exception? innerException)
1312
: base("Socket has been closed.", innerException)
1413
{
1514
}
1615
}
1716

18-
public class TcpConnection : ISocketConnection
17+
internal sealed class TcpConnection : ISocketConnection
1918
{
19+
private readonly ILogger _logger;
20+
private readonly Socket _socket;
2021
private readonly TaskCompletionSource<Exception> _waitForClosedSource = new();
2122
private int _disposed;
2223

2324
public TcpConnection(ILogger logger)
2425
{
25-
Logger = logger;
26-
Socket = new Socket(Socket.OSSupportsIPv6 ? AddressFamily.InterNetworkV6 : AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
26+
_logger = logger;
27+
_socket = new Socket(Socket.OSSupportsIPv6 ? AddressFamily.InterNetworkV6 : AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
2728
if (Socket.OSSupportsIPv6)
2829
{
29-
Socket.DualMode = true;
30+
_socket.DualMode = true;
3031
}
3132

32-
Socket.NoDelay = true;
33+
_socket.NoDelay = true;
3334
}
3435

35-
public ILogger Logger { get; }
36-
37-
public Socket Socket { get; }
38-
3936
public Task<Exception> WaitForClosed => _waitForClosedSource.Task;
4037

4138
// CancellationToken is not used, operation lifetime is completely same as socket.
@@ -51,9 +48,9 @@ public TcpConnection(ILogger logger)
5148
public ValueTask ConnectAsync(string host, int port, CancellationToken cancellationToken)
5249
{
5350
#if NETSTANDARD
54-
return new ValueTask(Socket.ConnectAsync(host, port).WaitAsync(Timeout.InfiniteTimeSpan, cancellationToken));
51+
return new ValueTask(_socket.ConnectAsync(host, port).WaitAsync(Timeout.InfiniteTimeSpan, cancellationToken));
5552
#else
56-
return Socket.ConnectAsync(host, port, cancellationToken);
53+
return _socket.ConnectAsync(host, port, cancellationToken);
5754
#endif
5855
}
5956

@@ -66,9 +63,9 @@ public async ValueTask ConnectAsync(string host, int port, TimeSpan timeout)
6663
try
6764
{
6865
#if NETSTANDARD
69-
await Socket.ConnectAsync(host, port).WaitAsync(timeout, cts.Token).ConfigureAwait(false);
66+
await _socket.ConnectAsync(host, port).WaitAsync(timeout, cts.Token).ConfigureAwait(false);
7067
#else
71-
await Socket.ConnectAsync(host, port, cts.Token).ConfigureAwait(false);
68+
await _socket.ConnectAsync(host, port, cts.Token).ConfigureAwait(false);
7269
#endif
7370
}
7471
catch (Exception ex)
@@ -94,9 +91,9 @@ public ValueTask<int> SendAsync(ReadOnlyMemory<byte> buffer)
9491
segment = new ArraySegment<byte>(buffer.ToArray());
9592
}
9693

97-
return new ValueTask<int>(Socket.SendAsync(segment, SocketFlags.None));
94+
return new ValueTask<int>(_socket.SendAsync(segment, SocketFlags.None));
9895
#else
99-
return Socket.SendAsync(buffer, SocketFlags.None, CancellationToken.None);
96+
return _socket.SendAsync(buffer, SocketFlags.None, CancellationToken.None);
10097
#endif
10198
}
10299

@@ -109,19 +106,19 @@ public ValueTask<int> ReceiveAsync(Memory<byte> buffer)
109106
ThrowHelper.ThrowInvalidOperationException("Can't get underlying array");
110107
}
111108

112-
return new ValueTask<int>(Socket.ReceiveAsync(segment, SocketFlags.None));
109+
return new ValueTask<int>(_socket.ReceiveAsync(segment, SocketFlags.None));
113110
#else
114-
return Socket.ReceiveAsync(buffer, SocketFlags.None, CancellationToken.None);
111+
return _socket.ReceiveAsync(buffer, SocketFlags.None, CancellationToken.None);
115112
#endif
116113
}
117114

118115
public ValueTask AbortConnectionAsync(CancellationToken cancellationToken)
119116
{
120117
#if NETSTANDARD
121-
Socket.Disconnect(false);
118+
_socket.Disconnect(false);
122119
return default;
123120
#else
124-
return Socket.DisconnectAsync(false, cancellationToken);
121+
return _socket.DisconnectAsync(false, cancellationToken);
125122
#endif
126123
}
127124

@@ -135,19 +132,17 @@ public ValueTask DisposeAsync()
135132
}
136133
catch
137134
{
138-
// ignored
139135
}
140136

141137
try
142138
{
143-
Socket.Shutdown(SocketShutdown.Both);
139+
_socket.Shutdown(SocketShutdown.Both);
144140
}
145141
catch
146142
{
147-
// ignored
148143
}
149144

150-
Socket.Dispose();
145+
_socket.Dispose();
151146
}
152147

153148
return default;
@@ -161,13 +156,13 @@ public void SignalDisconnected(Exception exception)
161156

162157
// NetworkStream will own the Socket, so mark as disposed
163158
// in order to skip socket.Dispose() in DisposeAsync
164-
internal SslStreamConnection UpgradeToSslStreamConnection(NatsTlsOpts tlsOpts)
159+
public SslStreamConnection UpgradeToSslStreamConnection(NatsTlsOpts tlsOpts)
165160
{
166161
if (Interlocked.Increment(ref _disposed) == 1)
167162
{
168163
return new SslStreamConnection(
169-
Logger,
170-
Socket,
164+
_logger,
165+
_socket,
171166
tlsOpts,
172167
_waitForClosedSource);
173168
}

Diff for: src/NATS.Client.Core/WebSocketConnection.cs renamed to src/NATS.Client.Core/Internal/WebSocketConnection.cs

+17-13
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,19 @@
66
using System.Runtime.InteropServices;
77
#endif
88

9-
namespace NATS.Client.Core;
9+
namespace NATS.Client.Core.Internal;
1010

11-
public class WebSocketConnection : ISocketConnection
11+
internal sealed class WebSocketConnection : ISocketConnection
1212
{
13+
private readonly ClientWebSocket _socket;
1314
private readonly TaskCompletionSource<Exception> _waitForClosedSource = new();
1415
private readonly TimeSpan _socketCloseTimeout = TimeSpan.FromSeconds(5); // matches _socketComponentDisposeTimeout in NatsConnection.cs
1516
private int _disposed;
1617

17-
public ClientWebSocket Socket { get; } = new();
18+
public WebSocketConnection()
19+
{
20+
_socket = new ClientWebSocket();
21+
}
1822

1923
public Task<Exception> WaitForClosed => _waitForClosedSource.Task;
2024

@@ -30,7 +34,7 @@ public class WebSocketConnection : ISocketConnection
3034
[MethodImpl(MethodImplOptions.AggressiveInlining)]
3135
public Task ConnectAsync(Uri uri, CancellationToken cancellationToken)
3236
{
33-
return Socket.ConnectAsync(uri, cancellationToken);
37+
return _socket.ConnectAsync(uri, cancellationToken);
3438
}
3539

3640
/// <summary>
@@ -41,8 +45,8 @@ public async ValueTask ConnectAsync(NatsUri uri, NatsOpts opts)
4145
using var cts = new CancellationTokenSource(opts.ConnectTimeout);
4246
try
4347
{
44-
await opts.WebSocketOpts.ApplyClientWebSocketOptionsAsync(Socket.Options, uri, opts.TlsOpts, cts.Token).ConfigureAwait(false);
45-
await Socket.ConnectAsync(uri.Uri, cts.Token).ConfigureAwait(false);
48+
await opts.WebSocketOpts.ApplyClientWebSocketOptionsAsync(_socket.Options, uri, opts.TlsOpts, cts.Token).ConfigureAwait(false);
49+
await _socket.ConnectAsync(uri.Uri, cts.Token).ConfigureAwait(false);
4650
}
4751
catch (Exception ex)
4852
{
@@ -67,9 +71,9 @@ public async ValueTask<int> SendAsync(ReadOnlyMemory<byte> buffer)
6771
segment = new ArraySegment<byte>(buffer.ToArray());
6872
}
6973

70-
await Socket.SendAsync(segment, WebSocketMessageType.Binary, true, CancellationToken.None).ConfigureAwait(false);
74+
await _socket.SendAsync(segment, WebSocketMessageType.Binary, true, CancellationToken.None).ConfigureAwait(false);
7175
#else
72-
await Socket.SendAsync(buffer, WebSocketMessageType.Binary, WebSocketMessageFlags.EndOfMessage, CancellationToken.None).ConfigureAwait(false);
76+
await _socket.SendAsync(buffer, WebSocketMessageType.Binary, WebSocketMessageFlags.EndOfMessage, CancellationToken.None).ConfigureAwait(false);
7377
#endif
7478
return buffer.Length;
7579
}
@@ -83,9 +87,9 @@ public async ValueTask<int> ReceiveAsync(Memory<byte> buffer)
8387
ThrowHelper.ThrowInvalidOperationException("Can't get underlying array");
8488
}
8589

86-
var wsRead = await Socket.ReceiveAsync(segment, CancellationToken.None).ConfigureAwait(false);
90+
var wsRead = await _socket.ReceiveAsync(segment, CancellationToken.None).ConfigureAwait(false);
8791
#else
88-
var wsRead = await Socket.ReceiveAsync(buffer, CancellationToken.None).ConfigureAwait(false);
92+
var wsRead = await _socket.ReceiveAsync(buffer, CancellationToken.None).ConfigureAwait(false);
8993
#endif
9094
return wsRead.Count;
9195
}
@@ -94,7 +98,7 @@ public ValueTask AbortConnectionAsync(CancellationToken cancellationToken)
9498
{
9599
// ClientWebSocket.Abort() doesn't accept a cancellation token, so check at the beginning of this method
96100
cancellationToken.ThrowIfCancellationRequested();
97-
Socket.Abort();
101+
_socket.Abort();
98102
return default;
99103
}
100104

@@ -113,13 +117,13 @@ public async ValueTask DisposeAsync()
113117
try
114118
{
115119
var cts = new CancellationTokenSource(_socketCloseTimeout);
116-
await Socket.CloseAsync(WebSocketCloseStatus.NormalClosure, default, cts.Token).ConfigureAwait(false);
120+
await _socket.CloseAsync(WebSocketCloseStatus.NormalClosure, default, cts.Token).ConfigureAwait(false);
117121
}
118122
catch
119123
{
120124
}
121125

122-
Socket.Dispose();
126+
_socket.Dispose();
123127
}
124128
}
125129

0 commit comments

Comments
 (0)