Skip to content

Commit 4bb6541

Browse files
committed
Add a new begin_latest_read api for transaction support
Signed-off-by: yongman <[email protected]>
1 parent 5619b79 commit 4bb6541

File tree

4 files changed

+86
-18
lines changed

4 files changed

+86
-18
lines changed

src/transaction/client.rs

+28-5
Original file line numberDiff line numberDiff line change
@@ -276,12 +276,35 @@ impl Client {
276276
Ok(res)
277277
}
278278

279-
pub fn new_transaction(
280-
&self,
281-
timestamp: Timestamp,
282-
options: TransactionOptions,
283-
) -> Transaction {
279+
fn new_transaction(&self, timestamp: Timestamp, options: TransactionOptions) -> Transaction {
284280
let logger = self.logger.new(o!("child" => 1));
285281
Transaction::new(timestamp, self.pd.clone(), options, logger)
286282
}
283+
284+
/// Creates a new latest commit readonly [`Transaction`].
285+
///
286+
/// Read operations will read the latest commit data which is not a snapshot read.
287+
///
288+
/// # Examples
289+
///
290+
/// ```rust,no_run
291+
/// # use tikv_client::{Config, TransactionClient};
292+
/// # use futures::prelude::*;
293+
/// # use tikv_client::TransactionOptions;
294+
/// # futures::executor::block_on(async {
295+
/// let client = TransactionClient::new(vec!["192.168.0.100"], None)
296+
/// .await
297+
/// .unwrap();
298+
/// let options = TransactionOptions::new_optimistic();
299+
/// let mut transaction = client.begin_latest_read(options);
300+
/// // ... Issue some reads.
301+
/// # });
302+
/// ```
303+
pub fn begin_latest_read(&self, options: TransactionOptions) -> Transaction {
304+
debug!(
305+
self.logger,
306+
"creating new latest commit readonly transaction"
307+
);
308+
self.new_transaction(Timestamp::from_version(u64::MAX), options.read_only())
309+
}
287310
}

src/transaction/transaction.rs

+28-12
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ impl<PdC: PdClient> Transaction<PdC> {
113113
/// ```
114114
pub async fn get(&mut self, key: impl Into<Key>) -> Result<Option<Value>> {
115115
debug!(self.logger, "invoking transactional get request");
116-
self.check_allow_operation().await?;
116+
self.check_allow_operation(true).await?;
117117
let timestamp = self.timestamp.clone();
118118
let rpc = self.rpc.clone();
119119
let key = key.into();
@@ -177,7 +177,7 @@ impl<PdC: PdClient> Transaction<PdC> {
177177
/// ```
178178
pub async fn get_for_update(&mut self, key: impl Into<Key>) -> Result<Option<Value>> {
179179
debug!(self.logger, "invoking transactional get_for_update request");
180-
self.check_allow_operation().await?;
180+
self.check_allow_operation(false).await?;
181181
if !self.is_pessimistic() {
182182
let key = key.into();
183183
self.lock_keys(iter::once(key.clone())).await?;
@@ -244,7 +244,7 @@ impl<PdC: PdClient> Transaction<PdC> {
244244
keys: impl IntoIterator<Item = impl Into<Key>>,
245245
) -> Result<impl Iterator<Item = KvPair>> {
246246
debug!(self.logger, "invoking transactional batch_get request");
247-
self.check_allow_operation().await?;
247+
self.check_allow_operation(true).await?;
248248
let timestamp = self.timestamp.clone();
249249
let rpc = self.rpc.clone();
250250
let retry_options = self.options.retry_options.clone();
@@ -299,7 +299,7 @@ impl<PdC: PdClient> Transaction<PdC> {
299299
self.logger,
300300
"invoking transactional batch_get_for_update request"
301301
);
302-
self.check_allow_operation().await?;
302+
self.check_allow_operation(false).await?;
303303
let keys: Vec<Key> = keys.into_iter().map(|k| k.into()).collect();
304304
if !self.is_pessimistic() {
305305
self.lock_keys(keys.clone()).await?;
@@ -433,7 +433,7 @@ impl<PdC: PdClient> Transaction<PdC> {
433433
/// ```
434434
pub async fn put(&mut self, key: impl Into<Key>, value: impl Into<Value>) -> Result<()> {
435435
debug!(self.logger, "invoking transactional put request");
436-
self.check_allow_operation().await?;
436+
self.check_allow_operation(false).await?;
437437
let key = key.into();
438438
if self.is_pessimistic() {
439439
self.pessimistic_lock(iter::once(key.clone()), false)
@@ -464,7 +464,7 @@ impl<PdC: PdClient> Transaction<PdC> {
464464
/// ```
465465
pub async fn insert(&mut self, key: impl Into<Key>, value: impl Into<Value>) -> Result<()> {
466466
debug!(self.logger, "invoking transactional insert request");
467-
self.check_allow_operation().await?;
467+
self.check_allow_operation(false).await?;
468468
let key = key.into();
469469
if self.buffer.get(&key).is_some() {
470470
return Err(Error::DuplicateKeyInsertion);
@@ -499,7 +499,7 @@ impl<PdC: PdClient> Transaction<PdC> {
499499
/// ```
500500
pub async fn delete(&mut self, key: impl Into<Key>) -> Result<()> {
501501
debug!(self.logger, "invoking transactional delete request");
502-
self.check_allow_operation().await?;
502+
self.check_allow_operation(false).await?;
503503
let key = key.into();
504504
if self.is_pessimistic() {
505505
self.pessimistic_lock(iter::once(key.clone()), false)
@@ -537,7 +537,7 @@ impl<PdC: PdClient> Transaction<PdC> {
537537
keys: impl IntoIterator<Item = impl Into<Key>>,
538538
) -> Result<()> {
539539
debug!(self.logger, "invoking transactional lock_keys request");
540-
self.check_allow_operation().await?;
540+
self.check_allow_operation(false).await?;
541541
match self.options.kind {
542542
TransactionKind::Optimistic => {
543543
for key in keys {
@@ -569,6 +569,15 @@ impl<PdC: PdClient> Transaction<PdC> {
569569
/// ```
570570
pub async fn commit(&mut self) -> Result<Option<Timestamp>> {
571571
debug!(self.logger, "commiting transaction");
572+
573+
{
574+
// readonly transaction no need to commit
575+
let status = self.status.read().await;
576+
if *status == TransactionStatus::ReadOnly {
577+
return Ok(None);
578+
}
579+
}
580+
572581
{
573582
let mut status = self.status.write().await;
574583
if !matches!(
@@ -677,7 +686,7 @@ impl<PdC: PdClient> Transaction<PdC> {
677686
#[doc(hidden)]
678687
pub async fn send_heart_beat(&mut self) -> Result<u64> {
679688
debug!(self.logger, "sending heart_beat");
680-
self.check_allow_operation().await?;
689+
self.check_allow_operation(true).await?;
681690
let primary_key = match self.buffer.get_primary_key() {
682691
Some(k) => k,
683692
None => return Err(Error::NoPrimaryKey),
@@ -703,7 +712,7 @@ impl<PdC: PdClient> Transaction<PdC> {
703712
key_only: bool,
704713
reverse: bool,
705714
) -> Result<impl Iterator<Item = KvPair>> {
706-
self.check_allow_operation().await?;
715+
self.check_allow_operation(true).await?;
707716
let timestamp = self.timestamp.clone();
708717
let rpc = self.rpc.clone();
709718
let retry_options = self.options.retry_options.clone();
@@ -840,10 +849,17 @@ impl<PdC: PdClient> Transaction<PdC> {
840849
}
841850

842851
/// Checks if the transaction can perform arbitrary operations.
843-
async fn check_allow_operation(&self) -> Result<()> {
852+
async fn check_allow_operation(&self, readonly: bool) -> Result<()> {
844853
let status = self.status.read().await;
845854
match *status {
846-
TransactionStatus::ReadOnly | TransactionStatus::Active => Ok(()),
855+
TransactionStatus::Active => Ok(()),
856+
TransactionStatus::ReadOnly => {
857+
if readonly {
858+
Ok(())
859+
} else {
860+
Err(Error::OperationReadOnlyError)
861+
}
862+
}
847863
TransactionStatus::Committed
848864
| TransactionStatus::Rolledback
849865
| TransactionStatus::StartedCommit

tests/integration_tests.rs

+27-1
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,10 @@ use futures::prelude::*;
1717
use rand::{seq::IteratorRandom, thread_rng, Rng};
1818
use serial_test::serial;
1919
use std::{
20+
assert_eq,
2021
collections::{HashMap, HashSet},
2122
convert::TryInto,
22-
iter,
23+
iter, matches,
2324
};
2425
use tikv_client::{
2526
transaction::HeartbeatOption, BoundRange, Error, Key, KvPair, RawClient, Result, Transaction,
@@ -942,6 +943,31 @@ async fn txn_key_exists() -> Result<()> {
942943
Ok(())
943944
}
944945

946+
#[tokio::test]
947+
#[serial]
948+
async fn txn_latest_read() -> Result<()> {
949+
init().await?;
950+
let client = TransactionClient::new_with_config(pd_addrs(), Default::default(), None).await?;
951+
let key = "key".to_owned();
952+
let value = "value".to_owned();
953+
let options = TransactionOptions::new_optimistic();
954+
let mut t1 = client.begin_latest_read(options);
955+
t1.get(key.clone()).await?;
956+
t1.put(key.clone(), value.clone())
957+
.await
958+
.map_err(|_e| matches!(Error::OperationReadOnlyError, _e))
959+
.unwrap_err();
960+
// commit is no needed for readonly transaction, commit() will take no effect if called.
961+
t1.commit().await?;
962+
963+
let options = TransactionOptions::new_pessimistic();
964+
let mut t2 = client.begin_latest_read(options);
965+
t2.get(key.clone()).await?;
966+
// t2.commit().await?;
967+
968+
Ok(())
969+
}
970+
945971
// helper function
946972
async fn get_u32(client: &RawClient, key: Vec<u8>) -> Result<u32> {
947973
let x = client.get(key).await?.unwrap();

tikv-client-common/src/errors.rs

+3
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@ pub enum Error {
2222
/// It's not allowed to perform operations in a transaction after it has been committed or rolled back.
2323
#[error("Cannot read or write data after any attempt to commit or roll back the transaction")]
2424
OperationAfterCommitError,
25+
/// It's not allowed to perform write operation in a readonly transaction.
26+
#[error("Cannot write data in read-only transaction")]
27+
OperationReadOnlyError,
2528
/// We tried to use 1pc for a transaction, but it didn't work. Probably should have used 2pc.
2629
#[error("1PC transaction could not be committed.")]
2730
OnePcFailure,

0 commit comments

Comments
 (0)