Skip to content

Commit 4489ae0

Browse files
committed
Introduce EventQueueNotifierGuard type
Previously, when enqueuing new events to the `EventQueue`, we'd directly attempt to wake any notifiers/notify any threads waiting on the `Condvar` about the newly available events. This could of course mean we'd notify them while ourselves still holding some locks, e.g., on the peer state. Here, we instead introduce a `EventQueueNotifierGuard` type that will notify about pending events if necesssary, which mitigates any potential lock contention: we now simply have to ensure that any method calling `enqueue` holds the notifier before retrieving any locks.
1 parent 01906f5 commit 4489ae0

File tree

6 files changed

+91
-47
lines changed

6 files changed

+91
-47
lines changed

lightning-liquidity/src/events/event_queue.rs

+52-30
Original file line numberDiff line numberDiff line change
@@ -14,37 +14,19 @@ pub(crate) struct EventQueue {
1414
queue: Arc<Mutex<VecDeque<LiquidityEvent>>>,
1515
waker: Arc<Mutex<Option<Waker>>>,
1616
#[cfg(feature = "std")]
17-
condvar: crate::sync::Condvar,
17+
condvar: Arc<crate::sync::Condvar>,
1818
}
1919

2020
impl EventQueue {
2121
pub fn new() -> Self {
2222
let queue = Arc::new(Mutex::new(VecDeque::new()));
2323
let waker = Arc::new(Mutex::new(None));
24-
#[cfg(feature = "std")]
25-
{
26-
let condvar = crate::sync::Condvar::new();
27-
Self { queue, waker, condvar }
24+
Self {
25+
queue,
26+
waker,
27+
#[cfg(feature = "std")]
28+
condvar: Arc::new(crate::sync::Condvar::new()),
2829
}
29-
#[cfg(not(feature = "std"))]
30-
Self { queue, waker }
31-
}
32-
33-
pub fn enqueue<E: Into<LiquidityEvent>>(&self, event: E) {
34-
{
35-
let mut queue = self.queue.lock().unwrap();
36-
if queue.len() < MAX_EVENT_QUEUE_SIZE {
37-
queue.push_back(event.into());
38-
} else {
39-
return;
40-
}
41-
}
42-
43-
if let Some(waker) = self.waker.lock().unwrap().take() {
44-
waker.wake();
45-
}
46-
#[cfg(feature = "std")]
47-
self.condvar.notify_one();
4830
}
4931

5032
pub fn next_event(&self) -> Option<LiquidityEvent> {
@@ -83,6 +65,41 @@ impl EventQueue {
8365
pub fn get_and_clear_pending_events(&self) -> Vec<LiquidityEvent> {
8466
self.queue.lock().unwrap().split_off(0).into()
8567
}
68+
69+
// Returns an [`EventQueueNotifierGuard`] that will notify about new event when dropped.
70+
pub fn notifier(&self) -> EventQueueNotifierGuard {
71+
EventQueueNotifierGuard(self)
72+
}
73+
}
74+
75+
// A guard type that will notify about new events when dropped.
76+
#[must_use]
77+
pub(crate) struct EventQueueNotifierGuard<'a>(&'a EventQueue);
78+
79+
impl<'a> EventQueueNotifierGuard<'a> {
80+
pub fn enqueue<E: Into<LiquidityEvent>>(&self, event: E) {
81+
let mut queue = self.0.queue.lock().unwrap();
82+
if queue.len() < MAX_EVENT_QUEUE_SIZE {
83+
queue.push_back(event.into());
84+
} else {
85+
return;
86+
}
87+
}
88+
}
89+
90+
impl<'a> Drop for EventQueueNotifierGuard<'a> {
91+
fn drop(&mut self) {
92+
let should_notify = !self.0.queue.lock().unwrap().is_empty();
93+
94+
if should_notify {
95+
if let Some(waker) = self.0.waker.lock().unwrap().take() {
96+
waker.wake();
97+
}
98+
99+
#[cfg(feature = "std")]
100+
self.0.condvar.notify_one();
101+
}
102+
}
86103
}
87104

88105
struct EventFuture {
@@ -129,7 +146,8 @@ mod tests {
129146
});
130147

131148
for _ in 0..3 {
132-
event_queue.enqueue(expected_event.clone());
149+
let guard = event_queue.notifier();
150+
guard.enqueue(expected_event.clone());
133151
}
134152

135153
assert_eq!(event_queue.wait_next_event(), expected_event);
@@ -154,22 +172,25 @@ mod tests {
154172
let mut delayed_enqueue = false;
155173

156174
for _ in 0..25 {
157-
event_queue.enqueue(expected_event.clone());
175+
let guard = event_queue.notifier();
176+
guard.enqueue(expected_event.clone());
158177
enqueued_events.fetch_add(1, Ordering::SeqCst);
159178
}
160179

161180
loop {
162181
tokio::select! {
163182
_ = tokio::time::sleep(Duration::from_millis(10)), if !delayed_enqueue => {
164-
event_queue.enqueue(expected_event.clone());
183+
let guard = event_queue.notifier();
184+
guard.enqueue(expected_event.clone());
165185
enqueued_events.fetch_add(1, Ordering::SeqCst);
166186
delayed_enqueue = true;
167187
}
168188
e = event_queue.next_event_async() => {
169189
assert_eq!(e, expected_event);
170190
received_events.fetch_add(1, Ordering::SeqCst);
171191

172-
event_queue.enqueue(expected_event.clone());
192+
let guard = event_queue.notifier();
193+
guard.enqueue(expected_event.clone());
173194
enqueued_events.fetch_add(1, Ordering::SeqCst);
174195
}
175196
e = event_queue.next_event_async() => {
@@ -201,8 +222,9 @@ mod tests {
201222
std::thread::spawn(move || {
202223
// Sleep a bit before we enqueue the events everybody is waiting for.
203224
std::thread::sleep(Duration::from_millis(20));
204-
thread_queue.enqueue(thread_event.clone());
205-
thread_queue.enqueue(thread_event.clone());
225+
let guard = thread_queue.notifier();
226+
guard.enqueue(thread_event.clone());
227+
guard.enqueue(thread_event.clone());
206228
});
207229

208230
let e = event_queue.next_event_async().await;

lightning-liquidity/src/lsps0/client.rs

+3-1
Original file line numberDiff line numberDiff line change
@@ -61,9 +61,11 @@ where
6161
fn handle_response(
6262
&self, response: LSPS0Response, counterparty_node_id: &PublicKey,
6363
) -> Result<(), LightningError> {
64+
let event_queue_notifier = self.pending_events.notifier();
65+
6466
match response {
6567
LSPS0Response::ListProtocols(LSPS0ListProtocolsResponse { protocols }) => {
66-
self.pending_events.enqueue(LSPS0ClientEvent::ListProtocolsResponse {
68+
event_queue_notifier.enqueue(LSPS0ClientEvent::ListProtocolsResponse {
6769
counterparty_node_id: *counterparty_node_id,
6870
protocols,
6971
});

lightning-liquidity/src/lsps1/client.rs

+18-7
Original file line numberDiff line numberDiff line change
@@ -110,8 +110,9 @@ where
110110
&self, request_id: LSPSRequestId, counterparty_node_id: &PublicKey,
111111
result: LSPS1GetInfoResponse,
112112
) -> Result<(), LightningError> {
113-
let outer_state_lock = self.per_peer_state.write().unwrap();
113+
let event_queue_notifier = self.pending_events.notifier();
114114

115+
let outer_state_lock = self.per_peer_state.write().unwrap();
115116
match outer_state_lock.get(counterparty_node_id) {
116117
Some(inner_state_lock) => {
117118
let mut peer_state_lock = inner_state_lock.lock().unwrap();
@@ -126,7 +127,7 @@ where
126127
});
127128
}
128129

129-
self.pending_events.enqueue(LSPS1ClientEvent::SupportedOptionsReady {
130+
event_queue_notifier.enqueue(LSPS1ClientEvent::SupportedOptionsReady {
130131
counterparty_node_id: *counterparty_node_id,
131132
supported_options: result.options,
132133
request_id,
@@ -147,6 +148,8 @@ where
147148
&self, request_id: LSPSRequestId, counterparty_node_id: &PublicKey,
148149
error: LSPSResponseError,
149150
) -> Result<(), LightningError> {
151+
let event_queue_notifier = self.pending_events.notifier();
152+
150153
let outer_state_lock = self.per_peer_state.read().unwrap();
151154
match outer_state_lock.get(counterparty_node_id) {
152155
Some(inner_state_lock) => {
@@ -162,7 +165,7 @@ where
162165
});
163166
}
164167

165-
self.pending_events.enqueue(LSPS1ClientEvent::SupportedOptionsRequestFailed {
168+
event_queue_notifier.enqueue(LSPS1ClientEvent::SupportedOptionsRequestFailed {
166169
request_id: request_id.clone(),
167170
counterparty_node_id: *counterparty_node_id,
168171
error: error.clone(),
@@ -224,6 +227,8 @@ where
224227
&self, request_id: LSPSRequestId, counterparty_node_id: &PublicKey,
225228
response: LSPS1CreateOrderResponse,
226229
) -> Result<(), LightningError> {
230+
let event_queue_notifier = self.pending_events.notifier();
231+
227232
let outer_state_lock = self.per_peer_state.read().unwrap();
228233
match outer_state_lock.get(counterparty_node_id) {
229234
Some(inner_state_lock) => {
@@ -239,7 +244,7 @@ where
239244
});
240245
}
241246

242-
self.pending_events.enqueue(LSPS1ClientEvent::OrderCreated {
247+
event_queue_notifier.enqueue(LSPS1ClientEvent::OrderCreated {
243248
request_id,
244249
counterparty_node_id: *counterparty_node_id,
245250
order_id: response.order_id,
@@ -266,6 +271,8 @@ where
266271
&self, request_id: LSPSRequestId, counterparty_node_id: &PublicKey,
267272
error: LSPSResponseError,
268273
) -> Result<(), LightningError> {
274+
let event_queue_notifier = self.pending_events.notifier();
275+
269276
let outer_state_lock = self.per_peer_state.read().unwrap();
270277
match outer_state_lock.get(counterparty_node_id) {
271278
Some(inner_state_lock) => {
@@ -281,7 +288,7 @@ where
281288
});
282289
}
283290

284-
self.pending_events.enqueue(LSPS1ClientEvent::OrderRequestFailed {
291+
event_queue_notifier.enqueue(LSPS1ClientEvent::OrderRequestFailed {
285292
request_id: request_id.clone(),
286293
counterparty_node_id: *counterparty_node_id,
287294
error: error.clone(),
@@ -343,6 +350,8 @@ where
343350
&self, request_id: LSPSRequestId, counterparty_node_id: &PublicKey,
344351
response: LSPS1CreateOrderResponse,
345352
) -> Result<(), LightningError> {
353+
let event_queue_notifier = self.pending_events.notifier();
354+
346355
let outer_state_lock = self.per_peer_state.read().unwrap();
347356
match outer_state_lock.get(counterparty_node_id) {
348357
Some(inner_state_lock) => {
@@ -358,7 +367,7 @@ where
358367
});
359368
}
360369

361-
self.pending_events.enqueue(LSPS1ClientEvent::OrderStatus {
370+
event_queue_notifier.enqueue(LSPS1ClientEvent::OrderStatus {
362371
request_id,
363372
counterparty_node_id: *counterparty_node_id,
364373
order_id: response.order_id,
@@ -385,6 +394,8 @@ where
385394
&self, request_id: LSPSRequestId, counterparty_node_id: &PublicKey,
386395
error: LSPSResponseError,
387396
) -> Result<(), LightningError> {
397+
let event_queue_notifier = self.pending_events.notifier();
398+
388399
let outer_state_lock = self.per_peer_state.read().unwrap();
389400
match outer_state_lock.get(counterparty_node_id) {
390401
Some(inner_state_lock) => {
@@ -400,7 +411,7 @@ where
400411
});
401412
}
402413

403-
self.pending_events.enqueue(LSPS1ClientEvent::OrderRequestFailed {
414+
event_queue_notifier.enqueue(LSPS1ClientEvent::OrderRequestFailed {
404415
request_id: request_id.clone(),
405416
counterparty_node_id: *counterparty_node_id,
406417
error: error.clone(),

lightning-liquidity/src/lsps1/service.rs

+5-3
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,7 @@ where
198198
&self, request_id: LSPSRequestId, counterparty_node_id: &PublicKey,
199199
params: LSPS1CreateOrderRequest,
200200
) -> Result<(), LightningError> {
201+
let event_queue_notifier = self.pending_events.notifier();
201202
if !is_valid(&params.order, &self.config.supported_options.as_ref().unwrap()) {
202203
let response = LSPS1Response::CreateOrderError(LSPSResponseError {
203204
code: LSPS1_CREATE_ORDER_REQUEST_ORDER_MISMATCH_ERROR_CODE,
@@ -231,7 +232,7 @@ where
231232
.insert(request_id.clone(), LSPS1Request::CreateOrder(params.clone()));
232233
}
233234

234-
self.pending_events.enqueue(LSPS1ServiceEvent::RequestForPaymentDetails {
235+
event_queue_notifier.enqueue(LSPS1ServiceEvent::RequestForPaymentDetails {
235236
request_id,
236237
counterparty_node_id: *counterparty_node_id,
237238
order: params.order,
@@ -315,6 +316,7 @@ where
315316
&self, request_id: LSPSRequestId, counterparty_node_id: &PublicKey,
316317
params: LSPS1GetOrderRequest,
317318
) -> Result<(), LightningError> {
319+
let event_queue_notifier = self.pending_events.notifier();
318320
let outer_state_lock = self.per_peer_state.read().unwrap();
319321
match outer_state_lock.get(counterparty_node_id) {
320322
Some(inner_state_lock) => {
@@ -333,7 +335,7 @@ where
333335

334336
if let Err(e) = outbound_channel.awaiting_payment() {
335337
peer_state_lock.outbound_channels_by_order_id.remove(&params.order_id);
336-
self.pending_events.enqueue(LSPS1ServiceEvent::Refund {
338+
event_queue_notifier.enqueue(LSPS1ServiceEvent::Refund {
337339
request_id,
338340
counterparty_node_id: *counterparty_node_id,
339341
order_id: params.order_id,
@@ -345,7 +347,7 @@ where
345347
.pending_requests
346348
.insert(request_id.clone(), LSPS1Request::GetOrder(params.clone()));
347349

348-
self.pending_events.enqueue(LSPS1ServiceEvent::CheckPaymentConfirmation {
350+
event_queue_notifier.enqueue(LSPS1ServiceEvent::CheckPaymentConfirmation {
349351
request_id,
350352
counterparty_node_id: *counterparty_node_id,
351353
order_id: params.order_id,

lightning-liquidity/src/lsps2/client.rs

+6-2
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,8 @@ where
191191
&self, request_id: LSPSRequestId, counterparty_node_id: &PublicKey,
192192
result: LSPS2GetInfoResponse,
193193
) -> Result<(), LightningError> {
194+
let event_queue_notifier = self.pending_events.notifier();
195+
194196
let outer_state_lock = self.per_peer_state.read().unwrap();
195197
match outer_state_lock.get(counterparty_node_id) {
196198
Some(inner_state_lock) => {
@@ -206,7 +208,7 @@ where
206208
});
207209
}
208210

209-
self.pending_events.enqueue(LSPS2ClientEvent::OpeningParametersReady {
211+
event_queue_notifier.enqueue(LSPS2ClientEvent::OpeningParametersReady {
210212
request_id,
211213
counterparty_node_id: *counterparty_node_id,
212214
opening_fee_params_menu: result.opening_fee_params_menu,
@@ -257,6 +259,8 @@ where
257259
&self, request_id: LSPSRequestId, counterparty_node_id: &PublicKey,
258260
result: LSPS2BuyResponse,
259261
) -> Result<(), LightningError> {
262+
let event_queue_notifier = self.pending_events.notifier();
263+
260264
let outer_state_lock = self.per_peer_state.read().unwrap();
261265
match outer_state_lock.get(counterparty_node_id) {
262266
Some(inner_state_lock) => {
@@ -272,7 +276,7 @@ where
272276
})?;
273277

274278
if let Ok(intercept_scid) = result.jit_channel_scid.to_scid() {
275-
self.pending_events.enqueue(LSPS2ClientEvent::InvoiceParametersReady {
279+
event_queue_notifier.enqueue(LSPS2ClientEvent::InvoiceParametersReady {
276280
request_id,
277281
counterparty_node_id: *counterparty_node_id,
278282
intercept_scid,

lightning-liquidity/src/lsps2/service.rs

+7-4
Original file line numberDiff line numberDiff line change
@@ -777,6 +777,8 @@ where
777777
&self, intercept_scid: u64, intercept_id: InterceptId, expected_outbound_amount_msat: u64,
778778
payment_hash: PaymentHash,
779779
) -> Result<(), APIError> {
780+
let event_queue_notifier = self.pending_events.notifier();
781+
780782
let peer_by_intercept_scid = self.peer_by_intercept_scid.read().unwrap();
781783
if let Some(counterparty_node_id) = peer_by_intercept_scid.get(&intercept_scid) {
782784
let outer_state_lock = self.per_peer_state.read().unwrap();
@@ -800,7 +802,7 @@ where
800802
user_channel_id: jit_channel.user_channel_id,
801803
intercept_scid,
802804
};
803-
self.pending_events.enqueue(event);
805+
event_queue_notifier.enqueue(event);
804806
},
805807
Ok(Some(HTLCInterceptedAction::ForwardHTLC(channel_id))) => {
806808
self.channel_manager.get_cm().forward_intercepted_htlc(
@@ -1066,6 +1068,7 @@ where
10661068
&self, request_id: LSPSRequestId, counterparty_node_id: &PublicKey,
10671069
params: LSPS2GetInfoRequest,
10681070
) -> Result<(), LightningError> {
1071+
let event_queue_notifier = self.pending_events.notifier();
10691072
let (result, response) = {
10701073
let mut outer_state_lock = self.per_peer_state.write().unwrap();
10711074
let inner_state_lock =
@@ -1084,8 +1087,7 @@ where
10841087
counterparty_node_id: *counterparty_node_id,
10851088
token: params.token,
10861089
};
1087-
self.pending_events.enqueue(event);
1088-
1090+
event_queue_notifier.enqueue(event);
10891091
(Ok(()), msg)
10901092
},
10911093
(e, msg) => (e, msg),
@@ -1102,6 +1104,7 @@ where
11021104
fn handle_buy_request(
11031105
&self, request_id: LSPSRequestId, counterparty_node_id: &PublicKey, params: LSPS2BuyRequest,
11041106
) -> Result<(), LightningError> {
1107+
let event_queue_notifier = self.pending_events.notifier();
11051108
if let Some(payment_size_msat) = params.payment_size_msat {
11061109
if payment_size_msat < params.opening_fee_params.min_payment_size_msat {
11071110
let response = LSPS2Response::BuyError(LSPSResponseError {
@@ -1204,7 +1207,7 @@ where
12041207
opening_fee_params: params.opening_fee_params,
12051208
payment_size_msat: params.payment_size_msat,
12061209
};
1207-
self.pending_events.enqueue(event);
1210+
event_queue_notifier.enqueue(event);
12081211

12091212
(Ok(()), msg)
12101213
},

0 commit comments

Comments
 (0)