Skip to content

Commit a4fe585

Browse files
committed
introduce region cache codec interface
1 parent 222908e commit a4fe585

File tree

13 files changed

+83
-74
lines changed

13 files changed

+83
-74
lines changed

src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@
9090
//! # })}
9191
//! ```
9292
93+
#![allow(incomplete_features)]
9394
#![feature(specialization)]
9495
#![feature(explicit_generic_args_with_impl_trait)]
9596
#[macro_use]

src/pd/client.rs

Lines changed: 6 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
// Copyright 2018 TiKV Project Authors. Licensed under Apache-2.0.
22

3-
use std::{collections::HashMap, marker::PhantomData, sync::Arc, thread};
3+
use std::{collections::HashMap, sync::Arc, thread};
44

55
use async_trait::async_trait;
66
use futures::{prelude::*, stream::BoxStream};
@@ -219,9 +219,8 @@ pub struct PdRpcClient<C, KvC: KvConnect + Send + Sync + 'static = TikvConnect,
219219
pd: Arc<RetryClient<Cl>>,
220220
kv_connect: KvC,
221221
kv_client_cache: Arc<RwLock<HashMap<String, KvC::KvClient>>>,
222-
region_cache: RegionCache<RetryClient<Cl>>,
222+
region_cache: RegionCache<C, RetryClient<Cl>>,
223223
logger: Logger,
224-
codec: C,
225224
}
226225

227226
#[async_trait]
@@ -237,20 +236,11 @@ impl<C: RequestCodec, KvC: KvConnect + Send + Sync + 'static> PdClient for PdRpc
237236
}
238237

239238
async fn region_for_key(&self, key: &Key) -> Result<RegionWithLeader> {
240-
let enable_codec = self.enable_codec;
241-
let key = if enable_codec {
242-
key.to_encoded()
243-
} else {
244-
key.clone()
245-
};
246-
247-
let region = self.region_cache.get_region_by_key(&key).await?;
248-
Self::decode_region(region, enable_codec)
239+
self.region_cache.get_region_by_key(&key).await
249240
}
250241

251242
async fn region_for_id(&self, id: RegionId) -> Result<RegionWithLeader> {
252-
let region = self.region_cache.get_region_by_id(id).await?;
253-
Self::decode_region(region, self.enable_codec)
243+
self.region_cache.get_region_by_id(id).await
254244
}
255245

256246
async fn get_timestamp(self: Arc<Self>) -> Result<Timestamp> {
@@ -270,7 +260,7 @@ impl<C: RequestCodec, KvC: KvConnect + Send + Sync + 'static> PdClient for PdRpc
270260
}
271261

272262
fn get_request_codec(&self) -> Self::RequestCodec {
273-
todo!()
263+
self.region_cache.get_request_codec()
274264
}
275265
}
276266

@@ -338,9 +328,8 @@ impl<C, KvC: KvConnect + Send + Sync + 'static, Cl> PdRpcClient<C, KvC, Cl> {
338328
pd: pd.clone(),
339329
kv_client_cache,
340330
kv_connect: kv_connect(env, security_mgr),
341-
region_cache: RegionCache::new(pd),
331+
region_cache: RegionCache::new(codec, pd),
342332
logger,
343-
codec
344333
})
345334
}
346335

src/raw/client.rs

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,12 @@ use tikv_client_common::Error;
99
use tikv_client_proto::metapb;
1010

1111
use crate::{
12-
Backoff,
1312
backoff::DEFAULT_REGION_BACKOFF,
14-
BoundRange,
15-
ColumnFamily,
1613
config::Config,
17-
Key, KvPair, pd::{PdClient, PdRpcClient}, raw::lowering::*, request::{Collect, CollectSingle, Plan, request_codec::RequestCodec}, Result, Value,
14+
pd::{PdClient, PdRpcClient},
15+
raw::lowering::*,
16+
request::{request_codec::RequestCodec, Collect, CollectSingle, Plan},
17+
Backoff, BoundRange, ColumnFamily, Key, KvPair, Result, Value,
1818
};
1919

2020
const MAX_RAW_KV_SCAN_LIMIT: u32 = 10240;
@@ -215,7 +215,7 @@ impl<C: RequestCodec, PdC: PdClient> Client<C, PdC> {
215215
/// ```
216216
pub async fn batch_get(
217217
&self,
218-
keys: impl IntoIterator<Item=impl Into<Key>>,
218+
keys: impl IntoIterator<Item = impl Into<Key>>,
219219
) -> Result<Vec<KvPair>> {
220220
debug!(self.logger, "invoking raw batch_get request");
221221
let request =
@@ -277,7 +277,7 @@ impl<C: RequestCodec, PdC: PdClient> Client<C, PdC> {
277277
/// ```
278278
pub async fn batch_put(
279279
&self,
280-
pairs: impl IntoIterator<Item=impl Into<KvPair>>,
280+
pairs: impl IntoIterator<Item = impl Into<KvPair>>,
281281
) -> Result<()> {
282282
debug!(self.logger, "invoking raw batch_put request");
283283
let request = new_raw_batch_put_request::<C>(
@@ -339,7 +339,7 @@ impl<C: RequestCodec, PdC: PdClient> Client<C, PdC> {
339339
/// let result: () = req.await.unwrap();
340340
/// # });
341341
/// ```
342-
pub async fn batch_delete(&self, keys: impl IntoIterator<Item=impl Into<Key>>) -> Result<()> {
342+
pub async fn batch_delete(&self, keys: impl IntoIterator<Item = impl Into<Key>>) -> Result<()> {
343343
debug!(self.logger, "invoking raw batch_delete request");
344344
self.assert_non_atomic()?;
345345
let request =
@@ -465,7 +465,7 @@ impl<C: RequestCodec, PdC: PdClient> Client<C, PdC> {
465465
/// ```
466466
pub async fn batch_scan(
467467
&self,
468-
ranges: impl IntoIterator<Item=impl Into<BoundRange>>,
468+
ranges: impl IntoIterator<Item = impl Into<BoundRange>>,
469469
each_limit: u32,
470470
) -> Result<Vec<KvPair>> {
471471
debug!(self.logger, "invoking raw batch_scan request");
@@ -497,7 +497,7 @@ impl<C: RequestCodec, PdC: PdClient> Client<C, PdC> {
497497
/// ```
498498
pub async fn batch_scan_keys(
499499
&self,
500-
ranges: impl IntoIterator<Item=impl Into<BoundRange>>,
500+
ranges: impl IntoIterator<Item = impl Into<BoundRange>>,
501501
each_limit: u32,
502502
) -> Result<Vec<Key>> {
503503
debug!(self.logger, "invoking raw batch_scan_keys request");
@@ -547,7 +547,7 @@ impl<C: RequestCodec, PdC: PdClient> Client<C, PdC> {
547547
&self,
548548
copr_name: impl Into<String>,
549549
copr_version_req: impl Into<String>,
550-
ranges: impl IntoIterator<Item=impl Into<BoundRange>>,
550+
ranges: impl IntoIterator<Item = impl Into<BoundRange>>,
551551
request_builder: impl Fn(metapb::Region, Vec<Range<Key>>) -> Vec<u8> + Send + Sync + 'static,
552552
) -> Result<Vec<(Vec<u8>, Vec<Range<Key>>)>> {
553553
let copr_version_req = copr_version_req.into();
@@ -593,7 +593,7 @@ impl<C: RequestCodec, PdC: PdClient> Client<C, PdC> {
593593

594594
async fn batch_scan_inner(
595595
&self,
596-
ranges: impl IntoIterator<Item=impl Into<BoundRange>>,
596+
ranges: impl IntoIterator<Item = impl Into<BoundRange>>,
597597
each_limit: u32,
598598
key_only: bool,
599599
) -> Result<Vec<KvPair>> {

src/raw/lowering.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,13 @@
44
//! types (i.e., the types from the client crate) and converts these to the types used in the
55
//! generated protobuf code, then calls the low-level ctor functions in the requests module.
66
7-
use std::{iter::Iterator, marker::PhantomData, ops::Range, sync::Arc};
7+
use std::{iter::Iterator, ops::Range, sync::Arc};
88

99
use tikv_client_proto::{kvrpcpb, metapb};
1010

1111
use crate::{
12-
raw::requests,
13-
request::{request_codec::RequestCodec, KvRequest},
14-
BoundRange, ColumnFamily, Key, KvPair, Value,
12+
raw::requests, request::request_codec::RequestCodec, BoundRange, ColumnFamily, Key, KvPair,
13+
Value,
1514
};
1615

1716
pub fn new_raw_get_request<C: RequestCodec>(

src/raw/requests.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,8 @@ use crate::{
1313
collect_first,
1414
pd::PdClient,
1515
request::{
16-
plan::ResponseWithShard,
17-
request_codec::{RawApiV1, RequestCodec},
18-
Collect, CollectSingle, DefaultProcessor, KvRequest, Merge, Process, Shardable, SingleKey,
16+
plan::ResponseWithShard, request_codec::RequestCodec, Collect, CollectSingle,
17+
DefaultProcessor, KvRequest, Merge, Process, Shardable, SingleKey,
1918
},
2019
store::{store_stream_for_keys, store_stream_for_ranges, RegionStore},
2120
transaction::HasLocks,

src/region_cache.rs

Lines changed: 40 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,22 @@
11
// Copyright 2021 TiKV Project Authors. Licensed under Apache-2.0.
22

3-
use crate::{
4-
pd::{RetryClient, RetryClientTrait},
5-
region::{RegionId, RegionVerId, RegionWithLeader, StoreId},
6-
Key, Result,
7-
};
83
use std::{
94
collections::{BTreeMap, HashMap, HashSet},
105
sync::Arc,
116
};
7+
8+
use tokio::sync::{Notify, RwLock};
9+
1210
use tikv_client_common::Error;
1311
use tikv_client_pd::Cluster;
1412
use tikv_client_proto::metapb::{self, Store};
15-
use tokio::sync::{Notify, RwLock};
13+
14+
use crate::{
15+
pd::{RetryClient, RetryClientTrait},
16+
region::{RegionId, RegionVerId, RegionWithLeader, StoreId},
17+
request::request_codec::RequestCodec,
18+
Key, Result,
19+
};
1620

1721
const MAX_RETRY_WAITING_CONCURRENT_REQUEST: usize = 4;
1822

@@ -44,23 +48,25 @@ impl RegionCacheMap {
4448
}
4549
}
4650

47-
pub struct RegionCache<Client = RetryClient<Cluster>> {
51+
pub struct RegionCache<C, Client = RetryClient<Cluster>> {
4852
region_cache: RwLock<RegionCacheMap>,
4953
store_cache: RwLock<HashMap<StoreId, Store>>,
5054
inner_client: Arc<Client>,
55+
codec: C,
5156
}
5257

53-
impl<Client> RegionCache<Client> {
54-
pub fn new(inner_client: Arc<Client>) -> RegionCache<Client> {
58+
impl<C, Client> RegionCache<C, Client> {
59+
pub fn new(codec: C, inner_client: Arc<Client>) -> Self {
5560
RegionCache {
5661
region_cache: RwLock::new(RegionCacheMap::new()),
5762
store_cache: RwLock::new(HashMap::new()),
5863
inner_client,
64+
codec,
5965
}
6066
}
6167
}
6268

63-
impl<C: RetryClientTrait> RegionCache<C> {
69+
impl<C: RequestCodec, R: RetryClientTrait> RegionCache<C, R> {
6470
// Retrieve cache entry by key. If there's no entry, query PD and update cache.
6571
pub async fn get_region_by_key(&self, key: &Key) -> Result<RegionWithLeader> {
6672
let region_cache_guard = self.region_cache.read().await;
@@ -126,7 +132,12 @@ impl<C: RetryClientTrait> RegionCache<C> {
126132

127133
/// Force read through (query from PD) and update cache
128134
pub async fn read_through_region_by_key(&self, key: Key) -> Result<RegionWithLeader> {
129-
let region = self.inner_client.clone().get_region(key.into()).await?;
135+
let mut region = self
136+
.inner_client
137+
.clone()
138+
.get_region(self.codec.encode_pd_query(key).into())
139+
.await?;
140+
region.region = self.codec.decode_region(region.region)?;
130141
self.add_region(region.clone()).await;
131142
Ok(region)
132143
}
@@ -140,7 +151,8 @@ impl<C: RetryClientTrait> RegionCache<C> {
140151
region_cache_guard.on_my_way_id.insert(id, notify.clone());
141152
}
142153

143-
let region = self.inner_client.clone().get_region_by_id(id).await?;
154+
let mut region = self.inner_client.clone().get_region_by_id(id).await?;
155+
region.region = self.codec.decode_region(region.region)?;
144156
self.add_region(region.clone()).await;
145157

146158
// notify others
@@ -226,27 +238,35 @@ impl<C: RetryClientTrait> RegionCache<C> {
226238
cache.key_to_ver_id.remove(&start_key);
227239
}
228240
}
241+
242+
pub fn get_request_codec(&self) -> C {
243+
self.codec.clone()
244+
}
229245
}
230246

231247
#[cfg(test)]
232248
mod test {
233-
use super::RegionCache;
234-
use crate::{
235-
pd::RetryClientTrait,
236-
region::{RegionId, RegionWithLeader},
237-
Key, Result,
238-
};
239-
use async_trait::async_trait;
240249
use std::{
241250
collections::{BTreeMap, HashMap, HashSet},
242251
sync::{
243252
atomic::{AtomicU64, Ordering::SeqCst},
244253
Arc,
245254
},
246255
};
256+
257+
use async_trait::async_trait;
258+
use tokio::sync::Mutex;
259+
247260
use tikv_client_common::Error;
248261
use tikv_client_proto::metapb;
249-
use tokio::sync::Mutex;
262+
263+
use crate::{
264+
pd::RetryClientTrait,
265+
region::{RegionId, RegionWithLeader},
266+
Key, Result,
267+
};
268+
269+
use super::RegionCache;
250270

251271
#[derive(Default)]
252272
struct MockRetryClient {

src/request/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ use tikv_client_store::{HasKeyErrors, HasRegionError, Request};
99

1010
use crate::{
1111
backoff::{Backoff, DEFAULT_REGION_BACKOFF, OPTIMISTIC_BACKOFF, PESSIMISTIC_BACKOFF},
12-
pd::PdClient,
1312
transaction::HasLocks,
1413
};
1514

src/request/plan_builder.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ where
4545
PdC: PdClient<RequestCodec = C>,
4646
{
4747
pub fn new(pd_client: Arc<PdC>, request: Req) -> Self {
48-
let codec = pd_client.get_request_codec().clone();
48+
let codec = pd_client.get_request_codec();
4949
PlanBuilder {
5050
pd_client,
5151
plan: Dispatch::new(request, None, codec),

src/request/request_codec.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,18 @@
11
use tikv_client_proto::metapb::Region;
22

3-
use crate::Result;
3+
use crate::{Key, Result};
44

55
pub trait RequestCodec: Sized + Clone + Sync + Send + 'static {
6-
fn encode_key(&self, key: Vec<u8>) -> Vec<u8> {
6+
fn encode_key(&self, key: Key) -> Key {
77
key
88
}
9-
fn decode_key(&self, key: Vec<u8>) -> Result<Vec<u8>> {
9+
fn decode_key(&self, key: Key) -> Result<Key> {
1010
Ok(key)
1111
}
12-
fn encode_range(&self, start: Vec<u8>, end: Vec<u8>) -> (Vec<u8>, Vec<u8>) {
12+
fn encode_range(&self, start: Key, end: Key) -> (Key, Key) {
1313
(start, end)
1414
}
15-
fn encode_pd_query(&self, key: Vec<u8>) -> Vec<u8> {
15+
fn encode_pd_query(&self, key: Key) -> Key {
1616
key
1717
}
1818
fn decode_region(&self, region: Region) -> Result<Region> {

src/transaction/client.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,10 @@ use crate::{
1010
backoff::{DEFAULT_REGION_BACKOFF, OPTIMISTIC_BACKOFF},
1111
config::Config,
1212
pd::{PdClient, PdRpcClient},
13-
request::{Plan, request_codec::RequestCodec},
14-
Result,
13+
request::{request_codec::RequestCodec, Plan},
1514
timestamp::TimestampExt,
1615
transaction::{Snapshot, Transaction, TransactionOptions},
16+
Result,
1717
};
1818

1919
use super::{requests::new_scan_lock_request, resolve_locks};
@@ -44,8 +44,8 @@ pub struct Client<C> {
4444
}
4545

4646
impl<C> Client<C>
47-
where
48-
C: RequestCodec,
47+
where
48+
C: RequestCodec,
4949
{
5050
/// Create a transactional [`Client`] and connect to the TiKV cluster.
5151
///
@@ -113,7 +113,8 @@ impl<C> Client<C>
113113
});
114114
debug!(logger, "creating new transactional client");
115115
let pd_endpoints: Vec<String> = pd_endpoints.into_iter().map(Into::into).collect();
116-
let pd = Arc::new(PdRpcClient::connect(&pd_endpoints, config, codec, logger.clone()).await?);
116+
let pd =
117+
Arc::new(PdRpcClient::connect(&pd_endpoints, config, codec, logger.clone()).await?);
117118
Ok(Client {
118119
pd,
119120
logger,

0 commit comments

Comments
 (0)