diff --git a/ballista/rust/core/src/serde/logical_plan/from_proto.rs b/ballista/rust/core/src/serde/logical_plan/from_proto.rs index ade2cb40adb7..4d51c4e06a99 100644 --- a/ballista/rust/core/src/serde/logical_plan/from_proto.rs +++ b/ballista/rust/core/src/serde/logical_plan/from_proto.rs @@ -157,7 +157,7 @@ impl TryInto for &protobuf::LogicalPlanNode { LogicalPlanBuilder::scan_parquet_with_name( &scan.path, projection, - 24, + ExecutionContext::with_concurrency(24), &scan.table_name, )? //TODO concurrency .build() @@ -1114,10 +1114,11 @@ impl TryInto for &protobuf::Field { } } +use datafusion::physical_plan::datetime_expressions::to_timestamp; use datafusion::physical_plan::{aggregates, windows}; use datafusion::prelude::{ array, date_part, date_trunc, length, lower, ltrim, md5, rtrim, sha224, sha256, - sha384, sha512, trim, upper, + sha384, sha512, trim, upper, ExecutionContext, }; use std::convert::TryFrom; diff --git a/ballista/rust/core/src/serde/physical_plan/from_proto.rs b/ballista/rust/core/src/serde/physical_plan/from_proto.rs index 46815db056a1..95862f9b4f75 100644 --- a/ballista/rust/core/src/serde/physical_plan/from_proto.rs +++ b/ballista/rust/core/src/serde/physical_plan/from_proto.rs @@ -34,6 +34,7 @@ use datafusion::arrow::datatypes::{DataType, Schema, SchemaRef}; use datafusion::catalog::catalog::{ CatalogList, CatalogProvider, MemoryCatalogList, MemoryCatalogProvider, }; +use datafusion::datasource::object_store::ObjectStoreRegistry; use datafusion::execution::context::{ ExecutionConfig, ExecutionContextState, ExecutionProps, }; @@ -70,7 +71,7 @@ use datafusion::physical_plan::{ Partitioning, }; use datafusion::physical_plan::{AggregateExpr, ExecutionPlan, PhysicalExpr, WindowExpr}; -use datafusion::prelude::CsvReadOptions; +use datafusion::prelude::{CsvReadOptions, ExecutionContext}; use log::debug; use protobuf::physical_expr_node::ExprType; use protobuf::physical_plan_node::PhysicalPlanType; @@ -130,14 +131,13 @@ impl TryInto> for &protobuf::PhysicalPlanNode { } PhysicalPlanType::ParquetScan(scan) => { let projection = scan.projection.iter().map(|i| *i as usize).collect(); - let filenames: Vec<&str> = - scan.filename.iter().map(|s| s.as_str()).collect(); - Ok(Arc::new(ParquetExec::try_from_files( - &filenames, + let path: &str = scan.filename[0].as_str(); + Ok(Arc::new(ParquetExec::try_from_path( + path, Some(projection), None, scan.batch_size as usize, - scan.num_partitions as usize, + ExecutionContext::with_concurrency(scan.num_partitions as usize), None, )?)) } @@ -621,6 +621,9 @@ impl TryFrom<&protobuf::PhysicalExprNode> for Arc { let catalog_list = Arc::new(MemoryCatalogList::new()) as Arc; + + let object_store_registry = Arc::new(ObjectStoreRegistry::new()); + let ctx_state = ExecutionContextState { catalog_list, scalar_functions: Default::default(), @@ -628,6 +631,7 @@ impl TryFrom<&protobuf::PhysicalExprNode> for Arc { aggregate_functions: Default::default(), config: ExecutionConfig::new(), execution_props: ExecutionProps::new(), + object_store_registry, }; let fun_expr = functions::create_physical_fun( diff --git a/ballista/rust/core/src/serde/physical_plan/to_proto.rs b/ballista/rust/core/src/serde/physical_plan/to_proto.rs index 8d8f917461a9..438f871b346a 100644 --- a/ballista/rust/core/src/serde/physical_plan/to_proto.rs +++ b/ballista/rust/core/src/serde/physical_plan/to_proto.rs @@ -271,7 +271,7 @@ impl TryInto for Arc { let filenames = exec .partitions() .iter() - .flat_map(|part| part.filenames().to_owned()) + .flat_map(|part| part.filenames()) .collect(); Ok(protobuf::PhysicalPlanNode { physical_plan_type: Some(PhysicalPlanType::ParquetScan( diff --git a/ballista/rust/scheduler/src/lib.rs b/ballista/rust/scheduler/src/lib.rs index b476b77f36a1..389a5cf34ff1 100644 --- a/ballista/rust/scheduler/src/lib.rs +++ b/ballista/rust/scheduler/src/lib.rs @@ -82,7 +82,7 @@ use self::state::{ConfigBackendClient, SchedulerState}; use ballista_core::config::BallistaConfig; use ballista_core::execution_plans::ShuffleWriterExec; use ballista_core::serde::scheduler::to_proto::hash_partitioning_to_proto; -use datafusion::physical_plan::parquet::ParquetExec; +use datafusion::datasource::parquet::ParquetRootDesc; use datafusion::prelude::{ExecutionConfig, ExecutionContext}; use std::time::{Instant, SystemTime, UNIX_EPOCH}; @@ -282,24 +282,19 @@ impl SchedulerGrpc for SchedulerServer { match file_type { FileType::Parquet => { - let parquet_exec = - ParquetExec::try_from_path(&path, None, None, 1024, 1, None) - .map_err(|e| { - let msg = format!("Error opening parquet files: {}", e); - error!("{}", msg); - tonic::Status::internal(msg) - })?; + let ctx = ExecutionContext::with_concurrency(1); + let parquet_desc = ParquetRootDesc::new(&path, ctx).map_err(|e| { + let msg = format!("Error opening parquet files: {}", e); + error!("{}", msg); + tonic::Status::internal(msg) + })?; //TODO include statistics and any other info needed to reconstruct ParquetExec Ok(Response::new(GetFileMetadataResult { - schema: Some(parquet_exec.schema().as_ref().into()), - partitions: parquet_exec - .partitions() - .iter() - .map(|part| FilePartitionMetadata { - filename: part.filenames().to_vec(), - }) - .collect(), + schema: Some(parquet_desc.schema().as_ref().into()), + partitions: vec![FilePartitionMetadata { + filename: vec![path], + }], })) } //TODO implement for CSV diff --git a/ballista/rust/scheduler/src/planner.rs b/ballista/rust/scheduler/src/planner.rs index 05025f282477..246a057fbf88 100644 --- a/ballista/rust/scheduler/src/planner.rs +++ b/ballista/rust/scheduler/src/planner.rs @@ -269,8 +269,8 @@ mod test { }; } - #[test] - fn distributed_hash_aggregate_plan() -> Result<(), BallistaError> { + #[tokio::test] + async fn distributed_hash_aggregate_plan() -> Result<(), BallistaError> { let mut ctx = datafusion_test_context("testdata")?; // simplified form of TPC-H query 1 @@ -352,8 +352,8 @@ mod test { Ok(()) } - #[test] - fn distributed_join_plan() -> Result<(), BallistaError> { + #[tokio::test] + async fn distributed_join_plan() -> Result<(), BallistaError> { let mut ctx = datafusion_test_context("testdata")?; // simplified form of TPC-H query 12 @@ -523,8 +523,8 @@ order by Ok(()) } - #[test] - fn roundtrip_serde_hash_aggregate() -> Result<(), BallistaError> { + #[tokio::test] + async fn roundtrip_serde_hash_aggregate() -> Result<(), BallistaError> { let mut ctx = datafusion_test_context("testdata")?; // simplified form of TPC-H query 1 diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs index 29835e266589..cb3ade6df047 100644 --- a/benchmarks/src/bin/tpch.rs +++ b/benchmarks/src/bin/tpch.rs @@ -478,8 +478,8 @@ fn get_table( let schema = get_schema(table); Ok(Arc::new(ParquetTable::try_new_with_schema( &path, + ExecutionContext::with_concurrency(max_concurrency), schema, - max_concurrency, false, )?)) } diff --git a/datafusion-examples/examples/flight_server.rs b/datafusion-examples/examples/flight_server.rs index 138434ea2482..aab647b86676 100644 --- a/datafusion-examples/examples/flight_server.rs +++ b/datafusion-examples/examples/flight_server.rs @@ -65,7 +65,11 @@ impl FlightService for FlightServiceImpl { ) -> Result, Status> { let request = request.into_inner(); - let table = ParquetTable::try_new(&request.path[0], num_cpus::get()).unwrap(); + let table = ParquetTable::try_new( + &request.path[0], + ExecutionContext::with_concurrency(num_cpus::get()), + ) + .unwrap(); let options = datafusion::arrow::ipc::writer::IpcWriteOptions::default(); let schema_result = SchemaAsIpc::new(table.schema().as_ref(), &options).into(); diff --git a/datafusion/Cargo.toml b/datafusion/Cargo.toml index db950c4956ce..9541e6d5a57a 100644 --- a/datafusion/Cargo.toml +++ b/datafusion/Cargo.toml @@ -56,9 +56,9 @@ paste = "^1.0" num_cpus = "1.13.0" chrono = "0.4" async-trait = "0.1.41" -futures = "0.3" +futures = { version = "0.3", features = ["executor"] } pin-project-lite= "^0.2.0" -tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync"] } +tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync", "fs"] } tokio-stream = "0.1" log = "^0.4" md-5 = { version = "^0.9.1", optional = true } diff --git a/datafusion/src/dataframe.rs b/datafusion/src/dataframe.rs index 608f6dbcaf17..45727f4160f7 100644 --- a/datafusion/src/dataframe.rs +++ b/datafusion/src/dataframe.rs @@ -41,7 +41,8 @@ use async_trait::async_trait; /// ``` /// # use datafusion::prelude::*; /// # use datafusion::error::Result; -/// # fn main() -> Result<()> { +/// #[tokio::main] +/// # async fn main() -> Result<()> { /// let mut ctx = ExecutionContext::new(); /// let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new())?; /// let df = df.filter(col("a").lt_eq(col("b")))? @@ -59,7 +60,8 @@ pub trait DataFrame: Send + Sync { /// ``` /// # use datafusion::prelude::*; /// # use datafusion::error::Result; - /// # fn main() -> Result<()> { + /// #[tokio::main] + /// # async fn main() -> Result<()> { /// let mut ctx = ExecutionContext::new(); /// let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new())?; /// let df = df.select_columns(&["a", "b"])?; @@ -73,7 +75,8 @@ pub trait DataFrame: Send + Sync { /// ``` /// # use datafusion::prelude::*; /// # use datafusion::error::Result; - /// # fn main() -> Result<()> { + /// #[tokio::main] + /// # async fn main() -> Result<()> { /// let mut ctx = ExecutionContext::new(); /// let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new())?; /// let df = df.select(vec![col("a") * col("b"), col("c")])?; @@ -87,7 +90,8 @@ pub trait DataFrame: Send + Sync { /// ``` /// # use datafusion::prelude::*; /// # use datafusion::error::Result; - /// # fn main() -> Result<()> { + /// #[tokio::main] + /// # async fn main() -> Result<()> { /// let mut ctx = ExecutionContext::new(); /// let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new())?; /// let df = df.filter(col("a").lt_eq(col("b")))?; @@ -101,7 +105,8 @@ pub trait DataFrame: Send + Sync { /// ``` /// # use datafusion::prelude::*; /// # use datafusion::error::Result; - /// # fn main() -> Result<()> { + /// #[tokio::main] + /// # async fn main() -> Result<()> { /// let mut ctx = ExecutionContext::new(); /// let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new())?; /// @@ -124,7 +129,8 @@ pub trait DataFrame: Send + Sync { /// ``` /// # use datafusion::prelude::*; /// # use datafusion::error::Result; - /// # fn main() -> Result<()> { + /// #[tokio::main] + /// # async fn main() -> Result<()> { /// let mut ctx = ExecutionContext::new(); /// let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new())?; /// let df = df.limit(100)?; @@ -138,7 +144,8 @@ pub trait DataFrame: Send + Sync { /// ``` /// # use datafusion::prelude::*; /// # use datafusion::error::Result; - /// # fn main() -> Result<()> { + /// #[tokio::main] + /// # async fn main() -> Result<()> { /// let mut ctx = ExecutionContext::new(); /// let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new())?; /// let df = df.union(df.clone())?; @@ -153,7 +160,8 @@ pub trait DataFrame: Send + Sync { /// ``` /// # use datafusion::prelude::*; /// # use datafusion::error::Result; - /// # fn main() -> Result<()> { + /// #[tokio::main] + /// # async fn main() -> Result<()> { /// let mut ctx = ExecutionContext::new(); /// let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new())?; /// let df = df.sort(vec![col("a").sort(true, true), col("b").sort(false, false)])?; @@ -196,7 +204,8 @@ pub trait DataFrame: Send + Sync { /// ``` /// # use datafusion::prelude::*; /// # use datafusion::error::Result; - /// # fn main() -> Result<()> { + /// #[tokio::main] + /// # async fn main() -> Result<()> { /// let mut ctx = ExecutionContext::new(); /// let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new())?; /// let df1 = df.repartition(Partitioning::RoundRobinBatch(4))?; @@ -275,7 +284,8 @@ pub trait DataFrame: Send + Sync { /// ``` /// # use datafusion::prelude::*; /// # use datafusion::error::Result; - /// # fn main() -> Result<()> { + /// #[tokio::main] + /// # async fn main() -> Result<()> { /// let mut ctx = ExecutionContext::new(); /// let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new())?; /// let schema = df.schema(); @@ -309,7 +319,8 @@ pub trait DataFrame: Send + Sync { /// ``` /// # use datafusion::prelude::*; /// # use datafusion::error::Result; - /// # fn main() -> Result<()> { + /// #[tokio::main] + /// # async fn main() -> Result<()> { /// let mut ctx = ExecutionContext::new(); /// let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new())?; /// let f = df.registry(); diff --git a/datafusion/src/datasource/csv.rs b/datafusion/src/datasource/csv.rs index 987c4fdb079d..8e55180ea829 100644 --- a/datafusion/src/datasource/csv.rs +++ b/datafusion/src/datasource/csv.rs @@ -24,6 +24,8 @@ //! ``` //! use datafusion::datasource::TableProvider; //! use datafusion::datasource::csv::{CsvFile, CsvReadOptions}; +//! #[tokio::main] +//! # async fn main() { //! //! let testdata = datafusion::test_util::arrow_test_data(); //! let csvdata = CsvFile::try_new( @@ -31,6 +33,7 @@ //! CsvReadOptions::new().delimiter(b'|'), //! ).unwrap(); //! let schema = csvdata.schema(); +//! # } //! ``` use arrow::datatypes::SchemaRef; @@ -40,14 +43,16 @@ use std::string::String; use std::sync::{Arc, Mutex}; use crate::datasource::datasource::Statistics; +use crate::datasource::object_store::local::LocalFileSystem; +use crate::datasource::object_store::ObjectStore; use crate::datasource::{Source, TableProvider}; use crate::error::{DataFusionError, Result}; use crate::logical_plan::Expr; use crate::physical_plan::csv::CsvExec; pub use crate::physical_plan::csv::CsvReadOptions; -use crate::physical_plan::{common, ExecutionPlan}; +use crate::physical_plan::ExecutionPlan; -/// Represents a CSV file with a provided schema +/// Represents a CSV file with a provided scxhema pub struct CsvFile { source: Source, schema: SchemaRef, @@ -64,7 +69,8 @@ impl CsvFile { let schema = Arc::new(match options.schema { Some(s) => s.clone(), None => { - let filenames = common::build_file_list(&path, options.file_extension)?; + let filenames = + LocalFileSystem.list(path.as_str(), options.file_extension)?; if filenames.is_empty() { return Err(DataFusionError::Plan(format!( "No files found at {path} with file extension {file_extension}", diff --git a/datafusion/src/datasource/json.rs b/datafusion/src/datasource/json.rs index 90fedfd6f528..e353cfe917c5 100644 --- a/datafusion/src/datasource/json.rs +++ b/datafusion/src/datasource/json.rs @@ -30,7 +30,6 @@ use crate::{ datasource::{Source, TableProvider}, error::{DataFusionError, Result}, physical_plan::{ - common, json::{NdJsonExec, NdJsonReadOptions}, ExecutionPlan, }, @@ -38,6 +37,8 @@ use crate::{ use arrow::{datatypes::SchemaRef, json::reader::infer_json_schema_from_seekable}; use super::datasource::Statistics; +use crate::datasource::object_store::local::LocalFileSystem; +use crate::datasource::object_store::ObjectStore; trait SeekRead: Read + Seek {} @@ -57,7 +58,7 @@ impl NdJsonFile { let schema = if let Some(schema) = options.schema { schema } else { - let filenames = common::build_file_list(path, options.file_extension)?; + let filenames = LocalFileSystem.list(path, options.file_extension)?; if filenames.is_empty() { return Err(DataFusionError::Plan(format!( "No files found at {path} with file extension {file_extension}", diff --git a/datafusion/src/datasource/mod.rs b/datafusion/src/datasource/mod.rs index 9699a997caa1..199444cd9cd6 100644 --- a/datafusion/src/datasource/mod.rs +++ b/datafusion/src/datasource/mod.rs @@ -22,12 +22,28 @@ pub mod datasource; pub mod empty; pub mod json; pub mod memory; +pub mod object_store; pub mod parquet; +pub mod parquet_io; pub use self::csv::{CsvFile, CsvReadOptions}; pub use self::datasource::{TableProvider, TableType}; pub use self::memory::MemTable; +use crate::arrow::datatypes::{Schema, SchemaRef}; +use crate::datasource::datasource::{ColumnStatistics, Statistics}; +use crate::datasource::object_store::{FileNameStream, ObjectStore}; +use crate::error::{DataFusionError, Result}; +use crate::physical_plan::expressions::{MaxAccumulator, MinAccumulator}; +use crate::physical_plan::Accumulator; +use async_trait::async_trait; +use futures::{Stream, StreamExt}; +use std::fmt::Debug; +use std::pin::Pin; +use std::sync::Arc; +use tokio::sync::mpsc::{channel, Receiver, Sender}; +use tokio_stream::wrappers::ReceiverStream; + /// Source for table input data pub(crate) enum Source> { /// Path to a single file or a directory containing one of more files @@ -36,3 +52,304 @@ pub(crate) enum Source> { /// Read data from a reader Reader(std::sync::Mutex>), } + +#[derive(Debug, Clone)] +/// A single file that should be read, along with its schema, statistics +/// and partition column values that need to be appended to each row. +pub struct PartitionedFile { + /// Path for the file (e.g. URL, filesystem path, etc) + pub file_path: String, + /// Schema of the file + pub schema: Schema, + /// Statistics of the file + pub statistics: Statistics, + // Values of partition columns to be appended to each row + // pub partition_value: Option>, + // Schema of partition columns + // pub partition_schema: Option, + // We may include row group range here for a more fine-grained parallel execution +} + +impl From for PartitionedFile { + fn from(file_path: String) -> Self { + Self { + file_path, + schema: Schema::empty(), + statistics: Default::default(), + } + } +} + +impl std::fmt::Display for PartitionedFile { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "{}", self.file_path) + } +} + +#[derive(Debug, Clone)] +/// A collection of files that should be read in a single task +pub struct FilePartition { + /// The index of the partition among all partitions + pub index: usize, + /// The contained files of the partition + pub files: Vec, +} + +impl std::fmt::Display for FilePartition { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + let files: Vec = self.files.iter().map(|f| f.to_string()).collect(); + write!(f, "{}", files.join(", ")) + } +} + +#[derive(Debug, Clone)] +/// All source files with same schema exists in a path +pub struct SourceRootDescriptor { + /// All source files in the path + pub partition_files: Vec, + /// The schema of the files + pub schema: SchemaRef, +} + +/// Stream of +pub type PartitionedFileStream = + Pin> + Send + Sync + 'static>>; + +/// Builder for ['SourceRootDescriptor'] inside given path +#[async_trait] +pub trait SourceRootDescBuilder: Sync + Send + Debug { + /// Construct a ['SourceRootDescriptor'] from the provided path + fn get_source_desc( + path: &str, + object_store: Arc, + ext: &str, + provided_schema: Option, + collect_statistics: bool, + ) -> Result { + let mut results: Vec> = Vec::new(); + futures::executor::block_on(async { + match Self::get_source_desc_async( + path, + object_store, + ext, + provided_schema, + collect_statistics, + ) + .await + { + Ok(mut stream) => { + while let Some(pf) = stream.next().await { + results.push(pf); + } + } + Err(e) => { + results.push(Err(e)); + } + } + }); + + let partition_results: Result> = + results.into_iter().collect(); + let partition_files = partition_results?; + + // build a list of Parquet partitions with statistics and gather all unique schemas + // used in this data set + let mut schemas: Vec = vec![]; + + for pf in &partition_files { + let schema = pf.schema.clone(); + if schemas.is_empty() { + schemas.push(schema); + } else if schema != schemas[0] { + // we currently get the schema information from the first file rather than do + // schema merging and this is a limitation. + // See https://issues.apache.org/jira/browse/ARROW-11017 + return Err(DataFusionError::Plan(format!( + "The file {} have different schema from the first file and DataFusion does \ + not yet support schema merging", + pf.file_path + ))); + } + } + + Ok(SourceRootDescriptor { + partition_files, + schema: Arc::new(schemas.pop().unwrap()), + }) + } + + /// Construct a ['SourceRootDescriptor'] from the provided path asynchronously + async fn get_source_desc_async( + path: &str, + object_store: Arc, + ext: &str, + provided_schema: Option, + collect_statistics: bool, + ) -> Result { + let mut list_result: FileNameStream = object_store.list_async(path, ext).await?; + + let (tx, rx): ( + Sender>, + Receiver>, + ) = channel(2); + + let mut contains_file = false; + while let Some(item) = list_result.next().await { + contains_file = true; + match item { + Ok(file_path) => { + if collect_statistics { + let tx = tx.clone(); + let object_store = object_store.clone(); + let path = file_path.clone(); + tokio::spawn(async move { + let file_meta = Self::get_file_meta(path, object_store).await; + tx.send(file_meta).await.unwrap(); + }); + } else { + tx.send(Ok(PartitionedFile { + file_path, + schema: provided_schema.clone().unwrap(), + statistics: Statistics::default(), + })) + .await + .unwrap(); + } + } + Err(e) => { + tx.send(Err(e)).await.unwrap(); + } + } + } + + if !contains_file { + return Err(DataFusionError::Plan(format!( + "No file (with .{} extension) found at path {}", + ext, path + ))); + } + + Ok(Box::pin(ReceiverStream::new(rx))) + } + + /// Get all metadata for a source file, including schema, statistics, partitions, etc. + async fn get_file_meta( + file_path: String, + object_store: Arc, + ) -> Result; +} + +/// Get all files as well as the summary statistics when a limit is provided +pub fn get_statistics_with_limit( + source_desc: &SourceRootDescriptor, + limit: Option, +) -> (Vec, Statistics) { + let mut all_files = source_desc.partition_files.clone(); + let schema = source_desc.schema.clone(); + + let mut total_byte_size = 0; + let mut null_counts = vec![0; schema.fields().len()]; + let mut has_statistics = false; + let (mut max_values, mut min_values) = create_max_min_accs(&schema); + + let mut num_rows = 0; + let mut num_files = 0; + for file in &all_files { + num_files += 1; + let file_stats = &file.statistics; + num_rows += file_stats.num_rows.unwrap_or(0); + total_byte_size += file_stats.total_byte_size.unwrap_or(0); + if let Some(vec) = &file_stats.column_statistics { + has_statistics = true; + for (i, cs) in vec.iter().enumerate() { + null_counts[i] += cs.null_count.unwrap_or(0); + + if let Some(max_value) = &mut max_values[i] { + if let Some(file_max) = cs.max_value.clone() { + match max_value.update(&[file_max]) { + Ok(_) => {} + Err(_) => { + max_values[i] = None; + } + } + } + } + + if let Some(min_value) = &mut min_values[i] { + if let Some(file_min) = cs.min_value.clone() { + match min_value.update(&[file_min]) { + Ok(_) => {} + Err(_) => { + min_values[i] = None; + } + } + } + } + } + } + if num_rows > limit.unwrap_or(usize::MAX) { + break; + } + } + all_files.truncate(num_files); + + let column_stats = if has_statistics { + Some(get_col_stats( + &*schema, + null_counts, + &mut max_values, + &mut min_values, + )) + } else { + None + }; + + let statistics = Statistics { + num_rows: Some(num_rows as usize), + total_byte_size: Some(total_byte_size as usize), + column_statistics: column_stats, + }; + (all_files, statistics) +} + +fn create_max_min_accs( + schema: &Schema, +) -> (Vec>, Vec>) { + let max_values: Vec> = schema + .fields() + .iter() + .map(|field| MaxAccumulator::try_new(field.data_type()).ok()) + .collect::>(); + let min_values: Vec> = schema + .fields() + .iter() + .map(|field| MinAccumulator::try_new(field.data_type()).ok()) + .collect::>(); + (max_values, min_values) +} + +fn get_col_stats( + schema: &Schema, + null_counts: Vec, + max_values: &mut Vec>, + min_values: &mut Vec>, +) -> Vec { + (0..schema.fields().len()) + .map(|i| { + let max_value = match &max_values[i] { + Some(max_value) => max_value.evaluate().ok(), + None => None, + }; + let min_value = match &min_values[i] { + Some(min_value) => min_value.evaluate().ok(), + None => None, + }; + ColumnStatistics { + null_count: Some(null_counts[i] as usize), + max_value, + min_value, + distinct_count: None, + } + }) + .collect() +} diff --git a/datafusion/src/datasource/object_store/local.rs b/datafusion/src/datasource/object_store/local.rs new file mode 100644 index 000000000000..47f455280d5b --- /dev/null +++ b/datafusion/src/datasource/object_store/local.rs @@ -0,0 +1,171 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Object store that represents the Local File System. +use crate::datasource::object_store::{ + FileNameStream, ObjectReader, ObjectStore, ThreadSafeRead, +}; +use crate::datasource::parquet_io::FileSource2; +use crate::error::DataFusionError; +use crate::error::Result; +use crate::parquet::file::reader::Length; +use async_trait::async_trait; +use futures::{stream, StreamExt}; +use std::any::Any; +use std::fs::File; +use std::sync::Arc; +use tokio::task; + +#[derive(Debug)] +/// Local File System as Object Store. +pub struct LocalFileSystem; + +#[async_trait] +impl ObjectStore for LocalFileSystem { + fn as_any(&self) -> &dyn Any { + self + } + + fn list(&self, path: &str, ext: &str) -> Result> { + let filenames: Vec = list_all(path.to_string(), ext.to_string())?; + Ok(filenames) + } + + async fn list_async(&self, path: &str, ext: &str) -> Result { + list_all_async(path.to_string(), ext.to_string()).await + } + + fn get_reader(&self, file_path: &str) -> Result> { + let file = File::open(file_path)?; + let reader = LocalFSObjectReader::new(file)?; + Ok(Arc::new(reader)) + } +} + +struct LocalFSObjectReader { + file: File, +} + +impl LocalFSObjectReader { + fn new(file: File) -> Result { + Ok(Self { file }) + } +} + +#[async_trait] +impl ObjectReader for LocalFSObjectReader { + fn get_reader(&self, start: u64, length: usize) -> Result> { + Ok(Box::new(FileSource2::::new( + &self.file, start, length, + ))) + } + + async fn get_reader_async( + &self, + start: u64, + length: usize, + ) -> Result> { + let file = self.file.try_clone()?; + match task::spawn_blocking(move || { + let read: Result> = + Ok(Box::new(FileSource2::::new(&file, start, length))); + read + }) + .await + { + Ok(r) => r, + Err(e) => Err(DataFusionError::Internal(e.to_string())), + } + } + + fn length(&self) -> Result { + Ok(self.file.len()) + } + + async fn length_async(&self) -> Result { + let file = self.file.try_clone()?; + match task::spawn_blocking(move || Ok(file.len())).await { + Ok(r) => r, + Err(e) => Err(DataFusionError::Internal(e.to_string())), + } + } +} + +fn list_all(root_path: String, ext: String) -> Result> { + let mut file_results: Vec> = Vec::new(); + futures::executor::block_on(async { + match list_all_async(root_path, ext).await { + Ok(mut stream) => { + while let Some(result) = stream.next().await { + file_results.push(result); + } + } + Err(_) => { + file_results.push(Err(DataFusionError::Plan("Invalid path".to_string()))); + } + } + }); + file_results.into_iter().collect() +} + +async fn list_all_async(root_path: String, ext: String) -> Result { + async fn find_files_in_dir( + path: String, + to_visit: &mut Vec, + ext: String, + ) -> Result> { + let mut dir = tokio::fs::read_dir(path).await?; + let mut files = Vec::new(); + + while let Some(child) = dir.next_entry().await? { + if let Some(child_path) = child.path().to_str() { + if child.metadata().await?.is_dir() { + to_visit.push(child_path.to_string()); + } else if child_path.ends_with(&ext.clone()) { + files.push(child_path.to_string()) + } + } else { + return Err(DataFusionError::Plan("Invalid path".to_string())); + } + } + Ok(files) + } + + if tokio::fs::metadata(&root_path).await?.is_file() { + Ok(Box::pin(stream::once(async { Ok(root_path) }))) + } else { + let result = stream::unfold(vec![root_path], move |mut to_visit| { + let ext = ext.clone(); + async move { + match to_visit.pop() { + None => None, + Some(path) => { + let file_stream = + match find_files_in_dir(path, &mut to_visit, ext).await { + Ok(files) => stream::iter(files).map(Ok).left_stream(), + Err(e) => stream::once(async { Err(e) }).right_stream(), + }; + + Some((file_stream, to_visit)) + } + } + } + }) + .flatten(); + Ok(Box::pin(result)) + } +} diff --git a/datafusion/src/datasource/object_store/mod.rs b/datafusion/src/datasource/object_store/mod.rs new file mode 100644 index 000000000000..03f51ce42592 --- /dev/null +++ b/datafusion/src/datasource/object_store/mod.rs @@ -0,0 +1,146 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Object Store abstracts access to an underlying file/object storage. + +pub mod local; + +use std::any::Any; +use std::collections::HashMap; +use std::fmt::Debug; +use std::io::Read; +use std::pin::Pin; +use std::sync::{Arc, RwLock}; + +use async_trait::async_trait; +use futures::Stream; + +use local::LocalFileSystem; + +use crate::error::Result; + +/// Thread safe read +pub trait ThreadSafeRead: Read + Send + Sync + 'static {} + +/// Object Reader for one file in a object store +#[async_trait] +pub trait ObjectReader { + /// Get reader for a part [start, start + length] in the file + fn get_reader(&self, start: u64, length: usize) -> Result> { + futures::executor::block_on(self.get_reader_async(start, length)) + } + + /// Get reader for a part [start, start + length] in the file asynchronously + async fn get_reader_async( + &self, + start: u64, + length: usize, + ) -> Result>; + + /// Get length for the file + fn length(&self) -> Result { + futures::executor::block_on(self.length_async()) + } + + /// Get length for the file asynchronously + async fn length_async(&self) -> Result; +} + +/// Stream of files get listed from object store. Currently, we only +/// return file paths, but for many object stores, object listing will actually give us more +/// information than just the file path, for example, last updated time and file size are +/// often returned as part of the api/sys call. +/// These extra metadata might be useful for other purposes. +pub type FileNameStream = + Pin> + Send + Sync + 'static>>; + +/// A ObjectStore abstracts access to an underlying file/object storage. +/// It maps strings (e.g. URLs, filesystem paths, etc) to sources of bytes +#[async_trait] +pub trait ObjectStore: Sync + Send + Debug { + /// Returns the object store as [`Any`](std::any::Any) + /// so that it can be downcast to a specific implementation. + fn as_any(&self) -> &dyn Any; + + /// Returns all the files with filename extension `ext` in path `prefix` + fn list(&self, prefix: &str, ext: &str) -> Result>; + + /// Returns all the files with filename extension `ext` in path `prefix` asynchronously + async fn list_async(&self, prefix: &str, ext: &str) -> Result; + + /// Get object reader for one file + fn get_reader(&self, file_path: &str) -> Result>; +} + +static LOCAL_SCHEME: &str = "file"; + +/// A Registry holds all the object stores at runtime with a scheme for each store. +/// This allows the user to extend DataFusion with different storage systems such as S3 or HDFS +/// and query data inside these systems. +pub struct ObjectStoreRegistry { + /// A map from scheme to object store that serve list / read operations for the store + pub object_stores: RwLock>>, +} + +impl ObjectStoreRegistry { + /// Create the registry that object stores can registered into. + /// ['LocalFileSystem'] store is registered in by default to support read from localfs natively. + pub fn new() -> Self { + let mut map: HashMap> = HashMap::new(); + map.insert(LOCAL_SCHEME.to_string(), Arc::new(LocalFileSystem)); + + Self { + object_stores: RwLock::new(map), + } + } + + /// Adds a new store to this registry. + /// If a store of the same prefix existed before, it is replaced in the registry and returned. + pub fn register_store( + &self, + scheme: String, + store: Arc, + ) -> Option> { + let mut stores = self.object_stores.write().unwrap(); + stores.insert(scheme, store) + } + + /// Get the store registered for scheme + pub fn get(&self, scheme: &str) -> Option> { + let stores = self.object_stores.read().unwrap(); + stores.get(scheme).cloned() + } + + /// Get a suitable store for the path based on it's scheme. For example: + /// path with prefix file:/// or no prefix will return the default LocalFS store, + /// path with prefix s3:/// will return the S3 store if it's registered, + /// and will always return LocalFS store when a prefix is not registered in the path. + pub fn get_by_path(&self, path: &str) -> Arc { + if let Some((scheme, _)) = path.split_once(':') { + let stores = self.object_stores.read().unwrap(); + if let Some(store) = stores.get(&*scheme.to_lowercase()) { + return store.clone(); + } + } + self.object_stores + .read() + .unwrap() + .get(LOCAL_SCHEME) + .unwrap() + .clone() + } +} diff --git a/datafusion/src/datasource/parquet.rs b/datafusion/src/datasource/parquet.rs index d312c7724563..1a8733c0ac97 100644 --- a/datafusion/src/datasource/parquet.rs +++ b/datafusion/src/datasource/parquet.rs @@ -18,39 +18,51 @@ //! Parquet data source use std::any::Any; -use std::string::String; +use std::io::Read; use std::sync::Arc; -use arrow::datatypes::*; +use async_trait::async_trait; +use parquet::arrow::ArrowReader; +use parquet::arrow::ParquetFileArrowReader; +use parquet::errors::ParquetError; +use parquet::file::reader::ChunkReader; +use parquet::file::serialized_reader::SerializedFileReader; +use parquet::file::statistics::Statistics as ParquetStatistics; +use super::datasource::TableProviderFilterPushDown; +use crate::arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use crate::datasource::datasource::Statistics; -use crate::datasource::TableProvider; +use crate::datasource::object_store::{ObjectReader, ObjectStore, ThreadSafeRead}; +use crate::datasource::{ + create_max_min_accs, get_col_stats, get_statistics_with_limit, PartitionedFile, + SourceRootDescBuilder, SourceRootDescriptor, TableProvider, +}; use crate::error::Result; use crate::logical_plan::{combine_filters, Expr}; +use crate::parquet::file::reader::Length; +use crate::physical_plan::expressions::{MaxAccumulator, MinAccumulator}; use crate::physical_plan::parquet::ParquetExec; -use crate::physical_plan::ExecutionPlan; - -use super::datasource::TableProviderFilterPushDown; +use crate::physical_plan::{Accumulator, ExecutionPlan}; +use crate::prelude::ExecutionContext; +use crate::scalar::ScalarValue; /// Table-based representation of a `ParquetFile`. pub struct ParquetTable { path: String, - schema: SchemaRef, - statistics: Statistics, + desc: Arc, max_concurrency: usize, enable_pruning: bool, } impl ParquetTable { /// Attempt to initialize a new `ParquetTable` from a file path. - pub fn try_new(path: impl Into, max_concurrency: usize) -> Result { + pub fn try_new(path: impl Into, context: ExecutionContext) -> Result { let path = path.into(); - let parquet_exec = ParquetExec::try_from_path(&path, None, None, 0, 1, None)?; - let schema = parquet_exec.schema(); + let max_concurrency = context.state.lock().unwrap().config.concurrency; + let root_desc = ParquetRootDesc::new(path.as_str(), context); Ok(Self { path, - schema, - statistics: parquet_exec.statistics().to_owned(), + desc: Arc::new(root_desc?), max_concurrency, enable_pruning: true, }) @@ -60,29 +72,24 @@ impl ParquetTable { /// If collect_statistics is `false`, doesn't read files until necessary by scan pub fn try_new_with_schema( path: impl Into, + context: ExecutionContext, schema: Schema, - max_concurrency: usize, collect_statistics: bool, ) -> Result { let path = path.into(); - if collect_statistics { - let parquet_exec = ParquetExec::try_from_path(&path, None, None, 0, 1, None)?; - Ok(Self { - path, - schema: Arc::new(schema), - statistics: parquet_exec.statistics().to_owned(), - max_concurrency, - enable_pruning: true, - }) - } else { - Ok(Self { - path, - schema: Arc::new(schema), - statistics: Statistics::default(), - max_concurrency, - enable_pruning: true, - }) - } + let max_concurrency = context.state.lock().unwrap().config.concurrency; + let root_desc = ParquetRootDesc::new_with_schema( + path.as_str(), + context, + Some(schema), + collect_statistics, + ); + Ok(Self { + path, + desc: Arc::new(root_desc?), + max_concurrency, + enable_pruning: true, + }) } /// Get the path for the Parquet file(s) represented by this ParquetTable instance @@ -109,7 +116,7 @@ impl TableProvider for ParquetTable { /// Get the schema for this parquet file. fn schema(&self) -> SchemaRef { - self.schema.clone() + self.desc.schema() } fn supports_filter_pushdown( @@ -136,8 +143,8 @@ impl TableProvider for ParquetTable { } else { None }; - Ok(Arc::new(ParquetExec::try_from_path( - &self.path, + Ok(Arc::new(ParquetExec::try_new( + self.desc.clone(), projection.clone(), predicate, limit @@ -149,7 +156,7 @@ impl TableProvider for ParquetTable { } fn statistics(&self) -> Statistics { - self.statistics.clone() + self.desc.statistics() } fn has_exact_statistics(&self) -> bool { @@ -157,6 +164,325 @@ impl TableProvider for ParquetTable { } } +#[derive(Debug)] +/// Descriptor for a parquet root path +pub struct ParquetRootDesc { + /// object store for reading files inside the root path + pub object_store: Arc, + /// metadata for files inside the root path + pub descriptor: SourceRootDescriptor, +} + +impl ParquetRootDesc { + /// Construct a new parquet descriptor for a root path + pub fn new(root_path: &str, context: ExecutionContext) -> Result { + let object_store = context + .state + .lock() + .unwrap() + .object_store_registry + .get_by_path(root_path); + let root_desc = + Self::get_source_desc(root_path, object_store.clone(), "parquet", None, true); + Ok(Self { + object_store, + descriptor: root_desc?, + }) + } + + /// Construct a new parquet descriptor for a root path with known schema + pub fn new_with_schema( + root_path: &str, + context: ExecutionContext, + schema: Option, + collect_statistics: bool, + ) -> Result { + let object_store = context + .state + .lock() + .unwrap() + .object_store_registry + .get_by_path(root_path); + let root_desc = Self::get_source_desc( + root_path, + object_store.clone(), + "parquet", + schema, + collect_statistics, + ); + Ok(Self { + object_store, + descriptor: root_desc?, + }) + } + + /// Get file schema for all parquet files + pub fn schema(&self) -> SchemaRef { + self.descriptor.schema.clone() + } + + /// Get the summary statistics for all parquet files + pub fn statistics(&self) -> Statistics { + get_statistics_with_limit(&self.descriptor, None).1 + } + + fn summarize_min_max( + max_values: &mut Vec>, + min_values: &mut Vec>, + fields: &[Field], + i: usize, + stat: &ParquetStatistics, + ) { + match stat { + ParquetStatistics::Boolean(s) => { + if let DataType::Boolean = fields[i].data_type() { + if s.has_min_max_set() { + if let Some(max_value) = &mut max_values[i] { + match max_value + .update(&[ScalarValue::Boolean(Some(*s.max()))]) + { + Ok(_) => {} + Err(_) => { + max_values[i] = None; + } + } + } + if let Some(min_value) = &mut min_values[i] { + match min_value + .update(&[ScalarValue::Boolean(Some(*s.min()))]) + { + Ok(_) => {} + Err(_) => { + min_values[i] = None; + } + } + } + } + } + } + ParquetStatistics::Int32(s) => { + if let DataType::Int32 = fields[i].data_type() { + if s.has_min_max_set() { + if let Some(max_value) = &mut max_values[i] { + match max_value.update(&[ScalarValue::Int32(Some(*s.max()))]) + { + Ok(_) => {} + Err(_) => { + max_values[i] = None; + } + } + } + if let Some(min_value) = &mut min_values[i] { + match min_value.update(&[ScalarValue::Int32(Some(*s.min()))]) + { + Ok(_) => {} + Err(_) => { + min_values[i] = None; + } + } + } + } + } + } + ParquetStatistics::Int64(s) => { + if let DataType::Int64 = fields[i].data_type() { + if s.has_min_max_set() { + if let Some(max_value) = &mut max_values[i] { + match max_value.update(&[ScalarValue::Int64(Some(*s.max()))]) + { + Ok(_) => {} + Err(_) => { + max_values[i] = None; + } + } + } + if let Some(min_value) = &mut min_values[i] { + match min_value.update(&[ScalarValue::Int64(Some(*s.min()))]) + { + Ok(_) => {} + Err(_) => { + min_values[i] = None; + } + } + } + } + } + } + ParquetStatistics::Float(s) => { + if let DataType::Float32 = fields[i].data_type() { + if s.has_min_max_set() { + if let Some(max_value) = &mut max_values[i] { + match max_value + .update(&[ScalarValue::Float32(Some(*s.max()))]) + { + Ok(_) => {} + Err(_) => { + max_values[i] = None; + } + } + } + if let Some(min_value) = &mut min_values[i] { + match min_value + .update(&[ScalarValue::Float32(Some(*s.min()))]) + { + Ok(_) => {} + Err(_) => { + min_values[i] = None; + } + } + } + } + } + } + ParquetStatistics::Double(s) => { + if let DataType::Float64 = fields[i].data_type() { + if s.has_min_max_set() { + if let Some(max_value) = &mut max_values[i] { + match max_value + .update(&[ScalarValue::Float64(Some(*s.max()))]) + { + Ok(_) => {} + Err(_) => { + max_values[i] = None; + } + } + } + if let Some(min_value) = &mut min_values[i] { + match min_value + .update(&[ScalarValue::Float64(Some(*s.min()))]) + { + Ok(_) => {} + Err(_) => { + min_values[i] = None; + } + } + } + } + } + } + _ => {} + } + } +} + +#[async_trait] +impl SourceRootDescBuilder for ParquetRootDesc { + async fn get_file_meta( + file_path: String, + object_store: Arc, + ) -> Result { + let reader = object_store.get_reader(file_path.as_str())?; + let file_reader = + Arc::new(SerializedFileReader::new(ObjectReaderWrapper::new(reader))?); + let mut arrow_reader = ParquetFileArrowReader::new(file_reader); + let file_path = file_path.to_string(); + let schema = arrow_reader.get_schema()?; + let num_fields = schema.fields().len(); + let fields = schema.fields().to_vec(); + let meta_data = arrow_reader.get_metadata(); + + let mut num_rows = 0; + let mut total_byte_size = 0; + let mut null_counts = vec![0; num_fields]; + let mut has_statistics = false; + + let (mut max_values, mut min_values) = create_max_min_accs(&schema); + + for row_group_meta in meta_data.row_groups() { + num_rows += row_group_meta.num_rows(); + total_byte_size += row_group_meta.total_byte_size(); + + let columns_null_counts = row_group_meta + .columns() + .iter() + .flat_map(|c| c.statistics().map(|stats| stats.null_count())); + + for (i, cnt) in columns_null_counts.enumerate() { + null_counts[i] += cnt as usize + } + + for (i, column) in row_group_meta.columns().iter().enumerate() { + if let Some(stat) = column.statistics() { + has_statistics = true; + ParquetRootDesc::summarize_min_max( + &mut max_values, + &mut min_values, + &fields, + i, + stat, + ) + } + } + } + + let column_stats = if has_statistics { + Some(get_col_stats( + &schema, + null_counts, + &mut max_values, + &mut min_values, + )) + } else { + None + }; + + let statistics = Statistics { + num_rows: Some(num_rows as usize), + total_byte_size: Some(total_byte_size as usize), + column_statistics: column_stats, + }; + + Ok(PartitionedFile { + file_path, + schema, + statistics, + }) + } +} + +/// Thin wrapper over object wrapper to work with parquet file read +pub struct ObjectReaderWrapper { + reader: Arc, +} + +impl ObjectReaderWrapper { + /// Construct a wrapper over the provided object reader + pub fn new(reader: Arc) -> Self { + Self { reader } + } +} + +impl ChunkReader for ObjectReaderWrapper { + type T = InnerReaderWrapper; + + fn get_read(&self, start: u64, length: usize) -> parquet::errors::Result { + match self.reader.get_reader(start, length) { + Ok(reader) => Ok(InnerReaderWrapper { + inner_reader: reader, + }), + Err(e) => Err(ParquetError::General(e.to_string())), + } + } +} + +impl Length for ObjectReaderWrapper { + fn len(&self) -> u64 { + self.reader.length().unwrap_or(0u64) + } +} + +/// Thin wrapper over reader for a parquet file. +/// To be removed once rust-lang/rfcs#1598 is stabilized +pub struct InnerReaderWrapper { + inner_reader: Box, +} + +impl Read for InnerReaderWrapper { + fn read(&mut self, buf: &mut [u8]) -> std::result::Result { + self.inner_reader.read(buf) + } +} + #[cfg(test)] mod tests { use super::*; @@ -167,7 +493,7 @@ mod tests { use arrow::record_batch::RecordBatch; use futures::StreamExt; - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn read_small_batches() -> Result<()> { let table = load_table("alltypes_plain.parquet")?; let projection = None; @@ -190,7 +516,7 @@ mod tests { Ok(()) } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn read_alltypes_plain_parquet() -> Result<()> { let table = load_table("alltypes_plain.parquet")?; @@ -225,7 +551,7 @@ mod tests { Ok(()) } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn read_bool_alltypes_plain_parquet() -> Result<()> { let table = load_table("alltypes_plain.parquet")?; let projection = Some(vec![1]); @@ -252,7 +578,7 @@ mod tests { Ok(()) } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn read_i32_alltypes_plain_parquet() -> Result<()> { let table = load_table("alltypes_plain.parquet")?; let projection = Some(vec![0]); @@ -276,7 +602,7 @@ mod tests { Ok(()) } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn read_i96_alltypes_plain_parquet() -> Result<()> { let table = load_table("alltypes_plain.parquet")?; let projection = Some(vec![10]); @@ -300,7 +626,7 @@ mod tests { Ok(()) } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn read_f32_alltypes_plain_parquet() -> Result<()> { let table = load_table("alltypes_plain.parquet")?; let projection = Some(vec![6]); @@ -327,7 +653,7 @@ mod tests { Ok(()) } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn read_f64_alltypes_plain_parquet() -> Result<()> { let table = load_table("alltypes_plain.parquet")?; let projection = Some(vec![7]); @@ -354,7 +680,7 @@ mod tests { Ok(()) } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn read_binary_alltypes_plain_parquet() -> Result<()> { let table = load_table("alltypes_plain.parquet")?; let projection = Some(vec![9]); @@ -384,7 +710,8 @@ mod tests { fn load_table(name: &str) -> Result> { let testdata = crate::test_util::parquet_test_data(); let filename = format!("{}/{}", testdata, name); - let table = ParquetTable::try_new(&filename, 2)?; + let table = + ParquetTable::try_new(&filename, ExecutionContext::with_concurrency(2))?; Ok(Arc::new(table)) } diff --git a/datafusion/src/datasource/parquet_io.rs b/datafusion/src/datasource/parquet_io.rs new file mode 100644 index 000000000000..a74fd70f25d5 --- /dev/null +++ b/datafusion/src/datasource/parquet_io.rs @@ -0,0 +1,144 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Copy of parquet::util::io::FileSource for thread safe parquet reader + +use std::sync::Mutex; +use std::{cmp, fmt, io::*}; + +use crate::datasource::object_store::ThreadSafeRead; +use crate::parquet::file::reader::Length; +use crate::parquet::util::io::{ParquetReader, Position}; + +const DEFAULT_BUF_SIZE: usize = 8 * 1024; + +// ---------------------------------------------------------------------- + +/// ParquetReader is the interface which needs to be fulfilled to be able to parse a +/// parquet source. +pub trait ThreadSafeParquetReader: ParquetReader + Send + Sync + 'static {} +impl ThreadSafeParquetReader for T {} + +/// Struct that represents a slice of a file data with independent start position and +/// length. Internally clones provided file handle, wraps with a custom implementation +/// of BufReader that resets position before any read. +/// +/// This is workaround and alternative for `file.try_clone()` method. It clones `File` +/// while preserving independent position, which is not available with `try_clone()`. +/// +/// Designed after `arrow::io::RandomAccessFile` and `std::io::BufReader` +pub struct FileSource2 { + reader: Mutex, + start: u64, // start position in a file + end: u64, // end position in a file + buf: Vec, // buffer where bytes read in advance are stored + buf_pos: usize, // current position of the reader in the buffer + buf_cap: usize, // current number of bytes read into the buffer +} + +impl fmt::Debug for FileSource2 { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("FileSource") + .field("reader", &"OPAQUE") + .field("start", &self.start) + .field("end", &self.end) + .field("buf.len", &self.buf.len()) + .field("buf_pos", &self.buf_pos) + .field("buf_cap", &self.buf_cap) + .finish() + } +} + +impl FileSource2 { + /// Creates new file reader with start and length from a file handle + pub fn new(fd: &R, start: u64, length: usize) -> Self { + let reader = Mutex::new(fd.try_clone().unwrap()); + Self { + reader, + start, + end: start + length as u64, + buf: vec![0_u8; DEFAULT_BUF_SIZE], + buf_pos: 0, + buf_cap: 0, + } + } + + fn fill_inner_buf(&mut self) -> Result<&[u8]> { + if self.buf_pos >= self.buf_cap { + // If we've reached the end of our internal buffer then we need to fetch + // some more data from the underlying reader. + // Branch using `>=` instead of the more correct `==` + // to tell the compiler that the pos..cap slice is always valid. + debug_assert!(self.buf_pos == self.buf_cap); + let mut reader = self.reader.lock().unwrap(); + reader.seek(SeekFrom::Start(self.start))?; // always seek to start before reading + self.buf_cap = reader.read(&mut self.buf)?; + self.buf_pos = 0; + } + Ok(&self.buf[self.buf_pos..self.buf_cap]) + } + + fn skip_inner_buf(&mut self, buf: &mut [u8]) -> Result { + // discard buffer + self.buf_pos = 0; + self.buf_cap = 0; + // read directly into param buffer + let mut reader = self.reader.lock().unwrap(); + reader.seek(SeekFrom::Start(self.start))?; // always seek to start before reading + let nread = reader.read(buf)?; + self.start += nread as u64; + Ok(nread) + } +} + +impl Read for FileSource2 { + fn read(&mut self, buf: &mut [u8]) -> Result { + let bytes_to_read = cmp::min(buf.len(), (self.end - self.start) as usize); + let buf = &mut buf[0..bytes_to_read]; + + // If we don't have any buffered data and we're doing a massive read + // (larger than our internal buffer), bypass our internal buffer + // entirely. + if self.buf_pos == self.buf_cap && buf.len() >= self.buf.len() { + return self.skip_inner_buf(buf); + } + let nread = { + let mut rem = self.fill_inner_buf()?; + // copy the data from the inner buffer to the param buffer + rem.read(buf)? + }; + // consume from buffer + self.buf_pos = cmp::min(self.buf_pos + nread, self.buf_cap); + + self.start += nread as u64; + Ok(nread) + } +} + +impl Position for FileSource2 { + fn pos(&self) -> u64 { + self.start + } +} + +impl Length for FileSource2 { + fn len(&self) -> u64 { + self.end - self.start + } +} + +impl ThreadSafeRead for FileSource2 {} diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs index 7a54beac6d86..6a41f50ab0a1 100644 --- a/datafusion/src/execution/context.rs +++ b/datafusion/src/execution/context.rs @@ -49,6 +49,8 @@ use crate::catalog::{ ResolvedTableReference, TableReference, }; use crate::datasource::csv::CsvFile; +use crate::datasource::object_store::ObjectStore; +use crate::datasource::object_store::ObjectStoreRegistry; use crate::datasource::parquet::ParquetTable; use crate::datasource::TableProvider; use crate::error::{DataFusionError, Result}; @@ -95,7 +97,8 @@ use parquet::file::properties::WriterProperties; /// ``` /// use datafusion::prelude::*; /// # use datafusion::error::Result; -/// # fn main() -> Result<()> { +/// #[tokio::main] +/// # async fn main() -> Result<()> { /// let mut ctx = ExecutionContext::new(); /// let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new())?; /// let df = df.filter(col("a").lt_eq(col("b")))? @@ -112,7 +115,8 @@ use parquet::file::properties::WriterProperties; /// use datafusion::prelude::*; /// /// # use datafusion::error::Result; -/// # fn main() -> Result<()> { +/// #[tokio::main] +/// # async fn main() -> Result<()> { /// let mut ctx = ExecutionContext::new(); /// ctx.register_csv("example", "tests/example.csv", CsvReadOptions::new())?; /// let results = ctx.sql("SELECT a, MIN(b) FROM example GROUP BY a LIMIT 100")?; @@ -164,10 +168,17 @@ impl ExecutionContext { aggregate_functions: HashMap::new(), config, execution_props: ExecutionProps::new(), + object_store_registry: Arc::new(ObjectStoreRegistry::new()), })), } } + /// Creates a new execution context using the provided concurrency. + pub fn with_concurrency(concurrency: usize) -> ExecutionContext { + let config = ExecutionConfig::new().with_concurrency(concurrency); + ExecutionContext::with_config(config) + } + /// Creates a dataframe that will execute a SQL query. pub fn sql(&mut self, sql: &str) -> Result> { let plan = self.create_logical_plan(sql)?; @@ -288,12 +299,7 @@ impl ExecutionContext { ) -> Result> { Ok(Arc::new(DataFrameImpl::new( self.state.clone(), - &LogicalPlanBuilder::scan_parquet( - filename, - None, - self.state.lock().unwrap().config.concurrency, - )? - .build()?, + &LogicalPlanBuilder::scan_parquet(filename, None, self.clone())?.build()?, ))) } @@ -324,9 +330,9 @@ impl ExecutionContext { /// executed against this context. pub fn register_parquet(&mut self, name: &str, filename: &str) -> Result<()> { let table = { - let m = self.state.lock().unwrap(); - ParquetTable::try_new(filename, m.config.concurrency)? - .with_enable_pruning(m.config.parquet_pruning) + let enable_pruning = self.state.lock().unwrap().config.parquet_pruning; + ParquetTable::try_new(filename, self.clone())? + .with_enable_pruning(enable_pruning) }; self.register_table(name, Arc::new(table))?; Ok(()) @@ -358,6 +364,25 @@ impl ExecutionContext { state.catalog_list.register_catalog(name, catalog) } + /// Registers a object store with scheme using a custom `ObjectStore` so that + /// an external file system or object storage system could be used against this context. + /// + /// Returns the `ObjectStore` previously registered for this + /// scheme, if any + pub fn register_object_store( + &self, + scheme: impl Into, + object_store: Arc, + ) -> Option> { + let scheme = scheme.into(); + + self.state + .lock() + .unwrap() + .object_store_registry + .register_store(scheme, object_store) + } + /// Retrieves a `CatalogProvider` instance by name pub fn catalog(&self, name: &str) -> Option> { self.state.lock().unwrap().catalog_list.catalog(name) @@ -840,6 +865,8 @@ pub struct ExecutionContextState { pub config: ExecutionConfig, /// Execution properties pub execution_props: ExecutionProps, + /// Object Store that are registered with the context + pub object_store_registry: Arc, } impl ExecutionProps { @@ -867,6 +894,7 @@ impl ExecutionContextState { aggregate_functions: HashMap::new(), config: ExecutionConfig::new(), execution_props: ExecutionProps::new(), + object_store_registry: Arc::new(ObjectStoreRegistry::new()), } } @@ -2682,7 +2710,7 @@ mod tests { Ok(()) } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn write_parquet_results() -> Result<()> { // create partitioned input file and context let tmp_dir = TempDir::new()?; diff --git a/datafusion/src/logical_plan/builder.rs b/datafusion/src/logical_plan/builder.rs index d9afe2e01f38..5f9d55df570b 100644 --- a/datafusion/src/logical_plan/builder.rs +++ b/datafusion/src/logical_plan/builder.rs @@ -40,6 +40,7 @@ use crate::logical_plan::{ columnize_expr, normalize_col, normalize_cols, Column, DFField, DFSchema, DFSchemaRef, Partitioning, }; +use crate::prelude::ExecutionContext; /// Default table name for unnamed table pub const UNNAMED_TABLE: &str = "?table?"; @@ -137,20 +138,20 @@ impl LogicalPlanBuilder { pub fn scan_parquet( path: impl Into, projection: Option>, - max_concurrency: usize, + context: ExecutionContext, ) -> Result { let path = path.into(); - Self::scan_parquet_with_name(path.clone(), projection, max_concurrency, path) + Self::scan_parquet_with_name(path.clone(), projection, context, path) } /// Scan a Parquet data source and register it with a given table name pub fn scan_parquet_with_name( path: impl Into, projection: Option>, - max_concurrency: usize, + context: ExecutionContext, table_name: impl Into, ) -> Result { - let provider = Arc::new(ParquetTable::try_new(path, max_concurrency)?); + let provider = Arc::new(ParquetTable::try_new(path, context)?); Self::scan(table_name, provider, projection) } diff --git a/datafusion/src/physical_optimizer/repartition.rs b/datafusion/src/physical_optimizer/repartition.rs index 4504c81daa06..42986d5c4dcc 100644 --- a/datafusion/src/physical_optimizer/repartition.rs +++ b/datafusion/src/physical_optimizer/repartition.rs @@ -110,6 +110,8 @@ mod tests { use super::*; use crate::datasource::datasource::Statistics; + use crate::datasource::object_store::local::LocalFileSystem; + use crate::datasource::PartitionedFile; use crate::physical_plan::parquet::{ ParquetExec, ParquetExecMetrics, ParquetPartition, }; @@ -122,11 +124,13 @@ mod tests { vec![], Arc::new(ParquetExec::new( vec![ParquetPartition::new( - vec!["x".to_string()], - Statistics::default(), + vec![PartitionedFile::from("x".to_string())], + 0, )], + Arc::new(LocalFileSystem), schema, None, + Statistics::default(), ParquetExecMetrics::new(), None, 2048, @@ -160,11 +164,13 @@ mod tests { vec![], Arc::new(ParquetExec::new( vec![ParquetPartition::new( - vec!["x".to_string()], - Statistics::default(), + vec![PartitionedFile::from("x".to_string())], + 0, )], + Arc::new(LocalFileSystem), schema, None, + Statistics::default(), ParquetExecMetrics::new(), None, 2048, diff --git a/datafusion/src/physical_plan/common.rs b/datafusion/src/physical_plan/common.rs index 2482bfc0872c..628095c6640c 100644 --- a/datafusion/src/physical_plan/common.rs +++ b/datafusion/src/physical_plan/common.rs @@ -27,8 +27,6 @@ use arrow::error::Result as ArrowResult; use arrow::record_batch::RecordBatch; use futures::channel::mpsc; use futures::{SinkExt, Stream, StreamExt, TryStreamExt}; -use std::fs; -use std::fs::metadata; use std::sync::Arc; use std::task::{Context, Poll}; use tokio::task::JoinHandle; @@ -107,42 +105,6 @@ pub(crate) fn combine_batches( } } -/// Recursively builds a list of files in a directory with a given extension -pub fn build_file_list(dir: &str, ext: &str) -> Result> { - let mut filenames: Vec = Vec::new(); - build_file_list_recurse(dir, &mut filenames, ext)?; - Ok(filenames) -} - -/// Recursively build a list of files in a directory with a given extension with an accumulator list -fn build_file_list_recurse( - dir: &str, - filenames: &mut Vec, - ext: &str, -) -> Result<()> { - let metadata = metadata(dir)?; - if metadata.is_file() { - if dir.ends_with(ext) { - filenames.push(dir.to_string()); - } - } else { - for entry in fs::read_dir(dir)? { - let entry = entry?; - let path = entry.path(); - if let Some(path_name) = path.to_str() { - if path.is_dir() { - build_file_list_recurse(path_name, filenames, ext)?; - } else if path_name.ends_with(ext) { - filenames.push(path_name.to_string()); - } - } else { - return Err(DataFusionError::Plan("Invalid path".to_string())); - } - } - } - Ok(()) -} - /// Spawns a task to the tokio threadpool and writes its outputs to the provided mpsc sender pub(crate) fn spawn_execution( input: Arc, diff --git a/datafusion/src/physical_plan/csv.rs b/datafusion/src/physical_plan/csv.rs index 544f98cba0c6..3f2214c57750 100644 --- a/datafusion/src/physical_plan/csv.rs +++ b/datafusion/src/physical_plan/csv.rs @@ -17,9 +17,11 @@ //! Execution plan for reading CSV files +use crate::datasource::object_store::local::LocalFileSystem; +use crate::datasource::object_store::ObjectStore; use crate::error::{DataFusionError, Result}; use crate::physical_plan::ExecutionPlan; -use crate::physical_plan::{common, source::Source, Partitioning}; +use crate::physical_plan::{source::Source, Partitioning}; use arrow::csv; use arrow::datatypes::{Schema, SchemaRef}; use arrow::error::Result as ArrowResult; @@ -141,7 +143,7 @@ impl CsvExec { ) -> Result { let file_extension = String::from(options.file_extension); - let filenames = common::build_file_list(path, file_extension.as_str())?; + let filenames = LocalFileSystem.list(path, options.file_extension)?; if filenames.is_empty() { return Err(DataFusionError::Execution(format!( "No files found at {path} with file extension {file_extension}", diff --git a/datafusion/src/physical_plan/json.rs b/datafusion/src/physical_plan/json.rs index ed9b0b03a38e..e70af74fb7e9 100644 --- a/datafusion/src/physical_plan/json.rs +++ b/datafusion/src/physical_plan/json.rs @@ -19,7 +19,9 @@ use async_trait::async_trait; use futures::Stream; -use super::{common, source::Source, ExecutionPlan, Partitioning, RecordBatchStream}; +use super::{source::Source, ExecutionPlan, Partitioning, RecordBatchStream}; +use crate::datasource::object_store::local::LocalFileSystem; +use crate::datasource::object_store::ObjectStore; use crate::error::{DataFusionError, Result}; use arrow::json::reader::{infer_json_schema_from_iterator, ValueIter}; use arrow::{ @@ -87,7 +89,7 @@ impl NdJsonExec { ) -> Result { let file_extension = options.file_extension.to_string(); - let filenames = common::build_file_list(path, &file_extension)?; + let filenames = LocalFileSystem.list(path, options.file_extension)?; if filenames.is_empty() { return Err(DataFusionError::Execution(format!( diff --git a/datafusion/src/physical_plan/mod.rs b/datafusion/src/physical_plan/mod.rs index 8f7db72484c9..ecbcd054b105 100644 --- a/datafusion/src/physical_plan/mod.rs +++ b/datafusion/src/physical_plan/mod.rs @@ -217,6 +217,9 @@ pub trait ExecutionPlan: Debug + Send + Sync { /// use datafusion::prelude::*; /// use datafusion::physical_plan::displayable; /// +/// #[tokio::main] +/// # async fn main() { +/// /// // Hard code concurrency as it appears in the RepartitionExec output /// let config = ExecutionConfig::new() /// .with_concurrency(3); @@ -242,6 +245,7 @@ pub trait ExecutionPlan: Debug + Send + Sync { /// \n RepartitionExec: partitioning=RoundRobinBatch(3)\ /// \n CsvExec: source=Path(tests/example.csv: [tests/example.csv]), has_header=true", /// plan_string.trim()); +/// # } /// ``` /// pub fn displayable(plan: &dyn ExecutionPlan) -> DisplayableExecutionPlan<'_> { diff --git a/datafusion/src/physical_plan/parquet.rs b/datafusion/src/physical_plan/parquet.rs index ff8bb5b32678..4b205fab0b6c 100644 --- a/datafusion/src/physical_plan/parquet.rs +++ b/datafusion/src/physical_plan/parquet.rs @@ -18,7 +18,6 @@ //! Execution plan for reading Parquet files use std::fmt; -use std::fs::File; use std::sync::Arc; use std::{any::Any, convert::TryInto}; @@ -27,14 +26,14 @@ use crate::{ logical_plan::{Column, Expr}, physical_optimizer::pruning::{PruningPredicate, PruningStatistics}, physical_plan::{ - common, DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, + DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, }, scalar::ScalarValue, }; use arrow::{ array::ArrayRef, - datatypes::{DataType, Schema, SchemaRef}, + datatypes::{Schema, SchemaRef}, error::{ArrowError, Result as ArrowResult}, record_batch::RecordBatch, }; @@ -54,19 +53,23 @@ use tokio::{ task, }; -use crate::datasource::datasource::{ColumnStatistics, Statistics}; +use crate::datasource::datasource::Statistics; use async_trait::async_trait; use super::stream::RecordBatchReceiverStream; use super::SQLMetric; -use crate::physical_plan::expressions::{MaxAccumulator, MinAccumulator}; -use crate::physical_plan::Accumulator; +use crate::datasource::object_store::ObjectStore; +use crate::datasource::parquet::{ObjectReaderWrapper, ParquetRootDesc}; +use crate::datasource::{get_statistics_with_limit, FilePartition, PartitionedFile}; +use crate::prelude::ExecutionContext; /// Execution plan for scanning one or more Parquet partitions #[derive(Debug, Clone)] pub struct ParquetExec { /// Parquet partitions to read partitions: Vec, + /// Source used for get reader for partitions + object_store: Arc, /// Schema after projection is applied schema: SchemaRef, /// Projection for which columns to load @@ -95,9 +98,7 @@ pub struct ParquetExec { #[derive(Debug, Clone)] pub struct ParquetPartition { /// The Parquet filename for this partition - pub filenames: Vec, - /// Statistics for this partition - pub statistics: Statistics, + pub file_partition: FilePartition, /// Execution metrics metrics: ParquetPartitionMetrics, } @@ -126,290 +127,44 @@ impl ParquetExec { projection: Option>, predicate: Option, batch_size: usize, - max_concurrency: usize, + context: ExecutionContext, limit: Option, ) -> Result { + let max_concurrency = context.state.lock().unwrap().config.concurrency; // build a list of filenames from the specified path, which could be a single file or // a directory containing one or more parquet files - let filenames = common::build_file_list(path, ".parquet")?; - if filenames.is_empty() { - Err(DataFusionError::Plan(format!( - "No Parquet files (with .parquet extension) found at path {}", - path - ))) - } else { - let filenames = filenames - .iter() - .map(|filename| filename.as_str()) - .collect::>(); - Self::try_from_files( - &filenames, - projection, - predicate, - batch_size, - max_concurrency, - limit, - ) - } + let root_desc = ParquetRootDesc::new(path, context)?; + Self::try_new( + Arc::new(root_desc), + projection, + predicate, + batch_size, + max_concurrency, + limit, + ) } - /// Create a new Parquet reader execution plan based on the specified list of Parquet - /// files - pub fn try_from_files( - filenames: &[&str], + /// Create a new Parquet reader execution plan with root descriptor, provided partitions and schema + pub fn try_new( + desc: Arc, projection: Option>, predicate: Option, batch_size: usize, max_concurrency: usize, limit: Option, ) -> Result { - debug!("Creating ParquetExec, filenames: {:?}, projection {:?}, predicate: {:?}, limit: {:?}", - filenames, projection, predicate, limit); - // build a list of Parquet partitions with statistics and gather all unique schemas - // used in this data set - let mut schemas: Vec = vec![]; - let mut partitions = Vec::with_capacity(max_concurrency); - let filenames: Vec = filenames.iter().map(|s| s.to_string()).collect(); - let chunks = split_files(&filenames, max_concurrency); - let mut num_rows = 0; - let mut num_fields = 0; - let mut fields = Vec::new(); - let mut total_byte_size = 0; - let mut null_counts = Vec::new(); - let mut max_values: Vec> = Vec::new(); - let mut min_values: Vec> = Vec::new(); - let mut limit_exhausted = false; - for chunk in chunks { - let mut filenames: Vec = - chunk.iter().map(|x| x.to_string()).collect(); - let mut total_files = 0; - for filename in &filenames { - total_files += 1; - let file = File::open(filename)?; - let file_reader = Arc::new(SerializedFileReader::new(file)?); - let mut arrow_reader = ParquetFileArrowReader::new(file_reader); - let meta_data = arrow_reader.get_metadata(); - // collect all the unique schemas in this data set - let schema = arrow_reader.get_schema()?; - if schemas.is_empty() || schema != schemas[0] { - fields = schema.fields().to_vec(); - num_fields = schema.fields().len(); - null_counts = vec![0; num_fields]; - max_values = schema - .fields() - .iter() - .map(|field| MaxAccumulator::try_new(field.data_type()).ok()) - .collect::>(); - min_values = schema - .fields() - .iter() - .map(|field| MinAccumulator::try_new(field.data_type()).ok()) - .collect::>(); - schemas.push(schema); - } + debug!("Creating ParquetExec, desc: {:?}, projection {:?}, predicate: {:?}, limit: {:?}", + desc, projection, predicate, limit); - for row_group_meta in meta_data.row_groups() { - num_rows += row_group_meta.num_rows(); - total_byte_size += row_group_meta.total_byte_size(); + let (all_files, statistics) = get_statistics_with_limit(&desc.descriptor, limit); + let schema = desc.schema(); - // Currently assumes every Parquet file has same schema - // https://issues.apache.org/jira/browse/ARROW-11017 - let columns_null_counts = row_group_meta - .columns() - .iter() - .flat_map(|c| c.statistics().map(|stats| stats.null_count())); - - for (i, cnt) in columns_null_counts.enumerate() { - null_counts[i] += cnt - } - - for (i, column) in row_group_meta.columns().iter().enumerate() { - if let Some(stat) = column.statistics() { - match stat { - ParquetStatistics::Boolean(s) => { - if let DataType::Boolean = fields[i].data_type() { - if s.has_min_max_set() { - if let Some(max_value) = &mut max_values[i] { - match max_value.update(&[ - ScalarValue::Boolean(Some(*s.max())), - ]) { - Ok(_) => {} - Err(_) => { - max_values[i] = None; - } - } - } - if let Some(min_value) = &mut min_values[i] { - match min_value.update(&[ - ScalarValue::Boolean(Some(*s.min())), - ]) { - Ok(_) => {} - Err(_) => { - min_values[i] = None; - } - } - } - } - } - } - ParquetStatistics::Int32(s) => { - if let DataType::Int32 = fields[i].data_type() { - if s.has_min_max_set() { - if let Some(max_value) = &mut max_values[i] { - match max_value.update(&[ - ScalarValue::Int32(Some(*s.max())), - ]) { - Ok(_) => {} - Err(_) => { - max_values[i] = None; - } - } - } - if let Some(min_value) = &mut min_values[i] { - match min_value.update(&[ - ScalarValue::Int32(Some(*s.min())), - ]) { - Ok(_) => {} - Err(_) => { - min_values[i] = None; - } - } - } - } - } - } - ParquetStatistics::Int64(s) => { - if let DataType::Int64 = fields[i].data_type() { - if s.has_min_max_set() { - if let Some(max_value) = &mut max_values[i] { - match max_value.update(&[ - ScalarValue::Int64(Some(*s.max())), - ]) { - Ok(_) => {} - Err(_) => { - max_values[i] = None; - } - } - } - if let Some(min_value) = &mut min_values[i] { - match min_value.update(&[ - ScalarValue::Int64(Some(*s.min())), - ]) { - Ok(_) => {} - Err(_) => { - min_values[i] = None; - } - } - } - } - } - } - ParquetStatistics::Float(s) => { - if let DataType::Float32 = fields[i].data_type() { - if s.has_min_max_set() { - if let Some(max_value) = &mut max_values[i] { - match max_value.update(&[ - ScalarValue::Float32(Some(*s.max())), - ]) { - Ok(_) => {} - Err(_) => { - max_values[i] = None; - } - } - } - if let Some(min_value) = &mut min_values[i] { - match min_value.update(&[ - ScalarValue::Float32(Some(*s.min())), - ]) { - Ok(_) => {} - Err(_) => { - min_values[i] = None; - } - } - } - } - } - } - ParquetStatistics::Double(s) => { - if let DataType::Float64 = fields[i].data_type() { - if s.has_min_max_set() { - if let Some(max_value) = &mut max_values[i] { - match max_value.update(&[ - ScalarValue::Float64(Some(*s.max())), - ]) { - Ok(_) => {} - Err(_) => { - max_values[i] = None; - } - } - } - if let Some(min_value) = &mut min_values[i] { - match min_value.update(&[ - ScalarValue::Float64(Some(*s.min())), - ]) { - Ok(_) => {} - Err(_) => { - min_values[i] = None; - } - } - } - } - } - } - _ => {} - } - } - } - - if limit.map(|x| num_rows >= x as i64).unwrap_or(false) { - limit_exhausted = true; - break; - } - } - } - let column_stats = (0..num_fields) - .map(|i| { - let max_value = match &max_values[i] { - Some(max_value) => max_value.evaluate().ok(), - None => None, - }; - let min_value = match &min_values[i] { - Some(min_value) => min_value.evaluate().ok(), - None => None, - }; - ColumnStatistics { - null_count: Some(null_counts[i] as usize), - max_value, - min_value, - distinct_count: None, - } - }) - .collect(); - - let statistics = Statistics { - num_rows: Some(num_rows as usize), - total_byte_size: Some(total_byte_size as usize), - column_statistics: Some(column_stats), - }; - // remove files that are not needed in case of limit - filenames.truncate(total_files); - partitions.push(ParquetPartition::new(filenames, statistics)); - if limit_exhausted { - break; - } + let mut partitions = Vec::with_capacity(max_concurrency); + let chunked_files = split_files(&all_files, max_concurrency); + for (index, group) in chunked_files.iter().enumerate() { + partitions.push(ParquetPartition::new(Vec::from(*group), index)); } - // we currently get the schema information from the first file rather than do - // schema merging and this is a limitation. - // See https://issues.apache.org/jira/browse/ARROW-11017 - if schemas.len() > 1 { - return Err(DataFusionError::Plan(format!( - "The Parquet files have {} different schemas and DataFusion does \ - not yet support schema merging", - schemas.len() - ))); - } - let schema = Arc::new(schemas.pop().unwrap()); let metrics = ParquetExecMetrics::new(); let predicate_builder = predicate.and_then(|predicate_expr| { @@ -428,8 +183,10 @@ impl ParquetExec { Ok(Self::new( partitions, + desc.object_store.clone(), schema, projection, + statistics, metrics, predicate_builder, batch_size, @@ -438,10 +195,13 @@ impl ParquetExec { } /// Create a new Parquet reader execution plan with provided partitions and schema + #[allow(clippy::too_many_arguments)] pub fn new( partitions: Vec, + object_store: Arc, schema: SchemaRef, projection: Option>, + statistics: Statistics, metrics: ParquetExecMetrics, predicate_builder: Option, batch_size: usize, @@ -459,96 +219,23 @@ impl ParquetExec { .collect(), ); - // sum the statistics - let mut num_rows: Option = None; - let mut total_byte_size: Option = None; - let mut null_counts: Vec = vec![0; schema.fields().len()]; - let mut has_statistics = false; - let mut max_values = schema - .fields() - .iter() - .map(|field| MaxAccumulator::try_new(field.data_type()).ok()) - .collect::>(); - let mut min_values = schema - .fields() - .iter() - .map(|field| MinAccumulator::try_new(field.data_type()).ok()) - .collect::>(); - for part in &partitions { - if let Some(n) = part.statistics.num_rows { - num_rows = Some(num_rows.unwrap_or(0) + n) - } - if let Some(n) = part.statistics.total_byte_size { - total_byte_size = Some(total_byte_size.unwrap_or(0) + n) + let new_column_statistics = statistics.column_statistics.map(|stats| { + let mut projected_stats = Vec::with_capacity(projection.len()); + for proj in &projection { + projected_stats.push(stats[*proj].clone()); } - if let Some(x) = &part.statistics.column_statistics { - let part_nulls: Vec> = - x.iter().map(|c| c.null_count).collect(); - has_statistics = true; - - let part_max_values: Vec> = - x.iter().map(|c| c.max_value.clone()).collect(); - let part_min_values: Vec> = - x.iter().map(|c| c.min_value.clone()).collect(); - - for &i in projection.iter() { - null_counts[i] = part_nulls[i].unwrap_or(0); - if let Some(part_max_value) = part_max_values[i].clone() { - if let Some(max_value) = &mut max_values[i] { - match max_value.update(&[part_max_value]) { - Ok(_) => {} - Err(_) => { - max_values[i] = None; - } - } - } - } - if let Some(part_min_value) = part_min_values[i].clone() { - if let Some(min_value) = &mut min_values[i] { - match min_value.update(&[part_min_value]) { - Ok(_) => {} - Err(_) => { - min_values[i] = None; - } - } - } - } - } - } - } - - let column_stats = if has_statistics { - Some( - (0..schema.fields().len()) - .map(|i| { - let max_value = match &max_values[i] { - Some(max_value) => max_value.evaluate().ok(), - None => None, - }; - let min_value = match &min_values[i] { - Some(min_value) => min_value.evaluate().ok(), - None => None, - }; - ColumnStatistics { - null_count: Some(null_counts[i] as usize), - max_value, - min_value, - distinct_count: None, - } - }) - .collect(), - ) - } else { - None - }; + projected_stats + }); let statistics = Statistics { - num_rows, - total_byte_size, - column_statistics: column_stats, + num_rows: statistics.num_rows, + total_byte_size: statistics.total_byte_size, + column_statistics: new_column_statistics, }; + Self { partitions, + object_store, schema: Arc::new(projected_schema), projection, metrics, @@ -582,22 +269,20 @@ impl ParquetExec { impl ParquetPartition { /// Create a new parquet partition - pub fn new(filenames: Vec, statistics: Statistics) -> Self { + pub fn new(files: Vec, index: usize) -> Self { Self { - filenames, - statistics, + file_partition: FilePartition { index, files }, metrics: ParquetPartitionMetrics::new(), } } /// The Parquet filename for this partition - pub fn filenames(&self) -> &[String] { - &self.filenames - } - - /// Statistics for this partition - pub fn statistics(&self) -> &Statistics { - &self.statistics + pub fn filenames(&self) -> Vec { + self.file_partition + .files + .iter() + .map(|f| f.file_path.clone()) + .collect() } } @@ -663,8 +348,8 @@ impl ExecutionPlan for ParquetExec { Receiver>, ) = channel(2); - let partition = &self.partitions[partition]; - let filenames = partition.filenames.clone(); + let object_store = self.object_store.clone(); + let partition = self.partitions[partition].clone(); let metrics = partition.metrics.clone(); let projection = self.projection.clone(); let predicate_builder = self.predicate_builder.clone(); @@ -672,8 +357,9 @@ impl ExecutionPlan for ParquetExec { let limit = self.limit; task::spawn_blocking(move || { - if let Err(e) = read_files( - &filenames, + if let Err(e) = read_partition( + object_store, + partition, metrics, &projection, &predicate_builder, @@ -698,9 +384,7 @@ impl ExecutionPlan for ParquetExec { let files: Vec<_> = self .partitions .iter() - .map(|pp| pp.filenames.iter()) - .flatten() - .map(|s| s.as_str()) + .map(|pp| format!("{}", pp.file_partition)) .collect(); write!( @@ -720,14 +404,11 @@ impl ExecutionPlan for ParquetExec { .flat_map(|p| { vec![ ( - format!( - "numPredicateEvaluationErrors for {}", - p.filenames.join(",") - ), + format!("numPredicateEvaluationErrors for {}", p.file_partition), p.metrics.predicate_evaluation_errors.as_ref().clone(), ), ( - format!("numRowGroupsPruned for {}", p.filenames.join(",")), + format!("numRowGroupsPruned for {}", p.file_partition), p.metrics.row_groups_pruned.as_ref().clone(), ), ] @@ -851,7 +532,7 @@ fn build_row_group_predicate( match predicate_values { Ok(values) => { // NB: false means don't scan row group - let num_pruned = values.iter().filter(|&v| !v).count(); + let num_pruned = values.iter().filter(|&v| !*v).count(); metrics.row_groups_pruned.add(num_pruned); Box::new(move |_, i| values[i]) } @@ -865,8 +546,10 @@ fn build_row_group_predicate( } } -fn read_files( - filenames: &[String], +#[allow(clippy::too_many_arguments)] +fn read_partition( + object_store: Arc, + partition: ParquetPartition, metrics: ParquetPartitionMetrics, projection: &[usize], predicate_builder: &Option, @@ -875,9 +558,11 @@ fn read_files( limit: Option, ) -> Result<()> { let mut total_rows = 0; - 'outer: for filename in filenames { - let file = File::open(&filename)?; - let mut file_reader = SerializedFileReader::new(file)?; + let all_files = partition.file_partition.files; + 'outer: for partitioned_file in all_files { + let reader = object_store.get_reader(partitioned_file.file_path.as_str())?; + let mut file_reader = + SerializedFileReader::new(ObjectReaderWrapper::new(reader))?; if let Some(predicate_builder) = predicate_builder { let row_group_predicate = build_row_group_predicate( predicate_builder, @@ -904,7 +589,7 @@ fn read_files( Some(Err(e)) => { let err_msg = format!( "Error reading batch from {}: {}", - filename, + partitioned_file, e.to_string() ); // send error to operator @@ -924,12 +609,15 @@ fn read_files( Ok(()) } -fn split_files(filenames: &[String], n: usize) -> Vec<&[String]> { - let mut chunk_size = filenames.len() / n; - if filenames.len() % n > 0 { +fn split_files( + partitioned_files: &[PartitionedFile], + n: usize, +) -> Vec<&[PartitionedFile]> { + let mut chunk_size = partitioned_files.len() / n; + if partitioned_files.len() % n > 0 { chunk_size += 1; } - filenames.chunks(chunk_size).collect() + partitioned_files.chunks(chunk_size).collect() } #[cfg(test)] @@ -945,24 +633,24 @@ mod tests { #[test] fn test_split_files() { - let filenames = vec![ - "a".to_string(), - "b".to_string(), - "c".to_string(), - "d".to_string(), - "e".to_string(), + let files = vec![ + PartitionedFile::from("a".to_string()), + PartitionedFile::from("b".to_string()), + PartitionedFile::from("c".to_string()), + PartitionedFile::from("d".to_string()), + PartitionedFile::from("e".to_string()), ]; - let chunks = split_files(&filenames, 1); + let chunks = split_files(&files, 1); assert_eq!(1, chunks.len()); assert_eq!(5, chunks[0].len()); - let chunks = split_files(&filenames, 2); + let chunks = split_files(&files, 2); assert_eq!(2, chunks.len()); assert_eq!(3, chunks[0].len()); assert_eq!(2, chunks[1].len()); - let chunks = split_files(&filenames, 5); + let chunks = split_files(&files, 5); assert_eq!(5, chunks.len()); assert_eq!(1, chunks[0].len()); assert_eq!(1, chunks[1].len()); @@ -970,7 +658,7 @@ mod tests { assert_eq!(1, chunks[3].len()); assert_eq!(1, chunks[4].len()); - let chunks = split_files(&filenames, 123); + let chunks = split_files(&files, 123); assert_eq!(5, chunks.len()); assert_eq!(1, chunks[0].len()); assert_eq!(1, chunks[1].len()); @@ -979,7 +667,7 @@ mod tests { assert_eq!(1, chunks[4].len()); } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test() -> Result<()> { let testdata = crate::test_util::parquet_test_data(); let filename = format!("{}/alltypes_plain.parquet", testdata); @@ -988,7 +676,7 @@ mod tests { Some(vec![0, 1, 2]), None, 1024, - 4, + ExecutionContext::with_concurrency(4), None, )?; assert_eq!(parquet_exec.output_partitioning().partition_count(), 1); diff --git a/datafusion/src/physical_plan/planner.rs b/datafusion/src/physical_plan/planner.rs index 02ab15d1a652..9d4673c15ff8 100644 --- a/datafusion/src/physical_plan/planner.rs +++ b/datafusion/src/physical_plan/planner.rs @@ -1399,8 +1399,8 @@ mod tests { planner.create_physical_plan(logical_plan, &ctx_state) } - #[test] - fn test_all_operators() -> Result<()> { + #[tokio::test] + async fn test_all_operators() -> Result<()> { let testdata = crate::test_util::arrow_test_data(); let path = format!("{}/csv/aggregate_test_100.csv", testdata); @@ -1444,8 +1444,8 @@ mod tests { Ok(()) } - #[test] - fn test_with_csv_plan() -> Result<()> { + #[tokio::test] + async fn test_with_csv_plan() -> Result<()> { let testdata = crate::test_util::arrow_test_data(); let path = format!("{}/csv/aggregate_test_100.csv", testdata); @@ -1463,8 +1463,8 @@ mod tests { Ok(()) } - #[test] - fn errors() -> Result<()> { + #[tokio::test] + async fn errors() -> Result<()> { let testdata = crate::test_util::arrow_test_data(); let path = format!("{}/csv/aggregate_test_100.csv", testdata); let options = CsvReadOptions::new().schema_infer_max_records(100); @@ -1565,8 +1565,8 @@ mod tests { } } - #[test] - fn in_list_types() -> Result<()> { + #[tokio::test] + async fn in_list_types() -> Result<()> { let testdata = crate::test_util::arrow_test_data(); let path = format!("{}/csv/aggregate_test_100.csv", testdata); let options = CsvReadOptions::new().schema_infer_max_records(100); @@ -1612,8 +1612,8 @@ mod tests { Ok(()) } - #[test] - fn hash_agg_input_schema() -> Result<()> { + #[tokio::test] + async fn hash_agg_input_schema() -> Result<()> { let testdata = crate::test_util::arrow_test_data(); let path = format!("{}/csv/aggregate_test_100.csv", testdata); @@ -1635,8 +1635,8 @@ mod tests { Ok(()) } - #[test] - fn hash_agg_group_by_partitioned() -> Result<()> { + #[tokio::test] + async fn hash_agg_group_by_partitioned() -> Result<()> { let testdata = crate::test_util::arrow_test_data(); let path = format!("{}/csv/aggregate_test_100.csv", testdata); diff --git a/datafusion/tests/parquet_pruning.rs b/datafusion/tests/parquet_pruning.rs index 789f0810c983..99e19a4789fb 100644 --- a/datafusion/tests/parquet_pruning.rs +++ b/datafusion/tests/parquet_pruning.rs @@ -41,7 +41,7 @@ use hashbrown::HashMap; use parquet::{arrow::ArrowWriter, file::properties::WriterProperties}; use tempfile::NamedTempFile; -#[tokio::test] +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn prune_timestamps_nanos() { let output = ContextWithParquet::new(Scenario::Timestamps) .await @@ -54,7 +54,7 @@ async fn prune_timestamps_nanos() { assert_eq!(output.result_rows, 10, "{}", output.description()); } -#[tokio::test] +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn prune_timestamps_micros() { let output = ContextWithParquet::new(Scenario::Timestamps) .await @@ -69,7 +69,7 @@ async fn prune_timestamps_micros() { assert_eq!(output.result_rows, 10, "{}", output.description()); } -#[tokio::test] +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn prune_timestamps_millis() { let output = ContextWithParquet::new(Scenario::Timestamps) .await @@ -84,7 +84,7 @@ async fn prune_timestamps_millis() { assert_eq!(output.result_rows, 10, "{}", output.description()); } -#[tokio::test] +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn prune_timestamps_seconds() { let output = ContextWithParquet::new(Scenario::Timestamps) .await @@ -99,7 +99,7 @@ async fn prune_timestamps_seconds() { assert_eq!(output.result_rows, 10, "{}", output.description()); } -#[tokio::test] +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn prune_date32() { let output = ContextWithParquet::new(Scenario::Dates) .await @@ -112,7 +112,7 @@ async fn prune_date32() { assert_eq!(output.result_rows, 1, "{}", output.description()); } -#[tokio::test] +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn prune_date64() { // work around for not being able to cast Date32 to Date64 automatically let date = "2020-01-02" @@ -137,7 +137,7 @@ async fn prune_date64() { assert_eq!(output.result_rows, 1, "{}", output.description()); } -#[tokio::test] +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn prune_disabled() { let query = "SELECT * FROM t where nanos < to_timestamp('2020-01-02 01:01:11Z')"; let expected_rows = 10; @@ -178,7 +178,7 @@ async fn prune_disabled() { ); } -#[tokio::test] +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn prune_int32_lt() { let (expected_errors, expected_row_group_pruned, expected_results) = (Some(0), Some(1), 11); @@ -218,7 +218,7 @@ async fn prune_int32_lt() { ); } -#[tokio::test] +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn prune_int32_eq() { // resulrt of sql "SELECT * FROM t where i = 1" let output = ContextWithParquet::new(Scenario::Int32) @@ -233,7 +233,7 @@ async fn prune_int32_eq() { assert_eq!(output.result_rows, 1, "{}", output.description()); } -#[tokio::test] +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn prune_int32_scalar_fun_and_eq() { // resulrt of sql "SELECT * FROM t where abs(i) = 1 and i = 1" // only use "i = 1" to prune @@ -249,7 +249,7 @@ async fn prune_int32_scalar_fun_and_eq() { assert_eq!(output.result_rows, 1, "{}", output.description()); } -#[tokio::test] +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn prune_int32_scalar_fun() { // resulrt of sql "SELECT * FROM t where abs(i) = 1" is not supported let output = ContextWithParquet::new(Scenario::Int32) @@ -265,7 +265,7 @@ async fn prune_int32_scalar_fun() { assert_eq!(output.result_rows, 3, "{}", output.description()); } -#[tokio::test] +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn prune_int32_complex_expr() { // resulrt of sql "SELECT * FROM t where i+1 = 1" is not supported let output = ContextWithParquet::new(Scenario::Int32) @@ -281,7 +281,7 @@ async fn prune_int32_complex_expr() { assert_eq!(output.result_rows, 2, "{}", output.description()); } -#[tokio::test] +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn prune_int32_complex_expr_subtract() { // resulrt of sql "SELECT * FROM t where 1-i > 1" is not supported let output = ContextWithParquet::new(Scenario::Int32) @@ -297,7 +297,7 @@ async fn prune_int32_complex_expr_subtract() { assert_eq!(output.result_rows, 9, "{}", output.description()); } -#[tokio::test] +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn prune_f64_lt() { let (expected_errors, expected_row_group_pruned, expected_results) = (Some(0), Some(1), 11); @@ -337,7 +337,7 @@ async fn prune_f64_lt() { ); } -#[tokio::test] +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn prune_f64_scalar_fun_and_gt() { // resulrt of sql "SELECT * FROM t where abs(f - 1) <= 0.000001 and f >= 0.1" // only use "f >= 0" to prune @@ -353,7 +353,7 @@ async fn prune_f64_scalar_fun_and_gt() { assert_eq!(output.result_rows, 1, "{}", output.description()); } -#[tokio::test] +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn prune_f64_scalar_fun() { // resulrt of sql "SELECT * FROM t where abs(f-1) <= 0.000001" is not supported let output = ContextWithParquet::new(Scenario::Float64) @@ -369,7 +369,7 @@ async fn prune_f64_scalar_fun() { assert_eq!(output.result_rows, 1, "{}", output.description()); } -#[tokio::test] +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn prune_f64_complex_expr() { // resulrt of sql "SELECT * FROM t where f+1 > 1.1"" is not supported let output = ContextWithParquet::new(Scenario::Float64) @@ -385,7 +385,7 @@ async fn prune_f64_complex_expr() { assert_eq!(output.result_rows, 9, "{}", output.description()); } -#[tokio::test] +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn prune_f64_complex_expr_subtract() { // resulrt of sql "SELECT * FROM t where 1-f > 1" is not supported let output = ContextWithParquet::new(Scenario::Float64) diff --git a/datafusion/tests/sql.rs b/datafusion/tests/sql.rs index 0f385680deed..b70ce155b6f3 100644 --- a/datafusion/tests/sql.rs +++ b/datafusion/tests/sql.rs @@ -110,7 +110,7 @@ async fn nyc() -> Result<()> { Ok(()) } -#[tokio::test] +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn parquet_query() { let mut ctx = ExecutionContext::new(); register_alltypes_parquet(&mut ctx); @@ -136,7 +136,7 @@ async fn parquet_query() { assert_batches_eq!(expected, &actual); } -#[tokio::test] +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn parquet_single_nan_schema() { let mut ctx = ExecutionContext::new(); let testdata = datafusion::test_util::parquet_test_data();