diff --git a/Cargo.lock b/Cargo.lock index 4411365b0..cc7b4edf0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3341,6 +3341,7 @@ dependencies = [ "itertools 0.13.0", "mockito", "port_scanner", + "reqsign", "reqwest", "serde", "serde_derive", @@ -5009,16 +5010,6 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5a651516ddc9168ebd67b24afd085a718be02f8858fe406591b013d101ce2f40" -[[package]] -name = "quick-xml" -version = "0.35.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "86e446ed58cef1bbfe847bc2fda0e2e4ea9f0e57b90c507d4781292590d72a4e" -dependencies = [ - "memchr", - "serde", -] - [[package]] name = "quick-xml" version = "0.36.2" @@ -5315,9 +5306,9 @@ checksum = "a35e8a6bf28cd121053a66aa2e6a2e3eaffad4a60012179f0e864aa5ffeff215" [[package]] name = "reqsign" -version = "0.16.1" +version = "0.16.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eb0075a66c8bfbf4cc8b70dca166e722e1f55a3ea9250ecbb85f4d92a5f64149" +checksum = "9323c0afb30e54f793f4705b10c890395bccc87c6e6ea62c4e7e82d09a380dc6" dependencies = [ "anyhow", "async-trait", @@ -5333,7 +5324,7 @@ dependencies = [ "log", "once_cell", "percent-encoding", - "quick-xml 0.35.0", + "quick-xml 0.37.4", "rand 0.8.5", "reqwest", "rsa", @@ -5342,6 +5333,7 @@ dependencies = [ "serde_json", "sha1", "sha2", + "toml", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index c38706d7f..a0433f935 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -64,8 +64,8 @@ datafusion-cli = "45" datafusion-sqllogictest = "45" derive_builder = "0.20" dirs = "6" -expect-test = "1" enum-ordinalize = "4.3.0" +expect-test = "1" faststr = "0.2.31" fnv = "1.0.7" fs-err = "3.1.0" @@ -93,6 +93,7 @@ port_scanner = "0.1.5" pretty_assertions = "1.4" rand = "0.8.5" regex = "1.10.5" +reqsign = { version = "0.16.3" } reqwest = { version = "0.12.12", default-features = false, features = ["json"] } roaring = { version = "0.10", git = "https://github.com/RoaringBitmap/roaring-rs.git" } rust_decimal = "1.36" @@ -108,7 +109,7 @@ tempfile = "3.18" tera = "1" thrift = "0.17.0" tokio = { version = "1.44", default-features = false } -toml = "0.8" +toml = "0.8.9" tracing = "0.1.37" tracing-subscriber = "0.3.8" typed-builder = "0.20" diff --git a/crates/catalog/rest/Cargo.toml b/crates/catalog/rest/Cargo.toml index 70a1e8998..1c6629fc6 100644 --- a/crates/catalog/rest/Cargo.toml +++ b/crates/catalog/rest/Cargo.toml @@ -43,6 +43,7 @@ tokio = { workspace = true, features = ["sync"] } tracing = { workspace = true } typed-builder = { workspace = true } uuid = { workspace = true, features = ["v4"] } +reqsign = { workspace = true } [dev-dependencies] ctor = { workspace = true } diff --git a/crates/catalog/rest/src/catalog.rs b/crates/catalog/rest/src/catalog.rs index 7bf54d4d8..e4aff8a6a 100644 --- a/crates/catalog/rest/src/catalog.rs +++ b/crates/catalog/rest/src/catalog.rs @@ -21,13 +21,14 @@ use std::collections::HashMap; use std::str::FromStr; use async_trait::async_trait; -use iceberg::io::FileIO; +use iceberg::io::{FileIO, FileIOBuilder}; use iceberg::table::Table; use iceberg::{ Catalog, Error, ErrorKind, Namespace, NamespaceIdent, Result, TableCommit, TableCreation, TableIdent, }; use itertools::Itertools; +use reqsign::{AwsConfig, AwsDefaultLoader, AwsV4Signer}; use reqwest::header::{ HeaderMap, HeaderName, HeaderValue, {self}, }; @@ -84,6 +85,29 @@ impl RestCatalogConfig { } } + pub(crate) fn get_signer(&self) -> Result> { + if let Some("true") = self.props.get("rest.sigv4-enabled").map(|s| s.as_str()) { + let Some(signing_region) = self.props.get("rest.signing-region") else { + return Err(Error::new( + ErrorKind::Unexpected, + "rest.signing-region is not set when rest.sigv4-enabled is true", + )); + }; + let Some(signing_name) = self.props.get("rest.signing-name") else { + return Err(Error::new( + ErrorKind::Unexpected, + "rest.signing-name is not set when rest.sigv4-enabled is true", + )); + }; + + let config = AwsConfig::default().from_profile().from_env(); + let loader = AwsDefaultLoader::new(self.client().unwrap_or_default(), config); + let signer = AwsV4Signer::new(signing_name, signing_region); + Ok(Some((loader, signer))) + } else { + Ok(None) + } + } fn namespaces_endpoint(&self) -> String { self.url_prefixed(&["namespaces"]) } @@ -306,6 +330,13 @@ impl RestCatalog { None => None, }; + if let Some(warehouse_path) = warehouse_path { + if warehouse_path.starts_with("arn:aws:") { + let file_io = FileIOBuilder::new("s3").with_props(&props).build()?; + return Ok(file_io); + } + } + let file_io = match warehouse_path.or(metadata_location) { Some(url) => FileIO::from_path(url)?.with_props(props).build()?, None => { diff --git a/crates/catalog/rest/src/client.rs b/crates/catalog/rest/src/client.rs index 36069c3ef..f24886443 100644 --- a/crates/catalog/rest/src/client.rs +++ b/crates/catalog/rest/src/client.rs @@ -18,8 +18,9 @@ use std::collections::HashMap; use std::fmt::{Debug, Formatter}; -use http::StatusCode; +use http::{HeaderValue, StatusCode}; use iceberg::{Error, ErrorKind, Result}; +use reqsign::{AwsDefaultLoader, AwsV4Signer}; use reqwest::header::HeaderMap; use reqwest::{Client, IntoUrl, Method, Request, RequestBuilder, Response}; use serde::de::DeserializeOwned; @@ -43,6 +44,8 @@ pub(crate) struct HttpClient { extra_headers: HeaderMap, /// Extra oauth parameters to be added to each authentication request. extra_oauth_params: HashMap, + + signer: Option<(AwsDefaultLoader, AwsV4Signer)>, } impl Debug for HttpClient { @@ -65,6 +68,7 @@ impl HttpClient { credential: cfg.credential(), extra_headers, extra_oauth_params: cfg.extra_oauth_params(), + signer: cfg.get_signer()?, }) } @@ -88,6 +92,7 @@ impl HttpClient { extra_oauth_params: (!cfg.extra_oauth_params().is_empty()) .then(|| cfg.extra_oauth_params()) .unwrap_or(self.extra_oauth_params), + signer: cfg.get_signer()?, }) } @@ -220,6 +225,39 @@ impl HttpClient { /// Executes the given `Request` and returns a `Response`. pub async fn execute(&self, mut request: Request) -> Result { request.headers_mut().extend(self.extra_headers.clone()); + + if let Some((loader, signer)) = &self.signer { + match loader.load().await { + Ok(Some(credential)) => { + const EMPTY_STRING_SHA256: &str = + "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"; + request.headers_mut().insert( + "x-amz-content-sha256", + HeaderValue::from_str(EMPTY_STRING_SHA256).unwrap(), + ); + if let Err(e) = signer.sign(&mut request, &credential) { + return Err(Error::new( + ErrorKind::Unexpected, + "Failed to sign request for sigv4 signing", + ) + .with_source(e)); + } + } + Ok(None) => { + return Err(Error::new( + ErrorKind::Unexpected, + "Credential not found for sigv4 signing", + )); + } + Err(e) => { + return Err(Error::new( + ErrorKind::Unexpected, + "Failed to load credential for sigv4 signing", + ) + .with_source(e)); + } + } + } Ok(self.client.execute(request).await?) } @@ -255,6 +293,7 @@ pub(crate) async fn deserialize_catalog_response( /// codes that all endpoints share (400, 404, etc.). pub(crate) async fn deserialize_unexpected_catalog_error(response: Response) -> Error { let (status, headers) = (response.status(), response.headers().clone()); + let url = response.url().to_string(); let bytes = match response.bytes().await { Ok(bytes) => bytes, Err(err) => return err.into(), @@ -264,4 +303,5 @@ pub(crate) async fn deserialize_unexpected_catalog_error(response: Response) -> .with_context("status", status.to_string()) .with_context("headers", format!("{:?}", headers)) .with_context("json", String::from_utf8_lossy(&bytes)) + .with_context("url", url) }