Skip to content

Commit 0ffe72a

Browse files
committed
added try_read and try_write
1 parent efaa4b9 commit 0ffe72a

File tree

4 files changed

+124
-38
lines changed

4 files changed

+124
-38
lines changed

futures-util/src/lock/backoff.rs

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
use std::cell::Cell;
2+
use std::sync::atomic::spin_loop_hint;
3+
use std::thread::yield_now;
4+
5+
// Taken from crossbeam_utils
6+
const SPIN_LIMIT: u32 = 6;
7+
const YIELD_LIMIT: u32 = 10;
8+
9+
pub(crate) struct Backoff {
10+
step: Cell<u32>,
11+
}
12+
13+
impl Backoff {
14+
pub(crate) fn new() -> Backoff {
15+
Backoff {
16+
step: Cell::new(0u32),
17+
}
18+
}
19+
20+
pub(crate) fn snooze(&self) {
21+
if self.step.get() <= SPIN_LIMIT {
22+
for _ in 0..1 << self.step.get() {
23+
spin_loop_hint();
24+
}
25+
} else {
26+
yield_now();
27+
}
28+
if self.step.get() <= YIELD_LIMIT {
29+
self.step.set(self.step.get() + 1);
30+
}
31+
}
32+
}

futures-util/src/lock/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ pub use self::rwlock::{
1616
};
1717
#[cfg(feature = "std")]
1818
pub(crate) mod waiter;
19+
#[cfg(feature = "std")]
20+
pub(crate) mod backoff;
1921

2022
#[cfg(any(feature = "bilock", feature = "sink", feature = "io"))]
2123
#[cfg_attr(docsrs, doc(cfg(feature = "bilock")))]

futures-util/src/lock/rwlock.rs

Lines changed: 79 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,18 @@ use std::cell::UnsafeCell;
55
use std::fmt;
66
use std::ops::{Deref, DerefMut};
77
use std::pin::Pin;
8+
use std::sync::RwLock as StdRwLock;
89
use std::sync::atomic::{AtomicUsize, Ordering};
910

11+
#[allow(clippy::identity_op)]
12+
const PHASE: usize = 1 << 0;
13+
const ONE_WRITER: usize = 1 << 1;
14+
const ONE_READER: usize = 1 << 2;
15+
const WRITE_BITS: usize = ONE_WRITER | PHASE;
16+
17+
// Sentinel for when no slot in the `Slab` has been dedicated to this object.
18+
const WAIT_KEY_NONE: usize = usize::max_value();
19+
1020
struct State {
1121
ins: AtomicUsize,
1222
out: AtomicUsize,
@@ -33,16 +43,26 @@ impl AtomicState {
3343
self.read.out.fetch_add(ONE_READER, Ordering::SeqCst)
3444
}
3545

46+
#[inline]
47+
fn waiting_writers(&self) -> usize {
48+
self.write.ins.load(Ordering::Relaxed)
49+
}
50+
3651
#[inline]
3752
fn reserve_reader(&self) -> usize {
3853
self.read.ins.fetch_add(ONE_READER, Ordering::SeqCst) & WRITE_BITS
3954
}
4055

4156
#[inline]
42-
fn reserve_writer(&self, phase: usize) -> usize {
57+
fn reserve_writer(&self, ticket: usize) -> usize {
4358
self.read
4459
.ins
45-
.fetch_add(ONE_WRITER | phase, Ordering::SeqCst)
60+
.fetch_add(ONE_WRITER | (ticket & PHASE), Ordering::SeqCst)
61+
}
62+
63+
#[inline]
64+
fn reserve_transient_writer(&self) -> usize {
65+
self.read.ins.fetch_add(PHASE, Ordering::SeqCst)
4666
}
4767

4868
#[inline]
@@ -71,6 +91,8 @@ pub struct RwLock<T: ?Sized> {
7191
atomic: AtomicState,
7292
readers: WaiterSet,
7393
writers: WaiterSet,
94+
block_read_tickets: StdRwLock<()>,
95+
block_write_tickets: StdRwLock<()>,
7496
value: UnsafeCell<T>,
7597
}
7698

@@ -80,15 +102,6 @@ impl<T: ?Sized> fmt::Debug for RwLock<T> {
80102
}
81103
}
82104

83-
#[allow(clippy::identity_op)]
84-
const PHASE: usize = 1 << 0;
85-
const ONE_WRITER: usize = 1 << 1;
86-
const ONE_READER: usize = 1 << 2;
87-
const WRITE_BITS: usize = ONE_WRITER | PHASE;
88-
89-
// Sentinel for when no slot in the `Slab` has been dedicated to this object.
90-
const WAIT_KEY_NONE: usize = usize::max_value();
91-
92105
impl<T> RwLock<T> {
93106
/// Creates a new futures-aware read-write lock.
94107
pub fn new(t: T) -> RwLock<T> {
@@ -105,6 +118,8 @@ impl<T> RwLock<T> {
105118
},
106119
readers: WaiterSet::new(),
107120
writers: WaiterSet::new(),
121+
block_read_tickets: StdRwLock::new(()),
122+
block_write_tickets: StdRwLock::new(()),
108123
value: UnsafeCell::new(t),
109124
}
110125
}
@@ -149,6 +164,54 @@ impl<T: ?Sized> RwLock<T> {
149164
}
150165
}
151166

167+
/// Attempt to acquire a read access lock synchronously.
168+
pub fn try_read(&self) -> Option<RwLockReadGuard<'_, T>> {
169+
let lock = self.block_read_tickets.write().unwrap();
170+
if self.atomic.phase() == 0 {
171+
self.atomic.reserve_reader();
172+
drop(lock);
173+
self.writers.notify_all();
174+
Some(RwLockReadGuard { rwlock: self })
175+
} else {
176+
drop(lock);
177+
self.writers.notify_all();
178+
None
179+
}
180+
}
181+
182+
/// Attempt to acquire a write access lock synchronously.
183+
pub fn try_write(&self) -> Option<RwLockWriteGuard<'_, T>> {
184+
let read_lock = self.block_read_tickets.write().unwrap();
185+
if self.atomic.phase() == 0 {
186+
let write_lock = self.block_write_tickets.write().unwrap();
187+
if self.atomic.waiting_writers() == self.atomic.finished_writers()
188+
&& self.atomic.reserve_transient_writer() == self.atomic.finished_readers()
189+
{
190+
self.atomic.insert_writer();
191+
drop(write_lock);
192+
drop(read_lock);
193+
self.writers.notify_all();
194+
Some(RwLockWriteGuard { rwlock: self })
195+
} else if self.atomic.phase() != 0 {
196+
self.atomic.clear_phase();
197+
drop(write_lock);
198+
drop(read_lock);
199+
self.writers.notify_all();
200+
self.readers.notify_all();
201+
None
202+
} else {
203+
drop(write_lock);
204+
drop(read_lock);
205+
self.writers.notify_all();
206+
None
207+
}
208+
} else {
209+
drop(read_lock);
210+
self.writers.notify_all();
211+
None
212+
}
213+
}
214+
152215
/// Returns a mutable reference to the underlying data.
153216
///
154217
/// Since this call borrows the lock mutably, no actual locking needs to
@@ -304,11 +367,13 @@ impl<'a, T: ?Sized> Future for RwLockWriteFuture<'a, T> {
304367

305368
match self.ticket {
306369
None => {
370+
let _write_lock = rwlock.block_write_tickets.read().unwrap();
307371
let ticket = rwlock.atomic.insert_writer();
308372
self.ticket = Some(Ticket::Write(ticket));
309373
if ticket == rwlock.atomic.finished_writers() {
310374
// Note that the WRITE_BITS are always cleared at this point.
311-
let ticket = rwlock.atomic.reserve_writer(ticket & PHASE);
375+
let _read_lock = rwlock.block_read_tickets.read().unwrap();
376+
let ticket = rwlock.atomic.reserve_writer(ticket);
312377
self.ticket = Some(Ticket::Read(ticket));
313378
if ticket == rwlock.atomic.finished_readers() {
314379
self.rwlock = None;
@@ -325,7 +390,8 @@ impl<'a, T: ?Sized> Future for RwLockWriteFuture<'a, T> {
325390
Some(Ticket::Write(ticket)) => {
326391
if ticket == rwlock.atomic.finished_writers() {
327392
// Note that the WRITE_BITS are always cleared at this point.
328-
let ticket = rwlock.atomic.reserve_writer(ticket & PHASE);
393+
let _read_lock = rwlock.block_read_tickets.read().unwrap();
394+
let ticket = rwlock.atomic.reserve_writer(ticket);
329395
self.ticket = Some(Ticket::Read(ticket));
330396
if ticket == rwlock.atomic.finished_readers() {
331397
rwlock.writers.remove(self.wait_key);

futures-util/src/lock/waiter.rs

Lines changed: 11 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,10 @@
1+
use crate::lock::backoff::Backoff;
12
use slab::Slab;
2-
use std::cell::{Cell, UnsafeCell};
3+
use std::cell::UnsafeCell;
34
use std::mem;
45
use std::ops::{Deref, DerefMut};
5-
use std::sync::atomic::{spin_loop_hint, AtomicUsize, Ordering};
6+
use std::sync::atomic::{AtomicUsize, Ordering};
67
use std::task::Waker;
7-
use std::thread::yield_now;
8-
9-
// Taken from crossbeam_utils
10-
const SPIN_LIMIT: u32 = 6;
11-
const YIELD_LIMIT: u32 = 10;
128

139
#[allow(clippy::identity_op)]
1410
const IS_LOCKED: usize = 1 << 0;
@@ -142,30 +138,20 @@ impl WaiterSet {
142138
notified
143139
}
144140

145-
fn lock(&self) -> Lock<'_> {
146-
let step = Cell::new(0u32);
141+
fn lock(&self) -> WaiterSetGuard<'_> {
142+
let backoff = Backoff::new();
147143
while self.state.fetch_or(IS_LOCKED, Ordering::Acquire) & IS_LOCKED != 0 {
148-
if step.get() <= SPIN_LIMIT {
149-
for _ in 0..1 << step.get() {
150-
spin_loop_hint();
151-
}
152-
} else {
153-
yield_now();
154-
}
155-
if step.get() <= YIELD_LIMIT {
156-
step.set(step.get() + 1);
157-
}
144+
backoff.snooze();
158145
}
159-
160-
Lock { waiter_set: self }
146+
WaiterSetGuard { waiter_set: self }
161147
}
162148
}
163149

164-
struct Lock<'a> {
150+
struct WaiterSetGuard<'a> {
165151
waiter_set: &'a WaiterSet,
166152
}
167153

168-
impl Drop for Lock<'_> {
154+
impl Drop for WaiterSetGuard<'_> {
169155
fn drop(&mut self) {
170156
let mut state = 0;
171157

@@ -181,15 +167,15 @@ impl Drop for Lock<'_> {
181167
}
182168
}
183169

184-
impl Deref for Lock<'_> {
170+
impl Deref for WaiterSetGuard<'_> {
185171
type Target = Inner;
186172

187173
fn deref(&self) -> &Inner {
188174
unsafe { &*self.waiter_set.inner.get() }
189175
}
190176
}
191177

192-
impl DerefMut for Lock<'_> {
178+
impl DerefMut for WaiterSetGuard<'_> {
193179
fn deref_mut(&mut self) -> &mut Inner {
194180
unsafe { &mut *self.waiter_set.inner.get() }
195181
}

0 commit comments

Comments
 (0)