Skip to content

Commit 8b7c787

Browse files
committed
Add support for timers on web platforms
This commit adds support for async-io on wasm32-unknown-unknown. Not all features of async-io can be ported to WASM; for instance: - Async<T> can't be ported over as WASM doesn't really have a reactor. WASI could eventually be supported here, but that is dependent on smol-rs/polling#102 - block_on() can't be ported over, as blocking isn't allowed on the web. The only thing left is Timer, which can be implemented using setTimeout and setInterval. So that's what's been done: when the WASM target family is enabled, Async<T> and block_on() will be disabled and Timer will switch to an implementation that uses web timeouts. This is not a breaking change, as this crate previously failed to compile on web platforms anyways. This functionality currently does not support Node.js. Signed-off-by: John Nunley <[email protected]>
1 parent a9aabe5 commit 8b7c787

File tree

8 files changed

+331
-20
lines changed

8 files changed

+331
-20
lines changed

.github/workflows/ci.yml

+4
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ jobs:
3838
- name: Install Rust
3939
# --no-self-update is necessary because the windows environment cannot self-update rustup.exe.
4040
run: rustup update ${{ matrix.rust }} --no-self-update && rustup default ${{ matrix.rust }}
41+
- run: rustup target add wasm32-unknown-unknown
4142
- run: cargo build --all --all-features --all-targets
4243
- name: Run cargo check (without dev-dependencies to catch missing feature flags)
4344
if: startsWith(matrix.rust, 'nightly')
@@ -50,6 +51,9 @@ jobs:
5051
# if: startsWith(matrix.rust, 'nightly') && matrix.os == 'ubuntu-latest'
5152
# run: cargo check -Z build-std --target=riscv32imc-esp-espidf
5253
- run: cargo test
54+
- uses: taiki-e/install-action@wasm-pack
55+
- run: cargo check --target wasm32-unknown-unknown --all-features --tests
56+
- run: wasm-pack test --node
5357

5458
# Copied from: https://github.com/rust-lang/stacker/pull/19/files
5559
windows_gnu:

Cargo.toml

+17-2
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,13 @@ name = "timer"
2323
harness = false
2424

2525
[dependencies]
26-
async-lock = "2.6"
2726
cfg-if = "1"
27+
futures-lite = { version = "1.11.0", default-features = false }
28+
29+
[target.'cfg(not(target_family = "wasm"))'.dependencies]
30+
async-lock = "2.6"
2831
concurrent-queue = "2.2.0"
2932
futures-io = { version = "0.3.28", default-features = false, features = ["std"] }
30-
futures-lite = { version = "1.11.0", default-features = false }
3133
parking = "2.0.0"
3234
polling = "3.0.0"
3335
rustix = { version = "0.38.2", default-features = false, features = ["std", "fs"] }
@@ -36,15 +38,28 @@ socket2 = { version = "0.5.3", features = ["all"] }
3638
tracing = { version = "0.1.37", default-features = false }
3739
waker-fn = "1.1.0"
3840

41+
[target.'cfg(target_family = "wasm")'.dependencies]
42+
atomic-waker = "1.1.1"
43+
wasm-bindgen = "0.2.87"
44+
web-sys = { version = "0.3.0", features = ["Window"] }
45+
3946
[dev-dependencies]
4047
async-channel = "1"
48+
49+
[target.'cfg(not(target_family = "wasm"))'.dev-dependencies]
4150
async-net = "1"
4251
blocking = "1"
4352
criterion = { version = "0.4", default-features = false, features = ["cargo_bench_support"] }
4453
getrandom = "0.2.7"
4554
signal-hook = "0.3"
4655
tempfile = "3"
4756

57+
[target.'cfg(target_family = "wasm")'.dev-dependencies]
58+
console_error_panic_hook = "0.1.7"
59+
wasm-bindgen-futures = "0.4.37"
60+
wasm-bindgen-test = "0.3.37"
61+
web-time = "0.2.0"
62+
4863
[target.'cfg(target_os = "linux")'.dev-dependencies]
4964
inotify = { version = "0.10.1", default-features = false }
5065
timerfd = "1"

src/lib.rs

+24-4
Original file line numberDiff line numberDiff line change
@@ -64,23 +64,31 @@
6464
use std::future::Future;
6565
use std::pin::Pin;
6666
use std::task::{Context, Poll};
67-
use std::time::{Duration, Instant};
67+
use std::time::Duration;
6868

69-
use futures_lite::stream::Stream;
69+
#[cfg(not(target_family = "wasm"))]
70+
use std::time::Instant;
7071

71-
use crate::reactor::Reactor;
72+
use futures_lite::stream::Stream;
7273

74+
#[cfg(not(target_family = "wasm"))]
7375
mod driver;
76+
#[cfg(not(target_family = "wasm"))]
7477
mod io;
78+
#[cfg(not(target_family = "wasm"))]
7579
mod reactor;
7680

77-
#[path = "timer/native.rs"]
81+
#[cfg_attr(not(target_family = "wasm"), path = "timer/native.rs")]
82+
#[cfg_attr(target_family = "wasm", path = "timer/web.rs")]
7883
mod timer;
7984

8085
pub mod os;
8186

87+
#[cfg(not(target_family = "wasm"))]
8288
pub use driver::block_on;
89+
#[cfg(not(target_family = "wasm"))]
8390
pub use io::{Async, IoSafe};
91+
#[cfg(not(target_family = "wasm"))]
8492
pub use reactor::{Readable, ReadableOwned, Writable, WritableOwned};
8593

8694
/// A future or stream that emits timed events.
@@ -197,6 +205,7 @@ impl Timer {
197205
/// Timer::at(when).await;
198206
/// # });
199207
/// ```
208+
#[cfg(not(target_family = "wasm"))]
200209
#[inline]
201210
pub fn at(instant: Instant) -> Timer {
202211
Timer(timer::Timer::at(instant))
@@ -236,6 +245,7 @@ impl Timer {
236245
/// Timer::interval_at(start, period).next().await;
237246
/// # });
238247
/// ```
248+
#[cfg(not(target_family = "wasm"))]
239249
#[inline]
240250
pub fn interval_at(start: Instant, period: Duration) -> Timer {
241251
Timer(timer::Timer::interval_at(start, period))
@@ -325,6 +335,7 @@ impl Timer {
325335
/// t.set_at(when);
326336
/// # });
327337
/// ```
338+
#[cfg(not(target_family = "wasm"))]
328339
#[inline]
329340
pub fn set_at(&mut self, instant: Instant) {
330341
self.0.set_at(instant)
@@ -376,15 +387,20 @@ impl Timer {
376387
/// t.set_interval_at(start, period);
377388
/// # });
378389
/// ```
390+
#[cfg(not(target_family = "wasm"))]
379391
#[inline]
380392
pub fn set_interval_at(&mut self, start: Instant, period: Duration) {
381393
self.0.set_interval_at(start, period)
382394
}
383395
}
384396

385397
impl Future for Timer {
398+
#[cfg(not(target_family = "wasm"))]
386399
type Output = Instant;
387400

401+
#[cfg(target_family = "wasm")]
402+
type Output = ();
403+
388404
#[inline]
389405
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
390406
match self.poll_next(cx) {
@@ -396,8 +412,12 @@ impl Future for Timer {
396412
}
397413

398414
impl Stream for Timer {
415+
#[cfg(not(target_family = "wasm"))]
399416
type Item = Instant;
400417

418+
#[cfg(target_family = "wasm")]
419+
type Item = ();
420+
401421
#[inline]
402422
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
403423
self.0.poll_next(cx)

src/os/unix.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ pub fn reactor_fd() -> Option<BorrowedFd<'static>> {
6262
not(polling_test_poll_backend),
6363
))] {
6464
use std::os::unix::io::AsFd;
65-
Some(crate::Reactor::get().poller.as_fd())
65+
Some(crate::reactor::Reactor::get().poller.as_fd())
6666
} else {
6767
None
6868
}

src/timer/web.rs

+212
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,212 @@
1+
//! Timers for web targets.
2+
//!
3+
//! These use the `setTimeout` function on the web to handle timing.
4+
5+
use std::convert::TryInto;
6+
use std::sync::atomic::{AtomicUsize, Ordering};
7+
use std::sync::Arc;
8+
use std::task::{Context, Poll};
9+
use std::time::Duration;
10+
11+
use atomic_waker::AtomicWaker;
12+
use wasm_bindgen::closure::Closure;
13+
use wasm_bindgen::JsCast;
14+
15+
/// A timer for non-Web platforms.
16+
///
17+
/// self registers a timeout in the global reactor, which in turn sets a timeout in the poll call.
18+
#[derive(Debug)]
19+
pub(super) struct Timer {
20+
/// The waker to wake when the timer fires.
21+
waker: Arc<State>,
22+
23+
/// The ongoing timeout or interval.
24+
ongoing_timeout: TimerId,
25+
26+
/// Keep the closure alive so we don't drop it.
27+
closure: Option<Closure<dyn FnMut()>>,
28+
}
29+
30+
#[derive(Debug)]
31+
struct State {
32+
/// The number of times this timer has been woken.
33+
woken: AtomicUsize,
34+
35+
/// The waker to wake when the timer fires.
36+
waker: AtomicWaker,
37+
}
38+
39+
#[derive(Debug)]
40+
enum TimerId {
41+
NoTimer,
42+
Timeout(i32),
43+
Interval(i32),
44+
}
45+
46+
impl Timer {
47+
/// Create a timer that will never fire.
48+
#[inline]
49+
pub(super) fn never() -> Self {
50+
Self {
51+
waker: Arc::new(State {
52+
woken: AtomicUsize::new(0),
53+
waker: AtomicWaker::new(),
54+
}),
55+
ongoing_timeout: TimerId::NoTimer,
56+
closure: None,
57+
}
58+
}
59+
60+
/// Create a timer that will fire at the given instant.
61+
#[inline]
62+
pub(super) fn after(duration: Duration) -> Timer {
63+
let mut this = Self::never();
64+
this.set_after(duration);
65+
this
66+
}
67+
68+
/// Create a timer that will fire at the given instant.
69+
#[inline]
70+
pub(super) fn interval(period: Duration) -> Timer {
71+
let mut this = Self::never();
72+
this.set_interval(period);
73+
this
74+
}
75+
76+
/// Returns `true` if self timer will fire at some point.
77+
#[inline]
78+
pub(super) fn will_fire(&self) -> bool {
79+
matches!(
80+
self.ongoing_timeout,
81+
TimerId::Timeout(_) | TimerId::Interval(_)
82+
)
83+
}
84+
85+
/// Set the timer to fire after the given duration.
86+
#[inline]
87+
pub(super) fn set_after(&mut self, duration: Duration) {
88+
// Set the timeout.
89+
let id = {
90+
let waker = self.waker.clone();
91+
let closure: Closure<dyn FnMut()> = Closure::wrap(Box::new(move || {
92+
waker.wake();
93+
}));
94+
95+
let result = web_sys::window()
96+
.unwrap()
97+
.set_timeout_with_callback_and_timeout_and_arguments_0(
98+
closure.as_ref().unchecked_ref(),
99+
duration.as_millis().try_into().expect("timeout too long"),
100+
);
101+
102+
// Make sure we don't drop the closure before it's called.
103+
self.closure = Some(closure);
104+
105+
match result {
106+
Ok(id) => id,
107+
Err(_) => {
108+
panic!("failed to set timeout")
109+
}
110+
}
111+
};
112+
113+
// Set our ID.
114+
self.ongoing_timeout = TimerId::Timeout(id);
115+
}
116+
117+
/// Set the timer to emit events periodically.
118+
#[inline]
119+
pub(super) fn set_interval(&mut self, period: Duration) {
120+
// Set the timeout.
121+
let id = {
122+
let waker = self.waker.clone();
123+
let closure: Closure<dyn FnMut()> = Closure::wrap(Box::new(move || {
124+
waker.wake();
125+
}));
126+
127+
let result = web_sys::window()
128+
.unwrap()
129+
.set_interval_with_callback_and_timeout_and_arguments_0(
130+
closure.as_ref().unchecked_ref(),
131+
period.as_millis().try_into().expect("timeout too long"),
132+
);
133+
134+
// Make sure we don't drop the closure before it's called.
135+
self.closure = Some(closure);
136+
137+
match result {
138+
Ok(id) => id,
139+
Err(_) => {
140+
panic!("failed to set interval")
141+
}
142+
}
143+
};
144+
145+
// Set our ID.
146+
self.ongoing_timeout = TimerId::Interval(id);
147+
}
148+
149+
/// Poll for the next timer event.
150+
#[inline]
151+
pub(super) fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<()>> {
152+
let mut registered = false;
153+
let mut woken = self.waker.woken.load(Ordering::Acquire);
154+
155+
loop {
156+
if woken > 0 {
157+
// Try to decrement the number of woken events.
158+
if let Err(new_woken) = self.waker.woken.compare_exchange(
159+
woken,
160+
woken - 1,
161+
Ordering::SeqCst,
162+
Ordering::Acquire,
163+
) {
164+
woken = new_woken;
165+
continue;
166+
}
167+
168+
// If we are using a one-shot timer, clear it.
169+
if let TimerId::Timeout(_) = self.ongoing_timeout {
170+
self.clear();
171+
}
172+
173+
return Poll::Ready(Some(()));
174+
}
175+
176+
if !registered {
177+
// Register the waker.
178+
self.waker.waker.register(cx.waker());
179+
registered = true;
180+
} else {
181+
// We've already registered, so we can just return pending.
182+
return Poll::Pending;
183+
}
184+
}
185+
}
186+
187+
/// Clear the current timeout.
188+
fn clear(&mut self) {
189+
match self.ongoing_timeout {
190+
TimerId::NoTimer => {}
191+
TimerId::Timeout(id) => {
192+
web_sys::window().unwrap().clear_timeout_with_handle(id);
193+
}
194+
TimerId::Interval(id) => {
195+
web_sys::window().unwrap().clear_interval_with_handle(id);
196+
}
197+
}
198+
}
199+
}
200+
201+
impl State {
202+
fn wake(&self) {
203+
self.woken.fetch_add(1, Ordering::SeqCst);
204+
self.waker.wake();
205+
}
206+
}
207+
208+
impl Drop for Timer {
209+
fn drop(&mut self) {
210+
self.clear();
211+
}
212+
}

tests/async.rs

+2
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
#![cfg(not(target_family = "wasm"))]
2+
13
use std::future::Future;
24
use std::io;
35
use std::net::{Shutdown, TcpListener, TcpStream, UdpSocket};

tests/block_on.rs

+2
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
#![cfg(not(target_family = "wasm"))]
2+
13
use async_io::block_on;
24
use std::{
35
future::Future,

0 commit comments

Comments
 (0)