Skip to content

Commit a6a9bdf

Browse files
authored
wasi-sockets: Introduce UDP streams (#7243)
* Introduce UDP streams Introduce new `inbound-datagram-stream` and `outbound-datagram-stream` types and moved `receive` and `send` methods to those respectively. These streams are returned by `bind` can be individually subscribed to. This resolves a design issue where a UDP server would end up in a spin loop because `receive` returned EWOULDBLOCK but poll_* always returned immediately because the socket was ready for sending. In this new setup, users can poll each direction separately. Fixes WebAssembly/wasi-sockets#64 Additionally: - Enable send-like behaviour by making `outbound-datagram::remote-address` optional. Fixes WebAssembly/wasi-sockets#57 - Dropped the `network` parameter from the `connect` call, because `bind` is now _required_ to perform IO. * Align names with wasi-http * Revert previous changes to `bind`. Replace `connect` with `stream` Remove the Mutex again. Instead allow `stream` to be called multiple times, but trap if the previous streams are still active. * The code block was treated as Rust code. * Align more closely to wasi-io's input&output-stream * Use `send` instead of `sendto` on connected sockets. prtest:full
1 parent d361cf2 commit a6a9bdf

File tree

11 files changed

+680
-404
lines changed

11 files changed

+680
-404
lines changed

crates/test-programs/src/bin/preview2_udp_sample_application.rs

Lines changed: 20 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
use test_programs::wasi::sockets::network::{
2-
IpAddressFamily, IpSocketAddress, Ipv4SocketAddress, Ipv6SocketAddress, Network,
2+
IpAddress, IpAddressFamily, IpSocketAddress, Ipv4SocketAddress, Ipv6SocketAddress, Network,
33
};
4-
use test_programs::wasi::sockets::udp::{Datagram, UdpSocket};
4+
use test_programs::wasi::sockets::udp::{OutgoingDatagram, UdpSocket};
55

66
fn test_sample_application(family: IpAddressFamily, bind_address: IpSocketAddress) {
7+
let unspecified_addr = IpSocketAddress::new(IpAddress::new_unspecified(family), 0);
8+
79
let first_message = &[];
810
let second_message = b"Hello, world!";
911
let third_message = b"Greetings, planet!";
@@ -13,31 +15,32 @@ fn test_sample_application(family: IpAddressFamily, bind_address: IpSocketAddres
1315
let server = UdpSocket::new(family).unwrap();
1416

1517
server.blocking_bind(&net, bind_address).unwrap();
18+
let (server_incoming, _) = server.stream(None).unwrap();
1619
let addr = server.local_address().unwrap();
1720

1821
let client_addr = {
1922
let client = UdpSocket::new(family).unwrap();
20-
client.blocking_connect(&net, addr).unwrap();
23+
client.blocking_bind(&net, unspecified_addr).unwrap();
24+
let (_, client_outgoing) = client.stream(Some(addr)).unwrap();
2125

2226
let datagrams = [
23-
Datagram {
27+
OutgoingDatagram {
2428
data: first_message.to_vec(),
25-
remote_address: addr,
29+
remote_address: None,
2630
},
27-
Datagram {
31+
OutgoingDatagram {
2832
data: second_message.to_vec(),
29-
remote_address: addr,
33+
remote_address: Some(addr),
3034
},
3135
];
32-
client.blocking_send(&datagrams).unwrap();
36+
client_outgoing.blocking_send(&datagrams).unwrap();
3337

3438
client.local_address().unwrap()
3539
};
3640

3741
{
3842
// Check that we've received our sent messages.
39-
// Not guaranteed to work but should work in practice.
40-
let datagrams = server.blocking_receive(2..100).unwrap();
43+
let datagrams = server_incoming.blocking_receive(2..100).unwrap();
4144
assert_eq!(datagrams.len(), 2);
4245

4346
assert_eq!(datagrams[0].data, first_message);
@@ -50,21 +53,22 @@ fn test_sample_application(family: IpAddressFamily, bind_address: IpSocketAddres
5053
// Another client
5154
{
5255
let client = UdpSocket::new(family).unwrap();
53-
client.blocking_connect(&net, addr).unwrap();
56+
client.blocking_bind(&net, unspecified_addr).unwrap();
57+
let (_, client_outgoing) = client.stream(None).unwrap();
5458

55-
let datagrams = [Datagram {
59+
let datagrams = [OutgoingDatagram {
5660
data: third_message.to_vec(),
57-
remote_address: addr,
61+
remote_address: Some(addr),
5862
}];
59-
client.blocking_send(&datagrams).unwrap();
63+
client_outgoing.blocking_send(&datagrams).unwrap();
6064
}
6165

6266
{
6367
// Check that we sent and received our message!
64-
let datagrams = server.blocking_receive(1..100).unwrap();
68+
let datagrams = server_incoming.blocking_receive(1..100).unwrap();
6569
assert_eq!(datagrams.len(), 1);
6670

67-
assert_eq!(datagrams[0].data, third_message); // Not guaranteed to work but should work in practice.
71+
assert_eq!(datagrams[0].data, third_message);
6872
}
6973
}
7074

crates/test-programs/src/sockets.rs

Lines changed: 18 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@ use crate::wasi::sockets::network::{
77
Network,
88
};
99
use crate::wasi::sockets::tcp::TcpSocket;
10-
use crate::wasi::sockets::udp::{Datagram, UdpSocket};
10+
use crate::wasi::sockets::udp::{
11+
IncomingDatagram, IncomingDatagramStream, OutgoingDatagram, OutgoingDatagramStream, UdpSocket,
12+
};
1113
use crate::wasi::sockets::{tcp_create_socket, udp_create_socket};
1214
use std::ops::Range;
1315

@@ -142,42 +144,42 @@ impl UdpSocket {
142144
}
143145
}
144146
}
147+
}
145148

146-
pub fn blocking_connect(
147-
&self,
148-
network: &Network,
149-
remote_address: IpSocketAddress,
150-
) -> Result<(), ErrorCode> {
149+
impl OutgoingDatagramStream {
150+
fn blocking_check_send(&self, timeout: &Pollable) -> Result<u64, ErrorCode> {
151151
let sub = self.subscribe();
152152

153-
self.start_connect(&network, remote_address)?;
154-
155153
loop {
156-
match self.finish_connect() {
157-
Err(ErrorCode::WouldBlock) => sub.wait(),
154+
match self.check_send() {
155+
Ok(0) => sub.wait_until(timeout)?,
158156
result => return result,
159157
}
160158
}
161159
}
162160

163-
pub fn blocking_send(&self, mut datagrams: &[Datagram]) -> Result<(), ErrorCode> {
161+
pub fn blocking_send(&self, mut datagrams: &[OutgoingDatagram]) -> Result<(), ErrorCode> {
164162
let timeout = monotonic_clock::subscribe(TIMEOUT_NS, false);
165-
let pollable = self.subscribe();
166163

167164
while !datagrams.is_empty() {
168-
match self.send(datagrams) {
165+
let permit = self.blocking_check_send(&timeout)?;
166+
let chunk_len = datagrams.len().min(permit as usize);
167+
match self.send(&datagrams[..chunk_len]) {
168+
Ok(0) => {}
169169
Ok(packets_sent) => {
170-
datagrams = &datagrams[(packets_sent as usize)..];
170+
let packets_sent = packets_sent as usize;
171+
datagrams = &datagrams[packets_sent..];
171172
}
172-
Err(ErrorCode::WouldBlock) => pollable.wait_until(&timeout)?,
173173
Err(err) => return Err(err),
174174
}
175175
}
176176

177177
Ok(())
178178
}
179+
}
179180

180-
pub fn blocking_receive(&self, count: Range<u64>) -> Result<Vec<Datagram>, ErrorCode> {
181+
impl IncomingDatagramStream {
182+
pub fn blocking_receive(&self, count: Range<u64>) -> Result<Vec<IncomingDatagram>, ErrorCode> {
181183
let timeout = monotonic_clock::subscribe(TIMEOUT_NS, false);
182184
let pollable = self.subscribe();
183185
let mut datagrams = vec![];
@@ -187,11 +189,6 @@ impl UdpSocket {
187189
Ok(mut chunk) => {
188190
datagrams.append(&mut chunk);
189191

190-
if datagrams.len() >= count.start as usize {
191-
return Ok(datagrams);
192-
}
193-
}
194-
Err(ErrorCode::WouldBlock) => {
195192
if datagrams.len() >= count.start as usize {
196193
return Ok(datagrams);
197194
} else {

crates/wasi-http/wit/deps/sockets/udp-create-socket.wit

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ interface udp-create-socket {
88
/// Similar to `socket(AF_INET or AF_INET6, SOCK_DGRAM, IPPROTO_UDP)` in POSIX.
99
///
1010
/// This function does not require a network capability handle. This is considered to be safe because
11-
/// at time of creation, the socket is not bound to any `network` yet. Up to the moment `bind`/`connect` is called,
11+
/// at time of creation, the socket is not bound to any `network` yet. Up to the moment `bind` is called,
1212
/// the socket is effectively an in-memory configuration object, unable to communicate with the outside world.
1313
///
1414
/// All sockets are non-blocking. Use the wasi-poll interface to block on asynchronous operations.

0 commit comments

Comments
 (0)