Skip to content

Commit e54f626

Browse files
committed
Use ClientOptions to create Pipeline
1 parent 58238de commit e54f626

File tree

11 files changed

+235
-85
lines changed

11 files changed

+235
-85
lines changed

sdk/core/src/client_options.rs

Lines changed: 0 additions & 11 deletions
This file was deleted.

sdk/core/src/lib.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,17 +10,17 @@ extern crate serde_derive;
1010
mod macros;
1111

1212
mod bytes_stream;
13-
pub mod client_options;
1413
mod constants;
1514
mod context;
1615
mod errors;
1716
pub mod headers;
1817
mod http_client;
1918
pub mod incompletevector;
2019
mod models;
20+
mod options;
2121
pub mod parsing;
2222
pub mod pipeline;
23-
pub mod policies;
23+
mod policies;
2424
pub mod prelude;
2525
mod request;
2626
mod request_options;
@@ -36,13 +36,14 @@ use std::fmt::Debug;
3636
use uuid::Uuid;
3737

3838
pub use bytes_stream::*;
39-
pub use client_options::ClientOptions;
4039
pub use constants::*;
4140
pub use context::Context;
4241
pub use errors::*;
4342
pub use headers::AddAsHeader;
4443
pub use http_client::{new_http_client, to_json, HttpClient};
4544
pub use models::*;
45+
pub use options::*;
46+
pub use policies::{Policy, PolicyResult};
4647
pub use request::*;
4748
pub use response::*;
4849
pub use seekable_stream::*;

sdk/core/src/macros.rs

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
/// ```
1616
/// struct MyStruct<'a> { foo: Option<&'a str> };
1717
/// impl <'a> MyStruct<'a> {
18-
/// fn with_foo(self, foo: &'a str) -> Self {
18+
/// fn foo(self, foo: &'a str) -> Self {
1919
/// Self {
2020
/// foo: Some(foo),
2121
/// ..self
@@ -26,6 +26,7 @@
2626
#[macro_export]
2727
macro_rules! setters {
2828
(@single $name:ident : $typ:ty => $transform:expr) => {
29+
// TODO: Declare with_$name when https://github.com/rust-lang/rust/issues/29599 is fixed.
2930
pub fn $name<T: ::std::convert::Into<$typ>>(self, $name: T) -> Self {
3031
let $name: $typ = $name.into();
3132
Self {
@@ -170,6 +171,28 @@ mod test {
170171
create_enum!(Colors, (Black, "Black"), (White, "White"), (Red, "Red"));
171172
create_enum!(ColorsMonochrome, (Black, "Black"), (White, "White"));
172173

174+
struct Options {
175+
a: Option<String>,
176+
b: u32,
177+
}
178+
179+
#[allow(dead_code)]
180+
impl Options {
181+
setters! {
182+
a: String => Some(a),
183+
b: u32 => b,
184+
}
185+
}
186+
187+
impl Default for Options {
188+
fn default() -> Self {
189+
Options {
190+
a: None,
191+
b: 1,
192+
}
193+
}
194+
}
195+
173196
#[test]
174197
fn test_color_parse_1() {
175198
let color = "Black".parse::<Colors>().unwrap();
@@ -187,4 +210,13 @@ mod test {
187210
fn test_color_parse_err_1() {
188211
"Red".parse::<ColorsMonochrome>().unwrap();
189212
}
213+
214+
#[test]
215+
fn test_setters() {
216+
let options = Options::default()
217+
.a("test".to_owned());
218+
219+
assert_eq!(Some("test".to_owned()), options.a);
220+
assert_eq!(1, options.b);
221+
}
190222
}

sdk/core/src/options.rs

Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
use crate::policies::{ExponentialRetryPolicy, FixedRetryPolicy, NoRetryPolicy, Policy};
2+
use crate::{new_http_client, HttpClient};
3+
use std::sync::Arc;
4+
use std::time::Duration;
5+
6+
/// Client options allow customization of policies, retry options, and more.
7+
#[derive(Clone, Debug, Default)]
8+
pub struct ClientOptions {
9+
// TODO: Expose transport override.
10+
/// Policies called per call.
11+
pub(crate) per_call_policies: Vec<Arc<dyn Policy>>,
12+
13+
/// Policies called per retry.
14+
pub(crate) per_retry_policies: Vec<Arc<dyn Policy>>,
15+
16+
/// Retry options.
17+
pub(crate) retry: RetryOptions,
18+
19+
/// Telemetry options.
20+
pub(crate) telemetry: TelemetryOptions,
21+
22+
/// Transport options.
23+
pub(crate) transport: TransportOptions,
24+
}
25+
26+
impl ClientOptions {
27+
setters! {
28+
per_call_policies: Vec<Arc<dyn Policy>> => per_call_policies,
29+
per_retry_policies: Vec<Arc<dyn Policy>> => per_retry_policies,
30+
retry: RetryOptions => retry,
31+
telemetry: TelemetryOptions => telemetry,
32+
transport: TransportOptions => transport,
33+
}
34+
}
35+
36+
/// The algorithm to apply when calculating the delay between retry attempts.
37+
#[derive(Clone, Debug, PartialEq)]
38+
pub enum RetryMode {
39+
/// Retry attempts will delay based on a back-off strategy,
40+
/// where each attempt will increase the duration that it waits before retrying.
41+
///
42+
/// This is the default.
43+
Exponential,
44+
45+
/// Retry attempts happen at fixed intervals; each delay is a consistent duration.
46+
Fixed,
47+
48+
/// Do not retry attempts.
49+
None,
50+
}
51+
52+
impl Default for RetryMode {
53+
fn default() -> Self {
54+
RetryMode::Exponential
55+
}
56+
}
57+
58+
/// The set of options that can be specified to influence how retry attempts are made,
59+
/// and a failure is eligible to be retried.
60+
#[derive(Clone, Debug)]
61+
pub struct RetryOptions {
62+
/// The algorithm to use for calculating retry delays.
63+
pub(crate) mode: RetryMode,
64+
65+
/// The delay between retry attempts for a fixed algorithm
66+
/// or the delay on which to base calculations for a back-off-based approach.
67+
///
68+
/// The default is 800 milliseconds.
69+
pub(crate) delay: Duration,
70+
71+
/// The maximum number of retry attempts before giving up.
72+
///
73+
/// The default is 3.
74+
pub(crate) max_retries: u32,
75+
76+
/// The maximum permissible delay between retry attempts.
77+
///
78+
/// The default is 1 minute.
79+
pub(crate) max_delay: Duration,
80+
}
81+
82+
impl RetryOptions {
83+
setters! {
84+
mode: RetryMode => mode,
85+
delay: Duration => delay,
86+
max_retries: u32 => max_retries,
87+
max_delay: Duration => max_delay,
88+
}
89+
}
90+
91+
impl Default for RetryOptions {
92+
fn default() -> Self {
93+
RetryOptions {
94+
mode: RetryMode::default(),
95+
delay: Duration::from_millis(800),
96+
max_retries: 3,
97+
max_delay: Duration::from_secs(60),
98+
}
99+
}
100+
}
101+
102+
impl RetryOptions {
103+
pub(crate) fn to_policy(&self) -> Arc<dyn Policy> {
104+
match self.mode {
105+
RetryMode::Exponential => Arc::new(ExponentialRetryPolicy::new(
106+
self.delay,
107+
self.max_retries,
108+
self.max_delay,
109+
)),
110+
RetryMode::Fixed => Arc::new(FixedRetryPolicy::new(
111+
self.delay,
112+
self.max_retries,
113+
self.max_delay,
114+
)),
115+
RetryMode::None => Arc::new(NoRetryPolicy::default()),
116+
}
117+
}
118+
}
119+
120+
/// Telemetry options.
121+
#[derive(Clone, Debug, Default)]
122+
pub struct TelemetryOptions {
123+
/// Optional application ID to telemeter.
124+
pub(crate) application_id: Option<String>,
125+
}
126+
127+
impl TelemetryOptions {
128+
setters! {
129+
application_id: String => Some(application_id),
130+
}
131+
}
132+
133+
/// Transport options.
134+
#[derive(Clone, Debug)]
135+
pub struct TransportOptions {
136+
/// The HTTP client implementation to use for requests.
137+
pub(crate) http_client: Arc<dyn HttpClient>,
138+
}
139+
140+
impl TransportOptions {
141+
pub fn new(http_client: Arc<dyn HttpClient>) -> Self {
142+
Self { http_client }
143+
}
144+
}
145+
146+
impl Default for TransportOptions {
147+
/// Creates an instance of the ```TransportOptions``` using the enabled HTTP client.
148+
fn default() -> Self {
149+
TransportOptions {
150+
http_client: new_http_client(),
151+
}
152+
}
153+
}

sdk/core/src/pipeline.rs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use crate::policies::{Policy, PolicyResult, TelemetryPolicy};
1+
use crate::policies::{Policy, PolicyResult, TelemetryPolicy, TransportPolicy};
22
use crate::{ClientOptions, Context, Request, Response};
33
use std::sync::Arc;
44

@@ -37,9 +37,7 @@ impl Pipeline {
3737
crate_version: Option<&'static str>,
3838
options: &ClientOptions,
3939
per_call_policies: Vec<Arc<dyn Policy>>,
40-
retry: Arc<dyn Policy>,
4140
per_retry_policies: Vec<Arc<dyn Policy>>,
42-
transport_policy: Arc<dyn Policy>,
4341
) -> Self {
4442
let mut pipeline: Vec<Arc<dyn Policy>> = Vec::with_capacity(
4543
options.per_call_policies.len()
@@ -51,15 +49,21 @@ impl Pipeline {
5149

5250
pipeline.extend_from_slice(&per_call_policies);
5351
pipeline.extend_from_slice(&options.per_call_policies);
52+
5453
pipeline.push(Arc::new(TelemetryPolicy::new(
5554
crate_name,
5655
crate_version,
5756
&options.telemetry,
5857
)));
59-
pipeline.push(retry);
58+
59+
let retry_policy = options.retry.to_policy();
60+
pipeline.push(retry_policy);
61+
6062
pipeline.extend_from_slice(&per_retry_policies);
6163
pipeline.extend_from_slice(&options.per_retry_policies);
62-
pipeline.push(transport_policy);
64+
65+
let transport_policy = TransportPolicy::new(&options.transport);
66+
pipeline.push(Arc::new(transport_policy));
6367

6468
Self { pipeline }
6569
}

sdk/core/src/policies/retry_policies/exponential_retry.rs

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,9 @@ use chrono::{DateTime, Local};
44
use std::sync::Arc;
55
use std::time::Duration;
66

7-
/// Retry policy with exponential backoff.
7+
/// Retry policy with exponential back-off.
88
///
9-
/// Retry policy with exponential backoff (with an added random delay up to 256 ms). Each retry
9+
/// Retry policy with exponential back-off (with an added random delay up to 256 ms). Each retry
1010
/// will happen at least after an exponential wait time. So if x is the first retry wait, the
1111
/// second will be x*2, the third x*4 and so on. The policy will retry until the maximum number of
1212
/// retries have been reached or the maximum allowed delay has passed (whichever comes first). The
@@ -18,17 +18,15 @@ pub struct ExponentialRetryPolicy {
1818
max_delay: Duration,
1919
}
2020

21-
impl Default for ExponentialRetryPolicy {
22-
fn default() -> Self {
23-
Self {
24-
delay: Duration::from_secs(3),
25-
max_retries: 3,
26-
max_delay: Duration::from_secs(30),
21+
impl ExponentialRetryPolicy {
22+
pub(crate) fn new(delay: Duration, max_retries: u32, max_delay: Duration) -> Self {
23+
ExponentialRetryPolicy {
24+
delay: delay,
25+
max_retries: max_retries,
26+
max_delay: max_delay,
2727
}
2828
}
29-
}
3029

31-
impl ExponentialRetryPolicy {
3230
fn is_expired(
3331
&self,
3432
first_retry_time: &mut Option<DateTime<Local>>,

sdk/core/src/policies/retry_policies/linear_retry.rs renamed to sdk/core/src/policies/retry_policies/fixed_retry.rs

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,30 +4,28 @@ use chrono::{DateTime, Local};
44
use std::sync::Arc;
55
use std::time::Duration;
66

7-
/// Retry policy with linear backoff.
7+
/// Retry policy with fixed back-off.
88
///
9-
/// Retry policy with linear backoff (with an added random delay up to 256 ms). Each retry will
9+
/// Retry policy with fixed back-off (with an added random delay up to 256 ms). Each retry will
1010
/// happen at least after the same, configured sleep time. The policy will retry until the maximum number of
1111
/// retries have been reached or the maximum allowed delay has passed (whichever comes first). The
1212
/// wait time is not precise.
1313
#[derive(Debug, Clone, PartialEq, Eq)]
14-
pub struct LinearRetryPolicy {
14+
pub struct FixedRetryPolicy {
1515
delay: Duration,
1616
max_retries: u32,
1717
max_delay: Duration,
1818
}
1919

20-
impl Default for LinearRetryPolicy {
21-
fn default() -> Self {
22-
Self {
23-
delay: Duration::from_secs(3),
24-
max_retries: 3,
25-
max_delay: Duration::from_secs(10),
20+
impl FixedRetryPolicy {
21+
pub(crate) fn new(delay: Duration, max_retries: u32, max_delay: Duration) -> Self {
22+
FixedRetryPolicy {
23+
delay: delay,
24+
max_retries: max_retries,
25+
max_delay: max_delay,
2626
}
2727
}
28-
}
2928

30-
impl LinearRetryPolicy {
3129
fn is_expired(
3230
&self,
3331
first_retry_time: &mut Option<DateTime<Local>>,
@@ -47,7 +45,7 @@ impl LinearRetryPolicy {
4745
}
4846

4947
#[async_trait::async_trait]
50-
impl Policy for LinearRetryPolicy {
48+
impl Policy for FixedRetryPolicy {
5149
async fn send(
5250
&self,
5351
ctx: &mut Context,
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
mod exponential_retry;
2-
mod linear_retry;
2+
mod fixed_retry;
33
mod no_retry;
44

55
pub use exponential_retry::*;
6-
pub use linear_retry::*;
6+
pub use fixed_retry::*;
77
pub use no_retry::*;

0 commit comments

Comments
 (0)