Skip to content

Commit 86446ea

Browse files
authored
ObjectStore cleanup (#2587) (#2590)
* ObjectStore cleanup (#2587) * Fix CI
1 parent 744412f commit 86446ea

File tree

10 files changed

+141
-118
lines changed

10 files changed

+141
-118
lines changed

.github/workflows/object_store.yml

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,22 @@ jobs:
4242
rustup toolchain install stable
4343
rustup default stable
4444
rustup component add clippy
45-
- name: Run clippy
46-
run: |
47-
cargo clippy -p object_store --all-features --all-targets -- -D warnings
45+
# Run different tests for the library on its own as well as
46+
# all targets to ensure that it still works in the absence of
47+
# features that might be enabled by dev-dependencies of other
48+
# targets.
49+
- name: Run clippy with default features
50+
run: cargo clippy -p object_store -- -D warnings
51+
- name: Run clippy with aws feature
52+
run: cargo clippy -p object_store --features aws -- -D warnings
53+
- name: Run clippy with gcp feature
54+
run: cargo clippy -p object_store --features gcp -- -D warnings
55+
- name: Run clippy with azure feature
56+
run: cargo clippy -p object_store --features azure -- -D warnings
57+
- name: Run clippy with all features
58+
run: cargo clippy -p object_store --all-features -- -D warnings
59+
- name: Run clippy with all features and all targets
60+
run: cargo clippy -p object_store --all-features --all-targets -- -D warnings
4861

4962
# test the crate
5063
linux-test:

.github/workflows/parquet.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ jobs:
5858
cargo test -p parquet --all-features
5959
6060
61-
# test compilaton
61+
# test compilation
6262
linux-features:
6363
name: Check Compilation
6464
runs-on: ubuntu-latest

object_store/Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,9 +53,9 @@ ring = { version = "0.16", default-features = false, features = ["std"], optiona
5353
rustls-pemfile = { version = "1.0", default-features = false, optional = true }
5454

5555
[features]
56-
cloud = ["serde", "serde_json", "quick-xml", "reqwest", "reqwest/json", "reqwest/stream", "chrono/serde", "rustls-pemfile", "base64", "rand", "ring"]
56+
cloud = ["serde", "serde_json", "quick-xml", "reqwest", "reqwest/json", "reqwest/stream", "chrono/serde", "base64", "rand", "ring"]
5757
azure = ["cloud"]
58-
gcp = ["cloud"]
58+
gcp = ["cloud", "rustls-pemfile"]
5959
aws = ["cloud"]
6060

6161
[dev-dependencies] # In alphabetical order

object_store/src/azure/client.rs

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -75,20 +75,13 @@ pub(crate) enum Error {
7575
#[snafu(display("Error getting list response body: {}", source))]
7676
ListResponseBody { source: reqwest::Error },
7777

78-
#[snafu(display("Error performing create multipart request: {}", source))]
79-
CreateMultipartRequest { source: reqwest::Error },
80-
81-
#[snafu(display("Error performing complete multipart request: {}", source))]
82-
CompleteMultipartRequest { source: reqwest::Error },
83-
8478
#[snafu(display("Got invalid list response: {}", source))]
8579
InvalidListResponse { source: quick_xml::de::DeError },
8680

87-
#[snafu(display("Got invalid multipart response: {}", source))]
88-
InvalidMultipartResponse { source: quick_xml::de::DeError },
89-
9081
#[snafu(display("Error authorizing request: {}", source))]
91-
Authorization { source: crate::client::oauth::Error },
82+
Authorization {
83+
source: crate::azure::credential::Error,
84+
},
9285
}
9386

9487
impl From<Error> for crate::Error {

object_store/src/azure/credential.rs

Lines changed: 97 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,19 +15,24 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use crate::client::oauth::ClientSecretOAuthProvider;
18+
use crate::client::retry::RetryExt;
19+
use crate::client::token::{TemporaryToken, TokenCache};
1920
use crate::util::hmac_sha256;
21+
use crate::RetryConfig;
2022
use chrono::Utc;
23+
use reqwest::header::ACCEPT;
2124
use reqwest::{
2225
header::{
2326
HeaderMap, HeaderName, HeaderValue, AUTHORIZATION, CONTENT_ENCODING,
2427
CONTENT_LANGUAGE, CONTENT_LENGTH, CONTENT_TYPE, DATE, IF_MATCH,
2528
IF_MODIFIED_SINCE, IF_NONE_MATCH, IF_UNMODIFIED_SINCE, RANGE,
2629
},
27-
Method, RequestBuilder,
30+
Client, Method, RequestBuilder,
2831
};
32+
use snafu::{ResultExt, Snafu};
2933
use std::borrow::Cow;
3034
use std::str;
35+
use std::time::{Duration, Instant};
3136
use url::Url;
3237

3338
static AZURE_VERSION: HeaderValue = HeaderValue::from_static("2021-08-06");
@@ -38,6 +43,18 @@ pub(crate) static DELETE_SNAPSHOTS: HeaderName =
3843
pub(crate) static COPY_SOURCE: HeaderName = HeaderName::from_static("x-ms-copy-source");
3944
static CONTENT_MD5: HeaderName = HeaderName::from_static("content-md5");
4045
pub(crate) static RFC1123_FMT: &str = "%a, %d %h %Y %T GMT";
46+
const CONTENT_TYPE_JSON: &str = "application/json";
47+
48+
#[derive(Debug, Snafu)]
49+
pub enum Error {
50+
#[snafu(display("Error performing token request: {}", source))]
51+
TokenRequest { source: crate::client::retry::Error },
52+
53+
#[snafu(display("Error getting token response body: {}", source))]
54+
TokenResponseBody { source: reqwest::Error },
55+
}
56+
57+
pub type Result<T, E = Error> = std::result::Result<T, E>;
4158

4259
/// Provides credentials for use when signing requests
4360
#[derive(Debug)]
@@ -253,3 +270,81 @@ fn lexy_sort<'a>(
253270
values.sort_unstable();
254271
values
255272
}
273+
274+
#[derive(serde::Deserialize, Debug)]
275+
struct TokenResponse {
276+
access_token: String,
277+
expires_in: u64,
278+
}
279+
280+
/// Encapsulates the logic to perform an OAuth token challenge
281+
#[derive(Debug)]
282+
pub struct ClientSecretOAuthProvider {
283+
scope: String,
284+
token_url: String,
285+
client_id: String,
286+
client_secret: String,
287+
cache: TokenCache<String>,
288+
}
289+
290+
impl ClientSecretOAuthProvider {
291+
/// Create a new [`ClientSecretOAuthProvider`] for an azure backed store
292+
pub fn new(
293+
client_id: String,
294+
client_secret: String,
295+
tenant_id: String,
296+
authority_host: Option<String>,
297+
) -> Self {
298+
let authority_host = authority_host
299+
.unwrap_or_else(|| authority_hosts::AZURE_PUBLIC_CLOUD.to_owned());
300+
301+
Self {
302+
scope: "https://storage.azure.com/.default".to_owned(),
303+
token_url: format!("{}/{}/oauth2/v2.0/token", authority_host, tenant_id),
304+
client_id,
305+
client_secret,
306+
cache: TokenCache::default(),
307+
}
308+
}
309+
310+
/// Fetch a token
311+
pub async fn fetch_token(
312+
&self,
313+
client: &Client,
314+
retry: &RetryConfig,
315+
) -> Result<String> {
316+
self.cache
317+
.get_or_insert_with(|| self.fetch_token_inner(client, retry))
318+
.await
319+
}
320+
321+
/// Fetch a fresh token
322+
async fn fetch_token_inner(
323+
&self,
324+
client: &Client,
325+
retry: &RetryConfig,
326+
) -> Result<TemporaryToken<String>> {
327+
let response: TokenResponse = client
328+
.request(Method::POST, &self.token_url)
329+
.header(ACCEPT, HeaderValue::from_static(CONTENT_TYPE_JSON))
330+
.form(&[
331+
("client_id", self.client_id.as_str()),
332+
("client_secret", self.client_secret.as_str()),
333+
("scope", self.scope.as_str()),
334+
("grant_type", "client_credentials"),
335+
])
336+
.send_retry(retry)
337+
.await
338+
.context(TokenRequestSnafu)?
339+
.json()
340+
.await
341+
.context(TokenResponseBodySnafu)?;
342+
343+
let token = TemporaryToken {
344+
token: response.access_token,
345+
expiry: Instant::now() + Duration::from_secs(response.expires_in),
346+
};
347+
348+
Ok(token)
349+
}
350+
}

object_store/src/azure/mod.rs

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,9 @@ enum Error {
110110

111111
#[snafu(display("At least one authorization option must be specified"))]
112112
MissingCredentials {},
113+
114+
#[snafu(display("Azure credential error: {}", source), context(false))]
115+
Credential { source: credential::Error },
113116
}
114117

115118
impl From<Error> for super::Error {
@@ -539,13 +542,12 @@ impl MicrosoftAzureBuilder {
539542
} else if let (Some(client_id), Some(client_secret), Some(tenant_id)) =
540543
(client_id, client_secret, tenant_id)
541544
{
542-
let client_credential =
543-
crate::client::oauth::ClientSecretOAuthProvider::new_azure(
544-
client_id,
545-
client_secret,
546-
tenant_id,
547-
authority_host,
548-
);
545+
let client_credential = credential::ClientSecretOAuthProvider::new(
546+
client_id,
547+
client_secret,
548+
tenant_id,
549+
authority_host,
550+
);
549551
Ok(credential::CredentialProvider::ClientSecret(
550552
client_credential,
551553
))

object_store/src/client/mod.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,6 @@
1818
//! Generic utilities reqwest based ObjectStore implementations
1919
2020
pub mod backoff;
21-
#[cfg(any(feature = "gcp", feature = "azure"))]
22-
pub mod oauth;
2321
pub mod pagination;
2422
pub mod retry;
2523
pub mod token;

object_store/src/gcp.rs

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,13 +48,17 @@ use tokio::io::AsyncWrite;
4848
use crate::client::pagination::stream_paginated;
4949
use crate::client::retry::RetryExt;
5050
use crate::{
51-
client::{oauth::OAuthProvider, token::TokenCache},
51+
client::token::TokenCache,
5252
multipart::{CloudMultiPartUpload, CloudMultiPartUploadImpl, UploadPart},
5353
path::{Path, DELIMITER},
5454
util::{format_http_range, format_prefix},
5555
GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, Result, RetryConfig,
5656
};
5757

58+
use credential::OAuthProvider;
59+
60+
mod credential;
61+
5862
#[derive(Debug, Snafu)]
5963
enum Error {
6064
#[snafu(display("Unable to open service account file: {}", source))]
@@ -115,6 +119,9 @@ enum Error {
115119

116120
#[snafu(display("Missing service account path"))]
117121
MissingServiceAccountPath,
122+
123+
#[snafu(display("GCP credential error: {}", source))]
124+
Credential { source: credential::Error },
118125
}
119126

120127
impl From<Error> for super::Error {
@@ -240,7 +247,8 @@ impl GoogleCloudStorageClient {
240247
.get_or_insert_with(|| {
241248
oauth_provider.fetch_token(&self.client, &self.retry_config)
242249
})
243-
.await?)
250+
.await
251+
.context(CredentialSnafu)?)
244252
} else {
245253
Ok("".to_owned())
246254
}
@@ -818,7 +826,8 @@ impl GoogleCloudStorageBuilder {
818826
audience,
819827
)
820828
})
821-
.transpose()?;
829+
.transpose()
830+
.context(CredentialSnafu)?;
822831

823832
let encoded_bucket_name =
824833
percent_encode(bucket_name.as_bytes(), NON_ALPHANUMERIC).to_string();

object_store/src/client/oauth.rs renamed to object_store/src/gcp/credential.rs

Lines changed: 1 addition & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,13 @@
1616
// under the License.
1717

1818
use crate::client::retry::RetryExt;
19-
use crate::client::token::{TemporaryToken, TokenCache};
19+
use crate::client::token::TemporaryToken;
2020
use crate::RetryConfig;
21-
use reqwest::header::{HeaderMap, HeaderValue, ACCEPT};
2221
use reqwest::{Client, Method};
2322
use ring::signature::RsaKeyPair;
2423
use snafu::{ResultExt, Snafu};
2524
use std::time::{Duration, Instant};
2625

27-
const CONTENT_TYPE_JSON: &str = "application/json";
28-
const AZURE_STORAGE_TOKEN_SCOPE: &str = "https://storage.azure.com/.default";
29-
3026
#[derive(Debug, Snafu)]
3127
pub enum Error {
3228
#[snafu(display("No RSA key found in pem file"))]
@@ -224,82 +220,3 @@ fn b64_encode_obj<T: serde::Serialize>(obj: &T) -> Result<String> {
224220
let string = serde_json::to_string(obj).context(EncodeSnafu)?;
225221
Ok(base64::encode_config(string, base64::URL_SAFE_NO_PAD))
226222
}
227-
228-
/// Encapsulates the logic to perform an OAuth token challenge
229-
#[derive(Debug)]
230-
pub struct ClientSecretOAuthProvider {
231-
scope: String,
232-
token_url: String,
233-
client_id: String,
234-
client_secret: String,
235-
cache: TokenCache<String>,
236-
}
237-
238-
impl ClientSecretOAuthProvider {
239-
/// Create a new [`ClientSecretOAuthProvider`] for an azure backed store
240-
pub fn new_azure(
241-
client_id: String,
242-
client_secret: String,
243-
tenant_id: String,
244-
authority_host: Option<String>,
245-
) -> Self {
246-
let authority_host = authority_host.unwrap_or_else(|| {
247-
crate::azure::authority_hosts::AZURE_PUBLIC_CLOUD.to_owned()
248-
});
249-
250-
Self {
251-
scope: AZURE_STORAGE_TOKEN_SCOPE.to_owned(),
252-
token_url: format!("{}/{}/oauth2/v2.0/token", authority_host, tenant_id),
253-
client_id,
254-
client_secret,
255-
cache: TokenCache::default(),
256-
}
257-
}
258-
259-
/// Fetch a token
260-
pub async fn fetch_token(
261-
&self,
262-
client: &Client,
263-
retry: &RetryConfig,
264-
) -> Result<String> {
265-
self.cache
266-
.get_or_insert_with(|| self.fetch_token_inner(client, retry))
267-
.await
268-
}
269-
270-
/// Fetch a fresh token
271-
async fn fetch_token_inner(
272-
&self,
273-
client: &Client,
274-
retry: &RetryConfig,
275-
) -> Result<TemporaryToken<String>> {
276-
let mut headers = HeaderMap::new();
277-
headers.append(ACCEPT, HeaderValue::from_static(CONTENT_TYPE_JSON));
278-
279-
let mut params = std::collections::HashMap::new();
280-
params.insert("client_id", self.client_id.as_str());
281-
params.insert("client_secret", self.client_secret.as_str());
282-
params.insert("scope", self.scope.as_str());
283-
params.insert("grant_type", "client_credentials");
284-
285-
let response: TokenResponse = client
286-
.request(Method::POST, &self.token_url)
287-
.headers(headers)
288-
.form(&params)
289-
.send_retry(retry)
290-
.await
291-
.context(TokenRequestSnafu)?
292-
.error_for_status()
293-
.context(TokenResponseBodySnafu)?
294-
.json()
295-
.await
296-
.context(TokenResponseBodySnafu)?;
297-
298-
let token = TemporaryToken {
299-
token: response.access_token,
300-
expiry: Instant::now() + Duration::from_secs(response.expires_in),
301-
};
302-
303-
Ok(token)
304-
}
305-
}

object_store/src/lib.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -465,10 +465,6 @@ pub enum Error {
465465

466466
#[snafu(display("Operation not yet implemented."))]
467467
NotImplemented,
468-
469-
#[cfg(feature = "gcp")]
470-
#[snafu(display("OAuth error: {}", source), context(false))]
471-
OAuth { source: client::oauth::Error },
472468
}
473469

474470
impl From<Error> for std::io::Error {

0 commit comments

Comments
 (0)