diff --git a/src/pd/client.rs b/src/pd/client.rs index b03dbfaf..715a111e 100644 --- a/src/pd/client.rs +++ b/src/pd/client.rs @@ -133,7 +133,7 @@ pub trait PdClient: Send + Sync + 'static { .boxed() } - // Returns a Steam which iterates over the contexts for ranges in the same region. + // Returns a Stream which iterates over the contexts for ranges in the same region. fn group_ranges_by_region( self: Arc, mut ranges: Vec, @@ -153,7 +153,10 @@ pub trait PdClient: Send + Sync + 'static { let region_end = region.end_key(); let mut grouped = vec![]; if !region_end.is_empty() - && end_key.clone().map(|x| x > region_end).unwrap_or(true) + && end_key + .clone() + .map(|x| x > region_end || x.is_empty()) + .unwrap_or(true) { grouped.push((start_key, region_end.clone()).into()); ranges.push((region_end, end_key).into()); @@ -168,7 +171,10 @@ pub trait PdClient: Send + Sync + 'static { break; } if !region_end.is_empty() - && end_key.clone().map(|x| x > region_end).unwrap_or(true) + && end_key + .clone() + .map(|x| x > region_end || x.is_empty()) + .unwrap_or(true) { grouped.push((start_key, region_end.clone()).into()); ranges.push((region_end, end_key).into()); diff --git a/src/raw/client.rs b/src/raw/client.rs index 6943c1bf..6756aa22 100644 --- a/src/raw/client.rs +++ b/src/raw/client.rs @@ -252,9 +252,13 @@ impl Client { return Err(Error::max_scan_limit_exceeded(limit, MAX_RAW_KV_SCAN_LIMIT)); } - requests::new_raw_scan_request(range, limit, self.key_only, self.cf.clone()) + let res = requests::new_raw_scan_request(range, limit, self.key_only, self.cf.clone()) .execute(self.rpc.clone()) - .await + .await; + res.map(|mut s| { + s.truncate(limit as usize); + s + }) } /// Create a new 'batch scan' request. diff --git a/tests/integration_tests.rs b/tests/integration_tests.rs index ac930ed3..482dcb75 100644 --- a/tests/integration_tests.rs +++ b/tests/integration_tests.rs @@ -436,6 +436,63 @@ async fn raw_req() -> Fallible<()> { Fallible::Ok(()) } +/// Tests raw API when there are multiple regions. +/// Write large volumes of data to enforce region splitting. +/// In order to test `scan`, data is uniformly inserted. +/// +/// Ignoring this because we don't want to mess up transactional tests. +#[tokio::test] +#[serial] +#[ignore] +async fn raw_write_million() -> Fallible<()> { + const NUM_BITS_TXN: u32 = 9; + const NUM_BITS_KEY_PER_TXN: u32 = 10; + let interval = 2u32.pow(32 - NUM_BITS_TXN - NUM_BITS_KEY_PER_TXN); + + clear_tikv().await?; + let config = Config::new(pd_addrs()); + let client = RawClient::new(config).await?; + + for i in 0..2u32.pow(NUM_BITS_TXN) { + let mut cur = i * 2u32.pow(32 - NUM_BITS_TXN); + let keys = iter::repeat_with(|| { + let v = cur; + cur = cur.overflowing_add(interval).0; + v + }) + .map(|u| u.to_be_bytes().to_vec()) + .take(2usize.pow(NUM_BITS_KEY_PER_TXN)) + .collect::>(); // each txn puts 2 ^ 12 keys. 12 = 25 - 13 + client + .batch_put( + keys.iter() + .cloned() + .zip(iter::repeat(1u32.to_be_bytes().to_vec())), + ) + .await?; + + let res = client.batch_get(keys).await?; + assert_eq!(res.len(), 2usize.pow(NUM_BITS_KEY_PER_TXN)); + } + + // test scan + let limit = 10; + let res = client.scan(vec![].., limit).await?; + assert_eq!(res.len(), limit as usize); + + // test batch_scan + for batch_num in 1..4 { + let _ = client + .batch_scan(iter::repeat(vec![]..).take(batch_num), limit) + .await?; + // FIXME: `each_limit` parameter does no work as expected. + // It limits the entries on each region of each rangqe, instead of each range. + // assert_eq!(res.len(), limit as usize * batch_num); + } + + Fallible::Ok(()) +} + // helper function async fn get_u32(client: &RawClient, key: Vec) -> Fallible { let x = client.get(key).await?.unwrap();