Skip to content

[Cosmos] Migrate get_document to pipelines architecture #357

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Sep 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions sdk/cosmos/examples/document_00.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use serde::{Deserialize, Serialize};
// DB.
use azure_core::prelude::*;
use azure_cosmos::prelude::*;
use azure_cosmos::responses::GetDocumentResponse;
use std::borrow::Cow;
use std::error::Error;

Expand Down Expand Up @@ -154,8 +153,7 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
let get_document_response = collection_client
.clone()
.into_document_client(doc.id.clone(), &doc.id)?
.get_document()
.execute::<MySampleStruct>()
.get_document::<MySampleStruct>(Context::new(), GetDocumentOptions::new())
.await?;
println!("get_document_response == {:#?}", get_document_response);

Expand Down
25 changes: 10 additions & 15 deletions sdk/cosmos/examples/document_entries_00.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use azure_core::prelude::*;
use azure_cosmos::prelude::*;
use azure_cosmos::responses::GetDocumentResponse;
use futures::stream::StreamExt;
use serde::{Deserialize, Serialize};
use std::borrow::Cow;
Expand Down Expand Up @@ -129,15 +128,13 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
let response = client
.clone()
.into_document_client(id.clone(), partition_key)?
.get_document()
.consistency_level(session_token)
.execute::<MySampleStruct>()
.get_document::<MySampleStruct>(
Context::new(),
GetDocumentOptions::new().consistency_level(session_token),
)
.await?;

assert!(match response {
GetDocumentResponse::Found(_) => true,
_ => false,
});
assert!(matches!(response, GetDocumentResponse::Found(_)));
println!("response == {:#?}", response);

let mut doc = match response {
Expand Down Expand Up @@ -169,15 +166,13 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
let response = client
.clone()
.into_document_client(id.clone(), &id)?
.get_document()
.consistency_level(&response)
.execute::<MySampleStruct>()
.get_document::<MySampleStruct>(
Context::new(),
GetDocumentOptions::new().consistency_level(&response),
)
.await?;

assert!(match response {
GetDocumentResponse::NotFound(_) => true,
_ => false,
});
assert!(matches!(response, GetDocumentResponse::NotFound(_)));
println!("response == {:#?}", response);

for i in 0u64..5 {
Expand Down
26 changes: 13 additions & 13 deletions sdk/cosmos/examples/document_entries_01.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,23 +62,23 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
create_document_response
);

let document_client = client
let get_document_response = client
.clone()
.into_document_client(doc.id.clone(), &doc.id)?;

let get_document_response = document_client
.get_document()
.consistency_level(&create_document_response)
.execute::<serde_json::Value>()
.into_document_client(doc.id.clone(), &doc.id)?
.get_document::<serde_json::Value>(
Context::new(),
GetDocumentOptions::new().consistency_level(&create_document_response),
)
.await?;
println!("get_document_response == {:#?}", get_document_response);

let document_client = client.clone().into_document_client("ciccia", &doc.id)?;

let get_document_response = document_client
.get_document()
.consistency_level(&get_document_response)
.execute::<serde_json::Value>()
let get_document_response = client
.clone()
.into_document_client("ciccia", &doc.id)?
.get_document::<serde_json::Value>(
Context::new(),
GetDocumentOptions::new().consistency_level(&create_document_response),
)
.await?;
println!(
"get_document_response == {:#?}\n\n\n",
Expand Down
40 changes: 37 additions & 3 deletions sdk/cosmos/src/clients/document_client.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use super::{AttachmentClient, CollectionClient, CosmosClient, DatabaseClient};
use crate::prelude::{GetDocumentOptions, GetDocumentResponse};
use crate::resources::ResourceType;
use crate::{requests, ReadonlyString};
use azure_core::HttpClient;
use azure_core::{Context, HttpClient, PipelineContext, Request};
use serde::de::DeserializeOwned;
use serde::Serialize;

/// A client for Cosmos document resources.
Expand Down Expand Up @@ -61,8 +63,28 @@ impl DocumentClient {
}

/// Get a document
pub fn get_document(&self) -> requests::GetDocumentBuilder<'_, '_> {
requests::GetDocumentBuilder::new(self)
pub async fn get_document<T>(
&self,
ctx: Context,
options: GetDocumentOptions<'_>,
) -> Result<GetDocumentResponse<T>, crate::Error>
where
T: DeserializeOwned,
{
let mut request = self.prepare_request_pipeline_with_document_name(http::Method::GET);
let mut pipeline_context = PipelineContext::new(ctx, ResourceType::Databases.into());

options.decorate_request(&mut request)?;

let response = self
.cosmos_client()
.pipeline()
.send(&mut pipeline_context, &mut request)
.await?
.validate(http::StatusCode::OK)
.await?;

GetDocumentResponse::try_from(response).await
}

/// Delete a document
Expand Down Expand Up @@ -99,6 +121,18 @@ impl DocumentClient {
)
}

fn prepare_request_pipeline_with_document_name(&self, method: http::Method) -> Request {
self.cosmos_client().prepare_request_pipeline(
&format!(
"dbs/{}/colls/{}/docs/{}",
self.database_client().database_name(),
self.collection_client().collection_name(),
self.document_name()
),
method,
)
}

pub(crate) fn http_client(&self) -> &dyn HttpClient {
self.cosmos_client().http_client()
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,38 +1,74 @@
use crate::headers::from_headers::*;
use crate::prelude::*;
use crate::resources::Document;
use crate::ResourceQuota;
use azure_core::headers::{etag_from_headers, session_token_from_headers};
use azure_core::prelude::*;
use azure_core::SessionToken;
use azure_core::{collect_pinned_stream, Request as HttpRequest, Response as HttpResponse};
use chrono::{DateTime, Utc};
use http::response::Response;
use http::StatusCode;
use http::{HeaderMap, StatusCode};
use serde::de::DeserializeOwned;

#[derive(Debug, Clone)]
pub struct GetDocumentOptions<'a> {
if_match_condition: Option<IfMatchCondition<'a>>,
if_modified_since: Option<IfModifiedSince<'a>>,
consistency_level: Option<ConsistencyLevel>,
}

impl<'a> GetDocumentOptions<'a> {
pub fn new() -> Self {
Self {
if_match_condition: None,
if_modified_since: None,
consistency_level: None,
}
}

setters! {
consistency_level: ConsistencyLevel => Some(consistency_level),
if_match_condition: IfMatchCondition<'a> => Some(if_match_condition),
if_modified_since: &'a DateTime<Utc> => Some(IfModifiedSince::new(if_modified_since)),
}

pub(crate) fn decorate_request(&self, request: &mut HttpRequest) -> Result<(), crate::Error> {
// add trait headers
azure_core::headers::add_optional_header2(&self.if_match_condition, request)?;
azure_core::headers::add_optional_header2(&self.if_modified_since, request)?;
azure_core::headers::add_optional_header2(&self.consistency_level, request)?;

request.set_body(bytes::Bytes::from_static(EMPTY_BODY).into());

Ok(())
}
}

#[derive(Debug, Clone)]
pub enum GetDocumentResponse<T> {
Found(Box<FoundDocumentResponse<T>>),
NotFound(Box<NotFoundDocumentResponse>),
}

impl<T> std::convert::TryFrom<Response<bytes::Bytes>> for GetDocumentResponse<T>
impl<T> GetDocumentResponse<T>
where
T: DeserializeOwned,
{
type Error = crate::Error;

fn try_from(response: Response<bytes::Bytes>) -> Result<Self, Self::Error> {
let status_code = response.status();
pub async fn try_from(response: HttpResponse) -> Result<Self, crate::Error> {
let (status_code, headers, pinned_stream) = response.deconstruct();

let has_been_found =
status_code == StatusCode::OK || status_code == StatusCode::NOT_MODIFIED;

let body = collect_pinned_stream(pinned_stream).await?;

if has_been_found {
Ok(GetDocumentResponse::Found(Box::new(
FoundDocumentResponse::try_from(response)?,
FoundDocumentResponse::try_from(&headers, body).await?,
)))
} else {
Ok(GetDocumentResponse::NotFound(Box::new(
NotFoundDocumentResponse::try_from(response)?,
NotFoundDocumentResponse::try_from(&headers).await?,
)))
}
}
Expand Down Expand Up @@ -65,18 +101,13 @@ pub struct FoundDocumentResponse<T> {
pub date: DateTime<Utc>,
}

impl<T> std::convert::TryFrom<Response<bytes::Bytes>> for FoundDocumentResponse<T>
impl<T> FoundDocumentResponse<T>
where
T: DeserializeOwned,
{
type Error = crate::Error;

fn try_from(response: Response<bytes::Bytes>) -> Result<Self, Self::Error> {
let headers = response.headers();
let body: &[u8] = response.body();

async fn try_from(headers: &HeaderMap, body: bytes::Bytes) -> Result<Self, crate::Error> {
Ok(Self {
document: Document::try_from((headers, body))?,
document: serde_json::from_slice(&body)?,

content_location: content_location_from_headers(headers)?.to_owned(),
last_state_change: last_state_change_from_headers(headers)?,
Expand Down Expand Up @@ -126,12 +157,8 @@ pub struct NotFoundDocumentResponse {
pub date: DateTime<Utc>,
}

impl std::convert::TryFrom<Response<bytes::Bytes>> for NotFoundDocumentResponse {
type Error = crate::Error;

fn try_from(response: Response<bytes::Bytes>) -> Result<Self, Self::Error> {
let headers = response.headers();

impl NotFoundDocumentResponse {
async fn try_from(headers: &HeaderMap) -> Result<Self, crate::Error> {
Ok(Self {
content_location: content_location_from_headers(headers)?.to_owned(),
last_state_change: last_state_change_from_headers(headers)?,
Expand Down
2 changes: 2 additions & 0 deletions sdk/cosmos/src/operations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ mod create_permission;
mod create_user;
mod delete_permission;
mod get_database;
mod get_document;
mod get_permission;
mod get_user;
mod list_databases;
Expand All @@ -22,6 +23,7 @@ pub use create_permission::*;
pub use create_user::*;
pub use delete_permission::*;
pub use get_database::*;
pub use get_document::*;
pub use get_permission::*;
pub use get_user::*;
pub use list_databases::*;
Expand Down
75 changes: 0 additions & 75 deletions sdk/cosmos/src/requests/get_document_builder.rs

This file was deleted.

2 changes: 0 additions & 2 deletions sdk/cosmos/src/requests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ mod delete_user_defined_function_builder;
mod execute_stored_procedure_builder;
mod get_attachment_builder;
mod get_collection_builder;
mod get_document_builder;
mod get_partition_key_ranges_builder;
mod list_attachments_builder;
mod list_collections_builder;
Expand Down Expand Up @@ -55,7 +54,6 @@ pub use delete_user_defined_function_builder::DeleteUserDefinedFunctionBuilder;
pub use execute_stored_procedure_builder::ExecuteStoredProcedureBuilder;
pub use get_attachment_builder::GetAttachmentBuilder;
pub use get_collection_builder::GetCollectionBuilder;
pub use get_document_builder::GetDocumentBuilder;
pub use get_partition_key_ranges_builder::GetPartitionKeyRangesBuilder;
pub use list_attachments_builder::ListAttachmentsBuilder;
pub use list_collections_builder::ListCollectionsBuilder;
Expand Down
Loading