-
Notifications
You must be signed in to change notification settings - Fork 241
feat(rest): support AWS SIGv4 #1241
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,13 +21,14 @@ use std::collections::HashMap; | |
use std::str::FromStr; | ||
|
||
use async_trait::async_trait; | ||
use iceberg::io::FileIO; | ||
use iceberg::io::{FileIO, FileIOBuilder}; | ||
use iceberg::table::Table; | ||
use iceberg::{ | ||
Catalog, Error, ErrorKind, Namespace, NamespaceIdent, Result, TableCommit, TableCreation, | ||
TableIdent, | ||
}; | ||
use itertools::Itertools; | ||
use reqsign::{AwsConfig, AwsDefaultLoader, AwsV4Signer}; | ||
use reqwest::header::{ | ||
HeaderMap, HeaderName, HeaderValue, {self}, | ||
}; | ||
|
@@ -84,6 +85,29 @@ impl RestCatalogConfig { | |
} | ||
} | ||
|
||
pub(crate) fn get_signer(&self) -> Result<Option<(AwsDefaultLoader, AwsV4Signer)>> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think we should credential store loader, why not store credential? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. All the keys should use constants rather magic strings. |
||
if let Some("true") = self.props.get("rest.sigv4-enabled").map(|s| s.as_str()) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why not use |
||
let Some(signing_region) = self.props.get("rest.signing-region") else { | ||
return Err(Error::new( | ||
ErrorKind::Unexpected, | ||
"rest.signing-region is not set when rest.sigv4-enabled is true", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This string should be formatted using constants |
||
)); | ||
}; | ||
let Some(signing_name) = self.props.get("rest.signing-name") else { | ||
return Err(Error::new( | ||
ErrorKind::Unexpected, | ||
"rest.signing-name is not set when rest.sigv4-enabled is true", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ditto. |
||
)); | ||
}; | ||
|
||
let config = AwsConfig::default().from_profile().from_env(); | ||
let loader = AwsDefaultLoader::new(self.client().unwrap_or_default(), config); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not only from profile, we should also allow user to config using properties, for example aws_access_id |
||
let signer = AwsV4Signer::new(signing_name, signing_region); | ||
Ok(Some((loader, signer))) | ||
} else { | ||
Ok(None) | ||
} | ||
} | ||
fn namespaces_endpoint(&self) -> String { | ||
self.url_prefixed(&["namespaces"]) | ||
} | ||
|
@@ -306,6 +330,13 @@ impl RestCatalog { | |
None => None, | ||
}; | ||
|
||
if let Some(warehouse_path) = warehouse_path { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Look at below codes, we should also consider |
||
if warehouse_path.starts_with("arn:aws:") { | ||
let file_io = FileIOBuilder::new("s3").with_props(&props).build()?; | ||
return Ok(file_io); | ||
} | ||
Comment on lines
+334
to
+337
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there a better way to know the rest catalog service e.g. using |
||
} | ||
|
||
let file_io = match warehouse_path.or(metadata_location) { | ||
Some(url) => FileIO::from_path(url)?.with_props(props).build()?, | ||
None => { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,8 +18,9 @@ | |
use std::collections::HashMap; | ||
use std::fmt::{Debug, Formatter}; | ||
|
||
use http::StatusCode; | ||
use http::{HeaderValue, StatusCode}; | ||
use iceberg::{Error, ErrorKind, Result}; | ||
use reqsign::{AwsDefaultLoader, AwsV4Signer}; | ||
use reqwest::header::HeaderMap; | ||
use reqwest::{Client, IntoUrl, Method, Request, RequestBuilder, Response}; | ||
use serde::de::DeserializeOwned; | ||
|
@@ -43,6 +44,8 @@ pub(crate) struct HttpClient { | |
extra_headers: HeaderMap, | ||
/// Extra oauth parameters to be added to each authentication request. | ||
extra_oauth_params: HashMap<String, String>, | ||
|
||
signer: Option<(AwsDefaultLoader, AwsV4Signer)>, | ||
} | ||
|
||
impl Debug for HttpClient { | ||
|
@@ -65,6 +68,7 @@ impl HttpClient { | |
credential: cfg.credential(), | ||
extra_headers, | ||
extra_oauth_params: cfg.extra_oauth_params(), | ||
signer: cfg.get_signer()?, | ||
}) | ||
} | ||
|
||
|
@@ -88,6 +92,7 @@ impl HttpClient { | |
extra_oauth_params: (!cfg.extra_oauth_params().is_empty()) | ||
.then(|| cfg.extra_oauth_params()) | ||
.unwrap_or(self.extra_oauth_params), | ||
signer: cfg.get_signer()?, | ||
}) | ||
} | ||
|
||
|
@@ -220,6 +225,39 @@ impl HttpClient { | |
/// Executes the given `Request` and returns a `Response`. | ||
pub async fn execute(&self, mut request: Request) -> Result<Response> { | ||
request.headers_mut().extend(self.extra_headers.clone()); | ||
|
||
if let Some((loader, signer)) = &self.signer { | ||
match loader.load().await { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we want to load credentials with every request?
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We also need to change |
||
Ok(Some(credential)) => { | ||
const EMPTY_STRING_SHA256: &str = | ||
"e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"; | ||
request.headers_mut().insert( | ||
"x-amz-content-sha256", | ||
HeaderValue::from_str(EMPTY_STRING_SHA256).unwrap(), | ||
); | ||
Comment on lines
+232
to
+237
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do we have to hardcode this here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm also quite confused. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. By default,
I also not sure if this is the most correct way, whether we should support this natively in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this behavor should be hidden under |
||
if let Err(e) = signer.sign(&mut request, &credential) { | ||
return Err(Error::new( | ||
ErrorKind::Unexpected, | ||
"Failed to sign request for sigv4 signing", | ||
) | ||
.with_source(e)); | ||
} | ||
} | ||
Ok(None) => { | ||
return Err(Error::new( | ||
ErrorKind::Unexpected, | ||
"Credential not found for sigv4 signing", | ||
)); | ||
} | ||
Err(e) => { | ||
return Err(Error::new( | ||
ErrorKind::Unexpected, | ||
"Failed to load credential for sigv4 signing", | ||
) | ||
.with_source(e)); | ||
} | ||
} | ||
} | ||
Ok(self.client.execute(request).await?) | ||
} | ||
|
||
|
@@ -255,6 +293,7 @@ pub(crate) async fn deserialize_catalog_response<R: DeserializeOwned>( | |
/// codes that all endpoints share (400, 404, etc.). | ||
pub(crate) async fn deserialize_unexpected_catalog_error(response: Response) -> Error { | ||
let (status, headers) = (response.status(), response.headers().clone()); | ||
let url = response.url().to_string(); | ||
let bytes = match response.bytes().await { | ||
Ok(bytes) => bytes, | ||
Err(err) => return err.into(), | ||
|
@@ -264,4 +303,5 @@ pub(crate) async fn deserialize_unexpected_catalog_error(response: Response) -> | |
.with_context("status", status.to_string()) | ||
.with_context("headers", format!("{:?}", headers)) | ||
.with_context("json", String::from_utf8_lossy(&bytes)) | ||
.with_context("url", url) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason for not using https://crates.io/crates/aws-sigv4 ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reqsign is lightweight, with minimal dependency footprints. And it's already depended by us (transitively via opendal ). So it's good not to introduce new heavy dependencies.
@Xuanwo could you share your opinions on this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm fine with
reqsign
as its api will be easier to get adapted to different vendors, and alsoaws-sigv4
itself says it's not designed for used directly, see