Skip to content

Commit d9935c2

Browse files
authored
Merge pull request #179 from ekexium/fix-raw-scan-limit
Fix limit problem in raw_scan and unbouned problem in batch_scan
2 parents cc43fbd + 02629f8 commit d9935c2

File tree

3 files changed

+72
-5
lines changed

3 files changed

+72
-5
lines changed

src/pd/client.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ pub trait PdClient: Send + Sync + 'static {
133133
.boxed()
134134
}
135135

136-
// Returns a Steam which iterates over the contexts for ranges in the same region.
136+
// Returns a Stream which iterates over the contexts for ranges in the same region.
137137
fn group_ranges_by_region(
138138
self: Arc<Self>,
139139
mut ranges: Vec<BoundRange>,
@@ -153,7 +153,10 @@ pub trait PdClient: Send + Sync + 'static {
153153
let region_end = region.end_key();
154154
let mut grouped = vec![];
155155
if !region_end.is_empty()
156-
&& end_key.clone().map(|x| x > region_end).unwrap_or(true)
156+
&& end_key
157+
.clone()
158+
.map(|x| x > region_end || x.is_empty())
159+
.unwrap_or(true)
157160
{
158161
grouped.push((start_key, region_end.clone()).into());
159162
ranges.push((region_end, end_key).into());
@@ -168,7 +171,10 @@ pub trait PdClient: Send + Sync + 'static {
168171
break;
169172
}
170173
if !region_end.is_empty()
171-
&& end_key.clone().map(|x| x > region_end).unwrap_or(true)
174+
&& end_key
175+
.clone()
176+
.map(|x| x > region_end || x.is_empty())
177+
.unwrap_or(true)
172178
{
173179
grouped.push((start_key, region_end.clone()).into());
174180
ranges.push((region_end, end_key).into());

src/raw/client.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -252,9 +252,13 @@ impl Client {
252252
return Err(Error::max_scan_limit_exceeded(limit, MAX_RAW_KV_SCAN_LIMIT));
253253
}
254254

255-
requests::new_raw_scan_request(range, limit, self.key_only, self.cf.clone())
255+
let res = requests::new_raw_scan_request(range, limit, self.key_only, self.cf.clone())
256256
.execute(self.rpc.clone())
257-
.await
257+
.await;
258+
res.map(|mut s| {
259+
s.truncate(limit as usize);
260+
s
261+
})
258262
}
259263

260264
/// Create a new 'batch scan' request.

tests/integration_tests.rs

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -438,6 +438,63 @@ async fn raw_req() -> Fallible<()> {
438438
Fallible::Ok(())
439439
}
440440

441+
/// Tests raw API when there are multiple regions.
442+
/// Write large volumes of data to enforce region splitting.
443+
/// In order to test `scan`, data is uniformly inserted.
444+
///
445+
/// Ignoring this because we don't want to mess up transactional tests.
446+
#[tokio::test]
447+
#[serial]
448+
#[ignore]
449+
async fn raw_write_million() -> Fallible<()> {
450+
const NUM_BITS_TXN: u32 = 9;
451+
const NUM_BITS_KEY_PER_TXN: u32 = 10;
452+
let interval = 2u32.pow(32 - NUM_BITS_TXN - NUM_BITS_KEY_PER_TXN);
453+
454+
clear_tikv().await?;
455+
let config = Config::new(pd_addrs());
456+
let client = RawClient::new(config).await?;
457+
458+
for i in 0..2u32.pow(NUM_BITS_TXN) {
459+
let mut cur = i * 2u32.pow(32 - NUM_BITS_TXN);
460+
let keys = iter::repeat_with(|| {
461+
let v = cur;
462+
cur = cur.overflowing_add(interval).0;
463+
v
464+
})
465+
.map(|u| u.to_be_bytes().to_vec())
466+
.take(2usize.pow(NUM_BITS_KEY_PER_TXN))
467+
.collect::<Vec<_>>(); // each txn puts 2 ^ 12 keys. 12 = 25 - 13
468+
client
469+
.batch_put(
470+
keys.iter()
471+
.cloned()
472+
.zip(iter::repeat(1u32.to_be_bytes().to_vec())),
473+
)
474+
.await?;
475+
476+
let res = client.batch_get(keys).await?;
477+
assert_eq!(res.len(), 2usize.pow(NUM_BITS_KEY_PER_TXN));
478+
}
479+
480+
// test scan
481+
let limit = 10;
482+
let res = client.scan(vec![].., limit).await?;
483+
assert_eq!(res.len(), limit as usize);
484+
485+
// test batch_scan
486+
for batch_num in 1..4 {
487+
let _ = client
488+
.batch_scan(iter::repeat(vec![]..).take(batch_num), limit)
489+
.await?;
490+
// FIXME: `each_limit` parameter does no work as expected.
491+
// It limits the entries on each region of each rangqe, instead of each range.
492+
// assert_eq!(res.len(), limit as usize * batch_num);
493+
}
494+
495+
Fallible::Ok(())
496+
}
497+
441498
// helper function
442499
async fn get_u32(client: &RawClient, key: Vec<u8>) -> Fallible<u32> {
443500
let x = client.get(key).await?.unwrap();

0 commit comments

Comments
 (0)