Skip to content

Commit 58238de

Browse files
authored
Convert get database operation to pipeline architecture (#286)
* Small cleanup * Convert get database operation to pipeline architecture * Replace NOTE with TODO
1 parent 87c84b9 commit 58238de

18 files changed

+144
-166
lines changed

sdk/core/src/client_options.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,5 @@ pub struct ClientOptions {
77
// TODO: Expose retry options and transport overrides.
88
pub per_call_policies: Vec<Arc<dyn Policy>>,
99
pub per_retry_policies: Vec<Arc<dyn Policy>>,
10-
1110
pub telemetry: TelemetryOptions,
1211
}

sdk/core/src/response.rs

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,15 +37,15 @@ impl ResponseBuilder {
3737
pub struct Response {
3838
status: StatusCode,
3939
headers: HeaderMap,
40-
response: PinnedStream,
40+
body: PinnedStream,
4141
}
4242

4343
impl Response {
44-
fn new(status: StatusCode, headers: HeaderMap, response: PinnedStream) -> Self {
44+
fn new(status: StatusCode, headers: HeaderMap, body: PinnedStream) -> Self {
4545
Self {
4646
status,
4747
headers,
48-
response,
48+
body,
4949
}
5050
}
5151

@@ -58,7 +58,23 @@ impl Response {
5858
}
5959

6060
pub fn deconstruct(self) -> (StatusCode, HeaderMap, PinnedStream) {
61-
(self.status, self.headers, self.response)
61+
(self.status, self.headers, self.body)
62+
}
63+
64+
pub async fn validate(self, expected_status: StatusCode) -> Result<Self, crate::HttpError> {
65+
let status = self.status();
66+
if expected_status != status {
67+
let body = collect_pinned_stream(self.body)
68+
.await
69+
.unwrap_or_else(|_| Bytes::from_static("<INVALID BODY>".as_bytes()));
70+
Err(crate::HttpError::new_unexpected_status_code(
71+
expected_status,
72+
status,
73+
std::str::from_utf8(&body as &[u8]).unwrap_or("<NON-UTF8 BODY>"),
74+
))
75+
} else {
76+
Ok(self)
77+
}
6278
}
6379
}
6480

sdk/cosmos/examples/collection.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use azure_core::prelude::*;
12
use azure_cosmos::prelude::*;
23
use std::error::Error;
34

@@ -43,8 +44,7 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
4344
let db = client
4445
.clone()
4546
.into_database_client(db.id.clone())
46-
.get_database()
47-
.execute()
47+
.get_database(Context::new(), GetDatabaseOptions::default())
4848
.await?;
4949
println!("db {} found == {:?}", &db.database.id, &db);
5050
}

sdk/cosmos/examples/create_delete_database.rs

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ use azure_cosmos::prelude::*;
44
use futures::stream::StreamExt;
55

66
use std::error::Error;
7-
use std::sync::Arc;
87

98
#[tokio::main]
109
async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
@@ -32,11 +31,7 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
3231
// authorization token at later time if you need, for example, to escalate the privileges for a
3332
// single operation.
3433
let http_client = azure_core::new_http_client();
35-
let client = CosmosClient::new(
36-
http_client.clone(),
37-
account.clone(),
38-
authorization_token.clone(),
39-
);
34+
let client = CosmosClient::new(http_client, account, authorization_token);
4035

4136
// The Cosmos' client exposes a lot of methods. This one lists the databases in the specified
4237
// account. Database do not implement Display but deref to &str so you can pass it to methods
@@ -45,13 +40,7 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
4540
let list_databases_response = client.list_databases().execute().await?;
4641
println!("list_databases_response = {:#?}", list_databases_response);
4742

48-
let cosmos_client = CosmosClient::with_pipeline(
49-
http_client,
50-
account,
51-
authorization_token,
52-
CosmosOptions::with_client(Arc::new(reqwest::Client::new())),
53-
);
54-
let db = cosmos_client
43+
let db = client
5544
.create_database(
5645
azure_core::Context::new(),
5746
&database_name,

sdk/cosmos/examples/document_00.rs

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ use azure_cosmos::prelude::*;
66
use azure_cosmos::responses::GetDocumentResponse;
77
use std::borrow::Cow;
88
use std::error::Error;
9-
use std::sync::Arc;
109

1110
#[derive(Clone, Serialize, Deserialize, Debug)]
1211
struct MySampleStruct<'a> {
@@ -67,17 +66,11 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
6766
.into_iter()
6867
.find(|db| db.id == DATABASE);
6968

70-
let database_client = CosmosClient::with_pipeline(
71-
http_client,
72-
account,
73-
authorization_token,
74-
CosmosOptions::with_client(Arc::new(reqwest::Client::new())),
75-
);
7669
// If the requested database is not found we create it.
7770
let database = match db {
7871
Some(db) => db,
7972
None => {
80-
database_client
73+
client
8174
.create_database(Context::new(), DATABASE, CreateDatabaseOptions::new())
8275
.await?
8376
.database

sdk/cosmos/examples/get_database.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use azure_core::prelude::*;
12
use azure_cosmos::prelude::*;
23
use std::error::Error;
34

@@ -20,7 +21,9 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
2021

2122
let database_client = client.into_database_client(database_name.clone());
2223

23-
let response = database_client.get_database().execute().await?;
24+
let response = database_client
25+
.get_database(Context::new(), GetDatabaseOptions::new())
26+
.await?;
2427
println!("response == {:?}", response);
2528

2629
Ok(())

sdk/cosmos/examples/permission_00.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use azure_core::prelude::*;
12
use azure_cosmos::prelude::*;
23
use std::error::Error;
34

@@ -36,7 +37,9 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
3637
.into_collection_client(collection_name2);
3738
let user_client = database_client.clone().into_user_client(user_name);
3839

39-
let get_database_response = database_client.get_database().execute().await?;
40+
let get_database_response = database_client
41+
.get_database(Context::new(), GetDatabaseOptions::new())
42+
.await?;
4043
println!("get_database_response == {:#?}", get_database_response);
4144

4245
let get_collection_response = collection_client.get_collection().execute().await?;

sdk/cosmos/src/clients/cosmos_client.rs

Lines changed: 12 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ pub struct CosmosClient {
3232
auth_token: AuthorizationToken,
3333
cloud_location: CloudLocation,
3434
}
35+
3536
/// Options for specifying how a Cosmos client will behave
3637
pub struct CosmosOptions {
3738
options: ClientOptions,
@@ -142,22 +143,6 @@ impl CosmosClient {
142143
}
143144
}
144145

145-
/// Construct a pipeline with explicit options
146-
pub fn with_pipeline(
147-
http_client: Arc<dyn HttpClient>,
148-
account: String, // TODO: this will eventually be a URL
149-
auth_token: AuthorizationToken,
150-
options: CosmosOptions,
151-
) -> Self {
152-
let pipeline = new_pipeline_from_options(options);
153-
Self {
154-
http_client,
155-
pipeline,
156-
auth_token,
157-
cloud_location: CloudLocation::Public(account),
158-
}
159-
}
160-
161146
/// Set the auth token used
162147
pub fn auth_token(&mut self, auth_token: AuthorizationToken) {
163148
self.auth_token = auth_token;
@@ -177,7 +162,9 @@ impl CosmosClient {
177162
.pipeline()
178163
.send(&mut ctx, &mut request)
179164
.await
180-
.map_err(crate::Error::PolicyError)?;
165+
.map_err(crate::Error::PolicyError)?
166+
.validate(http::StatusCode::CREATED)
167+
.await?;
181168

182169
Ok(CreateDatabaseResponse::try_from(response).await?)
183170
}
@@ -196,6 +183,11 @@ impl CosmosClient {
196183
DatabaseClient::new(self, database_name)
197184
}
198185

186+
/// Prepares an `http::RequestBuilder`.
187+
///
188+
/// TODO: Remove once all operations have been moved to pipeline architecture. This is used by
189+
/// legacy operations that have not moved to the use of the pipeline architecture. Once
190+
/// that is complete, this will be superceded by `prepare_request2`.
199191
pub(crate) fn prepare_request(
200192
&self,
201193
uri_path: &str,
@@ -217,7 +209,9 @@ impl CosmosClient {
217209
self.prepare_request_with_signature(uri_path, http_method, &time, &auth)
218210
}
219211

220-
// Eventually this method will replace `prepare_request` fully
212+
/// Prepares' an `azure_core::Request`.
213+
///
214+
/// Note: Eventually this method will replace `prepare_request` fully
221215
pub(crate) fn prepare_request2(
222216
&self,
223217
uri_path: &str,

sdk/cosmos/src/clients/database_client.rs

Lines changed: 28 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
use super::*;
22
use crate::operations::*;
3-
use crate::requests;
43
use crate::resources::ResourceType;
5-
use crate::ReadonlyString;
4+
use crate::{requests, ReadonlyString};
5+
66
use azure_core::pipeline::Pipeline;
7-
use azure_core::{Context, HttpClient, Request};
7+
use azure_core::{Context, HttpClient};
88

99
/// A client for Cosmos database resources.
1010
#[derive(Debug, Clone)]
@@ -35,8 +35,26 @@ impl DatabaseClient {
3535
}
3636

3737
/// Get the database
38-
pub fn get_database(&self) -> requests::GetDatabaseBuilder<'_, '_> {
39-
requests::GetDatabaseBuilder::new(self)
38+
pub async fn get_database(
39+
&self,
40+
mut ctx: Context,
41+
options: GetDatabaseOptions,
42+
) -> Result<GetDatabaseResponse, crate::Error> {
43+
let mut request = self
44+
.prepare_request_with_database_name(http::Method::GET)
45+
.body(bytes::Bytes::new())
46+
.unwrap()
47+
.into();
48+
options.decorate_request(&mut request)?;
49+
let response = self
50+
.pipeline()
51+
.send(&mut ctx, &mut request)
52+
.await
53+
.map_err(crate::Error::PolicyError)?
54+
.validate(http::StatusCode::OK)
55+
.await?;
56+
57+
Ok(GetDatabaseResponse::try_from(response).await?)
4058
}
4159

4260
/// List collections in the database
@@ -52,24 +70,23 @@ impl DatabaseClient {
5270
/// Create a collection
5371
pub async fn create_collection<S: AsRef<str>>(
5472
&self,
55-
ctx: Context,
73+
mut ctx: Context,
5674
collection_name: S,
5775
options: CreateCollectionOptions,
5876
) -> Result<CreateCollectionResponse, crate::Error> {
59-
let request = self.cosmos_client().prepare_request(
77+
let mut request = self.cosmos_client().prepare_request2(
6078
&format!("dbs/{}/colls", self.database_name()),
6179
http::Method::POST,
6280
ResourceType::Collections,
6381
);
64-
let mut request: Request = request.body(bytes::Bytes::new()).unwrap().into();
65-
66-
let mut ctx = ctx.clone();
6782
options.decorate_request(&mut request, collection_name.as_ref())?;
6883
let response = self
6984
.pipeline()
7085
.send(&mut ctx, &mut request)
7186
.await
72-
.map_err(crate::Error::PolicyError)?;
87+
.map_err(crate::Error::PolicyError)?
88+
.validate(http::StatusCode::CREATED)
89+
.await?;
7390

7491
Ok(CreateCollectionResponse::try_from(response).await?)
7592
}

sdk/cosmos/src/consistency_level.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,6 @@ macro_rules! implement_from {
6363
}
6464

6565
implement_from!(CreateSlugAttachmentResponse);
66-
implement_from!(GetDatabaseResponse);
6766
implement_from!(GetCollectionResponse);
6867
implement_from!(CreateUserResponse);
6968
implement_from!(DeleteAttachmentResponse);

sdk/cosmos/src/operations/create_collection.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,7 @@ impl CreateCollectionOptions {
2828
indexing_policy: IndexingPolicy => Some(indexing_policy),
2929
offer: Offer => Some(offer),
3030
}
31-
}
3231

33-
impl CreateCollectionOptions {
3432
pub(crate) fn decorate_request(
3533
&self,
3634
request: &mut HttpRequest,
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
use crate::headers::from_headers::*;
2+
use crate::prelude::*;
3+
use crate::ResourceQuota;
4+
5+
use azure_core::headers::{etag_from_headers, session_token_from_headers};
6+
use azure_core::{collect_pinned_stream, Request as HttpRequest, Response as HttpResponse};
7+
use chrono::{DateTime, Utc};
8+
9+
#[derive(Debug, Clone, Default)]
10+
pub struct GetDatabaseOptions {
11+
consistency_level: Option<ConsistencyLevel>,
12+
}
13+
14+
impl GetDatabaseOptions {
15+
pub fn new() -> Self {
16+
Self {
17+
consistency_level: None,
18+
}
19+
}
20+
21+
setters! {
22+
consistency_level: ConsistencyLevel => Some(consistency_level),
23+
}
24+
25+
pub(crate) fn decorate_request(&self, request: &mut HttpRequest) -> Result<(), crate::Error> {
26+
azure_core::headers::add_optional_header2(&self.consistency_level, request);
27+
request.set_body(bytes::Bytes::from_static(&[]).into());
28+
29+
Ok(())
30+
}
31+
}
32+
33+
#[derive(Debug, Clone)]
34+
pub struct GetDatabaseResponse {
35+
pub database: Database,
36+
pub charge: f64,
37+
pub activity_id: uuid::Uuid,
38+
pub session_token: String,
39+
pub etag: String,
40+
pub last_state_change: DateTime<Utc>,
41+
pub resource_quota: Vec<ResourceQuota>,
42+
pub resource_usage: Vec<ResourceQuota>,
43+
pub schema_version: String,
44+
pub service_version: String,
45+
pub gateway_version: String,
46+
}
47+
48+
impl GetDatabaseResponse {
49+
pub async fn try_from(response: HttpResponse) -> Result<Self, crate::Error> {
50+
let (_status_code, headers, pinned_stream) = response.deconstruct();
51+
let body = collect_pinned_stream(pinned_stream).await?;
52+
53+
Ok(Self {
54+
database: serde_json::from_slice(&body)?,
55+
charge: request_charge_from_headers(&headers)?,
56+
activity_id: activity_id_from_headers(&headers)?,
57+
session_token: session_token_from_headers(&headers)?,
58+
etag: etag_from_headers(&headers)?,
59+
last_state_change: last_state_change_from_headers(&headers)?,
60+
resource_quota: resource_quota_from_headers(&headers)?,
61+
resource_usage: resource_usage_from_headers(&headers)?,
62+
schema_version: schema_version_from_headers(&headers)?.to_owned(),
63+
service_version: service_version_from_headers(&headers)?.to_owned(),
64+
gateway_version: gateway_version_from_headers(&headers)?.to_owned(),
65+
})
66+
}
67+
}

sdk/cosmos/src/operations/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44

55
mod create_collection;
66
mod create_database;
7+
mod get_database;
78

89
pub use create_collection::*;
910
pub use create_database::*;
11+
pub use get_database::*;

0 commit comments

Comments
 (0)