Skip to content

Commit 88845e8

Browse files
committed
feat: purge ephemeral storage
1 parent 4d605ac commit 88845e8

File tree

2 files changed

+237
-3
lines changed

2 files changed

+237
-3
lines changed

crates/storage/src/versioned/ephemeral_v1/sql.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,3 +76,11 @@ pub fn entry_count_and_size(content_type: &ContentType) -> String {
7676
table_name(content_type)
7777
)
7878
}
79+
80+
pub fn purge_by_slot(content_type: &ContentType) -> String {
81+
format!(
82+
"DELETE FROM {}
83+
WHERE slot < :slot",
84+
table_name(content_type)
85+
)
86+
}

crates/storage/src/versioned/ephemeral_v1/store.rs

Lines changed: 229 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,15 @@
1-
use std::marker::PhantomData;
1+
use std::{
2+
marker::PhantomData,
3+
sync::Arc,
4+
time::{Duration, SystemTime, UNIX_EPOCH},
5+
};
26

3-
use ethportal_api::{OverlayContentKey, RawContentValue};
7+
use alloy::eips::merge::EPOCH_SLOTS;
8+
use ethportal_api::{jsonrpsee::tokio, OverlayContentKey, RawContentValue};
49
use r2d2::Pool;
510
use r2d2_sqlite::SqliteConnectionManager;
611
use rusqlite::{named_params, types::Type, OptionalExtension};
12+
use tokio::task::JoinHandle;
713
use tracing::{debug, warn};
814
use trin_metrics::storage::StorageMetricsReporter;
915

@@ -15,6 +21,9 @@ use crate::{
1521
ContentId,
1622
};
1723

24+
pub const BEACON_GENESIS_TIME: u64 = 1606824023;
25+
pub const SLOTS_PER_HISTORICAL_ROOT: u64 = 8192;
26+
1827
/// The store for storing ephemeral headers, bodies, and receipts.
1928
#[allow(unused)]
2029
#[derive(Debug)]
@@ -27,6 +36,8 @@ pub struct EphemeralV1Store<TContentKey: OverlayContentKey> {
2736
metrics: StorageMetricsReporter,
2837
/// Phantom Content Key
2938
_phantom_content_key: PhantomData<TContentKey>,
39+
/// Background task handle for periodic purging
40+
background_purge_task: Option<JoinHandle<()>>,
3041
}
3142

3243
impl<TContentKey: OverlayContentKey> VersionedContentStore for EphemeralV1Store<TContentKey> {
@@ -59,6 +70,7 @@ impl<TContentKey: OverlayContentKey> VersionedContentStore for EphemeralV1Store<
5970
metrics: StorageMetricsReporter::new(subnetwork),
6071
_phantom_content_key: PhantomData,
6172
config,
73+
background_purge_task: None,
6274
};
6375
store.init()?;
6476
Ok(store)
@@ -71,13 +83,61 @@ impl<TContentKey: OverlayContentKey> EphemeralV1Store<TContentKey> {
7183
fn init(&mut self) -> Result<(), ContentStoreError> {
7284
self.init_usage_stats()?;
7385

74-
// TODO: Prune if necessary.
86+
// Purge content based on the last historical summaries update slot
87+
let rows_deleted =
88+
Self::purge_content_before_last_summary_internal(&Arc::new(self.config.clone()))?;
89+
90+
if rows_deleted > 0 {
91+
debug!(
92+
"Purged {} ephemeral content with during initialization",
93+
rows_deleted
94+
);
95+
}
7596

7697
Ok(())
7798
}
7899

79100
// PUBLIC FUNCTIONS
80101

102+
/// Starts the background task for periodic purging.
103+
/// This can be called explicitly after initialization if needed.
104+
pub fn start_background_purge_task(&mut self) -> Result<(), ContentStoreError> {
105+
let config = Arc::new(self.config.clone());
106+
107+
let handle = tokio::spawn(async move {
108+
// Run purge immediately when task starts
109+
if let Err(e) = Self::purge_content_before_last_summary_internal(&config) {
110+
warn!("Error purging content in background task: {}", e);
111+
}
112+
113+
let mut interval = tokio::time::interval(Duration::from_secs(12 * EPOCH_SLOTS)); // One epoch duration
114+
loop {
115+
interval.tick().await;
116+
117+
// Check if we're at a historical summaries update boundary
118+
let current_epoch = Self::expected_current_epoch();
119+
let next_epoch = current_epoch + 1;
120+
let period = SLOTS_PER_HISTORICAL_ROOT / EPOCH_SLOTS;
121+
122+
if next_epoch % period == 0 {
123+
if let Err(e) = Self::purge_content_before_last_summary_internal(&config) {
124+
warn!("Error purging content in background task: {}", e);
125+
}
126+
}
127+
}
128+
});
129+
130+
self.background_purge_task = Some(handle);
131+
Ok(())
132+
}
133+
134+
/// Stops the background purge task if it's running.
135+
pub fn stop_background_purge_task(&mut self) {
136+
if let Some(handle) = self.background_purge_task.take() {
137+
handle.abort();
138+
}
139+
}
140+
81141
/// Returns whether data associated with the content id is already stored.
82142
pub fn has_content(&self, content_id: &ContentId) -> Result<bool, ContentStoreError> {
83143
let timer = self.metrics.start_process_timer("has_content");
@@ -225,6 +285,15 @@ impl<TContentKey: OverlayContentKey> EphemeralV1Store<TContentKey> {
225285
self.metrics.get_summary()
226286
}
227287

288+
/// Manually triggers a purge of content before the last historical summary.
289+
/// This can be used to manually control when content is purged, independent of the background
290+
/// task.
291+
///
292+
/// Returns the number of rows deleted.
293+
pub fn trigger_content_purge(&self) -> Result<usize, ContentStoreError> {
294+
Self::purge_content_before_last_summary_internal(&Arc::new(self.config.clone()))
295+
}
296+
228297
// INTERNAL FUNCTIONS
229298

230299
/// Lookup and set `usage_stats`.
@@ -263,6 +332,61 @@ impl<TContentKey: OverlayContentKey> EphemeralV1Store<TContentKey> {
263332
) -> u64 {
264333
(raw_content_id.len() + raw_content_key.len() + raw_content_value.len()) as u64
265334
}
335+
336+
fn expected_current_epoch() -> u64 {
337+
let now = SystemTime::now();
338+
let now = now.duration_since(UNIX_EPOCH).expect("Time went backwards");
339+
let since_genesis = now - Duration::from_secs(BEACON_GENESIS_TIME);
340+
341+
since_genesis.as_secs() / 12 / EPOCH_SLOTS
342+
}
343+
344+
/// Internal method to purge content, used by both the main thread and background task
345+
fn purge_content_before_last_summary_internal(
346+
config: &Arc<EphemeralV1StoreConfig>,
347+
) -> Result<usize, ContentStoreError> {
348+
let current_epoch = Self::expected_current_epoch();
349+
let cutoff_slot = Self::last_summaries_slot(current_epoch);
350+
351+
let conn = config.sql_connection_pool.get()?;
352+
let query = sql::purge_by_slot(&config.content_type);
353+
354+
let rows_deleted = conn.execute(&query, named_params! { ":slot": cutoff_slot })?;
355+
Ok(rows_deleted)
356+
}
357+
358+
/// Computes the slot at which the last historical summary event occurred.
359+
/// Historical summary events are appended when the next epoch is a multiple
360+
/// of `period = SLOTS_PER_HISTORICAL_ROOT / EPOCH_SLOTS`.
361+
///
362+
/// If the current_epoch is less than the first event boundary (and assuming a genesis event
363+
/// at epoch 0), then this function returns 0.
364+
fn last_summaries_slot(current_epoch: u64) -> u64 {
365+
// Calculate the period (in epochs) at which events are appended.
366+
let period = SLOTS_PER_HISTORICAL_ROOT / EPOCH_SLOTS;
367+
// Compute candidate event epoch:
368+
// This candidate is based on (current_epoch + 1) because events are appended
369+
// when transitioning to the next epoch.
370+
let candidate = ((current_epoch + 1) / period) * period;
371+
// If candidate is greater than current_epoch, then that event is in the future,
372+
// so the last event occurred one period earlier.
373+
let last_summaries_epoch = if candidate > current_epoch {
374+
candidate.saturating_sub(period)
375+
} else {
376+
candidate
377+
};
378+
379+
last_summaries_epoch * EPOCH_SLOTS
380+
}
381+
}
382+
383+
impl<TContentKey: OverlayContentKey> Drop for EphemeralV1Store<TContentKey> {
384+
fn drop(&mut self) {
385+
// Cancel the background task when the store is dropped
386+
if let Some(handle) = self.background_purge_task.take() {
387+
handle.abort();
388+
}
389+
}
266390
}
267391

268392
/// Creates table and indexes if they don't already exist.
@@ -280,6 +404,7 @@ mod tests {
280404
use anyhow::Result;
281405
use ethportal_api::{types::network::Subnetwork, IdentityContentKey};
282406
use tempfile::TempDir;
407+
use tokio::time::{sleep, Duration};
283408

284409
use super::*;
285410
use crate::{test_utils::generate_random_bytes, utils::setup_sql};
@@ -451,4 +576,105 @@ mod tests {
451576

452577
Ok(())
453578
}
579+
580+
#[tokio::test]
581+
async fn test_background_purge_task() -> Result<()> {
582+
let temp_dir = TempDir::new()?;
583+
let config = create_config(&temp_dir);
584+
585+
// Create store without starting background task
586+
let mut store = EphemeralV1Store::<IdentityContentKey>::create(
587+
ContentType::HistoryEphemeral,
588+
config.clone(),
589+
)?;
590+
591+
// Verify background task is not running initially
592+
assert!(store.background_purge_task.is_none());
593+
594+
// Insert test data with slots before and after the cutoff
595+
let current_epoch = EphemeralV1Store::<IdentityContentKey>::expected_current_epoch();
596+
let cutoff_slot =
597+
EphemeralV1Store::<IdentityContentKey>::last_summaries_slot(current_epoch);
598+
599+
let (key1, value1) = generate_key_value();
600+
let (key2, value2) = generate_key_value();
601+
let (key3, value3) = generate_key_value();
602+
603+
// Insert data with slots before cutoff
604+
store.insert(&key1, value1, 0, cutoff_slot.saturating_sub(100))?;
605+
store.insert(&key2, value2, 0, cutoff_slot.saturating_sub(50))?;
606+
607+
// Insert data with slot after cutoff
608+
store.insert(&key3, value3, 0, cutoff_slot + 100)?;
609+
610+
// Verify data is present before starting background task
611+
assert!(store.has_content(&ContentId::from(key1.content_id()))?);
612+
assert!(store.has_content(&ContentId::from(key2.content_id()))?);
613+
assert!(store.has_content(&ContentId::from(key3.content_id()))?);
614+
615+
// Start the background task
616+
store.start_background_purge_task()?;
617+
// Wait for the background task to run and purge data
618+
sleep(Duration::from_secs(1)).await;
619+
assert!(store.background_purge_task.is_some());
620+
621+
// Verify that content before cutoff was purged
622+
assert!(
623+
!store.has_content(&ContentId::from(key1.content_id()))?,
624+
"key1 should be purged"
625+
);
626+
assert!(
627+
!store.has_content(&ContentId::from(key2.content_id()))?,
628+
"key2 should be purged"
629+
);
630+
assert!(
631+
store.has_content(&ContentId::from(key3.content_id()))?,
632+
"key3 should not be purged"
633+
);
634+
635+
// Stop the background task
636+
store.stop_background_purge_task();
637+
assert!(store.background_purge_task.is_none());
638+
639+
Ok(())
640+
}
641+
642+
#[test]
643+
fn test_purge_content_during_init() -> Result<()> {
644+
let temp_dir = TempDir::new()?;
645+
let config = create_config(&temp_dir);
646+
647+
// Create and populate store with test data
648+
let mut store = EphemeralV1Store::<IdentityContentKey>::create(
649+
ContentType::HistoryEphemeral,
650+
config.clone(),
651+
)?;
652+
653+
// Insert test data with slots before and after the cutoff
654+
let current_epoch = EphemeralV1Store::<IdentityContentKey>::expected_current_epoch();
655+
let cutoff_slot =
656+
EphemeralV1Store::<IdentityContentKey>::last_summaries_slot(current_epoch);
657+
658+
let (key1, value1) = generate_key_value();
659+
let (key2, value2) = generate_key_value();
660+
let (key3, value3) = generate_key_value();
661+
662+
// Insert data with slots before cutoff
663+
store.insert(&key1, value1, 0, cutoff_slot.saturating_sub(100))?;
664+
store.insert(&key2, value2, 0, cutoff_slot.saturating_sub(50))?;
665+
666+
// Insert data with slot after cutoff
667+
store.insert(&key3, value3, 0, cutoff_slot + 100)?;
668+
669+
// Create a new store instance to trigger init and purge
670+
let new_store =
671+
EphemeralV1Store::<IdentityContentKey>::create(ContentType::HistoryEphemeral, config)?;
672+
673+
// Verify that content before cutoff was purged
674+
assert!(!new_store.has_content(&ContentId::from(key1.content_id()))?);
675+
assert!(!new_store.has_content(&ContentId::from(key2.content_id()))?);
676+
assert!(new_store.has_content(&ContentId::from(key3.content_id()))?);
677+
678+
Ok(())
679+
}
454680
}

0 commit comments

Comments
 (0)