Skip to content

Commit eeb3176

Browse files
authored
Migrate iothub to pipelines (#972)
* migrate iothub to pipelines * add token authorization and harmonize constructir names * pr comments
1 parent ec2738f commit eeb3176

34 files changed

+333
-262
lines changed

sdk/core/src/policies/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
mod custom_headers_policy;
22
mod retry_policies;
33
mod telemetry_policy;
4+
mod timeout_policy;
45
mod transport;
56

67
pub use custom_headers_policy::{CustomHeaders, CustomHeadersPolicy};
78
pub use retry_policies::*;
89
pub use telemetry_policy::*;
10+
pub use timeout_policy::*;
911
pub use transport::*;
1012

1113
use crate::{Context, Request, Response};

sdk/storage/src/core/timeout_policy.rs renamed to sdk/core/src/policies/timeout_policy.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
1+
use crate::request_options::Timeout;
2+
use crate::{AppendToUrlQuery, Context, Policy, PolicyResult, Request};
13
use std::sync::Arc;
24

3-
use azure_core::{prelude::*, Context, Policy, PolicyResult, Request};
4-
55
#[derive(Debug, Clone, Default)]
66
pub struct TimeoutPolicy {
77
default_timeout: Option<Timeout>,

sdk/iot_hub/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ description = "Azure IoT Hub"
77
license = "MIT"
88

99
[dependencies]
10+
async-trait = "0.1"
1011
azure_core = { path = "../core", version = "0.3", default_features = false }
1112
base64 = "0.13"
1213
bytes = "1.0"
@@ -28,4 +29,4 @@ reqwest = "0.11.0"
2829
tokio = { version = "1.0", features = ["macros"] }
2930

3031
[features]
31-
default = ["azure_core/default"]
32+
default = ["azure_core/default"]

sdk/iot_hub/examples/apply_configuration_on_edge_device.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,9 +56,7 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
5656
}
5757
});
5858

59-
let http_client = azure_core::new_http_client();
60-
let service_client =
61-
ServiceClient::from_connection_string(http_client, iot_hub_connection_string, 3600)?;
59+
let service_client = ServiceClient::new_connection_string(iot_hub_connection_string, 3600)?;
6260
service_client
6361
.apply_on_edge_device(device_id)
6462
.modules_content(modules_content)

sdk/iot_hub/examples/configuration.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use azure_iot_hub::service::{resources::Configuration, ServiceClient};
1+
use azure_iot_hub::service::ServiceClient;
22
use std::error::Error;
33

44
#[tokio::main]
@@ -10,9 +10,7 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
1010
.nth(1)
1111
.expect("Please pass the configuration id as the first parameter");
1212

13-
let http_client = azure_core::new_http_client();
14-
let service_client =
15-
ServiceClient::from_connection_string(http_client, iot_hub_connection_string, 3600)?;
13+
let service_client = ServiceClient::new_connection_string(iot_hub_connection_string, 3600)?;
1614

1715
println!("Creating a new configuration with id: {}", configuration_id);
1816

@@ -42,7 +40,6 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
4240
.get_configuration(configuration_id)
4341
.into_future()
4442
.await?;
45-
let configuration: Configuration = configuration.try_into()?;
4643

4744
println!(
4845
"Successfully retrieved the new configuration '{:?}'",

sdk/iot_hub/examples/device_identity.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,7 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
1313
.expect("Please pass the device id as the first parameter");
1414

1515
println!("Getting device twin for device '{}'", device_id);
16-
let http_client = azure_core::new_http_client();
17-
let service_client =
18-
ServiceClient::from_connection_string(http_client, iot_hub_connection_string, 3600)?;
16+
let service_client = ServiceClient::new_connection_string(iot_hub_connection_string, 3600)?;
1917
let device = service_client
2018
.create_device_identity(
2119
&device_id,
@@ -27,6 +25,7 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
2725
)
2826
.into_future()
2927
.await?;
28+
let device: DeviceIdentityResponse = device.try_into()?;
3029

3130
println!("Successfully created a new device '{}'", device.device_id);
3231

sdk/iot_hub/examples/directmethod.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,7 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
2323
.nth(4)
2424
.expect("Please pass the payload as the fourth parameter");
2525

26-
let http_client = azure_core::new_http_client();
27-
let service_client =
28-
ServiceClient::from_connection_string(http_client, iot_hub_connection_string, 3600)?;
26+
let service_client = ServiceClient::new_connection_string(iot_hub_connection_string, 3600)?;
2927
println!(
3028
"Sending direct method {} to {}:{} on: {}",
3129
method_name, device_id, module_id, service_client.iot_hub_name

sdk/iot_hub/examples/gettwin.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,7 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
1212

1313
println!("Getting device twin for device: {}", device_id);
1414

15-
let http_client = azure_core::new_http_client();
16-
let service_client =
17-
ServiceClient::from_connection_string(http_client, iot_hub_connection_string, 3600)?;
15+
let service_client = ServiceClient::new_connection_string(iot_hub_connection_string, 3600)?;
1816
let twin = service_client
1917
.get_device_twin(device_id)
2018
.into_future()

sdk/iot_hub/examples/module_identity.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,7 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
1616
.nth(2)
1717
.expect("Please pass the module id as the second parameter");
1818

19-
let http_client = azure_core::new_http_client();
20-
let service_client =
21-
ServiceClient::from_connection_string(http_client, iot_hub_connection_string, 3600)?;
19+
let service_client = ServiceClient::new_connection_string(iot_hub_connection_string, 3600)?;
2220
let module = service_client
2321
.create_module_identity(
2422
&device_id,

sdk/iot_hub/examples/query_iothub.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,7 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
1111
let query = "SELECT * FROM devices";
1212
println!("Invoking query '{}' on the IoT Hub", query);
1313

14-
let http_client = azure_core::new_http_client();
15-
let service_client =
16-
ServiceClient::from_connection_string(http_client, iot_hub_connection_string, 3600)?;
14+
let service_client = ServiceClient::new_connection_string(iot_hub_connection_string, 3600)?;
1715

1816
let response = service_client
1917
.query(query)

sdk/iot_hub/examples/updatetwin.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,8 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
1616
.expect("Please pass the payload as the second parameter");
1717

1818
println!("Updating device twin for device: {}", device_id);
19-
let http_client = azure_core::new_http_client();
2019

21-
let service_client =
22-
ServiceClient::from_connection_string(http_client, iot_hub_connection_string, 3600)?;
20+
let service_client = ServiceClient::new_connection_string(iot_hub_connection_string, 3600)?;
2321
let updated_twin = service_client
2422
.update_device_twin(device_id)
2523
.desired_properties(serde_json::from_str(&payload)?)
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
use crate::service::IoTHubCredentials;
2+
use azure_core::error::{ErrorKind, ResultExt};
3+
use azure_core::{
4+
headers::{self, *},
5+
Context, Policy, PolicyResult, Request,
6+
};
7+
use std::sync::Arc;
8+
9+
const IOTHUB_TOKEN_SCOPE: &str = "https://iothubs.azure.net";
10+
11+
#[derive(Debug, Clone)]
12+
pub struct AuthorizationPolicy {
13+
credentials: IoTHubCredentials,
14+
}
15+
16+
impl AuthorizationPolicy {
17+
pub(crate) fn new(credentials: IoTHubCredentials) -> Self {
18+
Self { credentials }
19+
}
20+
}
21+
22+
#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
23+
#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
24+
impl Policy for AuthorizationPolicy {
25+
async fn send(
26+
&self,
27+
ctx: &Context,
28+
request: &mut Request,
29+
next: &[Arc<dyn Policy>],
30+
) -> PolicyResult {
31+
assert!(
32+
!next.is_empty(),
33+
"Authorization policies cannot be the last policy of a pipeline"
34+
);
35+
let request = match &self.credentials {
36+
IoTHubCredentials::SASToken(sas_token) => {
37+
request.insert_header(headers::AUTHORIZATION, sas_token);
38+
request
39+
}
40+
IoTHubCredentials::BearerToken(token) => {
41+
request.insert_header(AUTHORIZATION, format!("Bearer {}", token));
42+
request
43+
}
44+
IoTHubCredentials::TokenCredential(token_credential) => {
45+
let bearer_token = token_credential
46+
.get_token(IOTHUB_TOKEN_SCOPE)
47+
.await
48+
.context(ErrorKind::Credential, "failed to get bearer token")?;
49+
50+
request.insert_header(
51+
AUTHORIZATION,
52+
format!("Bearer {}", bearer_token.token.secret()),
53+
);
54+
request
55+
}
56+
};
57+
58+
next[0].send(ctx, request, &next[1..]).await
59+
}
60+
}

sdk/iot_hub/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,6 @@
22
#![deny(missing_docs)]
33
//! The IoT Hub crate contains a client that can be used to manage the IoT Hub.
44
5+
mod authorization_policy;
56
/// The service module contains the IoT Hub Service Client that can be used to manage the IoT Hub.
67
pub mod service;

0 commit comments

Comments
 (0)