Skip to content

Commit 101db25

Browse files
hotfix: small fixes for the replay (#304)
## Description This PR introduces optimizations and reliability improvements for ledger replay: - Start from the last (not corrupted snapshot), delete corrupted ones - Added hard linking for account files in `accounts_persister.rs` to prevent file removal during ledger access and replay - Increased RemoteAccountUpdatesWorker connection refresh interval from 5 to 50 minutes in `magic_validator.rs` for better stability (also, there are issues if the ws shard disconnect during replay) - Added `FailedToGetSubscriptionSlot` error handling in `account_cloner.rs` for subscription slot retrieval failures - Increased account hydration concurrency from 10 to 30 and fetch retries from 10 to 50 in `remote_account_cloner_worker.rs` for improved reliability <!-- /greptile_comment -->
1 parent 6b9bf65 commit 101db25

File tree

5 files changed

+70
-56
lines changed

5 files changed

+70
-56
lines changed

magicblock-account-cloner/src/account_cloner.rs

+3
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,9 @@ pub enum AccountClonerError {
3535

3636
#[error("FailedToFetchSatisfactorySlot")]
3737
FailedToFetchSatisfactorySlot,
38+
39+
#[error("FailedToGetSubscriptionSlot")]
40+
FailedToGetSubscriptionSlot,
3841
}
3942

4043
pub type AccountClonerResult<T> = Result<T, AccountClonerError>;

magicblock-account-cloner/src/remote_account_cloner_worker.rs

+13-6
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ where
134134
) -> Self {
135135
let (clone_request_sender, clone_request_receiver) =
136136
unbounded_channel();
137-
let fetch_retries = 10;
137+
let fetch_retries = 50;
138138
Self {
139139
internal_account_provider,
140140
account_fetcher,
@@ -273,9 +273,10 @@ where
273273
// retry resulting in overall slower hydration.
274274
// If the optimal rate here is desired we might make this configurable in the
275275
// future.
276+
// TODO(GabrielePicco): Make the concurrency configurable
276277
stream
277278
.map(Ok::<_, AccountClonerError>)
278-
.try_for_each_concurrent(10, |(pubkey, owner)| async move {
279+
.try_for_each_concurrent(30, |(pubkey, owner)| async move {
279280
trace!("Hydrating '{}'", pubkey);
280281
let res = self
281282
.do_clone_and_update_cache(
@@ -446,9 +447,15 @@ where
446447
}
447448
// If we failed to fetch too many time, stop here
448449
if fetch_count >= self.fetch_retries {
449-
return Err(
450-
AccountClonerError::FailedToFetchSatisfactorySlot,
451-
);
450+
return if min_context_slot.is_none() {
451+
Err(
452+
AccountClonerError::FailedToGetSubscriptionSlot,
453+
)
454+
} else {
455+
Err(
456+
AccountClonerError::FailedToFetchSatisfactorySlot,
457+
)
458+
};
452459
}
453460
}
454461
Err(error) => {
@@ -459,7 +466,7 @@ where
459466
}
460467
};
461468
// Wait a bit in the hopes of the min_context_slot becoming available (about half a slot)
462-
sleep(Duration::from_millis(300)).await;
469+
sleep(Duration::from_millis(400)).await;
463470
}
464471
} else {
465472
self.fetch_account_chain_snapshot(pubkey, None).await?

magicblock-account-cloner/tests/remote_account_cloner.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,7 @@ async fn test_clone_fails_stale_undelegated_account_when_ephemeral() {
278278
result,
279279
Err(AccountClonerError::FailedToFetchSatisfactorySlot)
280280
));
281-
assert_eq!(account_fetcher.get_fetch_count(&undelegated_account), 10); // Must have retried
281+
assert_eq!(account_fetcher.get_fetch_count(&undelegated_account), 50); // Must have retried
282282
assert!(account_updates.has_account_monitoring(&undelegated_account));
283283
assert!(account_dumper.was_untouched(&undelegated_account));
284284
// Cleanup everything correctly

magicblock-accounts-db/src/persist/accounts_persister.rs

+51-47
Original file line numberDiff line numberDiff line change
@@ -254,7 +254,6 @@ impl AccountsPersister {
254254
.first()
255255
.ok_or(AccountsDbError::NoStoragePathProvided)?;
256256

257-
// Read all files sorted slot/append_vec_id and return the last one
258257
let files = fs::read_dir(path)?;
259258
let mut files: Vec<_> = files
260259
.filter_map(|entry| {
@@ -285,56 +284,61 @@ impl AccountsPersister {
285284
})
286285
.collect();
287286

288-
files.sort_by(
289-
|(_, slot_a, id_a): &(PathBuf, Slot, AppendVecId),
290-
(_, slot_b, id_b): &(PathBuf, Slot, AppendVecId)| {
291-
// Sorting in reverse order
292-
if slot_a == slot_b {
293-
id_b.cmp(id_a)
294-
} else {
295-
slot_b.cmp(slot_a)
296-
}
297-
},
298-
);
287+
files.sort_by(|(_, slot_a, id_a), (_, slot_b, id_b)| {
288+
if slot_a == slot_b {
289+
id_b.cmp(id_a)
290+
} else {
291+
slot_b.cmp(slot_a)
292+
}
293+
});
299294

300-
let matching_file = {
301-
let mut matching_file = None;
302-
for (file, slot, id) in files {
303-
if slot <= max_slot {
304-
matching_file.replace((file, slot, id));
305-
break;
295+
for (file, slot, id) in files {
296+
if slot <= max_slot {
297+
let link = file.with_extension("link");
298+
if let Err(err) = fs::hard_link(&file, &link) {
299+
warn!("Failed to create hard link for {:?}: {}", file, err);
300+
continue;
301+
}
302+
let file = link;
303+
304+
let file_size = match fs::metadata(&file) {
305+
Ok(metadata) => metadata.len() as usize,
306+
Err(err) => {
307+
warn!("Failed to get metadata for {:?}: {}", file, err);
308+
let _ = fs::remove_file(&file);
309+
continue;
310+
}
311+
};
312+
313+
match AppendVec::new_from_file(
314+
&file,
315+
file_size,
316+
StorageAccess::Mmap,
317+
) {
318+
Ok((append_vec, num_accounts)) => {
319+
let accounts = AccountsFile::AppendVec(append_vec);
320+
let storage = AccountStorageEntry::new_existing(
321+
slot,
322+
id,
323+
accounts,
324+
num_accounts,
325+
);
326+
return Ok(Some((storage, slot)));
327+
}
328+
Err(err) => {
329+
warn!("Failed to load append vec from {:?}: {}. Deleting corrupted file.", file, err);
330+
let _ = fs::remove_file(&file);
331+
}
306332
}
307333
}
308-
matching_file
309-
};
310-
let Some((file, slot, id)) = matching_file else {
311-
warn!(
312-
"No storage found with slot <= {} inside {}",
313-
max_slot,
314-
path.display().to_string(),
315-
);
316-
return Ok(None);
317-
};
318-
319-
// When we drop the AppendVec the underlying file is removed from the
320-
// filesystem. There is no way to configure this via public methods.
321-
// Thus we copy the file before using it for the AppendVec. This way
322-
// we prevent account files being removed when we point a tool at the ledger
323-
// or replay it.
324-
let file = {
325-
let copy = file.with_extension("copy");
326-
fs::copy(&file, &copy)?;
327-
copy
328-
};
334+
}
329335

330-
// Create a AccountStorageEntry from the file
331-
let file_size = fs::metadata(&file)?.len() as usize;
332-
let (append_vec, num_accounts) =
333-
AppendVec::new_from_file(&file, file_size, StorageAccess::Mmap)?;
334-
let accounts = AccountsFile::AppendVec(append_vec);
335-
let storage =
336-
AccountStorageEntry::new_existing(slot, id, accounts, num_accounts);
337-
Ok(Some((storage, slot)))
336+
warn!(
337+
"No valid storage found with slot <= {} inside {}",
338+
max_slot,
339+
path.display().to_string(),
340+
);
341+
Ok(None)
338342
}
339343

340344
// -----------------

magicblock-api/src/magic_validator.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -242,8 +242,8 @@ impl MagicValidator {
242242
remote_rpc_config.clone(),
243243
remote_rpc_config.clone(),
244244
],
245-
// We'll kill/refresh one connection every 5 minutes
246-
Duration::from_secs(60 * 5),
245+
// We'll kill/refresh one connection every 50 minutes
246+
Duration::from_secs(60 * 50),
247247
);
248248

249249
let transaction_status_sender = TransactionStatusSender {

0 commit comments

Comments
 (0)