Skip to content

Use ClientOptions to create Pipeline #288

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jun 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
target
target/
Cargo.lock
*.rs.bk
.vscode/
Expand All @@ -9,6 +9,7 @@ sample/
\#*\#
\.#*
*~
*.code-workspace
*.swp
*.swo
*.vim
Expand Down
11 changes: 0 additions & 11 deletions sdk/core/src/client_options.rs

This file was deleted.

7 changes: 4 additions & 3 deletions sdk/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,17 @@ extern crate serde_derive;
mod macros;

mod bytes_stream;
pub mod client_options;
mod constants;
mod context;
mod errors;
pub mod headers;
mod http_client;
pub mod incompletevector;
mod models;
mod options;
pub mod parsing;
pub mod pipeline;
pub mod policies;
mod policies;
pub mod prelude;
mod request;
mod request_options;
Expand All @@ -36,13 +36,14 @@ use std::fmt::Debug;
use uuid::Uuid;

pub use bytes_stream::*;
pub use client_options::ClientOptions;
pub use constants::*;
pub use context::Context;
pub use errors::*;
pub use headers::AddAsHeader;
pub use http_client::{new_http_client, to_json, HttpClient};
pub use models::*;
pub use options::*;
pub use policies::{Policy, PolicyResult};
pub use request::*;
pub use response::*;
pub use seekable_stream::*;
Expand Down
30 changes: 29 additions & 1 deletion sdk/core/src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
/// ```
/// struct MyStruct<'a> { foo: Option<&'a str> };
/// impl <'a> MyStruct<'a> {
/// fn with_foo(self, foo: &'a str) -> Self {
/// fn foo(self, foo: &'a str) -> Self {
/// Self {
/// foo: Some(foo),
/// ..self
Expand All @@ -26,6 +26,7 @@
#[macro_export]
macro_rules! setters {
(@single $name:ident : $typ:ty => $transform:expr) => {
// TODO: Declare using idiomatic with_$name when https://github.com/Azure/azure-sdk-for-rust/issues/292 is resolved.
pub fn $name<T: ::std::convert::Into<$typ>>(self, $name: T) -> Self {
let $name: $typ = $name.into();
Self {
Expand Down Expand Up @@ -170,6 +171,25 @@ mod test {
create_enum!(Colors, (Black, "Black"), (White, "White"), (Red, "Red"));
create_enum!(ColorsMonochrome, (Black, "Black"), (White, "White"));

struct Options {
a: Option<String>,
b: u32,
}

#[allow(dead_code)]
impl Options {
setters! {
a: String => Some(a),
b: u32 => b,
}
}

impl Default for Options {
fn default() -> Self {
Options { a: None, b: 1 }
}
}

#[test]
fn test_color_parse_1() {
let color = "Black".parse::<Colors>().unwrap();
Expand All @@ -187,4 +207,12 @@ mod test {
fn test_color_parse_err_1() {
"Red".parse::<ColorsMonochrome>().unwrap();
}

#[test]
fn test_setters() {
let options = Options::default().a("test".to_owned());

assert_eq!(Some("test".to_owned()), options.a);
assert_eq!(1, options.b);
}
}
175 changes: 175 additions & 0 deletions sdk/core/src/options.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
use crate::policies::{ExponentialRetryPolicy, FixedRetryPolicy, NoRetryPolicy, Policy};
use crate::{new_http_client, HttpClient};
use std::sync::Arc;
use std::time::Duration;

/// Client options allow customization of policies, retry options, and more.
///
/// # Examples
///
/// You can override default options and even add your own per-call or per-retry policies:
///
/// ```
/// use azure_core::{ClientOptions, RetryOptions, TelemetryOptions};
/// let options = ClientOptions::default()
/// .retry(RetryOptions::default().max_retries(10u32))
/// .telemetry(TelemetryOptions::default().application_id("my-application"));
/// ```
#[derive(Clone, Debug, Default)]
pub struct ClientOptions {
// TODO: Expose transport override.
/// Policies called per call.
pub(crate) per_call_policies: Vec<Arc<dyn Policy>>,

/// Policies called per retry.
pub(crate) per_retry_policies: Vec<Arc<dyn Policy>>,

/// Retry options.
pub(crate) retry: RetryOptions,

/// Telemetry options.
pub(crate) telemetry: TelemetryOptions,

/// Transport options.
pub(crate) transport: TransportOptions,
}

impl ClientOptions {
/// A mutable reference to per-call policies.
pub fn mut_per_call_policies(&mut self) -> &mut Vec<Arc<dyn Policy>> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idiomatic form is per_call_policies_mut

&mut self.per_call_policies
}

/// A mutable reference to per-retry policies.
pub fn mut_per_retry_policies(&mut self) -> &mut Vec<Arc<dyn Policy>> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above

&mut self.per_retry_policies
}

setters! {
per_call_policies: Vec<Arc<dyn Policy>> => per_call_policies,
per_retry_policies: Vec<Arc<dyn Policy>> => per_retry_policies,
retry: RetryOptions => retry,
telemetry: TelemetryOptions => telemetry,
transport: TransportOptions => transport,
}
}

/// The algorithm to apply when calculating the delay between retry attempts.
#[derive(Clone, Debug, PartialEq)]
pub enum RetryMode {
/// Retry attempts will delay based on a back-off strategy,
/// where each attempt will increase the duration that it waits before retrying.
///
/// This is the default.
Exponential,

/// Retry attempts happen at fixed intervals; each delay is a consistent duration.
Fixed,

/// Do not retry attempts.
None,
}

impl Default for RetryMode {
fn default() -> Self {
RetryMode::Exponential
}
}

/// The set of options that can be specified to influence how retry attempts are made,
/// and a failure is eligible to be retried.
#[derive(Clone, Debug)]
pub struct RetryOptions {
/// The algorithm to use for calculating retry delays.
mode: RetryMode,

/// The delay between retry attempts for a fixed algorithm
/// or the delay on which to base calculations for a back-off-based approach.
///
/// The default is 800 milliseconds.
delay: Duration,

/// The maximum number of retry attempts before giving up.
///
/// The default is 3.
max_retries: u32,

/// The maximum permissible delay between retry attempts.
///
/// The default is 1 minute.
max_delay: Duration,
}

impl RetryOptions {
setters! {
mode: RetryMode => mode,
delay: Duration => delay,
max_retries: u32 => max_retries,
max_delay: Duration => max_delay,
}
}

impl Default for RetryOptions {
fn default() -> Self {
RetryOptions {
mode: RetryMode::default(),
delay: Duration::from_millis(800),
max_retries: 3,
max_delay: Duration::from_secs(60),
}
}
}

impl RetryOptions {
pub(crate) fn to_policy(&self) -> Arc<dyn Policy> {
match self.mode {
RetryMode::Exponential => Arc::new(ExponentialRetryPolicy::new(
self.delay,
self.max_retries,
self.max_delay,
)),
RetryMode::Fixed => Arc::new(FixedRetryPolicy::new(
self.delay,
self.max_retries,
self.max_delay,
)),
RetryMode::None => Arc::new(NoRetryPolicy::default()),
}
}
}

/// Telemetry options.
#[derive(Clone, Debug, Default)]
pub struct TelemetryOptions {
/// Optional application ID to telemeter.
pub(crate) application_id: Option<String>,
}

impl TelemetryOptions {
setters! {
application_id: String => Some(application_id),
}
}

/// Transport options.
#[derive(Clone, Debug)]
pub struct TransportOptions {
/// The HTTP client implementation to use for requests.
pub(crate) http_client: Arc<dyn HttpClient>,
}

impl TransportOptions {
/// Creates a new `TransportOptions` using the given `HttpClient`.
pub fn new(http_client: Arc<dyn HttpClient>) -> Self {
Self { http_client }
}
}

impl Default for TransportOptions {
/// Creates an instance of the `TransportOptions` using the default `HttpClient`.
fn default() -> Self {
TransportOptions {
http_client: new_http_client(),
}
}
}
39 changes: 28 additions & 11 deletions sdk/core/src/pipeline.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#[cfg(not(target_arch = "wasm32"))]
use crate::policies::TransportPolicy;
use crate::policies::{Policy, PolicyResult, TelemetryPolicy};
use crate::{ClientOptions, Context, Request, Response};
use crate::{ClientOptions, Context, HttpClient, Request, Response};
use std::sync::Arc;

/// Execution pipeline.
Expand All @@ -23,6 +25,7 @@ use std::sync::Arc;
/// self.pipeline[0] is always valid).
#[derive(Debug, Clone)]
pub struct Pipeline {
http_client: Arc<dyn HttpClient>,
pipeline: Vec<Arc<dyn Policy>>,
}

Expand All @@ -37,9 +40,7 @@ impl Pipeline {
crate_version: Option<&'static str>,
options: &ClientOptions,
per_call_policies: Vec<Arc<dyn Policy>>,
retry: Arc<dyn Policy>,
per_retry_policies: Vec<Arc<dyn Policy>>,
transport_policy: Arc<dyn Policy>,
) -> Self {
let mut pipeline: Vec<Arc<dyn Policy>> = Vec::with_capacity(
options.per_call_policies.len()
Expand All @@ -51,17 +52,33 @@ impl Pipeline {

pipeline.extend_from_slice(&per_call_policies);
pipeline.extend_from_slice(&options.per_call_policies);
pipeline.push(Arc::new(TelemetryPolicy::new(
crate_name,
crate_version,
&options.telemetry,
)));
pipeline.push(retry);

let telemetry_policy = TelemetryPolicy::new(crate_name, crate_version, &options.telemetry);
pipeline.push(Arc::new(telemetry_policy));

let retry_policy = options.retry.to_policy();
pipeline.push(retry_policy);

pipeline.extend_from_slice(&per_retry_policies);
pipeline.extend_from_slice(&options.per_retry_policies);
pipeline.push(transport_policy);

Self { pipeline }
// TODO: Add transport policy for WASM once https://github.com/Azure/azure-sdk-for-rust/issues/293 is resolved.
#[cfg(not(target_arch = "wasm32"))]
{
let transport_policy = TransportPolicy::new(&options.transport);
pipeline.push(Arc::new(transport_policy));
}

Self {
http_client: options.transport.http_client.clone(),
pipeline,
}
}

/// Gets the `HttpClient` used by the pipeline.
pub fn http_client(&self) -> &dyn HttpClient {
// TODO: Request methods should be defined directly on the pipeline instead of exposing the HttpClient.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you create an issue for this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see that there's this: #294. Might be worth in the future having all todos link to an issue

self.http_client.as_ref()
}

pub async fn send(&self, ctx: &mut Context, request: &mut Request) -> PolicyResult<Response> {
Expand Down
18 changes: 8 additions & 10 deletions sdk/core/src/policies/retry_policies/exponential_retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ use chrono::{DateTime, Local};
use std::sync::Arc;
use std::time::Duration;

/// Retry policy with exponential backoff.
/// Retry policy with exponential back-off.
///
/// Retry policy with exponential backoff (with an added random delay up to 256 ms). Each retry
/// Retry policy with exponential back-off (with an added random delay up to 256 ms). Each retry
/// will happen at least after an exponential wait time. So if x is the first retry wait, the
/// second will be x*2, the third x*4 and so on. The policy will retry until the maximum number of
/// retries have been reached or the maximum allowed delay has passed (whichever comes first). The
Expand All @@ -18,17 +18,15 @@ pub struct ExponentialRetryPolicy {
max_delay: Duration,
}

impl Default for ExponentialRetryPolicy {
fn default() -> Self {
Self {
delay: Duration::from_secs(3),
max_retries: 3,
max_delay: Duration::from_secs(30),
impl ExponentialRetryPolicy {
pub(crate) fn new(delay: Duration, max_retries: u32, max_delay: Duration) -> Self {
ExponentialRetryPolicy {
delay,
max_retries,
max_delay,
}
}
}

impl ExponentialRetryPolicy {
fn is_expired(
&self,
first_retry_time: &mut Option<DateTime<Local>>,
Expand Down
Loading