diff --git a/futures-channel/src/mpsc/mod.rs b/futures-channel/src/mpsc/mod.rs index abf24d28dd..494c97b2d1 100644 --- a/futures-channel/src/mpsc/mod.rs +++ b/futures-channel/src/mpsc/mod.rs @@ -923,17 +923,16 @@ impl Clone for UnboundedSenderInner { debug_assert!(curr < MAX_BUFFER); let next = curr + 1; - let actual = self.inner.num_senders.compare_and_swap(curr, next, SeqCst); - - // The ABA problem doesn't matter here. We only care that the - // number of senders never exceeds the maximum. - if actual == curr { - return Self { - inner: self.inner.clone(), - }; + match self.inner.num_senders.compare_exchange(curr, next, SeqCst, SeqCst) { + Ok(_) => { + // The ABA problem doesn't matter here. We only care that the + // number of senders never exceeds the maximum. + return Self { + inner: self.inner.clone(), + }; + } + Err(actual) => curr = actual, } - - curr = actual; } } } @@ -954,19 +953,18 @@ impl Clone for BoundedSenderInner { debug_assert!(curr < self.inner.max_senders()); let next = curr + 1; - let actual = self.inner.num_senders.compare_and_swap(curr, next, SeqCst); - - // The ABA problem doesn't matter here. We only care that the - // number of senders never exceeds the maximum. - if actual == curr { - return Self { - inner: self.inner.clone(), - sender_task: Arc::new(Mutex::new(SenderTask::new())), - maybe_parked: false, - }; + match self.inner.num_senders.compare_exchange(curr, next, SeqCst, SeqCst) { + Ok(_) => { + // The ABA problem doesn't matter here. We only care that the + // number of senders never exceeds the maximum. + return Self { + inner: self.inner.clone(), + sender_task: Arc::new(Mutex::new(SenderTask::new())), + maybe_parked: false, + }; + } + Err(actual) => curr = actual, } - - curr = actual; } } } diff --git a/futures-core/src/task/__internal/atomic_waker.rs b/futures-core/src/task/__internal/atomic_waker.rs index a2f6191368..213355bc6b 100644 --- a/futures-core/src/task/__internal/atomic_waker.rs +++ b/futures-core/src/task/__internal/atomic_waker.rs @@ -259,7 +259,11 @@ impl AtomicWaker { /// } /// ``` pub fn register(&self, waker: &Waker) { - match self.state.compare_and_swap(WAITING, REGISTERING, Acquire) { + match self + .state + .compare_exchange(WAITING, REGISTERING, Acquire, Acquire) + .unwrap_or_else(|x| x) + { WAITING => { unsafe { // Locked acquired, update the waker cell diff --git a/futures-util/src/future/future/shared.rs b/futures-util/src/future/future/shared.rs index 66fc5d3406..53635b5582 100644 --- a/futures-util/src/future/future/shared.rs +++ b/futures-util/src/future/future/shared.rs @@ -210,7 +210,12 @@ where inner.record_waker(&mut this.waker_key, cx); - match inner.notifier.state.compare_and_swap(IDLE, POLLING, SeqCst) { + match inner + .notifier + .state + .compare_exchange(IDLE, POLLING, SeqCst, SeqCst) + .unwrap_or_else(|x| x) + { IDLE => { // Lock acquired, fall through } @@ -255,14 +260,18 @@ where match future.poll(&mut cx) { Poll::Pending => { - match inner.notifier.state.compare_and_swap(POLLING, IDLE, SeqCst) { - POLLING => { - // Success - drop(_reset); - this.inner = Some(inner); - return Poll::Pending; - } - _ => unreachable!(), + if inner + .notifier + .state + .compare_exchange(POLLING, IDLE, SeqCst, SeqCst) + .is_ok() + { + // Success + drop(_reset); + this.inner = Some(inner); + return Poll::Pending; + } else { + unreachable!() } } Poll::Ready(output) => output,