Skip to content

Commit c628111

Browse files
authored
refactor: simplify MapApi (#13063)
* refactor: simplify MapApi - Add trait `MapKey` and `MapValue` to define behavior of key values that are used in `MapApi`. - Add Ref and RefMut as container of reference to leveled data. - Collect get() and range() implementation into function. The MapApi trait can not be generalized to adapt arbitrary lifetime, such as using `self` instead of `&self`: `MapApiRO { fn get(self, key) }`. Because there is a known limitation with rust GAT and hihger ranked lifetime: See: rust-lang/rust#114046 ``` error: implementation of `MapApiRO` is not general enough --> src/meta/raft-store/src/sm_v002/sm_v002.rs:80:74 | 80 | async fn get_kv(&self, key: &str) -> Result<GetKVReply, Self::Error> { | __________________________________________________________________________^ 81 | | let got = self.sm.get_kv(key).await; 82 | | 83 | | let local_now_ms = SeqV::<()>::now_ms(); 84 | | let got = Self::non_expired(got, local_now_ms); 85 | | Ok(got) 86 | | } | |_____^ implementation of `MapApiRO` is not general enough | = note: `MapApiRO<'1, std::string::String>` would have to be implemented for the type `&'0 LevelData`, for any two lifetimes `'0` and `'1`... = note: ...but `MapApiRO<'2, std::string::String>` is actually implemented for the type `&'2 LevelData`, for some specific lifetime `'2` ``` * refactor: move Ref and RefMut to separate files * refactor: rename is_not_found() to not_found() As @ariesdevil kindly suggested. * refactor: simplify map api names: LevelData to Level * chore: fix clippy
1 parent b17e271 commit c628111

15 files changed

+503
-208
lines changed

src/meta/raft-store/src/applier.rs

+14-6
Original file line numberDiff line numberDiff line change
@@ -191,11 +191,22 @@ impl<'a> Applier<'a> {
191191
Change::new(prev, result).into()
192192
}
193193

194+
// TODO(1): when get an applier, pass in a now_ms to ensure all expired are cleaned.
195+
/// Update or insert a kv entry.
196+
///
197+
/// If the input entry has expired, it performs a delete operation.
194198
#[minitrace::trace]
195-
async fn upsert_kv(&mut self, upsert_kv: &UpsertKV) -> (Option<SeqV>, Option<SeqV>) {
199+
pub(crate) async fn upsert_kv(&mut self, upsert_kv: &UpsertKV) -> (Option<SeqV>, Option<SeqV>) {
196200
debug!(upsert_kv = as_debug!(upsert_kv); "upsert_kv");
197201

198-
let (prev, result) = self.sm.upsert_kv(upsert_kv.clone()).await;
202+
let (prev, result) = self.sm.upsert_kv_primary_index(upsert_kv).await;
203+
204+
self.sm
205+
.update_expire_index(&upsert_kv.key, &prev, &result)
206+
.await;
207+
208+
let prev = Into::<Option<SeqV>>::into(prev);
209+
let result = Into::<Option<SeqV>>::into(result);
199210

200211
debug!(
201212
"applied UpsertKV: {:?}; prev: {:?}; result: {:?}",
@@ -209,7 +220,6 @@ impl<'a> Applier<'a> {
209220
}
210221

211222
#[minitrace::trace]
212-
213223
async fn apply_txn(&mut self, req: &TxnRequest) -> AppliedState {
214224
debug!(txn = as_display!(req); "apply txn cmd");
215225

@@ -445,9 +455,7 @@ impl<'a> Applier<'a> {
445455
assert_eq!(expire_key.seq, seq_v.seq);
446456
info!("clean expired: {}, {}", key, expire_key);
447457

448-
self.sm.upsert_kv(UpsertKV::delete(key.clone())).await;
449-
// dbg!("clean_expired", &key, &curr);
450-
self.push_change(key, curr, None);
458+
self.upsert_kv(&UpsertKV::delete(key.clone())).await;
451459
} else {
452460
unreachable!(
453461
"trying to remove un-cleanable: {}, {}, kv-entry: {:?}",

src/meta/raft-store/src/lib.rs

+1
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
// limitations under the License.
1414

1515
#![allow(clippy::uninlined_format_args)]
16+
#![feature(impl_trait_in_assoc_type)]
1617
// #![feature(type_alias_impl_trait)]
1718

1819
// #![allow(incomplete_features)]

src/meta/raft-store/src/sm_v002/importer.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use common_meta_types::LogId;
2020
use common_meta_types::StoredMembership;
2121

2222
use crate::key_spaces::RaftStoreEntry;
23-
use crate::sm_v002::leveled_store::level_data::LevelData;
23+
use crate::sm_v002::leveled_store::level::Level;
2424
use crate::sm_v002::leveled_store::sys_data_api::SysDataApiRO;
2525
use crate::sm_v002::marked::Marked;
2626
use crate::state_machine::ExpireKey;
@@ -29,7 +29,7 @@ use crate::state_machine::StateMachineMetaKey;
2929
/// A container of temp data that are imported to a LevelData.
3030
#[derive(Debug, Default)]
3131
pub struct Importer {
32-
level_data: LevelData,
32+
level_data: Level,
3333

3434
kv: BTreeMap<String, Marked>,
3535
expire: BTreeMap<ExpireKey, Marked<String>>,
@@ -109,7 +109,7 @@ impl Importer {
109109
Ok(())
110110
}
111111

112-
pub fn commit(mut self) -> LevelData {
112+
pub fn commit(mut self) -> Level {
113113
let d = &mut self.level_data;
114114

115115
d.replace_kv(self.kv);

src/meta/raft-store/src/sm_v002/leveled_store/level_data.rs renamed to src/meta/raft-store/src/sm_v002/leveled_store/level.rs

+41-32
Original file line numberDiff line numberDiff line change
@@ -22,16 +22,24 @@ use futures_util::StreamExt;
2222

2323
use crate::sm_v002::leveled_store::map_api::MapApi;
2424
use crate::sm_v002::leveled_store::map_api::MapApiRO;
25+
use crate::sm_v002::leveled_store::map_api::MapKey;
2526
use crate::sm_v002::leveled_store::sys_data::SysData;
2627
use crate::sm_v002::leveled_store::sys_data_api::SysDataApiRO;
2728
use crate::sm_v002::marked::Marked;
2829
use crate::state_machine::ExpireKey;
2930

31+
impl MapKey for String {
32+
type V = Vec<u8>;
33+
}
34+
impl MapKey for ExpireKey {
35+
type V = String;
36+
}
37+
3038
/// A single level of state machine data.
3139
///
3240
/// State machine data is composed of multiple levels.
3341
#[derive(Debug, Default)]
34-
pub struct LevelData {
42+
pub struct Level {
3543
/// System data(non-user data).
3644
sys_data: SysData,
3745

@@ -42,7 +50,7 @@ pub struct LevelData {
4250
expire: BTreeMap<ExpireKey, Marked<String>>,
4351
}
4452

45-
impl LevelData {
53+
impl Level {
4654
/// Create a new level that is based on this level.
4755
pub(crate) fn new_level(&self) -> Self {
4856
Self {
@@ -74,35 +82,36 @@ impl LevelData {
7482
}
7583

7684
#[async_trait::async_trait]
77-
impl MapApiRO<String> for LevelData {
78-
type V = Vec<u8>;
79-
80-
async fn get<Q>(&self, key: &Q) -> Marked<Self::V>
85+
impl MapApiRO<String> for Level {
86+
async fn get<Q>(&self, key: &Q) -> Marked<<String as MapKey>::V>
8187
where
8288
String: Borrow<Q>,
8389
Q: Ord + Send + Sync + ?Sized,
8490
{
8591
self.kv.get(key).cloned().unwrap_or(Marked::empty())
8692
}
8793

88-
async fn range<'a, T, R>(&'a self, range: R) -> BoxStream<'a, (String, Marked)>
94+
async fn range<'f, Q, R>(
95+
&'f self,
96+
range: R,
97+
) -> BoxStream<'f, (String, Marked<<String as MapKey>::V>)>
8998
where
90-
String: 'a,
91-
String: Borrow<T>,
92-
T: Ord + ?Sized,
93-
R: RangeBounds<T> + Send,
99+
String: Borrow<Q>,
100+
Q: Ord + Send + Sync + ?Sized,
101+
R: RangeBounds<Q> + Clone + Send + Sync,
94102
{
95-
futures::stream::iter(self.kv.range(range).map(|(k, v)| (k.clone(), v.clone()))).boxed()
103+
let it = self.kv.range(range).map(|(k, v)| (k.clone(), v.clone()));
104+
futures::stream::iter(it).boxed()
96105
}
97106
}
98107

99108
#[async_trait::async_trait]
100-
impl MapApi<String> for LevelData {
109+
impl MapApi<String> for Level {
101110
async fn set(
102111
&mut self,
103112
key: String,
104-
value: Option<(Self::V, Option<KVMeta>)>,
105-
) -> (Marked<Self::V>, Marked<Self::V>) {
113+
value: Option<(<String as MapKey>::V, Option<KVMeta>)>,
114+
) -> (Marked<<String as MapKey>::V>, Marked<<String as MapKey>::V>) {
106115
// The chance it is the bottom level is very low in a loaded system.
107116
// Thus we always tombstone the key if it is None.
108117

@@ -117,33 +126,30 @@ impl MapApi<String> for LevelData {
117126
Marked::new_tomb_stone(seq)
118127
};
119128

120-
let prev = MapApiRO::<String>::get(self, key.as_str()).await;
129+
let prev = MapApiRO::<String>::get(&*self, key.as_str()).await;
121130
self.kv.insert(key, marked.clone());
122131
(prev, marked)
123132
}
124133
}
125134

126135
#[async_trait::async_trait]
127-
impl MapApiRO<ExpireKey> for LevelData {
128-
type V = String;
129-
130-
async fn get<Q>(&self, key: &Q) -> Marked<Self::V>
136+
impl MapApiRO<ExpireKey> for Level {
137+
async fn get<Q>(&self, key: &Q) -> Marked<<ExpireKey as MapKey>::V>
131138
where
132139
ExpireKey: Borrow<Q>,
133140
Q: Ord + Send + Sync + ?Sized,
134141
{
135142
self.expire.get(key).cloned().unwrap_or(Marked::empty())
136143
}
137144

138-
async fn range<'a, T: ?Sized, R>(
139-
&'a self,
145+
async fn range<'f, Q, R>(
146+
&'f self,
140147
range: R,
141-
) -> BoxStream<'a, (ExpireKey, Marked<String>)>
148+
) -> BoxStream<'f, (ExpireKey, Marked<<ExpireKey as MapKey>::V>)>
142149
where
143-
ExpireKey: 'a,
144-
ExpireKey: Borrow<T>,
145-
T: Ord,
146-
R: RangeBounds<T> + Send,
150+
ExpireKey: Borrow<Q>,
151+
Q: Ord + Send + Sync + ?Sized,
152+
R: RangeBounds<Q> + Clone + Send + Sync,
147153
{
148154
let it = self
149155
.expire
@@ -155,12 +161,15 @@ impl MapApiRO<ExpireKey> for LevelData {
155161
}
156162

157163
#[async_trait::async_trait]
158-
impl MapApi<ExpireKey> for LevelData {
164+
impl MapApi<ExpireKey> for Level {
159165
async fn set(
160166
&mut self,
161167
key: ExpireKey,
162-
value: Option<(Self::V, Option<KVMeta>)>,
163-
) -> (Marked<Self::V>, Marked<Self::V>) {
168+
value: Option<(<ExpireKey as MapKey>::V, Option<KVMeta>)>,
169+
) -> (
170+
Marked<<ExpireKey as MapKey>::V>,
171+
Marked<<ExpireKey as MapKey>::V>,
172+
) {
164173
// dbg!("set expire", &key, &value);
165174

166175
let seq = self.curr_seq();
@@ -171,13 +180,13 @@ impl MapApi<ExpireKey> for LevelData {
171180
Marked::TombStone { internal_seq: seq }
172181
};
173182

174-
let prev = MapApiRO::<ExpireKey>::get(self, &key).await;
183+
let prev = MapApiRO::<ExpireKey>::get(&*self, &key).await;
175184
self.expire.insert(key, marked.clone());
176185
(prev, marked)
177186
}
178187
}
179188

180-
impl AsRef<SysData> for LevelData {
189+
impl AsRef<SysData> for Level {
181190
fn as_ref(&self) -> &SysData {
182191
&self.sys_data
183192
}

0 commit comments

Comments
 (0)