Skip to content

Commit ca932e9

Browse files
committed
✨ Initial version of a simple IDuplexPipe adapter for WebSocket
Handles input/output, provides RunAsync and CompleteAsync, and performs clean close when either input/output are completed.
1 parent fbf6fc5 commit ca932e9

15 files changed

+795
-1
lines changed

.editorconfig

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,3 +87,6 @@ csharp_new_line_before_catch = true
8787
csharp_new_line_before_finally = true
8888
csharp_new_line_before_members_in_object_initializers = true
8989
csharp_new_line_before_members_in_anonymous_types = true
90+
91+
# xUnit1013: Public method should be marked as test
92+
dotnet_diagnostic.xUnit1013.severity = none

.github/workflows/build.yml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,12 @@ jobs:
3131
submodules: recursive
3232
fetch-depth: 0
3333

34+
- name: ⚙ dotnet 6.0.x
35+
uses: actions/setup-dotnet@v1
36+
with:
37+
dotnet-version: 6.0.x
38+
include-prerelease: true
39+
3440
- name: ✓ ensure format
3541
run: |
3642
dotnet tool update -g dotnet-format --version 5.0.*
@@ -51,6 +57,12 @@ jobs:
5157
submodules: recursive
5258
fetch-depth: 0
5359

60+
- name: ⚙ dotnet 6.0.x
61+
uses: actions/setup-dotnet@v1
62+
with:
63+
dotnet-version: 6.0.x
64+
include-prerelease: true
65+
5466
- name: 🙏 build
5567
run: dotnet build -m:1 -p:VersionLabel="$GITHUB_REF.$GITHUB_RUN_NUMBER"
5668

WebSocketPipe.sln

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,25 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution
99
readme.md = readme.md
1010
EndProjectSection
1111
EndProject
12+
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "WebSocketPipe", "src\WebSocketPipe\WebSocketPipe.csproj", "{36D496E4-50C8-4156-8A9F-D525A3C19746}"
13+
EndProject
14+
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Tests", "src\Tests\Tests.csproj", "{517F1129-4EA6-46FA-827B-42CF5EB0DE09}"
15+
EndProject
1216
Global
17+
GlobalSection(SolutionConfigurationPlatforms) = preSolution
18+
Debug|Any CPU = Debug|Any CPU
19+
Release|Any CPU = Release|Any CPU
20+
EndGlobalSection
21+
GlobalSection(ProjectConfigurationPlatforms) = postSolution
22+
{36D496E4-50C8-4156-8A9F-D525A3C19746}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
23+
{36D496E4-50C8-4156-8A9F-D525A3C19746}.Debug|Any CPU.Build.0 = Debug|Any CPU
24+
{36D496E4-50C8-4156-8A9F-D525A3C19746}.Release|Any CPU.ActiveCfg = Release|Any CPU
25+
{36D496E4-50C8-4156-8A9F-D525A3C19746}.Release|Any CPU.Build.0 = Release|Any CPU
26+
{517F1129-4EA6-46FA-827B-42CF5EB0DE09}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
27+
{517F1129-4EA6-46FA-827B-42CF5EB0DE09}.Debug|Any CPU.Build.0 = Debug|Any CPU
28+
{517F1129-4EA6-46FA-827B-42CF5EB0DE09}.Release|Any CPU.ActiveCfg = Release|Any CPU
29+
{517F1129-4EA6-46FA-827B-42CF5EB0DE09}.Release|Any CPU.Build.0 = Release|Any CPU
30+
EndGlobalSection
1331
GlobalSection(SolutionProperties) = preSolution
1432
HideSolutionNode = FALSE
1533
EndGlobalSection

readme.md

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,30 @@
66
[![License](https://img.shields.io/github/license/devlooped/WebSocketPipe.svg?color=blue)](https://github.com/devlooped/WebSocketPipe/blob/main/license.txt)
77
[![Build](https://github.com/devlooped/WebSocketPipe/workflows/build/badge.svg?branch=main)](https://github.com/devlooped/WebSocketPipe/actions)
88

9-
A System.IO.Pipelines adapter API over System.Net.WebSockets
9+
A System.IO.Pipelines adapter API over System.Net.WebSockets
10+
11+
# Usage
12+
13+
```csharp
14+
using Devlooped;
15+
16+
var client = new ClientWebSocket();
17+
await client.ConnectAsync(serverUri, CancellationToken.None);
18+
19+
using IWebSocketPipe pipe = WebSocketPipe.Create(client, closeWhenCompleted: true);
20+
21+
var read = Task.Run(async () =>
22+
{
23+
var read = await pipe.Input.ReadAsync();
24+
Output.WriteLine("Client: " + Encoding.UTF8.GetString(read.Buffer));
25+
await pipe.CompleteAsync(WebSocketCloseStatus.NormalClosure, "Client Done");
26+
});
27+
28+
var run = pipe.RunAsync();
29+
30+
await pipe.Output.WriteAsync(Encoding.UTF8.GetBytes("hello").AsMemory());
31+
32+
33+
34+
```
35+

src/Tests/EndToEnd.cs

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
using System.IO.Pipelines;
2+
using System.Net;
3+
using System.Net.WebSockets;
4+
using System.Text;
5+
using Xunit;
6+
using Xunit.Abstractions;
7+
8+
namespace Devlooped
9+
{
10+
public record EndToEnd(ITestOutputHelper Output)
11+
{
12+
[Fact]
13+
public async Task RunAsync()
14+
{
15+
await using var server = WebSocketServer.Create(Echo, null, Output);
16+
17+
var client = new ClientWebSocket();
18+
19+
await client.ConnectAsync(server.Uri, CancellationToken.None);
20+
using var pipe = WebSocketPipe.Create(client, closeWhenCompleted: true);
21+
22+
var read = Task.Run(async () =>
23+
{
24+
var read = await pipe.Input.ReadAsync();
25+
Output.WriteLine("Client: " + Encoding.UTF8.GetString(read.Buffer));
26+
await pipe.CompleteAsync(WebSocketCloseStatus.NormalClosure, "Client Done");
27+
});
28+
29+
var run = pipe.RunAsync();
30+
31+
await pipe.Output.WriteAsync(Encoding.UTF8.GetBytes("hello").AsMemory());
32+
33+
await read;
34+
await run;
35+
36+
Assert.NotEqual(WebSocketState.Open, pipe.State);
37+
Assert.Equal("Client Done", pipe.CloseStatusDescription);
38+
}
39+
40+
async Task Echo(IDuplexPipe pipe)
41+
{
42+
while (await pipe.Input.ReadAsync() is var result && !result.IsCompleted)
43+
{
44+
Output.WriteLine($"Echoing: {Encoding.UTF8.GetString(result.Buffer)}");
45+
// Just assume we get a single-segment entry, for simplicity
46+
await pipe.Output.WriteAsync(result.Buffer.First);
47+
pipe.Input.AdvanceTo(result.Buffer.Start, result.Buffer.End);
48+
}
49+
Output.WriteLine($"Server: Done.");
50+
}
51+
}
52+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
{
2+
"iisSettings": {
3+
"windowsAuthentication": false,
4+
"anonymousAuthentication": true,
5+
"iisExpress": {
6+
"applicationUrl": "http://localhost:52017/",
7+
"sslPort": 44307
8+
}
9+
},
10+
"profiles": {
11+
"IIS Express": {
12+
"commandName": "IISExpress",
13+
"launchBrowser": true,
14+
"environmentVariables": {
15+
"ASPNETCORE_ENVIRONMENT": "Development"
16+
}
17+
},
18+
"Tests": {
19+
"commandName": "Project",
20+
"launchBrowser": true,
21+
"environmentVariables": {
22+
"ASPNETCORE_ENVIRONMENT": "Development"
23+
},
24+
"applicationUrl": "https://localhost:5001;http://localhost:5000"
25+
}
26+
}
27+
}

src/Tests/SimpleWebSocketPipeTests.cs

Lines changed: 205 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,205 @@
1+
using System.IO.Pipelines;
2+
using System.Net.WebSockets;
3+
using System.Text;
4+
using Xunit;
5+
using Xunit.Abstractions;
6+
7+
namespace Devlooped
8+
{
9+
public record SimpleWebSocketPipeTests(ITestOutputHelper Output)
10+
{
11+
[Fact]
12+
public async Task WhenWebSocketNotOpen_ThenThrowsAsync()
13+
{
14+
IWebSocketPipe pipe = WebSocketPipe.Create(new ClientWebSocket());
15+
await Assert.ThrowsAsync<InvalidOperationException>(() => pipe.RunAsync());
16+
}
17+
18+
[Fact]
19+
public async Task WhenConnected_ThenRuns()
20+
{
21+
await using var server = WebSocketServer.Create(Echo, null, Output);
22+
using var socket = new ClientWebSocket();
23+
24+
await socket.ConnectAsync(server.Uri, default);
25+
26+
using var pipe = WebSocketPipe.Create(socket);
27+
28+
await Task.WhenAll(
29+
pipe.RunAsync(server.Cancellation.Token),
30+
Task.Delay(100).ContinueWith(_ => server.Cancellation.Cancel()));
31+
}
32+
33+
[Fact]
34+
public async Task WhenServerClosesWebSocket_ThenClientCompletesGracefully()
35+
{
36+
await using var server = WebSocketServer.Create(Echo, null, Output);
37+
using var socket = new ClientWebSocket();
38+
await socket.ConnectAsync(server.Uri, default);
39+
using var pipe = WebSocketPipe.Create(socket);
40+
var run = pipe.RunAsync();
41+
42+
await server.DisposeAsync();
43+
44+
Task.WaitAny(run, Task.Delay(100).ContinueWith(_ => throw new TimeoutException()));
45+
}
46+
47+
[Fact]
48+
public async Task WhenClientClompletes_ThenServerCompletesGracefully()
49+
{
50+
IDuplexPipe? serverPipe = default;
51+
await using var server = WebSocketServer.Create(x =>
52+
{
53+
serverPipe = x;
54+
return Task.CompletedTask;
55+
}, null, Output);
56+
57+
using var socket = new ClientWebSocket();
58+
await socket.ConnectAsync(server.Uri, default);
59+
using var pipe = WebSocketPipe.Create(socket, closeWhenCompleted: true);
60+
var run = pipe.RunAsync();
61+
62+
await pipe.CompleteAsync();
63+
64+
Assert.Equal(WebSocketState.Closed, socket.State);
65+
Assert.NotNull(serverPipe);
66+
await Assert.ThrowsAsync<InvalidOperationException>(() => serverPipe!.Input.ReadAsync().AsTask());
67+
}
68+
69+
[Fact]
70+
public async Task WhenSocketClosed_ThenCompletes()
71+
{
72+
await using var server = WebSocketServer.Create(Echo, null, Output);
73+
using var socket = new ClientWebSocket();
74+
await socket.ConnectAsync(server.Uri, default);
75+
using var pipe = WebSocketPipe.Create(socket);
76+
var run = pipe.RunAsync();
77+
78+
await socket.CloseAsync(WebSocketCloseStatus.NormalClosure, null, default);
79+
80+
Assert.Equal(WebSocketState.Closed, socket.State);
81+
await run;
82+
await Assert.ThrowsAsync<InvalidOperationException>(() => pipe.Input.ReadAsync().AsTask());
83+
}
84+
85+
[Fact]
86+
public async Task WhenClientClompletesWithStatus_ThenServerAndClientShareStatus()
87+
{
88+
IWebSocketPipe? serverPipe = default;
89+
await using var server = WebSocketServer.Create(x =>
90+
{
91+
serverPipe = x;
92+
return Task.CompletedTask;
93+
}, null, Output);
94+
95+
using var socket = new ClientWebSocket();
96+
await socket.ConnectAsync(server.Uri, default);
97+
using var pipe = WebSocketPipe.Create(socket);
98+
var run = pipe.RunAsync();
99+
100+
await pipe.CompleteAsync(WebSocketCloseStatus.InvalidMessageType, "Invalid");
101+
102+
Assert.Equal(WebSocketState.Closed, socket.State);
103+
Assert.Equal(WebSocketCloseStatus.InvalidMessageType, socket.CloseStatus);
104+
Assert.Equal("Invalid", socket.CloseStatusDescription);
105+
106+
Assert.NotNull(serverPipe);
107+
Assert.Equal(WebSocketState.Closed, serverPipe!.State);
108+
Assert.Equal(WebSocketCloseStatus.InvalidMessageType, serverPipe!.CloseStatus);
109+
Assert.Equal("Invalid", serverPipe!.CloseStatusDescription);
110+
}
111+
112+
[Fact]
113+
public async Task WhenSubProtocolSpecified_ThenServerAndClientShareSubProtocol()
114+
{
115+
IWebSocketPipe? serverPipe = default;
116+
await using var server = WebSocketServer.Create(x =>
117+
{
118+
serverPipe = x;
119+
return Task.CompletedTask;
120+
}, null, Output);
121+
122+
using var socket = new ClientWebSocket();
123+
socket.Options.AddSubProtocol("protobuf.webpubsub.azure.v1");
124+
await socket.ConnectAsync(server.Uri, default);
125+
using var pipe = WebSocketPipe.Create(socket);
126+
var run = pipe.RunAsync();
127+
128+
Assert.Equal("protobuf.webpubsub.azure.v1", socket.SubProtocol);
129+
Assert.Equal("protobuf.webpubsub.azure.v1", pipe.SubProtocol);
130+
131+
Assert.NotNull(serverPipe);
132+
Assert.Equal("protobuf.webpubsub.azure.v1", serverPipe!.SubProtocol);
133+
}
134+
135+
[Fact]
136+
public async Task WhenClientClompletesWithStatus_ThenCompletesWebSocketEvenIfNotSpecified()
137+
{
138+
await using var server = WebSocketServer.Create(Echo, null, Output);
139+
using var socket = new ClientWebSocket();
140+
await socket.ConnectAsync(server.Uri, default);
141+
using var pipe = WebSocketPipe.Create(socket, closeWhenCompleted: false);
142+
var run = pipe.RunAsync();
143+
144+
await pipe.CompleteAsync(WebSocketCloseStatus.InvalidMessageType, "Invalid");
145+
146+
Assert.Equal(WebSocketState.Closed, socket.State);
147+
Assert.Equal(WebSocketCloseStatus.InvalidMessageType, socket.CloseStatus);
148+
Assert.Equal("Invalid", socket.CloseStatusDescription);
149+
}
150+
151+
[Fact]
152+
public async Task WhenSocketDisposed_ThenStateIsClosed()
153+
{
154+
await using var server = WebSocketServer.Create(Echo, null, Output);
155+
using var socket = new ClientWebSocket();
156+
157+
await socket.ConnectAsync(server.Uri, default);
158+
159+
using var pipe = WebSocketPipe.Create(socket);
160+
161+
socket.Dispose();
162+
163+
Assert.Equal(WebSocketState.Closed, pipe.State);
164+
}
165+
166+
[Fact]
167+
public async Task WhenReceivingChunks_PipeReaderExposesFullMessage()
168+
{
169+
await using var server = WebSocketServer.Create(async s =>
170+
{
171+
var serverPipe = WebSocketPipe.Create(s);
172+
var serverRun = serverPipe.RunAsync();
173+
while (await serverPipe.Input.ReadAsync() is var result && !result.IsCompleted)
174+
{
175+
// Send in "chunks" of 1 byte.
176+
for (var i = 0; i < result.Buffer.Length; i++)
177+
await s.SendAsync(result.Buffer.Slice(i, 1).First, WebSocketMessageType.Binary, i == result.Buffer.Length - 1, default);
178+
179+
serverPipe.Input.AdvanceTo(result.Buffer.Start, result.Buffer.End);
180+
}
181+
await serverRun;
182+
}, null, Output);
183+
184+
using var socket = new ClientWebSocket();
185+
await socket.ConnectAsync(server.Uri, default);
186+
using var pipe = WebSocketPipe.Create(socket);
187+
188+
var run = pipe.RunAsync();
189+
var write = pipe.Output.WriteAsync(new ReadOnlyMemory<byte>(Encoding.UTF8.GetBytes("hello")));
190+
var read = await pipe.Input.ReadAsync();
191+
var echo = Encoding.UTF8.GetString(read.Buffer.FirstSpan);
192+
193+
Assert.Equal("hello", echo);
194+
}
195+
196+
async Task Echo(IWebSocketPipe pipe)
197+
{
198+
while (await pipe.Input.ReadAsync() is var result && !result.IsCompleted)
199+
{
200+
await pipe.Output.WriteAsync(result.Buffer.First);
201+
pipe.Input.AdvanceTo(result.Buffer.Start, result.Buffer.End);
202+
}
203+
}
204+
}
205+
}

0 commit comments

Comments
 (0)