Skip to content

Commit d7880ef

Browse files
committed
feat(background-processor): add BackgroundProcessorBuilder for optional components
1 parent ebdbee0 commit d7880ef

File tree

1 file changed

+267
-0
lines changed
  • lightning-background-processor/src

1 file changed

+267
-0
lines changed

lightning-background-processor/src/lib.rs

Lines changed: 267 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,9 @@ use std::time::Instant;
5757
#[cfg(not(feature = "std"))]
5858
use alloc::boxed::Box;
5959

60+
#[cfg(feature = "std")]
61+
use std::marker::PhantomData;
62+
6063
/// `BackgroundProcessor` takes care of tasks that (1) need to happen periodically to keep
6164
/// Rust-Lightning running properly, and (2) either can or should be run in the background. Its
6265
/// responsibilities are:
@@ -1046,6 +1049,171 @@ impl BackgroundProcessor {
10461049
None => Ok(()),
10471050
}
10481051
}
1052+
1053+
/// Creates a new [`BackgroundProcessorBuilder`] to construct a [`BackgroundProcessor`] with optional components.
1054+
pub fn builder<'a, PS, EH, M, CM, PGS, RGS, G, UL, L, PM>(
1055+
persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
1056+
gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM, logger: L,
1057+
) -> BackgroundProcessorBuilder<'a, PS, EH, M, CM, (), PGS, RGS, G, UL, L, PM, ()>
1058+
where
1059+
PS: 'static + Deref + Send,
1060+
EH: 'static + EventHandler + Send,
1061+
M: 'static + Deref + Send + Sync,
1062+
CM: 'static + Deref + Send + Sync,
1063+
PGS: 'static + Deref<Target = P2PGossipSync<G, UL, L>> + Send + Sync,
1064+
RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
1065+
G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
1066+
UL: 'static + Deref + Send + Sync,
1067+
L: 'static + Deref + Send + Sync,
1068+
PM: 'static + Deref + Send + Sync,
1069+
{
1070+
BackgroundProcessorBuilder::new(
1071+
persister,
1072+
event_handler,
1073+
chain_monitor,
1074+
channel_manager,
1075+
gossip_sync,
1076+
peer_manager,
1077+
logger,
1078+
)
1079+
}
1080+
}
1081+
1082+
/// A builder for constructing a [`BackgroundProcessor`] with optional components.
1083+
///
1084+
/// This builder provides a flexible and type-safe way to construct a [`BackgroundProcessor`]
1085+
/// with optional components like `onion_messenger` and `scorer`. It helps avoid specifying
1086+
/// concrete types for components that aren't being used.
1087+
///
1088+
/// Use [`BackgroundProcessor::builder`] to create a new builder instance.
1089+
#[cfg(feature = "std")]
1090+
pub struct BackgroundProcessorBuilder<
1091+
'a,
1092+
UL: 'static + Deref + Send + Sync,
1093+
CF: 'static + Deref + Send + Sync,
1094+
T: 'static + Deref + Send + Sync,
1095+
F: 'static + Deref + Send + Sync,
1096+
G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
1097+
L: 'static + Deref + Send + Sync,
1098+
P: 'static + Deref + Send + Sync,
1099+
EH: 'static + EventHandler + Send,
1100+
PS: 'static + Deref + Send,
1101+
M: 'static
1102+
+ Deref<Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P>>
1103+
+ Send
1104+
+ Sync,
1105+
CM: 'static + Deref + Send + Sync,
1106+
OM: 'static + Deref + Send + Sync,
1107+
PGS: 'static + Deref<Target = P2PGossipSync<G, UL, L>> + Send + Sync,
1108+
RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
1109+
PM: 'static + Deref + Send + Sync,
1110+
S: 'static + Deref<Target = SC> + Send + Sync,
1111+
SC: for<'b> WriteableScore<'b>,
1112+
> where
1113+
UL::Target: 'static + UtxoLookup,
1114+
CF::Target: 'static + chain::Filter,
1115+
T::Target: 'static + BroadcasterInterface,
1116+
F::Target: 'static + FeeEstimator,
1117+
L::Target: 'static + Logger,
1118+
P::Target: 'static + Persist<<CM::Target as AChannelManager>::Signer>,
1119+
PS::Target: 'static + Persister<'a, CM, L, S>,
1120+
CM::Target: AChannelManager + Send + Sync,
1121+
OM::Target: AOnionMessenger + Send + Sync,
1122+
PM::Target: APeerManager + Send + Sync,
1123+
{
1124+
persister: PS,
1125+
event_handler: EH,
1126+
chain_monitor: M,
1127+
channel_manager: CM,
1128+
onion_messenger: Option<OM>,
1129+
gossip_sync: GossipSync<PGS, RGS, G, UL, L>,
1130+
peer_manager: PM,
1131+
logger: L,
1132+
scorer: Option<S>,
1133+
_phantom: PhantomData<(&'a (), CF, T, F, P)>,
1134+
}
1135+
1136+
#[cfg(feature = "std")]
1137+
impl<
1138+
'a,
1139+
UL: 'static + Deref + Send + Sync,
1140+
CF: 'static + Deref + Send + Sync,
1141+
T: 'static + Deref + Send + Sync,
1142+
F: 'static + Deref + Send + Sync,
1143+
G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
1144+
L: 'static + Deref + Send + Sync,
1145+
P: 'static + Deref + Send + Sync,
1146+
EH: 'static + EventHandler + Send,
1147+
PS: 'static + Deref + Send,
1148+
M: 'static
1149+
+ Deref<Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P>>
1150+
+ Send
1151+
+ Sync,
1152+
CM: 'static + Deref + Send + Sync,
1153+
OM: 'static + Deref + Send + Sync,
1154+
PGS: 'static + Deref<Target = P2PGossipSync<G, UL, L>> + Send + Sync,
1155+
RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
1156+
PM: 'static + Deref + Send + Sync,
1157+
S: 'static + Deref<Target = SC> + Send + Sync,
1158+
SC: for<'b> WriteableScore<'b>,
1159+
> BackgroundProcessorBuilder<'a, UL, CF, T, F, G, L, P, EH, PS, M, CM, OM, PGS, RGS, PM, S, SC>
1160+
where
1161+
UL::Target: 'static + UtxoLookup,
1162+
CF::Target: 'static + chain::Filter,
1163+
T::Target: 'static + BroadcasterInterface,
1164+
F::Target: 'static + FeeEstimator,
1165+
L::Target: 'static + Logger,
1166+
P::Target: 'static + Persist<<CM::Target as AChannelManager>::Signer>,
1167+
PS::Target: 'static + Persister<'a, CM, L, S>,
1168+
CM::Target: AChannelManager + Send + Sync,
1169+
OM::Target: AOnionMessenger + Send + Sync,
1170+
PM::Target: APeerManager + Send + Sync,
1171+
{
1172+
/// Creates a new builder instance. This is an internal method - use [`BackgroundProcessor::builder`] instead.
1173+
pub(crate) fn new(
1174+
persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
1175+
gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM, logger: L,
1176+
) -> Self {
1177+
Self {
1178+
persister,
1179+
event_handler,
1180+
chain_monitor,
1181+
channel_manager,
1182+
onion_messenger: None,
1183+
gossip_sync,
1184+
peer_manager,
1185+
logger,
1186+
scorer: None,
1187+
_phantom: PhantomData,
1188+
}
1189+
}
1190+
1191+
/// Sets the optional onion messenger component.
1192+
pub fn with_onion_messenger(&mut self, onion_messenger: OM) -> &mut Self {
1193+
self.onion_messenger = Some(onion_messenger);
1194+
self
1195+
}
1196+
1197+
/// Sets the optional scorer component.
1198+
pub fn with_scorer(&mut self, scorer: S) -> &mut Self {
1199+
self.scorer = Some(scorer);
1200+
self
1201+
}
1202+
1203+
/// Builds and starts the [`BackgroundProcessor`].
1204+
pub fn build(self) -> BackgroundProcessor {
1205+
BackgroundProcessor::start(
1206+
self.persister,
1207+
self.event_handler,
1208+
self.chain_monitor,
1209+
self.channel_manager,
1210+
self.onion_messenger,
1211+
self.gossip_sync,
1212+
self.peer_manager,
1213+
self.logger,
1214+
self.scorer,
1215+
)
1216+
}
10491217
}
10501218

10511219
#[cfg(feature = "std")]
@@ -2721,4 +2889,103 @@ mod tests {
27212889
r1.unwrap().unwrap();
27222890
r2.unwrap()
27232891
}
2892+
2893+
#[test]
2894+
fn test_background_processor_builder() {
2895+
// Test that when a new channel is created, the ChannelManager needs to be re-persisted with
2896+
// updates. Also test that when new updates are available, the manager signals that it needs
2897+
// re-persistence and is successfully re-persisted.
2898+
let (persist_dir, nodes) = create_nodes(2, "test_background_processor_builder");
2899+
2900+
// Go through the channel creation process so that each node has something to persist. Since
2901+
// open_channel consumes events, it must complete before starting BackgroundProcessor to
2902+
// avoid a race with processing events.
2903+
let tx = open_channel!(nodes[0], nodes[1], 100000);
2904+
2905+
// Initiate the background processors to watch each node.
2906+
let data_dir = nodes[0].kv_store.get_data_dir();
2907+
let persister = Arc::new(Persister::new(data_dir));
2908+
let event_handler = |_: _| Ok(());
2909+
let bg_processor = BackgroundProcessor::builder(
2910+
persister,
2911+
event_handler,
2912+
nodes[0].chain_monitor.clone(),
2913+
nodes[0].node.clone(),
2914+
nodes[0].p2p_gossip_sync(),
2915+
nodes[0].peer_manager.clone(),
2916+
nodes[0].logger.clone(),
2917+
)
2918+
.with_onion_messenger(nodes[0].messenger.clone())
2919+
.with_scorer(nodes[0].scorer.clone())
2920+
.build();
2921+
2922+
macro_rules! check_persisted_data {
2923+
($node: expr, $filepath: expr) => {
2924+
let mut expected_bytes = Vec::new();
2925+
loop {
2926+
expected_bytes.clear();
2927+
match $node.write(&mut expected_bytes) {
2928+
Ok(()) => match std::fs::read($filepath) {
2929+
Ok(bytes) => {
2930+
if bytes == expected_bytes {
2931+
break;
2932+
} else {
2933+
continue;
2934+
}
2935+
},
2936+
Err(_) => continue,
2937+
},
2938+
Err(e) => panic!("Unexpected error: {}", e),
2939+
}
2940+
}
2941+
};
2942+
}
2943+
2944+
// Check that the initial data is persisted as expected
2945+
let filepath =
2946+
get_full_filepath(format!("{}_persister_0", &persist_dir), "manager".to_string());
2947+
check_persisted_data!(nodes[0].node, filepath.clone());
2948+
2949+
loop {
2950+
if !nodes[0].node.get_event_or_persist_condvar_value() {
2951+
break;
2952+
}
2953+
}
2954+
2955+
// Force-close the channel.
2956+
let error_message = "Channel force-closed";
2957+
nodes[0]
2958+
.node
2959+
.force_close_broadcasting_latest_txn(
2960+
&ChannelId::v1_from_funding_outpoint(OutPoint {
2961+
txid: tx.compute_txid(),
2962+
index: 0,
2963+
}),
2964+
&nodes[1].node.get_our_node_id(),
2965+
error_message.to_string(),
2966+
)
2967+
.unwrap();
2968+
2969+
// Check that the force-close updates are persisted
2970+
check_persisted_data!(nodes[0].node, manager_path.clone());
2971+
loop {
2972+
if !nodes[0].node.get_event_or_persist_condvar_value() {
2973+
break;
2974+
}
2975+
}
2976+
2977+
// Check network graph is persisted
2978+
let filepath =
2979+
get_full_filepath(format!("{}_persister_0", &persist_dir), "network_graph".to_string());
2980+
check_persisted_data!(nodes[0].network_graph, filepath);
2981+
2982+
// Check scorer is persisted
2983+
let filepath =
2984+
get_full_filepath(format!("{}_persister_0", &persist_dir), "scorer".to_string());
2985+
check_persisted_data!(nodes[0].scorer, filepath);
2986+
2987+
if !std::thread::panicking() {
2988+
bg_processor.stop().unwrap();
2989+
}
2990+
}
27242991
}

0 commit comments

Comments
 (0)