diff --git a/datafusion/datasource/src/url.rs b/datafusion/datasource/src/url.rs index bddfdbcc06d1..189d0c19f086 100644 --- a/datafusion/datasource/src/url.rs +++ b/datafusion/datasource/src/url.rs @@ -242,25 +242,51 @@ impl ListingTableUrl { ) -> Result>> { let exec_options = &ctx.config_options().execution; let ignore_subdirectory = exec_options.listing_table_ignore_subdirectory; - // If the prefix is a file, use a head request, otherwise list - let list = match self.is_collection() { - true => match ctx.runtime_env().cache_manager.get_list_files_cache() { - None => store.list(Some(&self.prefix)), + + async fn list_with_cache<'b>( + ctx: &'b dyn Session, + store: &'b dyn ObjectStore, + prefix: &'b Path, + ) -> Result>> { + match ctx.runtime_env().cache_manager.get_list_files_cache() { + None => Ok(store + .list(Some(prefix)) + .map(|res| res.map_err(DataFusionError::ObjectStore)) + .boxed()), Some(cache) => { - if let Some(res) = cache.get(&self.prefix) { + if let Some(res) = cache.get(prefix) { debug!("Hit list all files cache"); - futures::stream::iter(res.as_ref().clone().into_iter().map(Ok)) - .boxed() + Ok(futures::stream::iter( + res.as_ref().clone().into_iter().map(Ok), + ) + .map(|res| res.map_err(DataFusionError::ObjectStore)) + .boxed()) } else { - let list_res = store.list(Some(&self.prefix)); - let vec = list_res.try_collect::>().await?; - cache.put(&self.prefix, Arc::new(vec.clone())); - futures::stream::iter(vec.into_iter().map(Ok)).boxed() + let vec = store + .list(Some(prefix)) + .map(|res| res.map_err(DataFusionError::ObjectStore)) + .try_collect::>() + .await?; + cache.put(prefix, Arc::new(vec.clone())); + Ok(futures::stream::iter(vec.into_iter().map(Ok)) + .map(|res| res.map_err(DataFusionError::ObjectStore)) + .boxed()) } } - }, - false => futures::stream::once(store.head(&self.prefix)).boxed(), + } + } + + let list: BoxStream<'a, Result> = if self.is_collection() { + list_with_cache(ctx, store, &self.prefix).await? + } else { + match store.head(&self.prefix).await { + Ok(meta) => futures::stream::once(async { Ok(meta) }) + .map(|res| res.map_err(DataFusionError::ObjectStore)) + .boxed(), + Err(_) => list_with_cache(ctx, store, &self.prefix).await?, + } }; + Ok(list .try_filter(move |meta| { let path = &meta.location; @@ -268,7 +294,6 @@ impl ListingTableUrl { let glob_match = self.contains(path, ignore_subdirectory); futures::future::ready(extension_match && glob_match) }) - .map_err(DataFusionError::ObjectStore) .boxed()) }