Skip to content

Commit 01906f5

Browse files
committed
Prefactor: Move EventQueue to dedicated module
1 parent 8f1dd4d commit 01906f5

File tree

2 files changed

+219
-213
lines changed

2 files changed

+219
-213
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,214 @@
1+
use super::LiquidityEvent;
2+
use crate::sync::{Arc, Mutex};
3+
4+
use alloc::collections::VecDeque;
5+
use alloc::vec::Vec;
6+
7+
use core::future::Future;
8+
use core::task::{Poll, Waker};
9+
10+
/// The maximum queue size we allow before starting to drop events.
11+
pub const MAX_EVENT_QUEUE_SIZE: usize = 1000;
12+
13+
pub(crate) struct EventQueue {
14+
queue: Arc<Mutex<VecDeque<LiquidityEvent>>>,
15+
waker: Arc<Mutex<Option<Waker>>>,
16+
#[cfg(feature = "std")]
17+
condvar: crate::sync::Condvar,
18+
}
19+
20+
impl EventQueue {
21+
pub fn new() -> Self {
22+
let queue = Arc::new(Mutex::new(VecDeque::new()));
23+
let waker = Arc::new(Mutex::new(None));
24+
#[cfg(feature = "std")]
25+
{
26+
let condvar = crate::sync::Condvar::new();
27+
Self { queue, waker, condvar }
28+
}
29+
#[cfg(not(feature = "std"))]
30+
Self { queue, waker }
31+
}
32+
33+
pub fn enqueue<E: Into<LiquidityEvent>>(&self, event: E) {
34+
{
35+
let mut queue = self.queue.lock().unwrap();
36+
if queue.len() < MAX_EVENT_QUEUE_SIZE {
37+
queue.push_back(event.into());
38+
} else {
39+
return;
40+
}
41+
}
42+
43+
if let Some(waker) = self.waker.lock().unwrap().take() {
44+
waker.wake();
45+
}
46+
#[cfg(feature = "std")]
47+
self.condvar.notify_one();
48+
}
49+
50+
pub fn next_event(&self) -> Option<LiquidityEvent> {
51+
self.queue.lock().unwrap().pop_front()
52+
}
53+
54+
pub async fn next_event_async(&self) -> LiquidityEvent {
55+
EventFuture { event_queue: Arc::clone(&self.queue), waker: Arc::clone(&self.waker) }.await
56+
}
57+
58+
#[cfg(feature = "std")]
59+
pub fn wait_next_event(&self) -> LiquidityEvent {
60+
let mut queue = self
61+
.condvar
62+
.wait_while(self.queue.lock().unwrap(), |queue: &mut VecDeque<LiquidityEvent>| {
63+
queue.is_empty()
64+
})
65+
.unwrap();
66+
67+
let event = queue.pop_front().expect("non-empty queue");
68+
let should_notify = !queue.is_empty();
69+
70+
drop(queue);
71+
72+
if should_notify {
73+
if let Some(waker) = self.waker.lock().unwrap().take() {
74+
waker.wake();
75+
}
76+
77+
self.condvar.notify_one();
78+
}
79+
80+
event
81+
}
82+
83+
pub fn get_and_clear_pending_events(&self) -> Vec<LiquidityEvent> {
84+
self.queue.lock().unwrap().split_off(0).into()
85+
}
86+
}
87+
88+
struct EventFuture {
89+
event_queue: Arc<Mutex<VecDeque<LiquidityEvent>>>,
90+
waker: Arc<Mutex<Option<Waker>>>,
91+
}
92+
93+
impl Future for EventFuture {
94+
type Output = LiquidityEvent;
95+
96+
fn poll(
97+
self: core::pin::Pin<&mut Self>, cx: &mut core::task::Context<'_>,
98+
) -> core::task::Poll<Self::Output> {
99+
if let Some(event) = self.event_queue.lock().unwrap().pop_front() {
100+
Poll::Ready(event)
101+
} else {
102+
*self.waker.lock().unwrap() = Some(cx.waker().clone());
103+
Poll::Pending
104+
}
105+
}
106+
}
107+
108+
#[cfg(test)]
109+
mod tests {
110+
#[tokio::test]
111+
#[cfg(feature = "std")]
112+
async fn event_queue_works() {
113+
use super::*;
114+
use crate::lsps0::event::LSPS0ClientEvent;
115+
use bitcoin::secp256k1::{PublicKey, Secp256k1, SecretKey};
116+
use core::sync::atomic::{AtomicU16, Ordering};
117+
use std::sync::Arc;
118+
use std::time::Duration;
119+
120+
let event_queue = Arc::new(EventQueue::new());
121+
assert_eq!(event_queue.next_event(), None);
122+
123+
let secp_ctx = Secp256k1::new();
124+
let counterparty_node_id =
125+
PublicKey::from_secret_key(&secp_ctx, &SecretKey::from_slice(&[42; 32]).unwrap());
126+
let expected_event = LiquidityEvent::LSPS0Client(LSPS0ClientEvent::ListProtocolsResponse {
127+
counterparty_node_id,
128+
protocols: Vec::new(),
129+
});
130+
131+
for _ in 0..3 {
132+
event_queue.enqueue(expected_event.clone());
133+
}
134+
135+
assert_eq!(event_queue.wait_next_event(), expected_event);
136+
assert_eq!(event_queue.next_event_async().await, expected_event);
137+
assert_eq!(event_queue.next_event(), Some(expected_event.clone()));
138+
assert_eq!(event_queue.next_event(), None);
139+
140+
// Check `next_event_async` won't return if the queue is empty and always rather timeout.
141+
tokio::select! {
142+
_ = tokio::time::sleep(Duration::from_millis(10)) => {
143+
// Timeout
144+
}
145+
_ = event_queue.next_event_async() => {
146+
panic!();
147+
}
148+
}
149+
assert_eq!(event_queue.next_event(), None);
150+
151+
// Check we get the expected number of events when polling/enqueuing concurrently.
152+
let enqueued_events = AtomicU16::new(0);
153+
let received_events = AtomicU16::new(0);
154+
let mut delayed_enqueue = false;
155+
156+
for _ in 0..25 {
157+
event_queue.enqueue(expected_event.clone());
158+
enqueued_events.fetch_add(1, Ordering::SeqCst);
159+
}
160+
161+
loop {
162+
tokio::select! {
163+
_ = tokio::time::sleep(Duration::from_millis(10)), if !delayed_enqueue => {
164+
event_queue.enqueue(expected_event.clone());
165+
enqueued_events.fetch_add(1, Ordering::SeqCst);
166+
delayed_enqueue = true;
167+
}
168+
e = event_queue.next_event_async() => {
169+
assert_eq!(e, expected_event);
170+
received_events.fetch_add(1, Ordering::SeqCst);
171+
172+
event_queue.enqueue(expected_event.clone());
173+
enqueued_events.fetch_add(1, Ordering::SeqCst);
174+
}
175+
e = event_queue.next_event_async() => {
176+
assert_eq!(e, expected_event);
177+
received_events.fetch_add(1, Ordering::SeqCst);
178+
}
179+
}
180+
181+
if delayed_enqueue
182+
&& received_events.load(Ordering::SeqCst) == enqueued_events.load(Ordering::SeqCst)
183+
{
184+
break;
185+
}
186+
}
187+
assert_eq!(event_queue.next_event(), None);
188+
189+
// Check we operate correctly, even when mixing and matching blocking and async API calls.
190+
let (tx, mut rx) = tokio::sync::watch::channel(());
191+
let thread_queue = Arc::clone(&event_queue);
192+
let thread_event = expected_event.clone();
193+
std::thread::spawn(move || {
194+
let e = thread_queue.wait_next_event();
195+
assert_eq!(e, thread_event);
196+
tx.send(()).unwrap();
197+
});
198+
199+
let thread_queue = Arc::clone(&event_queue);
200+
let thread_event = expected_event.clone();
201+
std::thread::spawn(move || {
202+
// Sleep a bit before we enqueue the events everybody is waiting for.
203+
std::thread::sleep(Duration::from_millis(20));
204+
thread_queue.enqueue(thread_event.clone());
205+
thread_queue.enqueue(thread_event.clone());
206+
});
207+
208+
let e = event_queue.next_event_async().await;
209+
assert_eq!(e, expected_event.clone());
210+
211+
rx.changed().await.unwrap();
212+
assert_eq!(event_queue.next_event(), None);
213+
}
214+
}

0 commit comments

Comments
 (0)