Skip to content

Last into_future #677

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Mar 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 6 additions & 17 deletions sdk/core/src/mock/mock_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,6 @@ use std::str::FromStr;

const FIELDS: &[&str] = &["uri", "method", "headers", "body"];

impl Request {
fn new(uri: Uri, method: Method, headers: HeaderMap, body: Body) -> Self {
Self {
uri,
method,
headers,
body,
}
}
}

Comment on lines -11 to -21
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, it took me a sec to realize this had just been moved to core/src/request.rs. That seems good.

impl<'de> Deserialize<'de> for Request {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
Expand Down Expand Up @@ -98,12 +87,12 @@ impl<'de> Visitor<'de> for RequestVisitor {
);
}

Ok(Self::Value::new(
Uri::from_str(uri.1).expect("expected a valid uri"),
Method::from_str(method.1).expect("expected a valid HTTP method"),
hm,
bytes::Bytes::from(body).into(),
))
Ok(Self::Value {
uri: Uri::from_str(uri.1).expect("expected a valid uri"),
method: Method::from_str(method.1).expect("expected a valid HTTP method"),
headers: hm,
body: bytes::Bytes::from(body).into(),
})
}
}

Expand Down
10 changes: 10 additions & 0 deletions sdk/core/src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,16 @@ pub struct Request {
}

impl Request {
/// Create a new request with an empty body and no headers
pub fn new(uri: Uri, method: Method) -> Self {
Self {
uri,
method,
headers: HeaderMap::default(),
body: Body::Bytes(bytes::Bytes::new()),
}
}

pub fn uri(&self) -> &Uri {
&self.uri
}
Expand Down
8 changes: 4 additions & 4 deletions sdk/data_cosmos/examples/attachments_00.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
println!("creating");
let attachment_client = document_client.clone().into_attachment_client("myref06");
let resp = attachment_client
.create_reference(
.create_attachment(
"https://cdn.pixabay.com/photo/2020/01/11/09/30/abstract-background-4756987__340.jpg",
"image/jpeg",
)
Expand All @@ -100,12 +100,12 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
println!("replacing");
let attachment_client = document_client.clone().into_attachment_client("myref06");
let resp = attachment_client
.replace_reference()
.consistency_level(session_token)
.execute(
.replace_attachment(
"https://Adn.pixabay.com/photo/2020/01/11/09/30/abstract-background-4756987__340.jpg",
"image/jpeg",
)
.consistency_level(session_token)
.into_future()
.await?;
println!("replace reference == {:#?}", resp);

Expand Down
2 changes: 1 addition & 1 deletion sdk/data_cosmos/examples/key_ranges_00.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
let client = client.into_database_client(database);
let client = client.into_collection_client(collection);

let resp = client.get_partition_key_ranges().execute().await?;
let resp = client.get_partition_key_ranges().into_future().await?;
println!("resp == {:#?}", resp);

Ok(())
Expand Down
2 changes: 1 addition & 1 deletion sdk/data_cosmos/examples/stored_proc_00.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
.into_stored_procedure_client("test_proc")
.execute_stored_procedure()
.parameters(["Robert"])
.execute::<serde_json::Value>()
.into_future::<serde_json::Value>()
.await?;

println!("Response object:\n{:#?}", ret);
Expand Down
2 changes: 1 addition & 1 deletion sdk/data_cosmos/examples/stored_proc_01.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
let execute_stored_procedure_response = stored_procedure_client
.execute_stored_procedure()
.parameters(["Robert"])
.execute::<serde_json::Value>()
.into_future::<serde_json::Value>()
.await?;

println!(
Expand Down
53 changes: 20 additions & 33 deletions sdk/data_cosmos/src/clients/attachment_client.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
use crate::operations::*;
use crate::requests;
use crate::resources::ResourceType;
use crate::ReadonlyString;
use azure_core::HttpClient;
use azure_core::Pipeline;
use azure_core::Request;
use azure_core::{Pipeline, Request};
use bytes::Bytes;

use super::*;
Expand Down Expand Up @@ -73,27 +69,35 @@ impl AttachmentClient {
CreateOrReplaceSlugAttachmentBuilder::new(self.clone(), false, body)
}

/// Initiate a request to create ant.
pub fn create_reference<M, C>(
/// Initiate a request to create a reference attachment.
pub fn create_attachment<M, C>(
&self,
media: M,
content_type: C,
) -> CreateReferenceAttachmentBuilder
) -> CreateOrReplaceAttachmentBuilder
where
M: Into<String>,
C: Into<String>,
{
CreateReferenceAttachmentBuilder::new(self.clone(), media.into(), content_type.into())
CreateOrReplaceAttachmentBuilder::new(self.clone(), true, media.into(), content_type.into())
}

/// Initiate a request to replace an attachment.
pub fn replace_reference(&self) -> requests::ReplaceReferenceAttachmentBuilder<'_, '_> {
requests::ReplaceReferenceAttachmentBuilder::new(self)
}

/// Get a raw [`HttpClient`].
pub(crate) fn http_client(&self) -> &dyn HttpClient {
self.cosmos_client().http_client()
pub fn replace_attachment<M, C>(
&self,
media: M,
content_type: C,
) -> CreateOrReplaceAttachmentBuilder
where
M: Into<String>,
C: Into<String>,
{
CreateOrReplaceAttachmentBuilder::new(
self.clone(),
false,
media.into(),
content_type.into(),
)
}

pub(crate) fn prepare_pipeline(&self, method: http::Method) -> Request {
Expand All @@ -108,23 +112,6 @@ impl AttachmentClient {
)
}

pub(crate) fn prepare_request_with_attachment_name(
&self,
method: http::Method,
) -> http::request::Builder {
self.cosmos_client().prepare_request(
&format!(
"dbs/{}/colls/{}/docs/{}/attachments/{}",
self.database_client().database_name(),
self.collection_client().collection_name(),
self.document_client().document_name(),
self.attachment_name()
),
method,
ResourceType::Attachments,
)
}

pub(crate) fn prepare_pipeline_with_attachment_name(&self, method: http::Method) -> Request {
self.cosmos_client().prepare_request_pipeline(
&format!(
Expand Down
11 changes: 3 additions & 8 deletions sdk/data_cosmos/src/clients/collection_client.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
use super::{DatabaseClient, UserDefinedFunctionClient};
use crate::clients::*;
use crate::operations::*;
use crate::requests;
use crate::resources::collection::PartitionKey;
use crate::resources::document::Query;
use crate::CosmosEntity;
use crate::ReadonlyString;
use azure_core::{HttpClient, Pipeline, Request};
use azure_core::{Pipeline, Request};
use serde::Serialize;

/// A client for Cosmos collection resources.
Expand Down Expand Up @@ -94,8 +93,8 @@ impl CollectionClient {
}

/// list the partition key ranges in a collection
pub fn get_partition_key_ranges(&self) -> requests::GetPartitionKeyRangesBuilder<'_, '_> {
requests::GetPartitionKeyRangesBuilder::new(self)
pub fn get_partition_key_ranges(&self) -> GetPartitionKeyRangesBuilder {
GetPartitionKeyRangesBuilder::new(self.clone())
}

/// convert into a [`DocumentClient`]
Expand Down Expand Up @@ -141,10 +140,6 @@ impl CollectionClient {
.prepare_request_pipeline(path, http_method)
}

pub(crate) fn http_client(&self) -> &dyn HttpClient {
self.cosmos_client().http_client()
}

pub(crate) fn pipeline(&self) -> &Pipeline {
self.cosmos_client().pipeline()
}
Expand Down
92 changes: 11 additions & 81 deletions sdk/data_cosmos/src/clients/cosmos_client.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,9 @@
use super::DatabaseClient;
use crate::authorization_policy::{generate_authorization, generate_resource_link};
use crate::headers::*;
use crate::operations::*;
use crate::resources::permission::AuthorizationToken;
use crate::resources::ResourceType;
use crate::{ReadonlyString, TimeNonce};
use crate::ReadonlyString;

use azure_core::{ClientOptions, HttpClient, Pipeline, Request};
use http::request::Builder as RequestBuilder;
use http::{header, HeaderValue};
use azure_core::{ClientOptions, Pipeline, Request};

use std::fmt::Debug;
use std::sync::Arc;
Expand All @@ -18,13 +13,10 @@ use std::sync::Arc;
pub const EMULATOR_ACCOUNT_KEY: &str =
"C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw==";

const AZURE_VERSION: &str = "2018-12-31";

Comment on lines -21 to -22
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where did this go? Has this been replaced by something else?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is done in the telemetry_policy now (not a part of this PR). This can be removed since now everything goes through the pipeline and the telemetry policy will be used.

/// A plain Cosmos client.
#[derive(Debug, Clone)]
pub struct CosmosClient {
pipeline: Pipeline,
auth_token: AuthorizationToken,
cloud_location: CloudLocation,
}

Expand Down Expand Up @@ -75,14 +67,9 @@ impl CosmosClient {
/// Create a new `CosmosClient` which connects to the account's instance in the public Azure cloud.
pub fn new(account: String, auth_token: AuthorizationToken, options: CosmosOptions) -> Self {
let cloud_location = CloudLocation::Public(account);
// TODO: The AuthorizationToken will only be stored in the pipeline via its policy.
// Right now the AuthorizationToken is a field of the Client.
// This will be corrected once every Cosmos function has been be migrated to the pipeline.
// Once that happens, we will remove the clone below.
let pipeline = new_pipeline_from_options(options, auth_token.clone());
let pipeline = new_pipeline_from_options(options, auth_token);
Self {
pipeline,
auth_token,
cloud_location,
}
}
Expand All @@ -108,10 +95,9 @@ impl CosmosClient {
options: CosmosOptions,
) -> Self {
let cloud_location = CloudLocation::China(account);
let pipeline = new_pipeline_from_options(options, auth_token.clone());
let pipeline = new_pipeline_from_options(options, auth_token);
Self {
pipeline,
auth_token,
cloud_location,
}
}
Expand All @@ -124,10 +110,9 @@ impl CosmosClient {
options: CosmosOptions,
) -> Self {
let cloud_location = CloudLocation::Custom { account, uri };
let pipeline = new_pipeline_from_options(options, auth_token.clone());
let pipeline = new_pipeline_from_options(options, auth_token);
Self {
pipeline,
auth_token,
cloud_location,
}
}
Expand All @@ -140,19 +125,15 @@ impl CosmosClient {
account: String::from("Custom"),
uri,
};
let pipeline = new_pipeline_from_options(options, auth_token.clone());
let pipeline = new_pipeline_from_options(options, auth_token);
Self {
pipeline,
auth_token,
cloud_location,
}
}

/// Set the auth token used
pub fn auth_token(&mut self, auth_token: AuthorizationToken) {
// TODO: To remove once everything uses the AutorizationPolicy
self.auth_token = auth_token.clone();

// we replace the AuthorizationPolicy. This is
// the last-1 policy by construction.
let auth_policy: Arc<dyn azure_core::Policy> =
Expand All @@ -177,74 +158,23 @@ impl CosmosClient {
DatabaseClient::new(self, database_name)
}

/// Prepares an `http::RequestBuilder`.
///
/// TODO: Remove once all operations have been moved to pipeline architecture. This is used by
/// legacy operations that have not moved to the use of the pipeline architecture. Once
/// that is complete, this will be superceded by `prepare_request_pipeline`.
pub(crate) fn prepare_request(
&self,
uri_path: &str,
http_method: http::Method,
resource_type: ResourceType,
) -> RequestBuilder {
let time = TimeNonce::default();

let auth = {
let resource_link = generate_resource_link(uri_path);
trace!(
"resource_link generated by prepare_request == {}",
resource_link
);
generate_authorization(
&self.auth_token,
&http_method,
&resource_type,
resource_link,
time,
)
};
trace!("prepare_request::auth == {:?}", auth);
let uri = format!("{}/{}", self.cloud_location.url(), uri_path);
debug!("building request. uri: {}", uri);

RequestBuilder::new()
.method(http_method)
.uri(uri)
.header(HEADER_DATE, time.to_string())
.header(HEADER_VERSION, HeaderValue::from_static(AZURE_VERSION))
.header(header::AUTHORIZATION, auth)
}

/// Prepares' an `azure_core::Request`. This function will
/// add the cloud location to the URI suffix and generate
/// a Request with the specified HTTP Method.
/// It will also set the body to an empty Bytes instance.
/// *Note*: This call does not handle authorization as
/// it will be done by the `AuthorizationPolicy`.
/// Prepares' an `azure_core::Request`.
///
/// Note: Eventually this method will replace `prepare_request` fully.
/// This function will add the cloud location to the URI suffix and generate
/// a Request with the specified HTTP Method. It will also set the body
/// to an empty `Bytes` instance.
pub(crate) fn prepare_request_pipeline(
&self,
uri_path: &str,
http_method: http::Method,
) -> Request {
let uri = format!("{}/{}", self.cloud_location.url(), uri_path);
RequestBuilder::new()
.method(http_method)
.uri(uri)
.body(bytes::Bytes::new())
.unwrap()
.into()
Request::new(uri.parse().unwrap(), http_method)
}

pub(crate) fn pipeline(&self) -> &Pipeline {
&self.pipeline
}

pub(crate) fn http_client(&self) -> &dyn HttpClient {
self.pipeline.http_client()
}
}

/// The cloud with which you want to interact.
Expand Down
Loading