Skip to content

Commit 0371231

Browse files
committed
Introduce EventQueueNotifierGuard type
Previously, when enqueuing new events to the `EventQueue`, we'd directly attempt to wake any notifiers/notify any threads waiting on the `Condvar` about the newly available events. This could of course mean we'd notify them while ourselves still holding some locks, e.g., on the peer state. Here, we instead introduce a `EventQueueNotifierGuard` type that will notify about pending events if necesssary, which mitigates any potential lock contention: we now simply have to ensure that any method calling `enqueue` holds the notifier before retrieving any locks.
1 parent 5dbf426 commit 0371231

File tree

5 files changed

+67
-4
lines changed

5 files changed

+67
-4
lines changed

lightning-liquidity/src/events.rs

+45-2
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ pub(crate) struct EventQueue {
3131
queue: Arc<Mutex<VecDeque<Event>>>,
3232
waker: Arc<Mutex<Option<Waker>>>,
3333
#[cfg(feature = "std")]
34-
condvar: crate::sync::Condvar,
34+
condvar: Arc<crate::sync::Condvar>,
3535
}
3636

3737
impl EventQueue {
@@ -40,7 +40,7 @@ impl EventQueue {
4040
let waker = Arc::new(Mutex::new(None));
4141
#[cfg(feature = "std")]
4242
{
43-
let condvar = crate::sync::Condvar::new();
43+
let condvar = Arc::new(crate::sync::Condvar::new());
4444
Self { queue, waker, condvar }
4545
}
4646
#[cfg(not(feature = "std"))]
@@ -98,6 +98,49 @@ impl EventQueue {
9898
pub fn get_and_clear_pending_events(&self) -> Vec<Event> {
9999
self.queue.lock().unwrap().split_off(0).into()
100100
}
101+
102+
// Returns an [`EventQueueNotifierGuard`] that will notify about new event when dropped.
103+
pub fn notifier(&self) -> EventQueueNotifierGuard {
104+
#[cfg(feature = "std")]
105+
{
106+
EventQueueNotifierGuard {
107+
queue: Arc::clone(&self.queue),
108+
waker: Arc::clone(&self.waker),
109+
condvar: Arc::clone(&self.condvar),
110+
}
111+
}
112+
#[cfg(not(feature = "std"))]
113+
{
114+
EventQueueNotifierGuard {
115+
queue: Arc::clone(&self.queue),
116+
waker: Arc::clone(&self.waker),
117+
}
118+
}
119+
}
120+
}
121+
122+
// A guard type that will notify about new events when dropped.
123+
#[must_use]
124+
pub(crate) struct EventQueueNotifierGuard {
125+
queue: Arc<Mutex<VecDeque<Event>>>,
126+
waker: Arc<Mutex<Option<Waker>>>,
127+
#[cfg(feature = "std")]
128+
condvar: Arc<crate::sync::Condvar>,
129+
}
130+
131+
impl Drop for EventQueueNotifierGuard {
132+
fn drop(&mut self) {
133+
let should_notify = !self.queue.lock().unwrap().is_empty();
134+
135+
if should_notify {
136+
if let Some(waker) = self.waker.lock().unwrap().take() {
137+
waker.wake();
138+
}
139+
140+
#[cfg(feature = "std")]
141+
self.condvar.notify_one();
142+
}
143+
}
101144
}
102145

103146
/// An event which you should probably take some action in response to.

lightning-liquidity/src/lsps0/client.rs

+2
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,8 @@ where
6060
fn handle_response(
6161
&self, response: LSPS0Response, counterparty_node_id: &PublicKey,
6262
) -> Result<(), LightningError> {
63+
let _event_queue_notifier = self.pending_events.notifier();
64+
6365
match response {
6466
LSPS0Response::ListProtocols(ListProtocolsResponse { protocols }) => {
6567
self.pending_events.enqueue(Event::LSPS0Client(

lightning-liquidity/src/lsps1/client.rs

+12-1
Original file line numberDiff line numberDiff line change
@@ -103,8 +103,9 @@ where
103103
fn handle_get_info_response(
104104
&self, request_id: RequestId, counterparty_node_id: &PublicKey, result: GetInfoResponse,
105105
) -> Result<(), LightningError> {
106-
let outer_state_lock = self.per_peer_state.write().unwrap();
106+
let _event_queue_notifier = self.pending_events.notifier();
107107

108+
let outer_state_lock = self.per_peer_state.write().unwrap();
108109
match outer_state_lock.get(counterparty_node_id) {
109110
Some(inner_state_lock) => {
110111
let mut peer_state_lock = inner_state_lock.lock().unwrap();
@@ -141,6 +142,8 @@ where
141142
fn handle_get_info_error(
142143
&self, request_id: RequestId, counterparty_node_id: &PublicKey, error: ResponseError,
143144
) -> Result<(), LightningError> {
145+
let _event_queue_notifier = self.pending_events.notifier();
146+
144147
let outer_state_lock = self.per_peer_state.read().unwrap();
145148
match outer_state_lock.get(counterparty_node_id) {
146149
Some(inner_state_lock) => {
@@ -218,6 +221,8 @@ where
218221
&self, request_id: RequestId, counterparty_node_id: &PublicKey,
219222
response: CreateOrderResponse,
220223
) -> Result<(), LightningError> {
224+
let _event_queue_notifier = self.pending_events.notifier();
225+
221226
let outer_state_lock = self.per_peer_state.read().unwrap();
222227
match outer_state_lock.get(counterparty_node_id) {
223228
Some(inner_state_lock) => {
@@ -259,6 +264,8 @@ where
259264
fn handle_create_order_error(
260265
&self, request_id: RequestId, counterparty_node_id: &PublicKey, error: ResponseError,
261266
) -> Result<(), LightningError> {
267+
let _event_queue_notifier = self.pending_events.notifier();
268+
262269
let outer_state_lock = self.per_peer_state.read().unwrap();
263270
match outer_state_lock.get(counterparty_node_id) {
264271
Some(inner_state_lock) => {
@@ -337,6 +344,8 @@ where
337344
&self, request_id: RequestId, counterparty_node_id: &PublicKey,
338345
response: CreateOrderResponse,
339346
) -> Result<(), LightningError> {
347+
let _event_queue_notifier = self.pending_events.notifier();
348+
340349
let outer_state_lock = self.per_peer_state.read().unwrap();
341350
match outer_state_lock.get(counterparty_node_id) {
342351
Some(inner_state_lock) => {
@@ -378,6 +387,8 @@ where
378387
fn handle_get_order_error(
379388
&self, request_id: RequestId, counterparty_node_id: &PublicKey, error: ResponseError,
380389
) -> Result<(), LightningError> {
390+
let _event_queue_notifier = self.pending_events.notifier();
391+
381392
let outer_state_lock = self.per_peer_state.read().unwrap();
382393
match outer_state_lock.get(counterparty_node_id) {
383394
Some(inner_state_lock) => {

lightning-liquidity/src/lsps2/client.rs

+4
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,8 @@ where
183183
fn handle_get_info_response(
184184
&self, request_id: RequestId, counterparty_node_id: &PublicKey, result: GetInfoResponse,
185185
) -> Result<(), LightningError> {
186+
let _event_queue_notifier = self.pending_events.notifier();
187+
186188
let outer_state_lock = self.per_peer_state.read().unwrap();
187189
match outer_state_lock.get(counterparty_node_id) {
188190
Some(inner_state_lock) => {
@@ -249,6 +251,8 @@ where
249251
fn handle_buy_response(
250252
&self, request_id: RequestId, counterparty_node_id: &PublicKey, result: BuyResponse,
251253
) -> Result<(), LightningError> {
254+
let _event_queue_notifier = self.pending_events.notifier();
255+
252256
let outer_state_lock = self.per_peer_state.read().unwrap();
253257
match outer_state_lock.get(counterparty_node_id) {
254258
Some(inner_state_lock) => {

lightning-liquidity/src/lsps2/service.rs

+4-1
Original file line numberDiff line numberDiff line change
@@ -790,6 +790,8 @@ where
790790
&self, intercept_scid: u64, intercept_id: InterceptId, expected_outbound_amount_msat: u64,
791791
payment_hash: PaymentHash,
792792
) -> Result<(), APIError> {
793+
let _event_queue_notifier = self.pending_events.notifier();
794+
793795
let peer_by_intercept_scid = self.peer_by_intercept_scid.read().unwrap();
794796
if let Some(counterparty_node_id) = peer_by_intercept_scid.get(&intercept_scid) {
795797
let outer_state_lock = self.per_peer_state.read().unwrap();
@@ -1078,6 +1080,7 @@ where
10781080
fn handle_get_info_request(
10791081
&self, request_id: RequestId, counterparty_node_id: &PublicKey, params: GetInfoRequest,
10801082
) -> Result<(), LightningError> {
1083+
let _event_queue_notifier = self.pending_events.notifier();
10811084
let (result, response) = {
10821085
let mut outer_state_lock = self.per_peer_state.write().unwrap();
10831086
let inner_state_lock =
@@ -1097,7 +1100,6 @@ where
10971100
token: params.token,
10981101
});
10991102
self.pending_events.enqueue(event);
1100-
11011103
(Ok(()), msg)
11021104
},
11031105
(e, msg) => (e, msg),
@@ -1114,6 +1116,7 @@ where
11141116
fn handle_buy_request(
11151117
&self, request_id: RequestId, counterparty_node_id: &PublicKey, params: BuyRequest,
11161118
) -> Result<(), LightningError> {
1119+
let _event_queue_notifier = self.pending_events.notifier();
11171120
if let Some(payment_size_msat) = params.payment_size_msat {
11181121
if payment_size_msat < params.opening_fee_params.min_payment_size_msat {
11191122
let response = LSPS2Response::BuyError(ResponseError {

0 commit comments

Comments
 (0)