Skip to content

Commit 62f99c6

Browse files
author
Xuhui Lu
authored
Merge branch 'master' into xlu/sync-api
2 parents e426c5b + c14f23a commit 62f99c6

29 files changed

+1328
-478
lines changed

.github/workflows/ci.yml

+2-2
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ jobs:
2020
- uses: actions-rs/cargo@v1
2121
with:
2222
command: check
23+
args: --all-targets --all-features
2324

2425
fmt:
2526
name: rustfmt
@@ -49,7 +50,7 @@ jobs:
4950
- uses: actions-rs/clippy-check@v1
5051
with:
5152
token: ${{ secrets.GITHUB_TOKEN }}
52-
args: --all-features
53+
args: --all-targets --all-features -- -D clippy::all
5354
name: Clippy Output
5455
unit-test:
5556
name: unit test
@@ -98,7 +99,6 @@ jobs:
9899
path: |
99100
~/.cargo/.crates.toml
100101
~/.cargo/.crates2.json
101-
~/.cargo/bin
102102
~/.cargo/registry/index
103103
~/.cargo/registry/cache
104104
key: ${{ runner.os }}-build-${{ env.cache-name }}-${{ hashFiles('Cargo.lock') }}

Cargo.toml

+3-2
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,8 @@ serde_derive = "1.0"
3434
slog = { version = "2.3", features = ["max_level_trace", "release_max_level_debug"] }
3535
slog-term = { version = "2.4" }
3636
thiserror = "1"
37-
tokio = { version = "1.0", features = [ "sync", "time" ] }
37+
tokio = { version = "1", features = [ "sync", "rt-multi-thread", "macros" ] }
38+
async-recursion = "0.3"
3839

3940
tikv-client-common = { version = "0.1.0", path = "tikv-client-common" }
4041
tikv-client-pd = { version = "0.1.0", path = "tikv-client-pd" }
@@ -50,7 +51,7 @@ proptest = "1"
5051
proptest-derive = "0.3"
5152
serial_test = "0.5.0"
5253
simple_logger = "1"
53-
tokio = { version = "1.0", features = [ "sync", "rt-multi-thread", "macros" ] }
54+
tokio = { version = "1", features = [ "sync", "rt-multi-thread", "macros" ] }
5455
reqwest = {version = "0.11", default-features = false, features = ["native-tls-vendored"]}
5556
serde_json = "1"
5657

Makefile

+2-2
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,9 @@
33
default: check
44

55
check:
6-
cargo check --all
6+
cargo check --all --all-targets --all-features
77
cargo fmt -- --check
8-
cargo clippy -- -D clippy::all
8+
cargo clippy --all-targets --all-features -- -D clippy::all
99

1010
unit-test:
1111
cargo test --all

src/backoff.rs

+5
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,11 @@ impl Backoff {
7676
self.kind == BackoffKind::None
7777
}
7878

79+
/// Returns the number of attempts
80+
pub fn current_attempts(&self) -> u32 {
81+
self.current_attempts
82+
}
83+
7984
/// Don't wait. Usually indicates that we should not retry a request.
8085
pub const fn no_backoff() -> Backoff {
8186
Backoff {

src/lib.rs

+1
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ mod pd;
104104
#[doc(hidden)]
105105
pub mod raw;
106106
mod region;
107+
mod region_cache;
107108
mod stats;
108109
mod store;
109110
mod timestamp;

src/mock.rs

+44-19
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,13 @@
77
88
use crate::{
99
pd::{PdClient, PdRpcClient, RetryClient},
10-
region::{Region, RegionId},
11-
store::Store,
10+
region::{RegionId, RegionWithLeader},
11+
store::RegionStore,
1212
Config, Error, Key, Result, Timestamp,
1313
};
1414
use async_trait::async_trait;
1515
use derive_new::new;
16+
use slog::{Drain, Logger};
1617
use std::{any::Any, sync::Arc};
1718
use tikv_client_proto::metapb;
1819
use tikv_client_store::{KvClient, KvConnect, Request};
@@ -21,8 +22,16 @@ use tikv_client_store::{KvClient, KvConnect, Request};
2122
/// client can be tested without doing any RPC calls.
2223
pub async fn pd_rpc_client() -> PdRpcClient<MockKvConnect, MockCluster> {
2324
let config = Config::default();
25+
let plain = slog_term::PlainSyncDecorator::new(std::io::stdout());
26+
let logger = Logger::root(
27+
slog_term::FullFormat::new(plain)
28+
.build()
29+
.filter_level(slog::Level::Info)
30+
.fuse(),
31+
o!(),
32+
);
2433
PdRpcClient::new(
25-
&config,
34+
config.clone(),
2635
|_, _| MockKvConnect,
2736
|e, sm| {
2837
futures::future::ok(RetryClient::new_with_cluster(
@@ -33,11 +42,13 @@ pub async fn pd_rpc_client() -> PdRpcClient<MockKvConnect, MockCluster> {
3342
))
3443
},
3544
false,
45+
logger,
3646
)
3747
.await
3848
.unwrap()
3949
}
4050

51+
#[allow(clippy::type_complexity)]
4152
#[derive(new, Default, Clone)]
4253
pub struct MockKvClient {
4354
pub addr: String,
@@ -93,27 +104,31 @@ impl MockPdClient {
93104
}
94105
}
95106

96-
pub fn region1() -> Region {
97-
let mut region = Region::default();
107+
pub fn region1() -> RegionWithLeader {
108+
let mut region = RegionWithLeader::default();
98109
region.region.id = 1;
99110
region.region.set_start_key(vec![0]);
100111
region.region.set_end_key(vec![10]);
101112

102-
let mut leader = metapb::Peer::default();
103-
leader.store_id = 41;
113+
let leader = metapb::Peer {
114+
store_id: 41,
115+
..Default::default()
116+
};
104117
region.leader = Some(leader);
105118

106119
region
107120
}
108121

109-
pub fn region2() -> Region {
110-
let mut region = Region::default();
122+
pub fn region2() -> RegionWithLeader {
123+
let mut region = RegionWithLeader::default();
111124
region.region.id = 2;
112125
region.region.set_start_key(vec![10]);
113126
region.region.set_end_key(vec![250, 250]);
114127

115-
let mut leader = metapb::Peer::default();
116-
leader.store_id = 42;
128+
let leader = metapb::Peer {
129+
store_id: 42,
130+
..Default::default()
131+
};
117132
region.leader = Some(leader);
118133

119134
region
@@ -124,11 +139,11 @@ impl MockPdClient {
124139
impl PdClient for MockPdClient {
125140
type KvClient = MockKvClient;
126141

127-
async fn map_region_to_store(self: Arc<Self>, region: Region) -> Result<Store> {
128-
Ok(Store::new(region, Arc::new(self.client.clone())))
142+
async fn map_region_to_store(self: Arc<Self>, region: RegionWithLeader) -> Result<RegionStore> {
143+
Ok(RegionStore::new(region, Arc::new(self.client.clone())))
129144
}
130145

131-
async fn region_for_key(&self, key: &Key) -> Result<Region> {
146+
async fn region_for_key(&self, key: &Key) -> Result<RegionWithLeader> {
132147
let bytes: &[_] = key.into();
133148
let region = if bytes.is_empty() || bytes[0] < 10 {
134149
Self::region1()
@@ -139,11 +154,11 @@ impl PdClient for MockPdClient {
139154
Ok(region)
140155
}
141156

142-
async fn region_for_id(&self, id: RegionId) -> Result<Region> {
157+
async fn region_for_id(&self, id: RegionId) -> Result<RegionWithLeader> {
143158
match id {
144159
1 => Ok(Self::region1()),
145160
2 => Ok(Self::region2()),
146-
_ => Err(Error::RegionNotFound { region_id: id }),
161+
_ => Err(Error::RegionNotFoundInResponse { region_id: id }),
147162
}
148163
}
149164

@@ -154,11 +169,21 @@ impl PdClient for MockPdClient {
154169
async fn update_safepoint(self: Arc<Self>, _safepoint: u64) -> Result<bool> {
155170
unimplemented!()
156171
}
172+
173+
async fn update_leader(
174+
&self,
175+
_ver_id: crate::region::RegionVerId,
176+
_leader: metapb::Peer,
177+
) -> Result<()> {
178+
todo!()
179+
}
180+
181+
async fn invalidate_region_cache(&self, _ver_id: crate::region::RegionVerId) {}
157182
}
158183

159-
pub fn mock_store() -> Store {
160-
Store {
161-
region: Region::default(),
184+
pub fn mock_store() -> RegionStore {
185+
RegionStore {
186+
region_with_leader: RegionWithLeader::default(),
162187
client: Arc::new(MockKvClient::new("foo".to_owned(), None)),
163188
}
164189
}

0 commit comments

Comments
 (0)