diff --git a/kurrentdb/Cargo.toml b/kurrentdb/Cargo.toml index df7c7cec..3c0be81e 100755 --- a/kurrentdb/Cargo.toml +++ b/kurrentdb/Cargo.toml @@ -7,7 +7,7 @@ version = "1.0.0-alpha.3" # Uncomment if you want to update messages.rs code-gen. # We disabled codegen.rs because it requires having `protoc` installed on your machine # in order to build that library. -# build = "codegen.rs" +build = "codegen.rs" description = "Official KurrentDB gRPC client" keywords = ["database", "event-sourcing", "ddd", "cqrs", "kurrent"] diff --git a/kurrentdb/codegen.rs b/kurrentdb/codegen.rs index 56c30b71..5e74f242 100755 --- a/kurrentdb/codegen.rs +++ b/kurrentdb/codegen.rs @@ -38,6 +38,7 @@ pub fn generate() -> Result<(), Box> { "protos/monitoring.proto", "protos/operations.proto", "protos/users.proto", + "protos/multi-append.proto", ]; fs::create_dir_all(out_dir)?; @@ -52,6 +53,8 @@ pub fn generate() -> Result<(), Box> { "ReadEvent.RecordedEvent.custom_metadata", "ReadEvent.RecordedEvent.data", "StreamIdentifier.stream_name", + "AppendRecord.data", + "DynamicValue.bytes_value", ]) .out_dir(out_dir) .extern_path(".event_store.client.Empty", "()") @@ -91,6 +94,9 @@ pub fn generate() -> Result<(), Box> { } else if filename_string.as_str() == "google.rpc.rs" { let new_file = file.path().parent().unwrap().join("google_rpc.rs"); fs::rename(file.path(), new_file)?; + } else if filename_string.as_str() == "kurrentdb.protocol.v2.rs" { + let new_file = file.path().parent().unwrap().join("new_streams.rs"); + fs::rename(file.path(), new_file)?; } } diff --git a/kurrentdb/protos/multi-append.proto b/kurrentdb/protos/multi-append.proto new file mode 100644 index 00000000..3e548f21 --- /dev/null +++ b/kurrentdb/protos/multi-append.proto @@ -0,0 +1,210 @@ +syntax = "proto3"; + +package kurrentdb.protocol.v2; + +option csharp_namespace = "KurrentDB.Protocol.Streams.V2"; +option java_package = "io.kurrentdb.streams.v2"; +option java_multiple_files = true; + +import "google/protobuf/timestamp.proto"; +import "google/protobuf/duration.proto"; +import "google/protobuf/struct.proto"; + +service StreamsService { + // Executes an atomic operation to append records to multiple streams. + // This transactional method ensures that all appends either succeed + // completely, or are entirely rolled back, thereby maintaining strict data + // consistency across all involved streams. + rpc MultiStreamAppend(MultiStreamAppendRequest) returns (MultiStreamAppendResponse); + + // Streaming version of MultiStreamAppend that allows clients to send multiple + // append requests over a single connection. When the stream completes, all + // records are appended transactionally (all succeed or fail together). + // Provides improved efficiency for high-throughput scenarios while + // maintaining the same transactional guarantees. + rpc MultiStreamAppendSession(stream AppendStreamRequest) returns (MultiStreamAppendResponse); +} + +message ProtocolDataUnit { + string id = 1; + map properties = 2; + bytes data = 3; + google.protobuf.Timestamp timestamp = 4; +} + +// Record to be appended to a stream. +message AppendRecord { + // Universally Unique identifier for the record. + // If not provided, the server will generate a new one. + optional string record_id = 1; + // A collection of properties providing additional system information about the + // record. + map properties = 2; + // The actual data payload of the record, stored as bytes. + bytes data = 3; +// // Optional timestamp indicating when the record was created. +// // If not provided, the server will use the current time. +// optional google.protobuf.Timestamp timestamp = 4; +} + +// Constants that match the expected state of a stream during an +// append operation. It can be used to specify whether the stream should exist, +// not exist, or can be in any state. +enum ExpectedRevisionConstants { + // The stream should exist and the expected revision should match the current + EXPECTED_REVISION_CONSTANTS_SINGLE_EVENT = 0; + // It is not important whether the stream exists or not. + EXPECTED_REVISION_CONSTANTS_ANY = -2; + // The stream should not exist. If it does, the append will fail. + EXPECTED_REVISION_CONSTANTS_NO_STREAM = -1; + // The stream should exist + EXPECTED_REVISION_CONSTANTS_EXISTS = -4; +} + +// Represents the input for appending records to a specific stream. +message AppendStreamRequest { + // The name of the stream to append records to. + string stream = 1; + // The records to append to the stream. + repeated AppendRecord records = 2; + // The expected revision of the stream. If the stream's current revision does + // not match, the append will fail. + // The expected revision can also be one of the special values + // from ExpectedRevisionConstants. + // Missing value means no expectation, the same as EXPECTED_REVISION_CONSTANTS_ANY + optional sint64 expected_revision = 3; +} + +// Success represents the successful outcome of an append operation. +message AppendStreamSuccess { + // The name of the stream to which records were appended. + string stream = 1; + // The position of the last appended record in the stream. + int64 position = 2; + // The expected revision of the stream after the append operation. + int64 stream_revision = 3; +} + +// Failure represents the detailed error information when an append operation fails. +message AppendStreamFailure { + // The name of the stream to which records were appended. + string stream = 1; + + // The error details + oneof error { + // Failed because the actual stream revision didn't match the expected revision. + ErrorDetails.WrongExpectedRevision wrong_expected_revision = 2; + // Failed because the client lacks sufficient permissions. + ErrorDetails.AccessDenied access_denied = 3; + // Failed because the target stream has been deleted. + ErrorDetails.StreamDeleted stream_deleted = 4; + } +} + +// AppendStreamOutput represents the output of appending records to a specific +// stream. +message AppendStreamResponse { + // The result of the append operation. + oneof result { + // Success represents the successful outcome of an append operation. + AppendStreamSuccess success = 1; + // Failure represents the details of a failed append operation. + AppendStreamFailure failure = 2; + } +} + +// MultiStreamAppendRequest represents a request to append records to multiple streams. +message MultiStreamAppendRequest { + // A list of AppendStreamInput messages, each representing a stream to which records should be appended. + repeated AppendStreamRequest input = 1; +} + +// Response from the MultiStreamAppend operation. +message MultiStreamAppendResponse { + oneof result { + // Success represents the successful outcome of a multi-stream append operation. + Success success = 1; + // Failure represents the details of a failed multi-stream append operation. + Failure failure = 2; + } + + message Success { + repeated AppendStreamSuccess output = 1; + } + + message Failure { + repeated AppendStreamFailure output = 1; + } +} + +// ErrorDetails provides detailed information about specific error conditions. +message ErrorDetails { + // When the user does not have sufficient permissions to perform the operation. + message AccessDenied { + // The simplified reason for access denial. + string reason = 1; + } + + // When the stream has been deleted. + message StreamDeleted { + // The time when the stream was deleted. + google.protobuf.Timestamp deleted_at = 1; + + // If the stream was hard deleted, you cannot reuse the stream name, + // it will raise an exception if you try to append to it again. + bool tombstoned = 2; + } + + // When the expected revision of the stream does not match the actual revision. + message WrongExpectedRevision { + // The actual revision of the stream. + int64 stream_revision = 1; + } + + // When the transaction exceeds the maximum size allowed + // (its bigger than the configured chunk size). + message TransactionMaxSizeExceeded { + // The maximum allowed size of the transaction. + uint32 max_size = 1; + } +} + +//=================================================================== +// Shared +//=================================================================== + +// Represents a list of dynamically typed values. +message ListDynamicValue { + // Repeated property of dynamically typed values. + repeated DynamicValue values = 1; +} + +// Represents a dynamic value +message DynamicValue { + oneof kind { + // Represents a null value. + google.protobuf.NullValue null_value = 1; + // Represents a 32-bit signed integer value. + sint32 int32_value = 2; + // Represents a 64-bit signed integer value. + sint64 int64_value = 3; + // Represents a byte array value. + bytes bytes_value = 4; + // Represents a 64-bit double-precision floating-point value. + double double_value = 5; + // Represents a 32-bit single-precision floating-point value + float float_value = 6; + // Represents a string value. + string string_value = 7; + // Represents a boolean value. + bool boolean_value = 8; + // Represents a timestamp value. + google.protobuf.Timestamp timestamp_value = 9; + // Represents a duration value. + google.protobuf.Duration duration_value = 10; +// // Represents a list of dynamic values. +// ListDynamicValue list_value = 11; +// // Represents a json struct +// google.protobuf.Struct struct_value = 12; + } +} diff --git a/kurrentdb/src/client.rs b/kurrentdb/src/client.rs index df5022e0..8910f6a8 100644 --- a/kurrentdb/src/client.rs +++ b/kurrentdb/src/client.rs @@ -7,13 +7,14 @@ use crate::options::read_stream::ReadStreamOptions; use crate::options::subscribe_to_stream::SubscribeToStreamOptions; use crate::server_features::ServerInfo; use crate::{ - DeletePersistentSubscriptionOptions, DeleteStreamOptions, GetPersistentSubscriptionInfoOptions, - ListPersistentSubscriptionsOptions, MetadataStreamName, PersistentSubscription, - PersistentSubscriptionInfo, PersistentSubscriptionToAllOptions, Position, ReadStream, - ReplayParkedMessagesOptions, RestartPersistentSubscriptionSubsystem, RevisionOrPosition, - StreamMetadata, StreamMetadataResult, StreamName, SubscribeToAllOptions, + AppendRequest, DeletePersistentSubscriptionOptions, DeleteStreamOptions, + GetPersistentSubscriptionInfoOptions, ListPersistentSubscriptionsOptions, MetadataStreamName, + MultiWriteResult, PersistentSubscription, PersistentSubscriptionInfo, + PersistentSubscriptionToAllOptions, Position, ReadStream, ReplayParkedMessagesOptions, + RestartPersistentSubscriptionSubsystem, RevisionOrPosition, StreamMetadata, + StreamMetadataResult, StreamName, StreamState, SubscribeToAllOptions, SubscribeToPersistentSubscriptionOptions, Subscription, TombstoneStreamOptions, - VersionedMetadata, WriteResult, commands, + VersionedMetadata, WriteResult, commands, new_commands, }; use crate::{ EventData, @@ -78,7 +79,47 @@ impl Client { where Events: ToEvents, { - commands::append_to_stream(&self.client, stream_name, options, events.into_events()).await + let req = AppendRequest { + stream: unsafe { String::from_utf8_unchecked(stream_name.into_stream_name().to_vec()) }, + events: events.into_events().collect(), + state: match options.version { + crate::event_store::client::streams::append_req::options::ExpectedStreamRevision::Revision(r) => StreamState::StreamRevision(r), + crate::event_store::client::streams::append_req::options::ExpectedStreamRevision::NoStream(_) => StreamState::NoStream, + crate::event_store::client::streams::append_req::options::ExpectedStreamRevision::Any(_) => StreamState::Any, + crate::event_store::client::streams::append_req::options::ExpectedStreamRevision::StreamExists(_) => StreamState::StreamExists, + }, + }; + + let result = self + .multi_append_stream(options, vec![req].into_iter()) + .await?; + + match result { + MultiWriteResult::Success(items) => Ok(WriteResult { + next_expected_version: items[0].next_expected_version, + position: items[0].position, + }), + + MultiWriteResult::Failure(items) => match items[0].error { + crate::MultiAppendWriteError::AccessDenied { .. } => { + Err(crate::Error::AccessDenied) + } + crate::MultiAppendWriteError::StreamDeleted { .. } => { + Err(crate::Error::ResourceDeleted) + } + crate::MultiAppendWriteError::WrongExpectedRevision { current, expected } => { + Err(crate::Error::WrongExpectedVersion { expected, current }) + } + }, + } + } + + pub async fn multi_append_stream( + &self, + options: &AppendToStreamOptions, + events: impl Iterator + Send + 'static, + ) -> crate::Result { + new_commands::multi_stream_append(&self.client, options, events).await } // Sets a stream metadata. diff --git a/kurrentdb/src/commands.rs b/kurrentdb/src/commands.rs index 35542adb..40f4da93 100644 --- a/kurrentdb/src/commands.rs +++ b/kurrentdb/src/commands.rs @@ -1870,7 +1870,7 @@ pub async fn restart_persistent_subscription_subsystem( Ok(()) } -fn create_streams_client(handle: Handle) -> StreamsClient { +pub(crate) fn create_streams_client(handle: Handle) -> StreamsClient { StreamsClient::with_origin(handle.client, handle.uri) .max_decoding_message_size(client::MAX_RECEIVE_MESSAGE_SIZE) } diff --git a/kurrentdb/src/event_store/generated.rs b/kurrentdb/src/event_store/generated.rs index 6e4dfdb8..576be115 100644 --- a/kurrentdb/src/event_store/generated.rs +++ b/kurrentdb/src/event_store/generated.rs @@ -1,10 +1,15 @@ use crate::{ - CurrentRevision, EventData, PersistentSubscriptionConnectionInfo, PersistentSubscriptionEvent, - PersistentSubscriptionInfo, PersistentSubscriptionMeasurements, PersistentSubscriptionSettings, - PersistentSubscriptionStats, Position, RecordedEvent, ResolvedEvent, RevisionOrPosition, - StreamPosition, StreamState, SystemConsumerStrategy, WriteResult, + AppendRequest, CurrentRevision, EventData, MultiAppendWriteError, MultiWriteFailure, + MultiWriteResult, MultiWriteSuccess, PersistentSubscriptionConnectionInfo, + PersistentSubscriptionEvent, PersistentSubscriptionInfo, PersistentSubscriptionMeasurements, + PersistentSubscriptionSettings, PersistentSubscriptionStats, Position, RecordedEvent, + ResolvedEvent, RevisionOrPosition, StreamPosition, StreamState, SystemConsumerStrategy, + WriteResult, }; -use chrono::{DateTime, Utc}; +use chrono::{DateTime, TimeZone, Utc}; +use new_streams::dynamic_value::Kind; +use new_streams::{DynamicValue, multi_stream_append_response}; +use std::collections::HashMap; use std::ops::Add; use std::time::{Duration, SystemTime}; @@ -12,6 +17,7 @@ pub mod common; pub mod google_rpc; pub mod gossip; pub mod monitoring; +pub mod new_streams; pub mod operations; pub mod persistent; pub mod projections; @@ -421,6 +427,97 @@ impl TryFrom for PersistentSubscriptionInfo for new_streams::AppendRecord { + fn from(value: EventData) -> Self { + let mut properties = HashMap::new(); + + for (key, value) in value.metadata { + properties.insert( + key, + DynamicValue { + kind: Some(Kind::StringValue(value)), + }, + ); + } + + Self { + record_id: value.id_opt.map(|x| x.to_string()), + properties, + data: value.payload, + } + } +} + +impl From for new_streams::AppendStreamRequest { + fn from(value: AppendRequest) -> Self { + Self { + stream: value.stream, + records: value.events.into_iter().map(|e| e.into()).collect(), + expected_revision: Some(match value.state { + StreamState::Any => -2, + StreamState::StreamExists => -4, + StreamState::NoStream => -1, + StreamState::StreamRevision(r) => r as i64, + }), + } + } +} + +impl From for MultiWriteResult { + fn from(value: new_streams::MultiStreamAppendResponse) -> Self { + match value.result.unwrap() { + multi_stream_append_response::Result::Success(s) => { + Self::Success(s.output.into_iter().map(|i| i.into()).collect()) + } + multi_stream_append_response::Result::Failure(f) => { + Self::Failure(f.output.into_iter().map(|i| i.into()).collect()) + } + } + } +} + +impl From for MultiWriteSuccess { + fn from(value: new_streams::AppendStreamSuccess) -> Self { + Self { + stream: value.stream, + next_expected_version: value.stream_revision as u64, + position: Position { + commit: value.position as u64, + prepare: value.position as u64, + }, + } + } +} + +impl From for MultiWriteFailure { + fn from(value: new_streams::AppendStreamFailure) -> Self { + Self { + stream: value.stream, + error: match value.error.unwrap() { + new_streams::append_stream_failure::Error::WrongExpectedRevision(e) => { + MultiAppendWriteError::WrongExpectedRevision { + current: CurrentRevision::Current(e.stream_revision as u64), + expected: StreamState::StreamRevision(42), // <-- we need to add the actual expected value. + } + } + + new_streams::append_stream_failure::Error::AccessDenied(e) => { + MultiAppendWriteError::AccessDenied { reason: e.reason } + } + + new_streams::append_stream_failure::Error::StreamDeleted(e) => { + MultiAppendWriteError::StreamDeleted { + deleted_at: e + .deleted_at + .and_then(|t| Utc.timestamp_opt(t.seconds, t.nanos as u32).single()), + tombstoned: e.tombstoned, + } + } + }, + } + } +} + pub(crate) fn parse_revision_or_position(input: &str) -> crate::Result { if let Ok(v) = input.parse::() { Ok(RevisionOrPosition::Revision(v)) diff --git a/kurrentdb/src/event_store/generated/new_streams.rs b/kurrentdb/src/event_store/generated/new_streams.rs new file mode 100644 index 00000000..df555043 --- /dev/null +++ b/kurrentdb/src/event_store/generated/new_streams.rs @@ -0,0 +1,443 @@ +// This file is @generated by prost-build. +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ProtocolDataUnit { + #[prost(string, tag = "1")] + pub id: ::prost::alloc::string::String, + #[prost(map = "string, message", tag = "2")] + pub properties: ::std::collections::HashMap< + ::prost::alloc::string::String, + DynamicValue, + >, + #[prost(bytes = "vec", tag = "3")] + pub data: ::prost::alloc::vec::Vec, + #[prost(message, optional, tag = "4")] + pub timestamp: ::core::option::Option<::prost_types::Timestamp>, +} +/// Record to be appended to a stream. +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct AppendRecord { + /// Universally Unique identifier for the record. + /// If not provided, the server will generate a new one. + #[prost(string, optional, tag = "1")] + pub record_id: ::core::option::Option<::prost::alloc::string::String>, + /// A collection of properties providing additional system information about the + /// record. + #[prost(map = "string, message", tag = "2")] + pub properties: ::std::collections::HashMap< + ::prost::alloc::string::String, + DynamicValue, + >, + /// The actual data payload of the record, stored as bytes. + /// + /// // Optional timestamp indicating when the record was created. + /// // If not provided, the server will use the current time. + /// optional google.protobuf.Timestamp timestamp = 4; + #[prost(bytes = "bytes", tag = "3")] + pub data: ::prost::bytes::Bytes, +} +/// Represents the input for appending records to a specific stream. +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct AppendStreamRequest { + /// The name of the stream to append records to. + #[prost(string, tag = "1")] + pub stream: ::prost::alloc::string::String, + /// The records to append to the stream. + #[prost(message, repeated, tag = "2")] + pub records: ::prost::alloc::vec::Vec, + /// The expected revision of the stream. If the stream's current revision does + /// not match, the append will fail. + /// The expected revision can also be one of the special values + /// from ExpectedRevisionConstants. + /// Missing value means no expectation, the same as EXPECTED_REVISION_CONSTANTS_ANY + #[prost(sint64, optional, tag = "3")] + pub expected_revision: ::core::option::Option, +} +/// Success represents the successful outcome of an append operation. +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct AppendStreamSuccess { + /// The name of the stream to which records were appended. + #[prost(string, tag = "1")] + pub stream: ::prost::alloc::string::String, + /// The position of the last appended record in the stream. + #[prost(int64, tag = "2")] + pub position: i64, + /// The expected revision of the stream after the append operation. + #[prost(int64, tag = "3")] + pub stream_revision: i64, +} +/// Failure represents the detailed error information when an append operation fails. +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct AppendStreamFailure { + /// The name of the stream to which records were appended. + #[prost(string, tag = "1")] + pub stream: ::prost::alloc::string::String, + /// The error details + #[prost(oneof = "append_stream_failure::Error", tags = "2, 3, 4")] + pub error: ::core::option::Option, +} +/// Nested message and enum types in `AppendStreamFailure`. +pub mod append_stream_failure { + /// The error details + #[derive(Clone, PartialEq, ::prost::Oneof)] + pub enum Error { + /// Failed because the actual stream revision didn't match the expected revision. + #[prost(message, tag = "2")] + WrongExpectedRevision(super::error_details::WrongExpectedRevision), + /// Failed because the client lacks sufficient permissions. + #[prost(message, tag = "3")] + AccessDenied(super::error_details::AccessDenied), + /// Failed because the target stream has been deleted. + #[prost(message, tag = "4")] + StreamDeleted(super::error_details::StreamDeleted), + } +} +/// AppendStreamOutput represents the output of appending records to a specific +/// stream. +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct AppendStreamResponse { + /// The result of the append operation. + #[prost(oneof = "append_stream_response::Result", tags = "1, 2")] + pub result: ::core::option::Option, +} +/// Nested message and enum types in `AppendStreamResponse`. +pub mod append_stream_response { + /// The result of the append operation. + #[derive(Clone, PartialEq, ::prost::Oneof)] + pub enum Result { + /// Success represents the successful outcome of an append operation. + #[prost(message, tag = "1")] + Success(super::AppendStreamSuccess), + /// Failure represents the details of a failed append operation. + #[prost(message, tag = "2")] + Failure(super::AppendStreamFailure), + } +} +/// MultiStreamAppendRequest represents a request to append records to multiple streams. +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct MultiStreamAppendRequest { + /// A list of AppendStreamInput messages, each representing a stream to which records should be appended. + #[prost(message, repeated, tag = "1")] + pub input: ::prost::alloc::vec::Vec, +} +/// Response from the MultiStreamAppend operation. +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct MultiStreamAppendResponse { + #[prost(oneof = "multi_stream_append_response::Result", tags = "1, 2")] + pub result: ::core::option::Option, +} +/// Nested message and enum types in `MultiStreamAppendResponse`. +pub mod multi_stream_append_response { + #[derive(Clone, PartialEq, ::prost::Message)] + pub struct Success { + #[prost(message, repeated, tag = "1")] + pub output: ::prost::alloc::vec::Vec, + } + #[derive(Clone, PartialEq, ::prost::Message)] + pub struct Failure { + #[prost(message, repeated, tag = "1")] + pub output: ::prost::alloc::vec::Vec, + } + #[derive(Clone, PartialEq, ::prost::Oneof)] + pub enum Result { + /// Success represents the successful outcome of a multi-stream append operation. + #[prost(message, tag = "1")] + Success(Success), + /// Failure represents the details of a failed multi-stream append operation. + #[prost(message, tag = "2")] + Failure(Failure), + } +} +/// ErrorDetails provides detailed information about specific error conditions. +#[derive(Clone, Copy, PartialEq, ::prost::Message)] +pub struct ErrorDetails {} +/// Nested message and enum types in `ErrorDetails`. +pub mod error_details { + /// When the user does not have sufficient permissions to perform the operation. + #[derive(Clone, PartialEq, ::prost::Message)] + pub struct AccessDenied { + /// The simplified reason for access denial. + #[prost(string, tag = "1")] + pub reason: ::prost::alloc::string::String, + } + /// When the stream has been deleted. + #[derive(Clone, Copy, PartialEq, ::prost::Message)] + pub struct StreamDeleted { + /// The time when the stream was deleted. + #[prost(message, optional, tag = "1")] + pub deleted_at: ::core::option::Option<::prost_types::Timestamp>, + /// If the stream was hard deleted, you cannot reuse the stream name, + /// it will raise an exception if you try to append to it again. + #[prost(bool, tag = "2")] + pub tombstoned: bool, + } + /// When the expected revision of the stream does not match the actual revision. + #[derive(Clone, Copy, PartialEq, ::prost::Message)] + pub struct WrongExpectedRevision { + /// The actual revision of the stream. + #[prost(int64, tag = "1")] + pub stream_revision: i64, + } + /// When the transaction exceeds the maximum size allowed + /// (its bigger than the configured chunk size). + #[derive(Clone, Copy, PartialEq, ::prost::Message)] + pub struct TransactionMaxSizeExceeded { + /// The maximum allowed size of the transaction. + #[prost(uint32, tag = "1")] + pub max_size: u32, + } +} +/// Represents a list of dynamically typed values. +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ListDynamicValue { + /// Repeated property of dynamically typed values. + #[prost(message, repeated, tag = "1")] + pub values: ::prost::alloc::vec::Vec, +} +/// Represents a dynamic value +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct DynamicValue { + #[prost(oneof = "dynamic_value::Kind", tags = "1, 2, 3, 4, 5, 6, 7, 8, 9, 10")] + pub kind: ::core::option::Option, +} +/// Nested message and enum types in `DynamicValue`. +pub mod dynamic_value { + #[derive(Clone, PartialEq, ::prost::Oneof)] + pub enum Kind { + /// Represents a null value. + #[prost(enumeration = "::prost_types::NullValue", tag = "1")] + NullValue(i32), + /// Represents a 32-bit signed integer value. + #[prost(sint32, tag = "2")] + Int32Value(i32), + /// Represents a 64-bit signed integer value. + #[prost(sint64, tag = "3")] + Int64Value(i64), + /// Represents a byte array value. + #[prost(bytes, tag = "4")] + BytesValue(::prost::bytes::Bytes), + /// Represents a 64-bit double-precision floating-point value. + #[prost(double, tag = "5")] + DoubleValue(f64), + /// Represents a 32-bit single-precision floating-point value + #[prost(float, tag = "6")] + FloatValue(f32), + /// Represents a string value. + #[prost(string, tag = "7")] + StringValue(::prost::alloc::string::String), + /// Represents a boolean value. + #[prost(bool, tag = "8")] + BooleanValue(bool), + /// Represents a timestamp value. + #[prost(message, tag = "9")] + TimestampValue(::prost_types::Timestamp), + /// Represents a duration value. + /// + /// // Represents a list of dynamic values. + /// ListDynamicValue list_value = 11; + /// // Represents a json struct + /// google.protobuf.Struct struct_value = 12; + #[prost(message, tag = "10")] + DurationValue(::prost_types::Duration), + } +} +/// Constants that match the expected state of a stream during an +/// append operation. It can be used to specify whether the stream should exist, +/// not exist, or can be in any state. +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] +#[repr(i32)] +pub enum ExpectedRevisionConstants { + /// The stream should exist and the expected revision should match the current + SingleEvent = 0, + /// It is not important whether the stream exists or not. + Any = -2, + /// The stream should not exist. If it does, the append will fail. + NoStream = -1, + /// The stream should exist + Exists = -4, +} +impl ExpectedRevisionConstants { + /// String value of the enum field names used in the ProtoBuf definition. + /// + /// The values are not transformed in any way and thus are considered stable + /// (if the ProtoBuf definition does not change) and safe for programmatic use. + pub fn as_str_name(&self) -> &'static str { + match self { + Self::SingleEvent => "EXPECTED_REVISION_CONSTANTS_SINGLE_EVENT", + Self::Any => "EXPECTED_REVISION_CONSTANTS_ANY", + Self::NoStream => "EXPECTED_REVISION_CONSTANTS_NO_STREAM", + Self::Exists => "EXPECTED_REVISION_CONSTANTS_EXISTS", + } + } + /// Creates an enum from field names used in the ProtoBuf definition. + pub fn from_str_name(value: &str) -> ::core::option::Option { + match value { + "EXPECTED_REVISION_CONSTANTS_SINGLE_EVENT" => Some(Self::SingleEvent), + "EXPECTED_REVISION_CONSTANTS_ANY" => Some(Self::Any), + "EXPECTED_REVISION_CONSTANTS_NO_STREAM" => Some(Self::NoStream), + "EXPECTED_REVISION_CONSTANTS_EXISTS" => Some(Self::Exists), + _ => None, + } + } +} +/// Generated client implementations. +pub mod streams_service_client { + #![allow( + unused_variables, + dead_code, + missing_docs, + clippy::wildcard_imports, + clippy::let_unit_value, + )] + use tonic::codegen::*; + use tonic::codegen::http::Uri; + #[derive(Debug, Clone)] + pub struct StreamsServiceClient { + inner: tonic::client::Grpc, + } + impl StreamsServiceClient { + /// Attempt to create a new client by connecting to a given endpoint. + pub async fn connect(dst: D) -> Result + where + D: TryInto, + D::Error: Into, + { + let conn = tonic::transport::Endpoint::new(dst)?.connect().await?; + Ok(Self::new(conn)) + } + } + impl StreamsServiceClient + where + T: tonic::client::GrpcService, + T::Error: Into, + T::ResponseBody: Body + std::marker::Send + 'static, + ::Error: Into + std::marker::Send, + { + pub fn new(inner: T) -> Self { + let inner = tonic::client::Grpc::new(inner); + Self { inner } + } + pub fn with_origin(inner: T, origin: Uri) -> Self { + let inner = tonic::client::Grpc::with_origin(inner, origin); + Self { inner } + } + pub fn with_interceptor( + inner: T, + interceptor: F, + ) -> StreamsServiceClient> + where + F: tonic::service::Interceptor, + T::ResponseBody: Default, + T: tonic::codegen::Service< + http::Request, + Response = http::Response< + >::ResponseBody, + >, + >, + , + >>::Error: Into + std::marker::Send + std::marker::Sync, + { + StreamsServiceClient::new(InterceptedService::new(inner, interceptor)) + } + /// Compress requests with the given encoding. + /// + /// This requires the server to support it otherwise it might respond with an + /// error. + #[must_use] + pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.send_compressed(encoding); + self + } + /// Enable decompressing responses. + #[must_use] + pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.accept_compressed(encoding); + self + } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_decoding_message_size(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_encoding_message_size(limit); + self + } + /// Executes an atomic operation to append records to multiple streams. + /// This transactional method ensures that all appends either succeed + /// completely, or are entirely rolled back, thereby maintaining strict data + /// consistency across all involved streams. + pub async fn multi_stream_append( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/kurrentdb.protocol.v2.StreamsService/MultiStreamAppend", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert( + GrpcMethod::new( + "kurrentdb.protocol.v2.StreamsService", + "MultiStreamAppend", + ), + ); + self.inner.unary(req, path, codec).await + } + /// Streaming version of MultiStreamAppend that allows clients to send multiple + /// append requests over a single connection. When the stream completes, all + /// records are appended transactionally (all succeed or fail together). + /// Provides improved efficiency for high-throughput scenarios while + /// maintaining the same transactional guarantees. + pub async fn multi_stream_append_session( + &mut self, + request: impl tonic::IntoStreamingRequest< + Message = super::AppendStreamRequest, + >, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/kurrentdb.protocol.v2.StreamsService/MultiStreamAppendSession", + ); + let mut req = request.into_streaming_request(); + req.extensions_mut() + .insert( + GrpcMethod::new( + "kurrentdb.protocol.v2.StreamsService", + "MultiStreamAppendSession", + ), + ); + self.inner.client_streaming(req, path, codec).await + } + } +} diff --git a/kurrentdb/src/lib.rs b/kurrentdb/src/lib.rs index 41d2545e..dbcf5407 100755 --- a/kurrentdb/src/lib.rs +++ b/kurrentdb/src/lib.rs @@ -61,6 +61,7 @@ mod commands; mod event_store; mod grpc; mod http; +mod new_commands; pub mod operations; mod options; mod private; diff --git a/kurrentdb/src/new_commands.rs b/kurrentdb/src/new_commands.rs new file mode 100644 index 00000000..9757ae75 --- /dev/null +++ b/kurrentdb/src/new_commands.rs @@ -0,0 +1,63 @@ +use crate::{ + AppendRequest, AppendToStreamOptions, MultiWriteResult, + commands::new_request, + event_store::{ + client::MAX_RECEIVE_MESSAGE_SIZE, + generated::new_streams::{ + MultiStreamAppendRequest, streams_service_client::StreamsServiceClient, + }, + }, + grpc::{GrpcClient, handle_error}, +}; + +/// Sends asynchronously the write command to the server. +pub async fn multi_stream_append( + connection: &GrpcClient, + options: &AppendToStreamOptions, + mut events: impl Iterator + Send + 'static, +) -> crate::Result { + let (min, max) = events.size_hint(); + let sized = max == Some(min); + + if min == 0 && sized { + return Err(crate::Error::IllegalOperation( + "iterator won't yield a value".to_string(), + )); + } + + let handle = connection.current_selected_node().await?; + let handle_id = handle.id(); + let mut client = StreamsServiceClient::with_interceptor(handle.client, handle.uri) + .max_decoding_message_size(MAX_RECEIVE_MESSAGE_SIZE); + + let res = if min == 1 && sized { + let req = events.next().expect("not be empty"); + let req = new_request( + connection.connection_settings(), + options, + MultiStreamAppendRequest { + input: vec![req.into()], + }, + ); + + client.multi_stream_append(req).await + } else { + // streaming + let payload = async_stream::stream! { + for req in events { + yield req.into() + } + }; + + let req = new_request(connection.connection_settings(), options, payload); + client.multi_stream_append_session(req).await + }; + + // handle_error(&connection.sender, handle_id, &e); + let resp = res + .map_err(crate::Error::from_grpc) + .inspect_err(|e| handle_error(&connection.sender, handle_id, e))? + .into_inner(); + + Ok(resp.into()) +} diff --git a/kurrentdb/src/types.rs b/kurrentdb/src/types.rs index 097359cb..37734b9f 100755 --- a/kurrentdb/src/types.rs +++ b/kurrentdb/src/types.rs @@ -189,6 +189,55 @@ pub struct WriteResult { pub position: Position, } +#[derive(Debug)] +pub struct AppendRequest { + pub stream: String, + pub events: Vec, + pub state: StreamState, +} + +/// Returned after writing to multiple streams. +#[derive(Debug)] +pub enum MultiWriteResult { + Success(Vec), + Failure(Vec), +} + +#[derive(Debug)] +pub struct MultiWriteSuccess { + /// Stream name. + pub stream: String, + + /// Next expected version of the stream. + pub next_expected_version: u64, + + /// `Position` of the write. + pub position: Position, +} + +#[derive(Debug)] +pub struct MultiWriteFailure { + pub stream: String, + pub error: MultiAppendWriteError, +} + +#[derive(Debug)] +pub enum MultiAppendWriteError { + AccessDenied { + reason: String, + }, + + StreamDeleted { + deleted_at: Option>, + tombstoned: bool, + }, + + WrongExpectedRevision { + current: CurrentRevision, + expected: StreamState, + }, +} + #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum StreamPosition { Start, @@ -1448,6 +1497,8 @@ pub enum Error { InitializationError(String), #[error("Illegal state error: {0}")] IllegalStateError(String), + #[error("Illegal operation error: {0}")] + IllegalOperation(String), #[error("Wrong expected version: expected '{expected}' but got '{current}'")] WrongExpectedVersion { expected: StreamState,