Skip to content

Commit 15d733b

Browse files
committed
wip
1 parent e1509f1 commit 15d733b

File tree

3 files changed

+44
-37
lines changed

3 files changed

+44
-37
lines changed

lightning/src/chain/chainmonitor.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ use crate::chain::transaction::{OutPoint, TransactionData};
3434
use crate::ln::types::ChannelId;
3535
use crate::sign::ecdsa::EcdsaChannelSigner;
3636
use crate::events::{self, Event, EventHandler, ReplayEvent};
37+
use crate::util::async_poll::{AsyncResult, AsyncResultNo};
3738
use crate::util::logger::{Logger, WithContext};
3839
use crate::util::errors::APIError;
3940
use crate::util::persist::MonitorName;
@@ -171,7 +172,7 @@ pub trait Persist<ChannelSigner: EcdsaChannelSigner> {
171172
/// the archive process. Additionally, because the archive operation could be retried on
172173
/// restart, this method must in that case be idempotent, ensuring it can handle scenarios where
173174
/// the monitor already exists in the archive.
174-
fn archive_persisted_channel(&self, monitor_name: MonitorName);
175+
fn archive_persisted_channel<'a>(&'a self, monitor_name: MonitorName) -> AsyncResultNo<'a>;
175176
}
176177

177178
struct MonitorHolder<ChannelSigner: EcdsaChannelSigner> {
@@ -1136,4 +1137,3 @@ mod tests {
11361137
}).is_err());
11371138
}
11381139
}
1139-

lightning/src/util/async_poll.rs

+3
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,9 @@ pub(crate) fn dummy_waker() -> Waker {
9696
unsafe { Waker::from_raw(RawWaker::new(core::ptr::null(), &DUMMY_WAKER_VTABLE)) }
9797
}
9898

99+
pub type AsyncResultError<'a, T, E> = Pin<Box<dyn Future<Output = Result<T, E>> + 'a + Send>>;
100+
pub type AsyncResultNo<'a> = Pin<Box<dyn Future<Output = ()> + 'a + Send>>;
101+
99102
/// A type alias for a future that returns a result of type T.
100103
#[cfg(feature = "std")]
101104
pub type AsyncResult<'a, T> = Pin<Box<dyn Future<Output = Result<T, ()>> + 'a + Send>>;

lightning/src/util/persist.rs

+39-35
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ use crate::sign::{ecdsa::EcdsaChannelSigner, EntropySource, SignerProvider};
3232
use crate::util::logger::Logger;
3333
use crate::util::ser::{Readable, ReadableArgs, Writeable};
3434

35+
use super::async_poll::{AsyncResult, AsyncResultError, AsyncResultNo};
36+
3537
/// The alphabet of characters allowed for namespaces and keys.
3638
pub const KVSTORE_NAMESPACE_KEY_ALPHABET: &str =
3739
"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789_-";
@@ -127,9 +129,9 @@ pub trait KVStore {
127129
/// `primary_namespace` and `secondary_namespace`.
128130
///
129131
/// [`ErrorKind::NotFound`]: io::ErrorKind::NotFound
130-
fn read(
131-
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
132-
) -> Result<Vec<u8>, io::Error>;
132+
fn read<'a>(
133+
&'a self, primary_namespace: &str, secondary_namespace: &str, key: &str,
134+
) -> AsyncResultError<'a, Vec<u8>, io::Error>;
133135
/// Persists the given data under the given `key`.
134136
///
135137
/// Will create the given `primary_namespace` and `secondary_namespace` if not already present
@@ -186,13 +188,13 @@ pub trait MigratableKVStore: KVStore {
186188
///
187189
/// Will abort and return an error if any IO operation fails. Note that in this case the
188190
/// `target_store` might get left in an intermediate state.
189-
pub fn migrate_kv_store_data<S: MigratableKVStore, T: MigratableKVStore>(
191+
pub async fn migrate_kv_store_data<S: MigratableKVStore, T: MigratableKVStore>(
190192
source_store: &mut S, target_store: &mut T,
191193
) -> Result<(), io::Error> {
192194
let keys_to_migrate = source_store.list_all_keys()?;
193195

194196
for (primary_namespace, secondary_namespace, key) in &keys_to_migrate {
195-
let data = source_store.read(primary_namespace, secondary_namespace, key)?;
197+
let data = source_store.read(primary_namespace, secondary_namespace, key).await?;
196198
target_store.write(primary_namespace, secondary_namespace, key, &data)?;
197199
}
198200

@@ -254,7 +256,7 @@ where
254256
}
255257
}
256258

257-
impl<ChannelSigner: EcdsaChannelSigner, K: KVStore + ?Sized> Persist<ChannelSigner> for K {
259+
impl<ChannelSigner: EcdsaChannelSigner, K: KVStore + Sync + ?Sized> Persist<ChannelSigner> for K {
258260
// TODO: We really need a way for the persister to inform the user that its time to crash/shut
259261
// down once these start returning failure.
260262
// Then we should return InProgress rather than UnrecoverableError, implying we should probably
@@ -289,36 +291,38 @@ impl<ChannelSigner: EcdsaChannelSigner, K: KVStore + ?Sized> Persist<ChannelSign
289291
}
290292
}
291293

292-
fn archive_persisted_channel(&self, monitor_name: MonitorName) {
293-
let monitor_key = monitor_name.to_string();
294-
let monitor = match self.read(
295-
CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
296-
CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
297-
monitor_key.as_str(),
298-
) {
299-
Ok(monitor) => monitor,
300-
Err(_) => return,
301-
};
302-
match self.write(
303-
ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
304-
ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
305-
monitor_key.as_str(),
306-
&monitor,
307-
) {
308-
Ok(()) => {},
309-
Err(_e) => return,
310-
};
311-
let _ = self.remove(
312-
CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
313-
CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
314-
monitor_key.as_str(),
315-
true,
316-
);
294+
fn archive_persisted_channel<'a>(&'a self, monitor_name: MonitorName) -> AsyncResultNo<'a>{
295+
Box::pin(async move {
296+
let monitor_key = monitor_name.to_string();
297+
let monitor = match self.read(
298+
CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
299+
CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
300+
monitor_key.as_str(),
301+
).await {
302+
Ok(monitor) => monitor,
303+
Err(_) => return,
304+
};
305+
match self.write(
306+
ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
307+
ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
308+
monitor_key.as_str(),
309+
&monitor,
310+
) {
311+
Ok(()) => {},
312+
Err(_e) => return,
313+
};
314+
let _ = self.remove(
315+
CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
316+
CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
317+
monitor_key.as_str(),
318+
true,
319+
);
320+
})
317321
}
318322
}
319323

320324
/// Read previously persisted [`ChannelMonitor`]s from the store.
321-
pub fn read_channel_monitors<K: Deref, ES: Deref, SP: Deref>(
325+
pub async fn read_channel_monitors<K: Deref, ES: Deref, SP: Deref>(
322326
kv_store: K, entropy_source: ES, signer_provider: SP,
323327
) -> Result<Vec<(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>)>, io::Error>
324328
where
@@ -337,7 +341,7 @@ where
337341
CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
338342
CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
339343
&stored_key,
340-
)?),
344+
).await?),
341345
(&*entropy_source, &*signer_provider),
342346
) {
343347
Ok((block_hash, channel_monitor)) => {
@@ -586,15 +590,15 @@ where
586590
}
587591

588592
/// Read a channel monitor.
589-
fn read_monitor(
593+
async fn read_monitor(
590594
&self, monitor_name: &MonitorName, monitor_key: &str,
591595
) -> Result<(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>), io::Error>
592596
{
593597
let mut monitor_cursor = io::Cursor::new(self.kv_store.read(
594598
CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
595599
CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
596600
monitor_key,
597-
)?);
601+
).await?);
598602
// Discard the sentinel bytes if found.
599603
if monitor_cursor.get_ref().starts_with(MONITOR_UPDATING_PERSISTER_PREPEND_SENTINEL) {
600604
monitor_cursor.set_position(MONITOR_UPDATING_PERSISTER_PREPEND_SENTINEL.len() as u64);

0 commit comments

Comments
 (0)