Skip to content

Commit c2f15c5

Browse files
authored
[datalake] Migrate file system operations to pipeline architecture (#597)
1 parent 3e92851 commit c2f15c5

30 files changed

+583
-718
lines changed

sdk/data_tables/src/responses/submit_transaction_response.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ impl TryFrom<&Response<Bytes>> for SubmitTransactionResponse {
3838

3939
let mut operation_response = OperationResponse::default();
4040

41-
for line in change_set_response.lines().into_iter() {
41+
for line in change_set_response.lines() {
4242
if line.starts_with("HTTP/1.1") {
4343
operation_response.status_code = line
4444
.split_whitespace()

sdk/storage/src/core/errors.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,12 @@ impl From<azure_core::HttpError> for Error {
8181
}
8282
}
8383

84+
impl From<azure_core::StreamError> for Error {
85+
fn from(error: azure_core::StreamError) -> Self {
86+
Self::CoreError(azure_core::Error::Stream(error))
87+
}
88+
}
89+
8490
#[non_exhaustive]
8591
#[derive(Debug, PartialEq, thiserror::Error)]
8692
pub enum AzurePathParseError {

sdk/storage_datalake/examples/data_lake_00_file_system.rs

Lines changed: 11 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
use azure_storage::core::prelude::*;
21
use azure_storage::storage_shared_key_credential::StorageSharedKeyCredential;
32
use azure_storage_datalake::prelude::*;
43
use chrono::Utc;
@@ -21,24 +20,22 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
2120
println!("creating file system '{}'...", &file_system_name);
2221
let create_fs_response = file_system_client
2322
.create()
24-
.properties(&fs_properties)
25-
.execute()
23+
.properties(fs_properties.clone())
24+
.into_future()
2625
.await?;
2726
println!("create file system response == {:?}\n", create_fs_response);
2827

2928
println!("listing file systems...");
30-
let mut stream = Box::pin(
31-
data_lake_client
32-
.list()
33-
.max_results(NonZeroU32::new(3).unwrap())
34-
.stream(),
35-
);
29+
let mut stream = data_lake_client
30+
.list_file_systems()
31+
.max_results(NonZeroU32::new(3).unwrap())
32+
.into_stream();
3633
while let Some(list_fs_response) = stream.next().await {
3734
println!("list file system response == {:?}\n", list_fs_response);
3835
}
3936

4037
println!("getting file system properties...");
41-
let get_fs_props_response = file_system_client.get_properties().execute().await?;
38+
let get_fs_props_response = file_system_client.get_properties().into_future().await?;
4239
println!(
4340
"get file system properties response == {:?}\n",
4441
get_fs_props_response
@@ -47,23 +44,23 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
4744
println!("setting file system properties...");
4845
fs_properties.insert("ModifiedBy", "Iota");
4946
let set_fs_props_response = file_system_client
50-
.set_properties(Some(&fs_properties))
51-
.execute()
47+
.set_properties(Some(fs_properties))
48+
.into_future()
5249
.await?;
5350
println!(
5451
"set file system properties response == {:?}\n",
5552
set_fs_props_response
5653
);
5754

5855
println!("getting file system properties...");
59-
let get_fs_props_response = file_system_client.get_properties().execute().await?;
56+
let get_fs_props_response = file_system_client.get_properties().into_future().await?;
6057
println!(
6158
"get file system properties response == {:?}\n",
6259
get_fs_props_response
6360
);
6461

6562
println!("deleting file system...");
66-
let delete_fs_response = file_system_client.delete().execute().await?;
63+
let delete_fs_response = file_system_client.delete().into_future().await?;
6764
println!("delete file system response == {:?}\n", delete_fs_response);
6865

6966
Ok(())
@@ -75,15 +72,7 @@ async fn create_data_lake_client() -> Result<DataLakeClient, Box<dyn Error + Sen
7572
let account_key = std::env::var("ADLSGEN2_STORAGE_MASTER_KEY")
7673
.expect("Set env variable ADLSGEN2_STORAGE_MASTER_KEY first!");
7774

78-
let http_client = azure_core::new_http_client();
79-
80-
let storage_account_client =
81-
StorageAccountClient::new_access_key(http_client.clone(), &account_name, &account_key);
82-
83-
let storage_client = storage_account_client.as_storage_client();
84-
8575
Ok(DataLakeClient::new(
86-
storage_client,
8776
StorageSharedKeyCredential::new(account_name, account_key),
8877
None,
8978
))

sdk/storage_datalake/examples/data_lake_01_file_create_delete.rs

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use azure_core::prelude::*;
2-
use azure_storage::{core::prelude::*, storage_shared_key_credential::StorageSharedKeyCredential};
2+
use azure_storage::storage_shared_key_credential::StorageSharedKeyCredential;
33
use azure_storage_datalake::prelude::*;
44
use chrono::Utc;
55
use std::error::Error;
@@ -14,7 +14,7 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
1414
.into_file_system_client(file_system_name.to_string());
1515

1616
println!("creating file system '{}'...", &file_system_name);
17-
let create_fs_response = file_system_client.create().execute().await?;
17+
let create_fs_response = file_system_client.create().into_future().await?;
1818
println!("create file system response == {:?}\n", create_fs_response);
1919

2020
let file_path = "some/path/example-file.txt";
@@ -47,7 +47,7 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
4747
println!("delete_file file response == {:?}\n", delete_file_response);
4848

4949
println!("deleting file system...");
50-
let delete_fs_response = file_system_client.delete().execute().await?;
50+
let delete_fs_response = file_system_client.delete().into_future().await?;
5151
println!("delete file system response == {:?}\n", delete_fs_response);
5252

5353
Ok(())
@@ -59,15 +59,7 @@ async fn create_data_lake_client() -> Result<DataLakeClient, Box<dyn Error + Sen
5959
let account_key = std::env::var("ADLSGEN2_STORAGE_MASTER_KEY")
6060
.expect("Set env variable ADLSGEN2_STORAGE_MASTER_KEY first!");
6161

62-
let http_client = azure_core::new_http_client();
63-
64-
let storage_account_client =
65-
StorageAccountClient::new_access_key(http_client.clone(), &account_name, &account_key);
66-
67-
let storage_client = storage_account_client.as_storage_client();
68-
6962
Ok(DataLakeClient::new(
70-
storage_client,
7163
StorageSharedKeyCredential::new(account_name, account_key),
7264
None,
7365
))

sdk/storage_datalake/examples/data_lake_02_file_upload.rs

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use azure_core::prelude::*;
2-
use azure_storage::{core::prelude::*, storage_shared_key_credential::StorageSharedKeyCredential};
2+
use azure_storage::storage_shared_key_credential::StorageSharedKeyCredential;
33
use azure_storage_datalake::prelude::*;
44
use chrono::Utc;
55
use std::error::Error;
@@ -14,7 +14,7 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
1414
.into_file_system_client(file_system_name.to_string());
1515

1616
println!("creating file system '{}'...", &file_system_name);
17-
let create_fs_response = file_system_client.create().execute().await?;
17+
let create_fs_response = file_system_client.create().into_future().await?;
1818
println!("create file system response == {:?}\n", create_fs_response);
1919

2020
let file_path = "some/path/example-file.txt";
@@ -70,7 +70,7 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
7070
println!("flush file response == {:?}\n", flush_file_response);
7171

7272
println!("deleting file system...");
73-
let delete_fs_response = file_system_client.delete().execute().await?;
73+
let delete_fs_response = file_system_client.delete().into_future().await?;
7474
println!("delete file system response == {:?}\n", delete_fs_response);
7575

7676
Ok(())
@@ -82,15 +82,7 @@ async fn create_data_lake_client() -> Result<DataLakeClient, Box<dyn Error + Sen
8282
let account_key = std::env::var("ADLSGEN2_STORAGE_MASTER_KEY")
8383
.expect("Set env variable ADLSGEN2_STORAGE_MASTER_KEY first!");
8484

85-
let http_client = azure_core::new_http_client();
86-
87-
let storage_account_client =
88-
StorageAccountClient::new_access_key(http_client.clone(), &account_name, &account_key);
89-
90-
let storage_client = storage_account_client.as_storage_client();
91-
9285
Ok(DataLakeClient::new(
93-
storage_client,
9486
StorageSharedKeyCredential::new(account_name, account_key),
9587
None,
9688
))

sdk/storage_datalake/examples/data_lake_03_file_rename.rs

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use azure_core::prelude::*;
2-
use azure_storage::{core::prelude::*, storage_shared_key_credential::StorageSharedKeyCredential};
2+
use azure_storage::storage_shared_key_credential::StorageSharedKeyCredential;
33
use azure_storage_datalake::prelude::*;
44
use chrono::Utc;
55
use std::error::Error;
@@ -14,7 +14,7 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
1414
.into_file_system_client(file_system_name.to_string());
1515

1616
println!("creating file system '{}'...", &file_system_name);
17-
let create_fs_response = file_system_client.create().execute().await?;
17+
let create_fs_response = file_system_client.create().into_future().await?;
1818
println!("create file system response == {:?}\n", create_fs_response);
1919

2020
let file_path1 = "some/path/example-file1.txt";
@@ -56,7 +56,7 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
5656
println!("rename file response == {:?}\n", rename_file_response);
5757

5858
println!("deleting file system...");
59-
let delete_fs_response = file_system_client.delete().execute().await?;
59+
let delete_fs_response = file_system_client.delete().into_future().await?;
6060
println!("delete file system response == {:?}\n", delete_fs_response);
6161

6262
Ok(())
@@ -68,15 +68,7 @@ async fn create_data_lake_client() -> Result<DataLakeClient, Box<dyn Error + Sen
6868
let account_key = std::env::var("ADLSGEN2_STORAGE_MASTER_KEY")
6969
.expect("Set env variable ADLSGEN2_STORAGE_MASTER_KEY first!");
7070

71-
let http_client = azure_core::new_http_client();
72-
73-
let storage_account_client =
74-
StorageAccountClient::new_access_key(http_client.clone(), &account_name, &account_key);
75-
76-
let storage_client = storage_account_client.as_storage_client();
77-
7871
Ok(DataLakeClient::new(
79-
storage_client,
8072
StorageSharedKeyCredential::new(account_name, account_key),
8173
None,
8274
))
Lines changed: 31 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,24 @@
11
use crate::clients::FileSystemClient;
2-
use crate::requests::*;
2+
use crate::operations::ListFileSystems;
33
use crate::shared_key_authorization_policy::SharedKeyAuthorizationPolicy;
4-
use azure_core::{ClientOptions, HttpClient, Pipeline};
5-
use azure_storage::core::prelude::*;
6-
use azure_storage::storage_shared_key_credential::StorageSharedKeyCredential;
7-
use bytes::Bytes;
8-
use http::method::Method;
9-
use http::request::{Builder, Request};
4+
use azure_core::{ClientOptions, Context, HttpClient, Pipeline};
5+
use azure_storage::core::clients::ServiceType;
6+
use azure_storage::core::storage_shared_key_credential::StorageSharedKeyCredential;
7+
use http::request::Builder;
108
use std::sync::Arc;
119

1210
const DEFAULT_DNS_SUFFIX: &str = "dfs.core.windows.net";
1311

1412
#[derive(Debug, Clone)]
1513
pub struct DataLakeClient {
1614
pipeline: Pipeline,
17-
storage_client: Arc<StorageClient>,
1815
custom_dns_suffix: Option<String>,
1916
url: String, // TODO: Use CloudLocation similar to CosmosClient
17+
pub(crate) context: Context,
2018
}
2119

2220
impl DataLakeClient {
2321
pub(crate) fn new_with_options(
24-
storage_client: Arc<StorageClient>,
2522
credential: StorageSharedKeyCredential,
2623
custom_dns_suffix: Option<String>,
2724
options: ClientOptions,
@@ -40,8 +37,8 @@ impl DataLakeClient {
4037
let per_call_policies = Vec::new();
4138
let auth_policy: Arc<dyn azure_core::Policy> =
4239
// TODO: Allow caller to choose auth policy, follow pattern of other clients
43-
// Arc::new(BearerTokenAuthorizationPolicy::new(bearer_token));
44-
Arc::new(SharedKeyAuthorizationPolicy::new(url.to_owned(), credential));
40+
// Arc::new(BearerTokenAuthorizationPolicy::new(bearer_token));
41+
Arc::new(SharedKeyAuthorizationPolicy::new(url.to_owned(), credential));
4542

4643
// take care of adding the AuthorizationPolicy as **last** retry policy.
4744
// Policies can change the url and/or the headers and the AuthorizationPolicy
@@ -56,59 +53,55 @@ impl DataLakeClient {
5653
per_retry_policies,
5754
);
5855

56+
let mut context = Context::new();
57+
context.insert(ServiceType::Blob);
58+
5959
Self {
6060
pipeline,
61-
storage_client,
6261
custom_dns_suffix,
6362
url,
63+
context,
6464
}
6565
}
6666

67-
pub fn new(
68-
storage_client: Arc<StorageClient>,
69-
credential: StorageSharedKeyCredential,
70-
custom_dns_suffix: Option<String>,
71-
) -> DataLakeClient {
72-
Self::new_with_options(
73-
storage_client,
74-
credential,
75-
custom_dns_suffix,
76-
ClientOptions::default(),
77-
)
67+
pub fn new(credential: StorageSharedKeyCredential, custom_dns_suffix: Option<String>) -> Self {
68+
Self::new_with_options(credential, custom_dns_suffix, ClientOptions::default())
7869
}
7970

8071
pub fn custom_dns_suffix(&self) -> Option<&str> {
8172
self.custom_dns_suffix.as_deref()
8273
}
8374

84-
pub(crate) fn http_client(&self) -> &dyn HttpClient {
85-
self.storage_client.storage_account_client().http_client()
86-
}
87-
8875
pub(crate) fn url(&self) -> &str {
8976
&self.url
9077
}
9178

92-
pub fn list(&self) -> ListFileSystemsBuilder {
93-
ListFileSystemsBuilder::new(self)
79+
pub fn list_file_systems(&self) -> ListFileSystems {
80+
ListFileSystems::new(self.clone(), Some(self.context.clone()))
9481
}
9582

9683
pub fn into_file_system_client(self, file_system_name: String) -> FileSystemClient {
9784
FileSystemClient::new(self, file_system_name)
9885
}
9986

100-
pub(crate) fn prepare_request(
87+
pub(crate) fn prepare_request_pipeline(
10188
&self,
102-
url: &str,
103-
method: &Method,
104-
http_header_adder: &dyn Fn(Builder) -> Builder,
105-
request_body: Option<Bytes>,
106-
) -> crate::Result<(Request<Bytes>, url::Url)> {
107-
self.storage_client
108-
.prepare_request(url, method, http_header_adder, request_body)
89+
uri: &str,
90+
http_method: http::Method,
91+
) -> azure_core::Request {
92+
Builder::new()
93+
.method(http_method)
94+
.uri(uri)
95+
.body(bytes::Bytes::new())
96+
.unwrap()
97+
.into()
10998
}
11099

111-
pub(crate) fn pipeline(&self) -> &Pipeline {
100+
pub fn pipeline(&self) -> &Pipeline {
112101
&self.pipeline
113102
}
103+
104+
pub fn http_client(&self) -> &dyn HttpClient {
105+
self.pipeline.http_client()
106+
}
114107
}

0 commit comments

Comments
 (0)