Skip to content

Commit abf22ba

Browse files
authored
fix raw scan (#409)
* fix raw scan Signed-off-by: Smityz <[email protected]> * fix Signed-off-by: Smityz <[email protected]> --------- Signed-off-by: Smityz <[email protected]>
1 parent 8b3ada2 commit abf22ba

File tree

4 files changed

+148
-18
lines changed

4 files changed

+148
-18
lines changed

src/common/errors.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ use std::result;
44

55
use thiserror::Error;
66

7+
use crate::BoundRange;
8+
79
/// An error originating from the TiKV client or dependencies.
810
#[derive(Debug, Error)]
911
#[allow(clippy::large_enum_variant)]
@@ -80,6 +82,8 @@ pub enum Error {
8082
/// No region is found for the given key.
8183
#[error("Region is not found for key: {:?}", key)]
8284
RegionForKeyNotFound { key: Vec<u8> },
85+
#[error("Region is not found for range: {:?}", range)]
86+
RegionForRangeNotFound { range: BoundRange },
8387
/// No region is found for the given id. note: distinguish it with the RegionNotFound error in errorpb.
8488
#[error("Region {} is not found in the response", region_id)]
8589
RegionNotFoundInResponse { region_id: u64 },

src/kv/bound_range.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,15 +67,15 @@ use crate::proto::kvrpcpb;
6767
#[derive(Clone, Debug, Eq, PartialEq)]
6868
#[cfg_attr(test, derive(Arbitrary))]
6969
pub struct BoundRange {
70-
from: Bound<Key>,
71-
to: Bound<Key>,
70+
pub from: Bound<Key>,
71+
pub to: Bound<Key>,
7272
}
7373

7474
impl BoundRange {
7575
/// Create a new BoundRange.
7676
///
7777
/// The caller must ensure that `from` is not `Unbounded`.
78-
fn new(from: Bound<Key>, to: Bound<Key>) -> BoundRange {
78+
pub fn new(from: Bound<Key>, to: Bound<Key>) -> BoundRange {
7979
BoundRange { from, to }
8080
}
8181

src/raw/client.rs

Lines changed: 49 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use std::str::FromStr;
55
use std::sync::Arc;
66
use std::u32;
77

8+
use futures::StreamExt;
89
use log::debug;
910

1011
use crate::backoff::DEFAULT_REGION_BACKOFF;
@@ -591,17 +592,54 @@ impl<PdC: PdClient> Client<PdC> {
591592
max_limit: MAX_RAW_KV_SCAN_LIMIT,
592593
});
593594
}
594-
595-
let request = new_raw_scan_request(range.into(), limit, key_only, self.cf.clone());
596-
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
597-
.retry_multi_region(self.backoff.clone())
598-
.merge(Collect)
599-
.plan();
600-
let res = plan.execute().await;
601-
res.map(|mut s| {
602-
s.truncate(limit as usize);
603-
s
604-
})
595+
let mut result = Vec::new();
596+
let mut cur_range = range.into();
597+
let mut scan_regions = self.rpc.clone().stores_for_range(cur_range.clone()).boxed();
598+
let mut region_store =
599+
scan_regions
600+
.next()
601+
.await
602+
.ok_or(Error::RegionForRangeNotFound {
603+
range: (cur_range.clone()),
604+
})??;
605+
let mut cur_limit = limit;
606+
while cur_limit > 0 {
607+
let request =
608+
new_raw_scan_request(cur_range.clone(), cur_limit, key_only, self.cf.clone());
609+
let resp = crate::request::PlanBuilder::new(self.rpc.clone(), request)
610+
.single_region_with_store(region_store.clone())
611+
.await?
612+
.plan()
613+
.execute()
614+
.await?;
615+
let mut region_scan_res = resp
616+
.kvs
617+
.into_iter()
618+
.map(Into::into)
619+
.collect::<Vec<KvPair>>();
620+
let res_len = region_scan_res.len();
621+
result.append(&mut region_scan_res);
622+
// if the number of results is less than cur_limit, it means this scan range contains more than one region, so we need to scan next region
623+
if res_len < cur_limit as usize {
624+
region_store = match scan_regions.next().await {
625+
Some(Ok(rs)) => {
626+
cur_range = BoundRange::new(
627+
std::ops::Bound::Included(region_store.region_with_leader.range().1),
628+
cur_range.to,
629+
);
630+
rs
631+
}
632+
Some(Err(e)) => return Err(e),
633+
None => return Ok(result),
634+
};
635+
cur_limit -= res_len as u32;
636+
} else {
637+
break;
638+
}
639+
}
640+
// limit is a soft limit, so we need check the number of results
641+
result.truncate(limit as usize);
642+
Ok(result)
605643
}
606644

607645
async fn batch_scan_inner(

tests/integration_tests.rs

Lines changed: 92 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -590,10 +590,98 @@ async fn raw_write_million() -> Result<()> {
590590
assert_eq!(res.len(), 2usize.pow(NUM_BITS_KEY_PER_TXN));
591591
}
592592

593-
// test scan
594-
let limit = 10;
595-
let res = client.scan(vec![].., limit).await?;
596-
assert_eq!(res.len(), limit as usize);
593+
// test scan, key range from [0,0,0,0] to [255.0.0.0]
594+
let mut limit = 2000;
595+
let mut r = client.scan(.., limit).await?;
596+
assert_eq!(r.len(), 256);
597+
for (i, val) in r.iter().enumerate() {
598+
let k: Vec<u8> = val.0.clone().into();
599+
assert_eq!(k[0], i as u8);
600+
}
601+
r = client.scan(vec![100, 0, 0, 0].., limit).await?;
602+
assert_eq!(r.len(), 156);
603+
for (i, val) in r.iter().enumerate() {
604+
let k: Vec<u8> = val.0.clone().into();
605+
assert_eq!(k[0], i as u8 + 100);
606+
}
607+
r = client
608+
.scan(vec![5, 0, 0, 0]..vec![200, 0, 0, 0], limit)
609+
.await?;
610+
assert_eq!(r.len(), 195);
611+
for (i, val) in r.iter().enumerate() {
612+
let k: Vec<u8> = val.0.clone().into();
613+
assert_eq!(k[0], i as u8 + 5);
614+
}
615+
r = client
616+
.scan(vec![5, 0, 0, 0]..=vec![200, 0, 0, 0], limit)
617+
.await?;
618+
assert_eq!(r.len(), 196);
619+
for (i, val) in r.iter().enumerate() {
620+
let k: Vec<u8> = val.0.clone().into();
621+
assert_eq!(k[0], i as u8 + 5);
622+
}
623+
r = client
624+
.scan(vec![5, 0, 0, 0]..=vec![255, 10, 0, 0], limit)
625+
.await?;
626+
assert_eq!(r.len(), 251);
627+
for (i, val) in r.iter().enumerate() {
628+
let k: Vec<u8> = val.0.clone().into();
629+
assert_eq!(k[0], i as u8 + 5);
630+
}
631+
r = client
632+
.scan(vec![255, 1, 0, 0]..=vec![255, 10, 0, 0], limit)
633+
.await?;
634+
assert_eq!(r.len(), 0);
635+
r = client.scan(..vec![0, 0, 0, 0], limit).await?;
636+
assert_eq!(r.len(), 0);
637+
638+
limit = 3;
639+
let mut r = client.scan(.., limit).await?;
640+
assert_eq!(r.len(), limit as usize);
641+
for (i, val) in r.iter().enumerate() {
642+
let k: Vec<u8> = val.0.clone().into();
643+
assert_eq!(k[0], i as u8);
644+
}
645+
r = client.scan(vec![100, 0, 0, 0].., limit).await?;
646+
assert_eq!(r.len(), limit as usize);
647+
for (i, val) in r.iter().enumerate() {
648+
let k: Vec<u8> = val.0.clone().into();
649+
assert_eq!(k[0], i as u8 + 100);
650+
}
651+
r = client
652+
.scan(vec![5, 0, 0, 0]..vec![200, 0, 0, 0], limit)
653+
.await?;
654+
assert_eq!(r.len(), limit as usize);
655+
for (i, val) in r.iter().enumerate() {
656+
let k: Vec<u8> = val.0.clone().into();
657+
assert_eq!(k[0], i as u8 + 5);
658+
}
659+
r = client
660+
.scan(vec![5, 0, 0, 0]..=vec![200, 0, 0, 0], limit)
661+
.await?;
662+
assert_eq!(r.len(), limit as usize);
663+
for (i, val) in r.iter().enumerate() {
664+
let k: Vec<u8> = val.0.clone().into();
665+
assert_eq!(k[0], i as u8 + 5);
666+
}
667+
r = client
668+
.scan(vec![5, 0, 0, 0]..=vec![255, 10, 0, 0], limit)
669+
.await?;
670+
assert_eq!(r.len(), limit as usize);
671+
for (i, val) in r.iter().enumerate() {
672+
let k: Vec<u8> = val.0.clone().into();
673+
assert_eq!(k[0], i as u8 + 5);
674+
}
675+
r = client
676+
.scan(vec![255, 1, 0, 0]..=vec![255, 10, 0, 0], limit)
677+
.await?;
678+
assert_eq!(r.len(), 0);
679+
r = client.scan(..vec![0, 0, 0, 0], limit).await?;
680+
assert_eq!(r.len(), 0);
681+
682+
limit = 0;
683+
r = client.scan(.., limit).await?;
684+
assert_eq!(r.len(), limit as usize);
597685

598686
// test batch_scan
599687
for batch_num in 1..4 {

0 commit comments

Comments
 (0)