-
Notifications
You must be signed in to change notification settings - Fork 2
feat(sim): dedup sim cache items #74
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
4738c87
d3d0883
0a03b1c
46cb252
0635206
d1f1f02
bcd03b4
60a54cf
6dbf00e
8573f9e
03a7791
d2f4404
cd15f8d
963229c
623e3ca
5368ca0
65665f1
d7741c1
f6aae32
2f9fcef
122b4a1
f9d7a8f
6e3ebf8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,16 +1,19 @@ | ||
use crate::SimItem; | ||
use crate::{item::SimIdentifier, CacheError, SimItem}; | ||
use alloy::consensus::TxEnvelope; | ||
use core::fmt; | ||
use parking_lot::RwLock; | ||
use signet_bundle::SignetEthBundle; | ||
use std::{ | ||
collections::BTreeMap, | ||
sync::{Arc, RwLock, RwLockWriteGuard}, | ||
collections::{BTreeMap, HashSet}, | ||
sync::Arc, | ||
}; | ||
|
||
/// A cache for the simulator. | ||
/// | ||
/// This cache is used to store the items that are being simulated. | ||
#[derive(Clone)] | ||
pub struct SimCache { | ||
inner: Arc<RwLock<BTreeMap<u128, SimItem>>>, | ||
inner: Arc<RwLock<CacheInner>>, | ||
capacity: usize, | ||
} | ||
|
||
|
@@ -27,169 +30,273 @@ impl Default for SimCache { | |
} | ||
|
||
impl SimCache { | ||
/// Create a new `SimCache` instance. | ||
/// Create a new `SimCache` instance, with a default capacity of `100`. | ||
pub fn new() -> Self { | ||
Self { inner: Arc::new(RwLock::new(BTreeMap::new())), capacity: 100 } | ||
Self { inner: Arc::new(RwLock::new(CacheInner::new())), capacity: 100 } | ||
} | ||
|
||
/// Create a new `SimCache` instance with a given capacity. | ||
pub fn with_capacity(capacity: usize) -> Self { | ||
Self { inner: Arc::new(RwLock::new(BTreeMap::new())), capacity } | ||
Self { inner: Arc::new(RwLock::new(CacheInner::new())), capacity } | ||
} | ||
|
||
/// Get an iterator over the best items in the cache. | ||
pub fn read_best(&self, n: usize) -> Vec<(u128, SimItem)> { | ||
self.inner.read().unwrap().iter().rev().take(n).map(|(k, v)| (*k, v.clone())).collect() | ||
self.inner.read().items.iter().rev().take(n).map(|(k, item)| (*k, item.clone())).collect() | ||
} | ||
|
||
/// Get the number of items in the cache. | ||
pub fn len(&self) -> usize { | ||
self.inner.read().unwrap().len() | ||
self.inner.read().items.len() | ||
} | ||
|
||
/// True if the cache is empty. | ||
pub fn is_empty(&self) -> bool { | ||
self.inner.read().unwrap().is_empty() | ||
self.inner.read().items.is_empty() | ||
} | ||
|
||
/// Get an item by key. | ||
pub fn get(&self, key: u128) -> Option<SimItem> { | ||
self.inner.read().unwrap().get(&key).cloned() | ||
self.inner.read().items.get(&key).cloned() | ||
} | ||
|
||
/// Remove an item by key. | ||
pub fn remove(&self, key: u128) -> Option<SimItem> { | ||
self.inner.write().unwrap().remove(&key) | ||
let mut inner = self.inner.write(); | ||
if let Some(item) = inner.items.remove(&key) { | ||
inner.seen.remove(item.identifier().as_bytes()); | ||
Some(item) | ||
} else { | ||
None | ||
} | ||
} | ||
|
||
fn add_inner( | ||
guard: &mut RwLockWriteGuard<'_, BTreeMap<u128, SimItem>>, | ||
mut score: u128, | ||
item: SimItem, | ||
capacity: usize, | ||
) { | ||
fn add_inner(inner: &mut CacheInner, mut score: u128, item: SimItem, capacity: usize) { | ||
// Check if we've already seen this item - if so, don't add it | ||
if !inner.seen.insert(item.identifier_owned()) { | ||
return; | ||
} | ||
|
||
// If it has the same score, we decrement (prioritizing earlier items) | ||
while guard.contains_key(&score) && score != 0 { | ||
while inner.items.contains_key(&score) && score != 0 { | ||
score = score.saturating_sub(1); | ||
} | ||
|
||
if guard.len() >= capacity { | ||
if inner.items.len() >= capacity { | ||
// If we are at capacity, we need to remove the lowest score | ||
guard.pop_first(); | ||
if let Some((_, item)) = inner.items.pop_first() { | ||
inner.seen.remove(&item.identifier_owned()); | ||
} | ||
} | ||
|
||
guard.entry(score).or_insert(item); | ||
inner.items.insert(score, item.clone()); | ||
} | ||
|
||
/// Add an item to the cache. | ||
/// | ||
/// The basefee is used to calculate an estimated fee for the item. | ||
pub fn add_item(&self, item: impl Into<SimItem>, basefee: u64) { | ||
let item = item.into(); | ||
/// Add a bundle to the cache. | ||
pub fn add_bundle(&self, bundle: SignetEthBundle, basefee: u64) -> Result<(), CacheError> { | ||
if bundle.replacement_uuid().is_none() { | ||
// If the bundle does not have a replacement UUID, we cannot add it to the cache. | ||
return Err(CacheError::BundleWithoutReplacementUuid); | ||
} | ||
|
||
// Calculate the total fee for the item. | ||
let item = SimItem::try_from(bundle)?; | ||
let score = item.calculate_total_fee(basefee); | ||
|
||
let mut inner = self.inner.write().unwrap(); | ||
|
||
let mut inner = self.inner.write(); | ||
Self::add_inner(&mut inner, score, item, self.capacity); | ||
|
||
Ok(()) | ||
} | ||
|
||
/// Add an iterator of items to the cache. This locks the cache only once | ||
pub fn add_items<I, Item>(&self, item: I, basefee: u64) | ||
/// Add an iterator of bundles to the cache. This locks the cache only once | ||
/// | ||
/// Bundles added should have a valid replacement UUID. Bundles without a replacement UUID will be skipped. | ||
pub fn add_bundles<I, Item>(&self, item: I, basefee: u64) -> Result<(), CacheError> | ||
where | ||
I: IntoIterator<Item = Item>, | ||
Item: Into<SimItem>, | ||
Item: Into<SignetEthBundle>, | ||
{ | ||
let iter = item.into_iter().map(|item| { | ||
let mut inner = self.inner.write(); | ||
|
||
for item in item.into_iter() { | ||
let item = item.into(); | ||
let Ok(item) = SimItem::try_from(item) else { | ||
// Skip invalid bundles | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should we log here that a bundle is being discarded as invalid? |
||
continue; | ||
}; | ||
let score = item.calculate_total_fee(basefee); | ||
(score, item) | ||
}); | ||
Self::add_inner(&mut inner, score, item, self.capacity); | ||
} | ||
|
||
Ok(()) | ||
} | ||
|
||
let mut inner = self.inner.write().unwrap(); | ||
/// Add a transaction to the cache. | ||
pub fn add_tx(&self, tx: TxEnvelope, basefee: u64) { | ||
let item = SimItem::from(tx); | ||
let score = item.calculate_total_fee(basefee); | ||
|
||
let mut inner = self.inner.write(); | ||
Self::add_inner(&mut inner, score, item, self.capacity); | ||
} | ||
|
||
for (score, item) in iter { | ||
/// Add an iterator of transactions to the cache. This locks the cache only once | ||
pub fn add_txs<I>(&self, item: I, basefee: u64) | ||
where | ||
I: IntoIterator<Item = TxEnvelope>, | ||
{ | ||
let mut inner = self.inner.write(); | ||
|
||
for item in item.into_iter() { | ||
let item = SimItem::from(item); | ||
let score = item.calculate_total_fee(basefee); | ||
Self::add_inner(&mut inner, score, item, self.capacity); | ||
} | ||
} | ||
|
||
/// Clean the cache by removing bundles that are not valid in the current | ||
/// block. | ||
pub fn clean(&self, block_number: u64, block_timestamp: u64) { | ||
let mut inner = self.inner.write().unwrap(); | ||
let mut inner = self.inner.write(); | ||
|
||
// Trim to capacity by dropping lower fees. | ||
while inner.len() > self.capacity { | ||
inner.pop_first(); | ||
while inner.items.len() > self.capacity { | ||
if let Some((_, item)) = inner.items.pop_first() { | ||
// Drop the identifier from the seen cache as well. | ||
inner.seen.remove(item.identifier().as_bytes()); | ||
} | ||
} | ||
|
||
inner.retain(|_, value| { | ||
let SimItem::Bundle(bundle) = value else { | ||
return true; | ||
}; | ||
if bundle.bundle.block_number != block_number { | ||
return false; | ||
} | ||
if let Some(timestamp) = bundle.min_timestamp() { | ||
if timestamp > block_timestamp { | ||
return false; | ||
} | ||
} | ||
if let Some(timestamp) = bundle.max_timestamp() { | ||
if timestamp < block_timestamp { | ||
return false; | ||
let CacheInner { ref mut items, ref mut seen } = *inner; | ||
|
||
items.retain(|_, item| { | ||
// Retain only items that are not bundles or are valid in the current block. | ||
if let SimItem::Bundle(bundle) = item { | ||
let should_remove = bundle.bundle.block_number == block_number | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we should remove bundles if they match the current block number? why? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this var looks more like
these are validity conditions |
||
&& bundle.min_timestamp().is_some_and(|ts| ts <= block_timestamp) | ||
&& bundle.max_timestamp().is_some_and(|ts| ts >= block_timestamp); | ||
|
||
let retain = !should_remove; | ||
|
||
if should_remove { | ||
seen.remove(item.identifier().as_bytes()); | ||
} | ||
retain | ||
} else { | ||
true // Non-bundle items are retained | ||
} | ||
true | ||
}) | ||
}); | ||
} | ||
|
||
/// Clear the cache. | ||
pub fn clear(&self) { | ||
let mut inner = self.inner.write().unwrap(); | ||
inner.clear(); | ||
let mut inner = self.inner.write(); | ||
inner.items.clear(); | ||
inner.seen.clear(); | ||
} | ||
} | ||
|
||
/// Internal cache data, meant to be protected by a lock. | ||
struct CacheInner { | ||
items: BTreeMap<u128, SimItem>, | ||
seen: HashSet<SimIdentifier<'static>>, | ||
} | ||
|
||
impl fmt::Debug for CacheInner { | ||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::fmt::Result { | ||
f.debug_struct("CacheInner").finish() | ||
} | ||
} | ||
|
||
impl CacheInner { | ||
fn new() -> Self { | ||
Self { items: BTreeMap::new(), seen: HashSet::new() } | ||
} | ||
} | ||
|
||
#[cfg(test)] | ||
mod test { | ||
use alloy::primitives::b256; | ||
|
||
use super::*; | ||
use crate::SimItem; | ||
|
||
#[test] | ||
fn test_cache() { | ||
let items = vec![ | ||
SimItem::invalid_item_with_score(100, 1), | ||
SimItem::invalid_item_with_score(100, 2), | ||
SimItem::invalid_item_with_score(100, 3), | ||
invalid_tx_with_score(100, 1), | ||
invalid_tx_with_score(100, 2), | ||
invalid_tx_with_score(100, 3), | ||
]; | ||
|
||
let cache = SimCache::with_capacity(2); | ||
cache.add_items(items, 0); | ||
cache.add_txs(items.clone(), 0); | ||
|
||
assert_eq!(cache.len(), 2); | ||
assert_eq!(cache.get(300), Some(SimItem::invalid_item_with_score(100, 3))); | ||
assert_eq!(cache.get(200), Some(SimItem::invalid_item_with_score(100, 2))); | ||
assert_eq!(cache.get(300), Some(items[2].clone().into())); | ||
assert_eq!(cache.get(200), Some(items[1].clone().into())); | ||
assert_eq!(cache.get(100), None); | ||
} | ||
|
||
#[test] | ||
fn overlap_at_zero() { | ||
let items = vec![ | ||
SimItem::invalid_item_with_score(1, 1), | ||
SimItem::invalid_item_with_score(1, 1), | ||
SimItem::invalid_item_with_score(1, 1), | ||
invalid_tx_with_score_and_hash( | ||
1, | ||
1, | ||
b256!("0xb36a5a0066980e8477d5d5cebf023728d3cfb837c719dc7f3aadb73d1a39f11f"), | ||
), | ||
invalid_tx_with_score_and_hash( | ||
1, | ||
1, | ||
b256!("0x04d3629f341cdcc5f72969af3c7638e106b4b5620594e6831d86f03ea048e68a"), | ||
), | ||
invalid_tx_with_score_and_hash( | ||
1, | ||
1, | ||
b256!("0x0f0b6a85c1ef6811bf86e92a3efc09f61feb1deca9da671119aaca040021598a"), | ||
), | ||
]; | ||
|
||
let cache = SimCache::with_capacity(2); | ||
cache.add_items(items, 0); | ||
cache.add_txs(items.clone(), 0); | ||
|
||
dbg!(&*cache.inner.read().unwrap()); | ||
dbg!(&*cache.inner.read()); | ||
|
||
assert_eq!(cache.len(), 2); | ||
assert_eq!(cache.get(0), Some(SimItem::invalid_item_with_score(1, 1))); | ||
assert_eq!(cache.get(1), Some(SimItem::invalid_item_with_score(1, 1))); | ||
assert_eq!(cache.get(0), Some(items[2].clone().into())); | ||
assert_eq!(cache.get(1), Some(items[0].clone().into())); | ||
assert_eq!(cache.get(2), None); | ||
} | ||
|
||
fn invalid_tx_with_score(gas_limit: u64, mpfpg: u128) -> alloy::consensus::TxEnvelope { | ||
let tx = build_alloy_tx(gas_limit, mpfpg); | ||
|
||
TxEnvelope::Eip1559(alloy::consensus::Signed::new_unhashed( | ||
tx, | ||
alloy::signers::Signature::test_signature(), | ||
)) | ||
} | ||
|
||
fn invalid_tx_with_score_and_hash( | ||
gas_limit: u64, | ||
mpfpg: u128, | ||
hash: alloy::primitives::B256, | ||
) -> alloy::consensus::TxEnvelope { | ||
let tx = build_alloy_tx(gas_limit, mpfpg); | ||
|
||
TxEnvelope::Eip1559(alloy::consensus::Signed::new_unchecked( | ||
tx, | ||
alloy::signers::Signature::test_signature(), | ||
hash, | ||
)) | ||
} | ||
|
||
fn build_alloy_tx(gas_limit: u64, mpfpg: u128) -> alloy::consensus::TxEip1559 { | ||
alloy::consensus::TxEip1559 { | ||
gas_limit, | ||
max_priority_fee_per_gas: mpfpg, | ||
max_fee_per_gas: alloy::consensus::constants::GWEI_TO_WEI as u128, | ||
..Default::default() | ||
} | ||
} | ||
} |
Uh oh!
There was an error while loading. Please reload this page.