Skip to content

[datalake] Migrate file system operations to pipeline architecture #597

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Jan 13, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ impl TryFrom<&Response<Bytes>> for SubmitTransactionResponse {

let mut operation_response = OperationResponse::default();

for line in change_set_response.lines().into_iter() {
for line in change_set_response.lines() {
if line.starts_with("HTTP/1.1") {
operation_response.status_code = line
.split_whitespace()
Expand Down
6 changes: 6 additions & 0 deletions sdk/storage/src/core/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,12 @@ impl From<azure_core::HttpError> for Error {
}
}

impl From<azure_core::StreamError> for Error {
fn from(error: azure_core::StreamError) -> Self {
Self::CoreError(azure_core::Error::Stream(error))
Copy link
Member

@cataggar cataggar Jan 13, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll make a pass over the storage errors after core & identity ship, but if the goal here is to put all azure_core::Error errors into Self::CoreError, then it can be done with a generic similar to:

impl<T: Into<azure_core::ParsingError>> From<T> for ParsingError {
fn from(error: T) -> Self {
Self::Core(error.into())
}
}

but with Into<azure_core::Error>.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this needed to be the variant, since other core variants are handled above in more specific manner. this then leads to duplicate implementations.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my bad - this is not even a variant. However i tried your approach and there is conflicting implementations then, and I did not want to change around the error too much in this PR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good.

}
}

#[non_exhaustive]
#[derive(Debug, PartialEq, thiserror::Error)]
pub enum AzurePathParseError {
Expand Down
33 changes: 11 additions & 22 deletions sdk/storage_datalake/examples/data_lake_00_file_system.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use azure_storage::core::prelude::*;
use azure_storage::storage_shared_key_credential::StorageSharedKeyCredential;
use azure_storage_datalake::prelude::*;
use chrono::Utc;
Expand All @@ -21,24 +20,22 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
println!("creating file system '{}'...", &file_system_name);
let create_fs_response = file_system_client
.create()
.properties(&fs_properties)
.execute()
.properties(fs_properties.clone())
.into_future()
.await?;
println!("create file system response == {:?}\n", create_fs_response);

println!("listing file systems...");
let mut stream = Box::pin(
data_lake_client
.list()
.max_results(NonZeroU32::new(3).unwrap())
.stream(),
);
let mut stream = data_lake_client
.list_file_systems()
.max_results(NonZeroU32::new(3).unwrap())
.into_stream();
while let Some(list_fs_response) = stream.next().await {
println!("list file system response == {:?}\n", list_fs_response);
}

println!("getting file system properties...");
let get_fs_props_response = file_system_client.get_properties().execute().await?;
let get_fs_props_response = file_system_client.get_properties().into_future().await?;
println!(
"get file system properties response == {:?}\n",
get_fs_props_response
Expand All @@ -47,23 +44,23 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
println!("setting file system properties...");
fs_properties.insert("ModifiedBy", "Iota");
let set_fs_props_response = file_system_client
.set_properties(Some(&fs_properties))
.execute()
.set_properties(Some(fs_properties))
.into_future()
.await?;
println!(
"set file system properties response == {:?}\n",
set_fs_props_response
);

println!("getting file system properties...");
let get_fs_props_response = file_system_client.get_properties().execute().await?;
let get_fs_props_response = file_system_client.get_properties().into_future().await?;
println!(
"get file system properties response == {:?}\n",
get_fs_props_response
);

println!("deleting file system...");
let delete_fs_response = file_system_client.delete().execute().await?;
let delete_fs_response = file_system_client.delete().into_future().await?;
println!("delete file system response == {:?}\n", delete_fs_response);

Ok(())
Expand All @@ -75,15 +72,7 @@ async fn create_data_lake_client() -> Result<DataLakeClient, Box<dyn Error + Sen
let account_key = std::env::var("ADLSGEN2_STORAGE_MASTER_KEY")
.expect("Set env variable ADLSGEN2_STORAGE_MASTER_KEY first!");

let http_client = azure_core::new_http_client();

let storage_account_client =
StorageAccountClient::new_access_key(http_client.clone(), &account_name, &account_key);

let storage_client = storage_account_client.as_storage_client();

Ok(DataLakeClient::new(
storage_client,
StorageSharedKeyCredential::new(account_name, account_key),
None,
))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use azure_core::prelude::*;
use azure_storage::{core::prelude::*, storage_shared_key_credential::StorageSharedKeyCredential};
use azure_storage::storage_shared_key_credential::StorageSharedKeyCredential;
use azure_storage_datalake::prelude::*;
use chrono::Utc;
use std::error::Error;
Expand All @@ -14,7 +14,7 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
.into_file_system_client(file_system_name.to_string());

println!("creating file system '{}'...", &file_system_name);
let create_fs_response = file_system_client.create().execute().await?;
let create_fs_response = file_system_client.create().into_future().await?;
println!("create file system response == {:?}\n", create_fs_response);

let file_path = "some/path/example-file.txt";
Expand Down Expand Up @@ -47,7 +47,7 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
println!("delete_file file response == {:?}\n", delete_file_response);

println!("deleting file system...");
let delete_fs_response = file_system_client.delete().execute().await?;
let delete_fs_response = file_system_client.delete().into_future().await?;
println!("delete file system response == {:?}\n", delete_fs_response);

Ok(())
Expand All @@ -59,15 +59,7 @@ async fn create_data_lake_client() -> Result<DataLakeClient, Box<dyn Error + Sen
let account_key = std::env::var("ADLSGEN2_STORAGE_MASTER_KEY")
.expect("Set env variable ADLSGEN2_STORAGE_MASTER_KEY first!");

let http_client = azure_core::new_http_client();

let storage_account_client =
StorageAccountClient::new_access_key(http_client.clone(), &account_name, &account_key);

let storage_client = storage_account_client.as_storage_client();

Ok(DataLakeClient::new(
storage_client,
StorageSharedKeyCredential::new(account_name, account_key),
None,
))
Expand Down
14 changes: 3 additions & 11 deletions sdk/storage_datalake/examples/data_lake_02_file_upload.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use azure_core::prelude::*;
use azure_storage::{core::prelude::*, storage_shared_key_credential::StorageSharedKeyCredential};
use azure_storage::storage_shared_key_credential::StorageSharedKeyCredential;
use azure_storage_datalake::prelude::*;
use chrono::Utc;
use std::error::Error;
Expand All @@ -14,7 +14,7 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
.into_file_system_client(file_system_name.to_string());

println!("creating file system '{}'...", &file_system_name);
let create_fs_response = file_system_client.create().execute().await?;
let create_fs_response = file_system_client.create().into_future().await?;
println!("create file system response == {:?}\n", create_fs_response);

let file_path = "some/path/example-file.txt";
Expand Down Expand Up @@ -70,7 +70,7 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
println!("flush file response == {:?}\n", flush_file_response);

println!("deleting file system...");
let delete_fs_response = file_system_client.delete().execute().await?;
let delete_fs_response = file_system_client.delete().into_future().await?;
println!("delete file system response == {:?}\n", delete_fs_response);

Ok(())
Expand All @@ -82,15 +82,7 @@ async fn create_data_lake_client() -> Result<DataLakeClient, Box<dyn Error + Sen
let account_key = std::env::var("ADLSGEN2_STORAGE_MASTER_KEY")
.expect("Set env variable ADLSGEN2_STORAGE_MASTER_KEY first!");

let http_client = azure_core::new_http_client();

let storage_account_client =
StorageAccountClient::new_access_key(http_client.clone(), &account_name, &account_key);

let storage_client = storage_account_client.as_storage_client();

Ok(DataLakeClient::new(
storage_client,
StorageSharedKeyCredential::new(account_name, account_key),
None,
))
Expand Down
14 changes: 3 additions & 11 deletions sdk/storage_datalake/examples/data_lake_03_file_rename.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use azure_core::prelude::*;
use azure_storage::{core::prelude::*, storage_shared_key_credential::StorageSharedKeyCredential};
use azure_storage::storage_shared_key_credential::StorageSharedKeyCredential;
use azure_storage_datalake::prelude::*;
use chrono::Utc;
use std::error::Error;
Expand All @@ -14,7 +14,7 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
.into_file_system_client(file_system_name.to_string());

println!("creating file system '{}'...", &file_system_name);
let create_fs_response = file_system_client.create().execute().await?;
let create_fs_response = file_system_client.create().into_future().await?;
println!("create file system response == {:?}\n", create_fs_response);

let file_path1 = "some/path/example-file1.txt";
Expand Down Expand Up @@ -56,7 +56,7 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
println!("rename file response == {:?}\n", rename_file_response);

println!("deleting file system...");
let delete_fs_response = file_system_client.delete().execute().await?;
let delete_fs_response = file_system_client.delete().into_future().await?;
println!("delete file system response == {:?}\n", delete_fs_response);

Ok(())
Expand All @@ -68,15 +68,7 @@ async fn create_data_lake_client() -> Result<DataLakeClient, Box<dyn Error + Sen
let account_key = std::env::var("ADLSGEN2_STORAGE_MASTER_KEY")
.expect("Set env variable ADLSGEN2_STORAGE_MASTER_KEY first!");

let http_client = azure_core::new_http_client();

let storage_account_client =
StorageAccountClient::new_access_key(http_client.clone(), &account_name, &account_key);

let storage_client = storage_account_client.as_storage_client();

Ok(DataLakeClient::new(
storage_client,
StorageSharedKeyCredential::new(account_name, account_key),
None,
))
Expand Down
69 changes: 31 additions & 38 deletions sdk/storage_datalake/src/clients/data_lake_client.rs
Original file line number Diff line number Diff line change
@@ -1,27 +1,24 @@
use crate::clients::FileSystemClient;
use crate::requests::*;
use crate::operations::ListFileSystems;
use crate::shared_key_authorization_policy::SharedKeyAuthorizationPolicy;
use azure_core::{ClientOptions, HttpClient, Pipeline};
use azure_storage::core::prelude::*;
use azure_storage::storage_shared_key_credential::StorageSharedKeyCredential;
use bytes::Bytes;
use http::method::Method;
use http::request::{Builder, Request};
use azure_core::{ClientOptions, Context, HttpClient, Pipeline};
use azure_storage::core::clients::ServiceType;
use azure_storage::core::storage_shared_key_credential::StorageSharedKeyCredential;
use http::request::Builder;
use std::sync::Arc;

const DEFAULT_DNS_SUFFIX: &str = "dfs.core.windows.net";

#[derive(Debug, Clone)]
pub struct DataLakeClient {
pipeline: Pipeline,
storage_client: Arc<StorageClient>,
custom_dns_suffix: Option<String>,
url: String, // TODO: Use CloudLocation similar to CosmosClient
pub(crate) context: Context,
}

impl DataLakeClient {
pub(crate) fn new_with_options(
storage_client: Arc<StorageClient>,
credential: StorageSharedKeyCredential,
custom_dns_suffix: Option<String>,
options: ClientOptions,
Expand All @@ -40,8 +37,8 @@ impl DataLakeClient {
let per_call_policies = Vec::new();
let auth_policy: Arc<dyn azure_core::Policy> =
// TODO: Allow caller to choose auth policy, follow pattern of other clients
// Arc::new(BearerTokenAuthorizationPolicy::new(bearer_token));
Arc::new(SharedKeyAuthorizationPolicy::new(url.to_owned(), credential));
// Arc::new(BearerTokenAuthorizationPolicy::new(bearer_token));
Arc::new(SharedKeyAuthorizationPolicy::new(url.to_owned(), credential));

// take care of adding the AuthorizationPolicy as **last** retry policy.
// Policies can change the url and/or the headers and the AuthorizationPolicy
Expand All @@ -56,59 +53,55 @@ impl DataLakeClient {
per_retry_policies,
);

let mut context = Context::new();
context.insert(ServiceType::Blob);

Self {
pipeline,
storage_client,
custom_dns_suffix,
url,
context,
}
}

pub fn new(
storage_client: Arc<StorageClient>,
credential: StorageSharedKeyCredential,
custom_dns_suffix: Option<String>,
) -> DataLakeClient {
Self::new_with_options(
storage_client,
credential,
custom_dns_suffix,
ClientOptions::default(),
)
pub fn new(credential: StorageSharedKeyCredential, custom_dns_suffix: Option<String>) -> Self {
Self::new_with_options(credential, custom_dns_suffix, ClientOptions::default())
}

pub fn custom_dns_suffix(&self) -> Option<&str> {
self.custom_dns_suffix.as_deref()
}

pub(crate) fn http_client(&self) -> &dyn HttpClient {
self.storage_client.storage_account_client().http_client()
}

pub(crate) fn url(&self) -> &str {
&self.url
}

pub fn list(&self) -> ListFileSystemsBuilder {
ListFileSystemsBuilder::new(self)
pub fn list_file_systems(&self) -> ListFileSystems {
ListFileSystems::new(self.clone(), Some(self.context.clone()))
}

pub fn into_file_system_client(self, file_system_name: String) -> FileSystemClient {
FileSystemClient::new(self, file_system_name)
}

pub(crate) fn prepare_request(
pub(crate) fn prepare_request_pipeline(
&self,
url: &str,
method: &Method,
http_header_adder: &dyn Fn(Builder) -> Builder,
request_body: Option<Bytes>,
) -> crate::Result<(Request<Bytes>, url::Url)> {
self.storage_client
.prepare_request(url, method, http_header_adder, request_body)
uri: &str,
http_method: http::Method,
) -> azure_core::Request {
Builder::new()
.method(http_method)
.uri(uri)
.body(bytes::Bytes::new())
.unwrap()
.into()
}

pub(crate) fn pipeline(&self) -> &Pipeline {
pub fn pipeline(&self) -> &Pipeline {
&self.pipeline
}

pub fn http_client(&self) -> &dyn HttpClient {
self.pipeline.http_client()
}
}
Loading