Skip to content

Commit 9c8b546

Browse files
committed
Windows compatibility
1 parent 18ceed5 commit 9c8b546

File tree

3 files changed

+120
-48
lines changed

3 files changed

+120
-48
lines changed

datafusion/core/src/datasource/listing/path.rs

Lines changed: 66 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ use datafusion_data_access::FileMeta;
2121
use futures::stream::BoxStream;
2222
use futures::{StreamExt, TryStreamExt};
2323
use glob::Pattern;
24+
use std::borrow::Cow;
25+
use std::path::{is_separator, MAIN_SEPARATOR};
2426
use url::Url;
2527

2628
/// A parsed URL identifying files for a listing table, see [`ListingTableUrl::parse`]
@@ -99,17 +101,25 @@ impl ListingTableUrl {
99101

100102
/// Returns the path as expected by [`ObjectStore`]
101103
///
102-
/// In particular for file scheme URLs, this has a leading `/`
103-
/// and describes an absolute path on the local filesystem
104+
/// In particular for file scheme URLs, this is an absolute
105+
/// on the local filesystem in the OS-specific path representation
104106
///
105-
/// For other URLs, this also contains the host component
106-
/// and lacks a leading `/`
107+
/// For other URLs, this is a the host and path of the URL,
108+
/// delimited by `/`, and with no leading `/`
107109
///
108110
/// TODO: Handle paths consistently (#2489)
109-
fn prefix(&self) -> &str {
111+
fn prefix(&self) -> Cow<'_, str> {
110112
match self.scheme() {
111-
"file" => self.url.path(),
112-
_ => &self.url[url::Position::BeforeHost..url::Position::AfterPath],
113+
"file" => match MAIN_SEPARATOR {
114+
'/' => Cow::Borrowed(self.url.path()),
115+
_ => {
116+
let path = self.url.to_file_path().unwrap();
117+
Cow::Owned(path.to_string_lossy().to_string())
118+
}
119+
},
120+
_ => Cow::Borrowed(
121+
&self.url[url::Position::BeforeHost..url::Position::AfterPath],
122+
),
113123
}
114124
}
115125

@@ -119,10 +129,12 @@ impl ListingTableUrl {
119129
&'a self,
120130
path: &'b str,
121131
) -> Option<impl Iterator<Item = &'b str> + 'a> {
132+
let prefix = self.prefix();
122133
// Ignore empty path segments
123134
let diff = itertools::diff_with(
124-
path.split('/').filter(|s| !s.is_empty()),
125-
self.prefix().split('/').filter(|s| !s.is_empty()),
135+
// TODO: Handle paths consistently (#2489)
136+
path.split(is_separator).filter(|s| !s.is_empty()),
137+
prefix.split(is_separator).filter(|s| !s.is_empty()),
126138
|a, b| a == b,
127139
);
128140

@@ -139,24 +151,27 @@ impl ListingTableUrl {
139151
store: &'a dyn ObjectStore,
140152
file_extension: &'a str,
141153
) -> BoxStream<'a, Result<FileMeta>> {
142-
futures::stream::once(store.list_file(self.prefix()))
143-
.try_flatten()
144-
.map_err(DataFusionError::IoError)
145-
.try_filter(move |meta| {
146-
let path = meta.path();
147-
148-
let extension_match = path.ends_with(file_extension);
149-
let glob_match = match &self.glob {
150-
Some(glob) => match path.strip_prefix(self.url.path()) {
151-
Some(stripped) => glob.matches(stripped),
152-
None => false,
153-
},
154-
None => true,
155-
};
154+
futures::stream::once(async move {
155+
let prefix = self.prefix();
156+
store.list_file(prefix.as_ref()).await
157+
})
158+
.try_flatten()
159+
.map_err(DataFusionError::IoError)
160+
.try_filter(move |meta| {
161+
let path = meta.path();
162+
163+
let extension_match = path.ends_with(file_extension);
164+
let glob_match = match &self.glob {
165+
Some(glob) => match path.strip_prefix(self.url.path()) {
166+
Some(stripped) => glob.matches(stripped),
167+
None => false,
168+
},
169+
None => true,
170+
};
156171

157-
futures::future::ready(extension_match && glob_match)
158-
})
159-
.boxed()
172+
futures::future::ready(extension_match && glob_match)
173+
})
174+
.boxed()
160175
}
161176
}
162177

@@ -194,10 +209,32 @@ fn split_glob_expression(path: &str) -> Option<(&str, &str)> {
194209

195210
#[cfg(test)]
196211
mod tests {
197-
use crate::datasource::listing::path::split_glob_expression;
212+
use super::*;
213+
use std::path::Path;
214+
215+
#[test]
216+
fn test_prefix_path() {
217+
let parent = Path::new("../").canonicalize().unwrap();
218+
let url = ListingTableUrl::parse(parent.to_string_lossy()).unwrap();
219+
220+
let path = Path::new(".").canonicalize().unwrap();
221+
let path = path.to_string_lossy();
222+
223+
assert_eq!(url.strip_prefix(path.as_ref()).unwrap().count(), 1);
224+
}
225+
226+
#[test]
227+
fn test_prefix_s3() {
228+
let url = ListingTableUrl::parse("s3://bucket/foo/bar").unwrap();
229+
assert_eq!(url.prefix(), "bucket/foo/bar");
230+
231+
let path = "bucket/foo/bar/partition/foo.parquet";
232+
let prefix: Vec<_> = url.strip_prefix(path).unwrap().collect();
233+
assert_eq!(prefix, vec!["partition", "foo.parquet"]);
234+
}
198235

199-
#[tokio::test]
200-
async fn test_split_glob() {
236+
#[test]
237+
fn test_split_glob() {
201238
fn test(input: &str, expected: Option<(&str, &str)>) {
202239
assert_eq!(
203240
split_glob_expression(input),

datafusion/core/src/test_util.rs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,7 @@ fn get_data_dir(udf_env: &str, submodule_data: &str) -> Result<PathBuf, Box<dyn
203203
if !trimmed.is_empty() {
204204
let pb = PathBuf::from(trimmed);
205205
if pb.is_dir() {
206-
return Ok(pb.canonicalize().unwrap());
206+
return Ok(pb);
207207
} else {
208208
return Err(format!(
209209
"the data dir `{}` defined by env {} not found",
@@ -224,7 +224,7 @@ fn get_data_dir(udf_env: &str, submodule_data: &str) -> Result<PathBuf, Box<dyn
224224

225225
let pb = PathBuf::from(dir).join(submodule_data);
226226
if pb.is_dir() {
227-
Ok(pb.canonicalize().unwrap())
227+
Ok(pb)
228228
} else {
229229
Err(format!(
230230
"env `{}` is undefined or has empty value, and the pre-defined data dir `{}` not found\n\
@@ -305,16 +305,18 @@ pub fn aggr_test_schema_with_missing_col() -> SchemaRef {
305305
mod tests {
306306
use super::*;
307307
use std::env;
308-
use std::path::Path;
309308

310309
#[test]
311310
fn test_data_dir() {
312311
let udf_env = "get_data_dir";
312+
let cwd = env::current_dir().unwrap();
313313

314-
let existing_str = "..";
315-
let existing_pb = Path::new(existing_str).canonicalize().unwrap();
314+
let existing_pb = cwd.join("..");
315+
let existing = existing_pb.display().to_string();
316+
let existing_str = existing.as_str();
316317

317-
let non_existing_str = "non-existing-dir";
318+
let non_existing = cwd.join("non-existing-dir").display().to_string();
319+
let non_existing_str = non_existing.as_str();
318320

319321
env::set_var(udf_env, non_existing_str);
320322
let res = get_data_dir(udf_env, existing_str);

datafusion/core/tests/sql/mod.rs

Lines changed: 46 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,9 @@ use datafusion::{execution::context::SessionContext, physical_plan::displayable}
4848
use datafusion_expr::Volatility;
4949
use std::fs::File;
5050
use std::io::Write;
51+
use std::path::PathBuf;
5152
use tempfile::TempDir;
53+
use url::Url;
5254

5355
/// A macro to assert that some particular line contains two substrings
5456
///
@@ -811,28 +813,59 @@ pub fn table_with_sequence(
811813
Ok(Arc::new(MemTable::try_new(schema, partitions)?))
812814
}
813815

814-
// Normalizes parts of an explain plan that vary from run to run (such as path)
815-
fn normalize_for_explain(s: &str) -> String {
816-
// Convert things like /Users/alamb/Software/arrow/testing/data/csv/aggregate_test_100.csv
817-
// to ARROW_TEST_DATA/csv/aggregate_test_100.csv
818-
let data_path = datafusion::test_util::arrow_test_data();
819-
let s = s.replace(&data_path, "ARROW_TEST_DATA");
816+
pub struct ExplainNormalizer {
817+
replacements: Vec<(String, String)>,
818+
}
819+
820+
impl ExplainNormalizer {
821+
fn new() -> Self {
822+
let mut replacements = vec![];
823+
824+
let mut push_path = |path: PathBuf, key: &str| {
825+
// Push path as is
826+
replacements.push((path.to_string_lossy().to_string(), key.to_string()));
827+
828+
// Push canonical version of path
829+
let canonical = path.canonicalize().unwrap();
830+
replacements.push((canonical.to_string_lossy().to_string(), key.to_string()));
831+
832+
// Push URL representation of path, to handle windows
833+
let url = Url::from_directory_path(canonical).unwrap();
834+
replacements.push((url.path().to_string(), key.to_string()));
835+
};
836+
837+
push_path(test_util::arrow_test_data().into(), "ARROW_TEST_DATA");
838+
push_path(std::env::current_dir().unwrap(), "WORKING_DIR");
820839

821-
let path = std::env::current_dir().unwrap();
822-
let s = s.replace(path.to_string_lossy().as_ref(), "WORKING_DIR");
840+
// convert things like partitioning=RoundRobinBatch(16)
841+
// to partitioning=RoundRobinBatch(NUM_CORES)
842+
let needle = format!("RoundRobinBatch({})", num_cpus::get());
843+
replacements.push((needle, "RoundRobinBatch(NUM_CORES)".to_string()));
823844

824-
// convert things like partitioning=RoundRobinBatch(16)
825-
// to partitioning=RoundRobinBatch(NUM_CORES)
826-
let needle = format!("RoundRobinBatch({})", num_cpus::get());
827-
s.replace(&needle, "RoundRobinBatch(NUM_CORES)")
845+
Self { replacements }
846+
}
847+
848+
fn normalize(&self, s: impl Into<String>) -> String {
849+
let mut s = s.into();
850+
for (from, to) in &self.replacements {
851+
s = s.replace(from, to);
852+
}
853+
s
854+
}
855+
}
856+
857+
// Normalizes parts of an explain plan that vary from run to run (such as path)
858+
fn normalize_for_explain(s: &str) -> String {
859+
ExplainNormalizer::new().normalize(s)
828860
}
829861

830862
/// Applies normalize_for_explain to every line
831863
fn normalize_vec_for_explain(v: Vec<Vec<String>>) -> Vec<Vec<String>> {
864+
let normalizer = ExplainNormalizer::new();
832865
v.into_iter()
833866
.map(|l| {
834867
l.into_iter()
835-
.map(|s| normalize_for_explain(&s))
868+
.map(|s| normalizer.normalize(s))
836869
.collect::<Vec<_>>()
837870
})
838871
.collect::<Vec<_>>()

0 commit comments

Comments
 (0)