diff --git a/datafusion/catalog/src/lib.rs b/datafusion/catalog/src/lib.rs index f160bddd2b9c..0394b05277da 100644 --- a/datafusion/catalog/src/lib.rs +++ b/datafusion/catalog/src/lib.rs @@ -50,7 +50,7 @@ pub use catalog::*; pub use datafusion_session::Session; pub use dynamic_file::catalog::*; pub use memory::{ - MemoryCatalogProvider, MemoryCatalogProviderList, MemorySchemaProvider, + MemTable, MemoryCatalogProvider, MemoryCatalogProviderList, MemorySchemaProvider, }; pub use r#async::*; pub use schema::*; diff --git a/datafusion/catalog/src/memory/mod.rs b/datafusion/catalog/src/memory/mod.rs index 4c5cf1a9ae9d..541d25b3345b 100644 --- a/datafusion/catalog/src/memory/mod.rs +++ b/datafusion/catalog/src/memory/mod.rs @@ -17,6 +17,12 @@ pub(crate) mod catalog; pub(crate) mod schema; +pub(crate) mod table; pub use catalog::*; pub use schema::*; +pub use table::*; + +// backward compatibility +pub use datafusion_datasource::memory::MemorySourceConfig; +pub use datafusion_datasource::source::DataSourceExec; diff --git a/datafusion/catalog/src/memory/table.rs b/datafusion/catalog/src/memory/table.rs new file mode 100644 index 000000000000..81243e2c4889 --- /dev/null +++ b/datafusion/catalog/src/memory/table.rs @@ -0,0 +1,296 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! [`MemTable`] for querying `Vec` by DataFusion. + +use std::any::Any; +use std::collections::HashMap; +use std::fmt::Debug; +use std::sync::Arc; + +use crate::TableProvider; +use datafusion_common::error::Result; +use datafusion_expr::Expr; +use datafusion_expr::TableType; +use datafusion_physical_expr::create_physical_sort_exprs; +use datafusion_physical_plan::repartition::RepartitionExec; +use datafusion_physical_plan::{ + common, ExecutionPlan, ExecutionPlanProperties, Partitioning, +}; + +use arrow::datatypes::SchemaRef; +use arrow::record_batch::RecordBatch; +use datafusion_common::{not_impl_err, plan_err, Constraints, DFSchema, SchemaExt}; +use datafusion_common_runtime::JoinSet; +use datafusion_datasource::memory::MemSink; +use datafusion_datasource::memory::MemorySourceConfig; +use datafusion_datasource::sink::DataSinkExec; +use datafusion_datasource::source::DataSourceExec; +use datafusion_expr::dml::InsertOp; +use datafusion_expr::SortExpr; +use datafusion_session::Session; + +use async_trait::async_trait; +use futures::StreamExt; +use log::debug; +use parking_lot::Mutex; +use tokio::sync::RwLock; + +// backward compatibility +pub use datafusion_datasource::memory::PartitionData; + +/// In-memory data source for presenting a `Vec` as a +/// data source that can be queried by DataFusion. This allows data to +/// be pre-loaded into memory and then repeatedly queried without +/// incurring additional file I/O overhead. +#[derive(Debug)] +pub struct MemTable { + schema: SchemaRef, + // batches used to be pub(crate), but it's needed to be public for the tests + pub batches: Vec, + constraints: Constraints, + column_defaults: HashMap, + /// Optional pre-known sort order(s). Must be `SortExpr`s. + /// inserting data into this table removes the order + pub sort_order: Arc>>>, +} + +impl MemTable { + /// Create a new in-memory table from the provided schema and record batches + pub fn try_new(schema: SchemaRef, partitions: Vec>) -> Result { + for batches in partitions.iter().flatten() { + let batches_schema = batches.schema(); + if !schema.contains(&batches_schema) { + debug!( + "mem table schema does not contain batches schema. \ + Target_schema: {schema:?}. Batches Schema: {batches_schema:?}" + ); + return plan_err!("Mismatch between schema and batches"); + } + } + + Ok(Self { + schema, + batches: partitions + .into_iter() + .map(|e| Arc::new(RwLock::new(e))) + .collect::>(), + constraints: Constraints::empty(), + column_defaults: HashMap::new(), + sort_order: Arc::new(Mutex::new(vec![])), + }) + } + + /// Assign constraints + pub fn with_constraints(mut self, constraints: Constraints) -> Self { + self.constraints = constraints; + self + } + + /// Assign column defaults + pub fn with_column_defaults( + mut self, + column_defaults: HashMap, + ) -> Self { + self.column_defaults = column_defaults; + self + } + + /// Specify an optional pre-known sort order(s). Must be `SortExpr`s. + /// + /// If the data is not sorted by this order, DataFusion may produce + /// incorrect results. + /// + /// DataFusion may take advantage of this ordering to omit sorts + /// or use more efficient algorithms. + /// + /// Note that multiple sort orders are supported, if some are known to be + /// equivalent, + pub fn with_sort_order(self, mut sort_order: Vec>) -> Self { + std::mem::swap(self.sort_order.lock().as_mut(), &mut sort_order); + self + } + + /// Create a mem table by reading from another data source + pub async fn load( + t: Arc, + output_partitions: Option, + state: &dyn Session, + ) -> Result { + let schema = t.schema(); + let constraints = t.constraints(); + let exec = t.scan(state, None, &[], None).await?; + let partition_count = exec.output_partitioning().partition_count(); + + let mut join_set = JoinSet::new(); + + for part_idx in 0..partition_count { + let task = state.task_ctx(); + let exec = Arc::clone(&exec); + join_set.spawn(async move { + let stream = exec.execute(part_idx, task)?; + common::collect(stream).await + }); + } + + let mut data: Vec> = + Vec::with_capacity(exec.output_partitioning().partition_count()); + + while let Some(result) = join_set.join_next().await { + match result { + Ok(res) => data.push(res?), + Err(e) => { + if e.is_panic() { + std::panic::resume_unwind(e.into_panic()); + } else { + unreachable!(); + } + } + } + } + + let mut exec = DataSourceExec::new(Arc::new(MemorySourceConfig::try_new( + &data, + Arc::clone(&schema), + None, + )?)); + if let Some(cons) = constraints { + exec = exec.with_constraints(cons.clone()); + } + + if let Some(num_partitions) = output_partitions { + let exec = RepartitionExec::try_new( + Arc::new(exec), + Partitioning::RoundRobinBatch(num_partitions), + )?; + + // execute and collect results + let mut output_partitions = vec![]; + for i in 0..exec.properties().output_partitioning().partition_count() { + // execute this *output* partition and collect all batches + let task_ctx = state.task_ctx(); + let mut stream = exec.execute(i, task_ctx)?; + let mut batches = vec![]; + while let Some(result) = stream.next().await { + batches.push(result?); + } + output_partitions.push(batches); + } + + return MemTable::try_new(Arc::clone(&schema), output_partitions); + } + MemTable::try_new(Arc::clone(&schema), data) + } +} + +#[async_trait] +impl TableProvider for MemTable { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + Arc::clone(&self.schema) + } + + fn constraints(&self) -> Option<&Constraints> { + Some(&self.constraints) + } + + fn table_type(&self) -> TableType { + TableType::Base + } + + async fn scan( + &self, + state: &dyn Session, + projection: Option<&Vec>, + _filters: &[Expr], + _limit: Option, + ) -> Result> { + let mut partitions = vec![]; + for arc_inner_vec in self.batches.iter() { + let inner_vec = arc_inner_vec.read().await; + partitions.push(inner_vec.clone()) + } + + let mut source = + MemorySourceConfig::try_new(&partitions, self.schema(), projection.cloned())?; + + let show_sizes = state.config_options().explain.show_sizes; + source = source.with_show_sizes(show_sizes); + + // add sort information if present + let sort_order = self.sort_order.lock(); + if !sort_order.is_empty() { + let df_schema = DFSchema::try_from(self.schema.as_ref().clone())?; + + let file_sort_order = sort_order + .iter() + .map(|sort_exprs| { + create_physical_sort_exprs( + sort_exprs, + &df_schema, + state.execution_props(), + ) + }) + .collect::>>()?; + source = source.try_with_sort_information(file_sort_order)?; + } + + Ok(DataSourceExec::from_data_source(source)) + } + + /// Returns an ExecutionPlan that inserts the execution results of a given [`ExecutionPlan`] into this [`MemTable`]. + /// + /// The [`ExecutionPlan`] must have the same schema as this [`MemTable`]. + /// + /// # Arguments + /// + /// * `state` - The [`SessionState`] containing the context for executing the plan. + /// * `input` - The [`ExecutionPlan`] to execute and insert. + /// + /// # Returns + /// + /// * A plan that returns the number of rows written. + /// + /// [`SessionState`]: https://docs.rs/datafusion/latest/datafusion/execution/session_state/struct.SessionState.html + async fn insert_into( + &self, + _state: &dyn Session, + input: Arc, + insert_op: InsertOp, + ) -> Result> { + // If we are inserting into the table, any sort order may be messed up so reset it here + *self.sort_order.lock() = vec![]; + + // Create a physical plan from the logical plan. + // Check that the schema of the plan matches the schema of this table. + self.schema() + .logically_equivalent_names_and_types(&input.schema())?; + + if insert_op != InsertOp::Append { + return not_impl_err!("{insert_op} not implemented for MemoryTable yet"); + } + let sink = MemSink::try_new(self.batches.clone(), Arc::clone(&self.schema))?; + Ok(Arc::new(DataSinkExec::new(input, Arc::new(sink), None))) + } + + fn get_column_default(&self, column: &str) -> Option<&Expr> { + self.column_defaults.get(column) + } +} diff --git a/datafusion/core/src/datasource/memory.rs b/datafusion/core/src/datasource/memory_test.rs similarity index 58% rename from datafusion/core/src/datasource/memory.rs rename to datafusion/core/src/datasource/memory_test.rs index 0288cd3e8bc7..381000ab8ee1 100644 --- a/datafusion/core/src/datasource/memory.rs +++ b/datafusion/core/src/datasource/memory_test.rs @@ -15,378 +15,25 @@ // specific language governing permissions and limitations // under the License. -//! [`MemTable`] for querying `Vec` by DataFusion. - -use std::any::Any; -use std::collections::HashMap; -use std::fmt::{self, Debug}; -use std::sync::Arc; - -use crate::datasource::{TableProvider, TableType}; -use crate::error::Result; -use crate::logical_expr::Expr; -use crate::physical_plan::repartition::RepartitionExec; -use crate::physical_plan::{ - common, DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, - Partitioning, SendableRecordBatchStream, -}; -use crate::physical_planner::create_physical_sort_exprs; - -use arrow::datatypes::SchemaRef; -use arrow::record_batch::RecordBatch; -use datafusion_catalog::Session; -use datafusion_common::{not_impl_err, plan_err, Constraints, DFSchema, SchemaExt}; -use datafusion_common_runtime::JoinSet; -pub use datafusion_datasource::memory::MemorySourceConfig; -use datafusion_datasource::sink::{DataSink, DataSinkExec}; -pub use datafusion_datasource::source::DataSourceExec; -use datafusion_execution::TaskContext; -use datafusion_expr::dml::InsertOp; -use datafusion_expr::SortExpr; - -use async_trait::async_trait; -use futures::StreamExt; -use log::debug; -use parking_lot::Mutex; -use tokio::sync::RwLock; - -/// Type alias for partition data -pub type PartitionData = Arc>>; - -/// In-memory data source for presenting a `Vec` as a -/// data source that can be queried by DataFusion. This allows data to -/// be pre-loaded into memory and then repeatedly queried without -/// incurring additional file I/O overhead. -#[derive(Debug)] -pub struct MemTable { - schema: SchemaRef, - pub(crate) batches: Vec, - constraints: Constraints, - column_defaults: HashMap, - /// Optional pre-known sort order(s). Must be `SortExpr`s. - /// inserting data into this table removes the order - pub sort_order: Arc>>>, -} - -impl MemTable { - /// Create a new in-memory table from the provided schema and record batches - pub fn try_new(schema: SchemaRef, partitions: Vec>) -> Result { - for batches in partitions.iter().flatten() { - let batches_schema = batches.schema(); - if !schema.contains(&batches_schema) { - debug!( - "mem table schema does not contain batches schema. \ - Target_schema: {schema:?}. Batches Schema: {batches_schema:?}" - ); - return plan_err!("Mismatch between schema and batches"); - } - } - - Ok(Self { - schema, - batches: partitions - .into_iter() - .map(|e| Arc::new(RwLock::new(e))) - .collect::>(), - constraints: Constraints::empty(), - column_defaults: HashMap::new(), - sort_order: Arc::new(Mutex::new(vec![])), - }) - } - - /// Assign constraints - pub fn with_constraints(mut self, constraints: Constraints) -> Self { - self.constraints = constraints; - self - } - - /// Assign column defaults - pub fn with_column_defaults( - mut self, - column_defaults: HashMap, - ) -> Self { - self.column_defaults = column_defaults; - self - } - - /// Specify an optional pre-known sort order(s). Must be `SortExpr`s. - /// - /// If the data is not sorted by this order, DataFusion may produce - /// incorrect results. - /// - /// DataFusion may take advantage of this ordering to omit sorts - /// or use more efficient algorithms. - /// - /// Note that multiple sort orders are supported, if some are known to be - /// equivalent, - pub fn with_sort_order(self, mut sort_order: Vec>) -> Self { - std::mem::swap(self.sort_order.lock().as_mut(), &mut sort_order); - self - } - - /// Create a mem table by reading from another data source - pub async fn load( - t: Arc, - output_partitions: Option, - state: &dyn Session, - ) -> Result { - let schema = t.schema(); - let constraints = t.constraints(); - let exec = t.scan(state, None, &[], None).await?; - let partition_count = exec.output_partitioning().partition_count(); - - let mut join_set = JoinSet::new(); - - for part_idx in 0..partition_count { - let task = state.task_ctx(); - let exec = Arc::clone(&exec); - join_set.spawn(async move { - let stream = exec.execute(part_idx, task)?; - common::collect(stream).await - }); - } - - let mut data: Vec> = - Vec::with_capacity(exec.output_partitioning().partition_count()); - - while let Some(result) = join_set.join_next().await { - match result { - Ok(res) => data.push(res?), - Err(e) => { - if e.is_panic() { - std::panic::resume_unwind(e.into_panic()); - } else { - unreachable!(); - } - } - } - } - - let mut exec = DataSourceExec::new(Arc::new(MemorySourceConfig::try_new( - &data, - Arc::clone(&schema), - None, - )?)); - if let Some(cons) = constraints { - exec = exec.with_constraints(cons.clone()); - } - - if let Some(num_partitions) = output_partitions { - let exec = RepartitionExec::try_new( - Arc::new(exec), - Partitioning::RoundRobinBatch(num_partitions), - )?; - - // execute and collect results - let mut output_partitions = vec![]; - for i in 0..exec.properties().output_partitioning().partition_count() { - // execute this *output* partition and collect all batches - let task_ctx = state.task_ctx(); - let mut stream = exec.execute(i, task_ctx)?; - let mut batches = vec![]; - while let Some(result) = stream.next().await { - batches.push(result?); - } - output_partitions.push(batches); - } - - return MemTable::try_new(Arc::clone(&schema), output_partitions); - } - MemTable::try_new(Arc::clone(&schema), data) - } -} - -#[async_trait] -impl TableProvider for MemTable { - fn as_any(&self) -> &dyn Any { - self - } - - fn schema(&self) -> SchemaRef { - Arc::clone(&self.schema) - } - - fn constraints(&self) -> Option<&Constraints> { - Some(&self.constraints) - } - - fn table_type(&self) -> TableType { - TableType::Base - } - - async fn scan( - &self, - state: &dyn Session, - projection: Option<&Vec>, - _filters: &[Expr], - _limit: Option, - ) -> Result> { - let mut partitions = vec![]; - for arc_inner_vec in self.batches.iter() { - let inner_vec = arc_inner_vec.read().await; - partitions.push(inner_vec.clone()) - } - - let mut source = - MemorySourceConfig::try_new(&partitions, self.schema(), projection.cloned())?; - - let show_sizes = state.config_options().explain.show_sizes; - source = source.with_show_sizes(show_sizes); - - // add sort information if present - let sort_order = self.sort_order.lock(); - if !sort_order.is_empty() { - let df_schema = DFSchema::try_from(self.schema.as_ref().clone())?; - - let file_sort_order = sort_order - .iter() - .map(|sort_exprs| { - create_physical_sort_exprs( - sort_exprs, - &df_schema, - state.execution_props(), - ) - }) - .collect::>>()?; - source = source.try_with_sort_information(file_sort_order)?; - } - - Ok(DataSourceExec::from_data_source(source)) - } - - /// Returns an ExecutionPlan that inserts the execution results of a given [`ExecutionPlan`] into this [`MemTable`]. - /// - /// The [`ExecutionPlan`] must have the same schema as this [`MemTable`]. - /// - /// # Arguments - /// - /// * `state` - The [`SessionState`] containing the context for executing the plan. - /// * `input` - The [`ExecutionPlan`] to execute and insert. - /// - /// # Returns - /// - /// * A plan that returns the number of rows written. - /// - /// [`SessionState`]: crate::execution::context::SessionState - async fn insert_into( - &self, - _state: &dyn Session, - input: Arc, - insert_op: InsertOp, - ) -> Result> { - // If we are inserting into the table, any sort order may be messed up so reset it here - *self.sort_order.lock() = vec![]; - - // Create a physical plan from the logical plan. - // Check that the schema of the plan matches the schema of this table. - self.schema() - .logically_equivalent_names_and_types(&input.schema())?; - - if insert_op != InsertOp::Append { - return not_impl_err!("{insert_op} not implemented for MemoryTable yet"); - } - let sink = MemSink::try_new(self.batches.clone(), Arc::clone(&self.schema))?; - Ok(Arc::new(DataSinkExec::new(input, Arc::new(sink), None))) - } - - fn get_column_default(&self, column: &str) -> Option<&Expr> { - self.column_defaults.get(column) - } -} - -/// Implements for writing to a [`MemTable`] -struct MemSink { - /// Target locations for writing data - batches: Vec, - schema: SchemaRef, -} - -impl Debug for MemSink { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("MemSink") - .field("num_partitions", &self.batches.len()) - .finish() - } -} - -impl DisplayAs for MemSink { - fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match t { - DisplayFormatType::Default | DisplayFormatType::Verbose => { - let partition_count = self.batches.len(); - write!(f, "MemoryTable (partitions={partition_count})") - } - DisplayFormatType::TreeRender => { - // TODO: collect info - write!(f, "") - } - } - } -} - -impl MemSink { - /// Creates a new [`MemSink`]. - /// - /// The caller is responsible for ensuring that there is at least one partition to insert into. - fn try_new(batches: Vec, schema: SchemaRef) -> Result { - if batches.is_empty() { - return plan_err!("Cannot insert into MemTable with zero partitions"); - } - Ok(Self { batches, schema }) - } -} - -#[async_trait] -impl DataSink for MemSink { - fn as_any(&self) -> &dyn Any { - self - } - - fn schema(&self) -> &SchemaRef { - &self.schema - } - - async fn write_all( - &self, - mut data: SendableRecordBatchStream, - _context: &Arc, - ) -> Result { - let num_partitions = self.batches.len(); - - // buffer up the data round robin style into num_partitions - - let mut new_batches = vec![vec![]; num_partitions]; - let mut i = 0; - let mut row_count = 0; - while let Some(batch) = data.next().await.transpose()? { - row_count += batch.num_rows(); - new_batches[i].push(batch); - i = (i + 1) % num_partitions; - } - - // write the outputs into the batches - for (target, mut batches) in self.batches.iter().zip(new_batches.into_iter()) { - // Append all the new batches in one go to minimize locking overhead - target.write().await.append(&mut batches); - } - - Ok(row_count as u64) - } -} - #[cfg(test)] mod tests { - use super::*; + use crate::datasource::MemTable; use crate::datasource::{provider_as_source, DefaultTableSource}; use crate::physical_plan::collect; use crate::prelude::SessionContext; - use arrow::array::{AsArray, Int32Array}; use arrow::datatypes::{DataType, Field, Schema, UInt64Type}; use arrow::error::ArrowError; - use datafusion_common::DataFusionError; + use arrow::record_batch::RecordBatch; + use arrow_schema::SchemaRef; + use datafusion_catalog::TableProvider; + use datafusion_common::{DataFusionError, Result}; + use datafusion_expr::dml::InsertOp; use datafusion_expr::LogicalPlanBuilder; + use futures::StreamExt; + use std::collections::HashMap; + use std::sync::Arc; #[tokio::test] async fn test_with_projection() -> Result<()> { diff --git a/datafusion/core/src/datasource/mod.rs b/datafusion/core/src/datasource/mod.rs index 35a451cbc803..a195c9a882dd 100644 --- a/datafusion/core/src/datasource/mod.rs +++ b/datafusion/core/src/datasource/mod.rs @@ -24,10 +24,9 @@ pub mod empty; pub mod file_format; pub mod listing; pub mod listing_table_factory; -pub mod memory; +mod memory_test; pub mod physical_plan; pub mod provider; -mod statistics; mod view_test; // backwards compatibility @@ -40,14 +39,15 @@ pub use crate::catalog::TableProvider; pub use crate::logical_expr::TableType; pub use datafusion_catalog::cte_worktable; pub use datafusion_catalog::default_table_source; +pub use datafusion_catalog::memory; pub use datafusion_catalog::stream; pub use datafusion_catalog::view; +pub use datafusion_datasource::get_statistics_with_limit; pub use datafusion_datasource::schema_adapter; pub use datafusion_datasource::sink; pub use datafusion_datasource::source; pub use datafusion_execution::object_store; pub use datafusion_physical_expr::create_ordering; -pub use statistics::get_statistics_with_limit; #[cfg(all(test, feature = "parquet"))] mod tests { diff --git a/datafusion/core/src/datasource/statistics.rs b/datafusion/core/src/datasource/statistics.rs deleted file mode 100644 index cf283ecee0bf..000000000000 --- a/datafusion/core/src/datasource/statistics.rs +++ /dev/null @@ -1,219 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use std::mem; -use std::sync::Arc; - -use futures::{Stream, StreamExt}; - -use crate::arrow::datatypes::SchemaRef; -use crate::error::Result; -use crate::physical_plan::{ColumnStatistics, Statistics}; -use datafusion_common::stats::Precision; -use datafusion_common::ScalarValue; -use datafusion_datasource::file_groups::FileGroup; - -use super::listing::PartitionedFile; - -/// Get all files as well as the file level summary statistics (no statistic for partition columns). -/// If the optional `limit` is provided, includes only sufficient files. Needed to read up to -/// `limit` number of rows. `collect_stats` is passed down from the configuration parameter on -/// `ListingTable`. If it is false we only construct bare statistics and skip a potentially expensive -/// call to `multiunzip` for constructing file level summary statistics. -pub async fn get_statistics_with_limit( - all_files: impl Stream)>>, - file_schema: SchemaRef, - limit: Option, - collect_stats: bool, -) -> Result<(FileGroup, Statistics)> { - let mut result_files = FileGroup::default(); - // These statistics can be calculated as long as at least one file provides - // useful information. If none of the files provides any information, then - // they will end up having `Precision::Absent` values. Throughout calculations, - // missing values will be imputed as: - // - zero for summations, and - // - neutral element for extreme points. - let size = file_schema.fields().len(); - let mut col_stats_set = vec![ColumnStatistics::default(); size]; - let mut num_rows = Precision::::Absent; - let mut total_byte_size = Precision::::Absent; - - // Fusing the stream allows us to call next safely even once it is finished. - let mut all_files = Box::pin(all_files.fuse()); - - if let Some(first_file) = all_files.next().await { - let (mut file, file_stats) = first_file?; - file.statistics = Some(file_stats.as_ref().clone()); - result_files.push(file); - - // First file, we set them directly from the file statistics. - num_rows = file_stats.num_rows; - total_byte_size = file_stats.total_byte_size; - for (index, file_column) in - file_stats.column_statistics.clone().into_iter().enumerate() - { - col_stats_set[index].null_count = file_column.null_count; - col_stats_set[index].max_value = file_column.max_value; - col_stats_set[index].min_value = file_column.min_value; - col_stats_set[index].sum_value = file_column.sum_value; - } - - // If the number of rows exceeds the limit, we can stop processing - // files. This only applies when we know the number of rows. It also - // currently ignores tables that have no statistics regarding the - // number of rows. - let conservative_num_rows = match num_rows { - Precision::Exact(nr) => nr, - _ => usize::MIN, - }; - if conservative_num_rows <= limit.unwrap_or(usize::MAX) { - while let Some(current) = all_files.next().await { - let (mut file, file_stats) = current?; - file.statistics = Some(file_stats.as_ref().clone()); - result_files.push(file); - if !collect_stats { - continue; - } - - // We accumulate the number of rows, total byte size and null - // counts across all the files in question. If any file does not - // provide any information or provides an inexact value, we demote - // the statistic precision to inexact. - num_rows = add_row_stats(file_stats.num_rows, num_rows); - - total_byte_size = - add_row_stats(file_stats.total_byte_size, total_byte_size); - - for (file_col_stats, col_stats) in file_stats - .column_statistics - .iter() - .zip(col_stats_set.iter_mut()) - { - let ColumnStatistics { - null_count: file_nc, - max_value: file_max, - min_value: file_min, - sum_value: file_sum, - distinct_count: _, - } = file_col_stats; - - col_stats.null_count = add_row_stats(*file_nc, col_stats.null_count); - set_max_if_greater(file_max, &mut col_stats.max_value); - set_min_if_lesser(file_min, &mut col_stats.min_value); - col_stats.sum_value = file_sum.add(&col_stats.sum_value); - } - - // If the number of rows exceeds the limit, we can stop processing - // files. This only applies when we know the number of rows. It also - // currently ignores tables that have no statistics regarding the - // number of rows. - if num_rows.get_value().unwrap_or(&usize::MIN) - > &limit.unwrap_or(usize::MAX) - { - break; - } - } - } - }; - - let mut statistics = Statistics { - num_rows, - total_byte_size, - column_statistics: col_stats_set, - }; - if all_files.next().await.is_some() { - // If we still have files in the stream, it means that the limit kicked - // in, and the statistic could have been different had we processed the - // files in a different order. - statistics = statistics.to_inexact() - } - - Ok((result_files, statistics)) -} - -fn add_row_stats( - file_num_rows: Precision, - num_rows: Precision, -) -> Precision { - match (file_num_rows, &num_rows) { - (Precision::Absent, _) => num_rows.to_inexact(), - (lhs, Precision::Absent) => lhs.to_inexact(), - (lhs, rhs) => lhs.add(rhs), - } -} - -/// If the given value is numerically greater than the original maximum value, -/// return the new maximum value with appropriate exactness information. -fn set_max_if_greater( - max_nominee: &Precision, - max_value: &mut Precision, -) { - match (&max_value, max_nominee) { - (Precision::Exact(val1), Precision::Exact(val2)) if val1 < val2 => { - *max_value = max_nominee.clone(); - } - (Precision::Exact(val1), Precision::Inexact(val2)) - | (Precision::Inexact(val1), Precision::Inexact(val2)) - | (Precision::Inexact(val1), Precision::Exact(val2)) - if val1 < val2 => - { - *max_value = max_nominee.clone().to_inexact(); - } - (Precision::Exact(_), Precision::Absent) => { - let exact_max = mem::take(max_value); - *max_value = exact_max.to_inexact(); - } - (Precision::Absent, Precision::Exact(_)) => { - *max_value = max_nominee.clone().to_inexact(); - } - (Precision::Absent, Precision::Inexact(_)) => { - *max_value = max_nominee.clone(); - } - _ => {} - } -} - -/// If the given value is numerically lesser than the original minimum value, -/// return the new minimum value with appropriate exactness information. -fn set_min_if_lesser( - min_nominee: &Precision, - min_value: &mut Precision, -) { - match (&min_value, min_nominee) { - (Precision::Exact(val1), Precision::Exact(val2)) if val1 > val2 => { - *min_value = min_nominee.clone(); - } - (Precision::Exact(val1), Precision::Inexact(val2)) - | (Precision::Inexact(val1), Precision::Inexact(val2)) - | (Precision::Inexact(val1), Precision::Exact(val2)) - if val1 > val2 => - { - *min_value = min_nominee.clone().to_inexact(); - } - (Precision::Exact(_), Precision::Absent) => { - let exact_min = mem::take(min_value); - *min_value = exact_min.to_inexact(); - } - (Precision::Absent, Precision::Exact(_)) => { - *min_value = min_nominee.clone().to_inexact(); - } - (Precision::Absent, Precision::Inexact(_)) => { - *min_value = min_nominee.clone(); - } - _ => {} - } -} diff --git a/datafusion/datasource/src/memory.rs b/datafusion/datasource/src/memory.rs index f2e36672cd5c..6d0e16ef4b91 100644 --- a/datafusion/datasource/src/memory.rs +++ b/datafusion/datasource/src/memory.rs @@ -19,9 +19,12 @@ use std::any::Any; use std::fmt; +use std::fmt::Debug; use std::sync::Arc; +use crate::sink::DataSink; use crate::source::{DataSource, DataSourceExec}; +use async_trait::async_trait; use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType}; use datafusion_physical_plan::memory::MemoryStream; use datafusion_physical_plan::projection::{ @@ -42,6 +45,8 @@ use datafusion_physical_expr::equivalence::ProjectionMapping; use datafusion_physical_expr::expressions::Column; use datafusion_physical_expr::utils::collect_columns; use datafusion_physical_expr::{EquivalenceProperties, LexOrdering}; +use futures::StreamExt; +use tokio::sync::RwLock; /// Execution plan for reading in-memory batches of data #[derive(Clone)] @@ -62,7 +67,7 @@ pub struct MemoryExec { } #[allow(unused, deprecated)] -impl fmt::Debug for MemoryExec { +impl Debug for MemoryExec { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { self.inner.fmt_as(DisplayFormatType::Default, f) } @@ -720,6 +725,91 @@ impl MemorySourceConfig { } } +/// Type alias for partition data +pub type PartitionData = Arc>>; + +/// Implements for writing to a [`MemTable`] +/// +/// [`MemTable`]: +pub struct MemSink { + /// Target locations for writing data + batches: Vec, + schema: SchemaRef, +} + +impl Debug for MemSink { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("MemSink") + .field("num_partitions", &self.batches.len()) + .finish() + } +} + +impl DisplayAs for MemSink { + fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + let partition_count = self.batches.len(); + write!(f, "MemoryTable (partitions={partition_count})") + } + DisplayFormatType::TreeRender => { + // TODO: collect info + write!(f, "") + } + } + } +} + +impl MemSink { + /// Creates a new [`MemSink`]. + /// + /// The caller is responsible for ensuring that there is at least one partition to insert into. + pub fn try_new(batches: Vec, schema: SchemaRef) -> Result { + if batches.is_empty() { + return plan_err!("Cannot insert into MemTable with zero partitions"); + } + Ok(Self { batches, schema }) + } +} + +#[async_trait] +impl DataSink for MemSink { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> &SchemaRef { + &self.schema + } + + async fn write_all( + &self, + mut data: SendableRecordBatchStream, + _context: &Arc, + ) -> Result { + let num_partitions = self.batches.len(); + + // buffer up the data round robin style into num_partitions + + let mut new_batches = vec![vec![]; num_partitions]; + let mut i = 0; + let mut row_count = 0; + while let Some(batch) = data.next().await.transpose()? { + row_count += batch.num_rows(); + new_batches[i].push(batch); + i = (i + 1) % num_partitions; + } + + // write the outputs into the batches + for (target, mut batches) in self.batches.iter().zip(new_batches.into_iter()) { + // Append all the new batches in one go to minimize locking overhead + target.write().await.append(&mut batches); + } + + Ok(row_count as u64) + } +} + #[cfg(test)] mod memory_source_tests { use std::sync::Arc; diff --git a/datafusion/datasource/src/mod.rs b/datafusion/datasource/src/mod.rs index fb119d1b3d2d..1e7ea1255df7 100644 --- a/datafusion/datasource/src/mod.rs +++ b/datafusion/datasource/src/mod.rs @@ -60,6 +60,7 @@ use std::pin::Pin; use std::sync::Arc; pub use self::url::ListingTableUrl; +pub use statistics::get_statistics_with_limit; /// Stream of files get listed from object store pub type PartitionedFileStream = diff --git a/datafusion/datasource/src/statistics.rs b/datafusion/datasource/src/statistics.rs index cd002a96683a..801315568a0d 100644 --- a/datafusion/datasource/src/statistics.rs +++ b/datafusion/datasource/src/statistics.rs @@ -20,8 +20,11 @@ //! Currently, this module houses code to sort file groups if they are non-overlapping with //! respect to the required sort order. See [`MinMaxStatistics`] +use futures::{Stream, StreamExt}; +use std::mem; use std::sync::Arc; +use crate::file_groups::FileGroup; use crate::PartitionedFile; use arrow::array::RecordBatch; @@ -30,9 +33,12 @@ use arrow::{ compute::SortColumn, row::{Row, Rows}, }; +use datafusion_common::stats::Precision; +use datafusion_common::ScalarValue; use datafusion_common::{plan_datafusion_err, plan_err, DataFusionError, Result}; use datafusion_physical_expr::{expressions::Column, PhysicalSortExpr}; use datafusion_physical_expr_common::sort_expr::LexOrdering; +use datafusion_physical_plan::{ColumnStatistics, Statistics}; /// A normalized representation of file min/max statistics that allows for efficient sorting & comparison. /// The min/max values are ordered by [`Self::sort_order`]. @@ -281,3 +287,192 @@ fn sort_columns_from_physical_sort_exprs( .map(|expr| expr.expr.as_any().downcast_ref::()) .collect::>>() } + +/// Get all files as well as the file level summary statistics (no statistic for partition columns). +/// If the optional `limit` is provided, includes only sufficient files. Needed to read up to +/// `limit` number of rows. `collect_stats` is passed down from the configuration parameter on +/// `ListingTable`. If it is false we only construct bare statistics and skip a potentially expensive +/// call to `multiunzip` for constructing file level summary statistics. +pub async fn get_statistics_with_limit( + all_files: impl Stream)>>, + file_schema: SchemaRef, + limit: Option, + collect_stats: bool, +) -> Result<(FileGroup, Statistics)> { + let mut result_files = FileGroup::default(); + // These statistics can be calculated as long as at least one file provides + // useful information. If none of the files provides any information, then + // they will end up having `Precision::Absent` values. Throughout calculations, + // missing values will be imputed as: + // - zero for summations, and + // - neutral element for extreme points. + let size = file_schema.fields().len(); + let mut col_stats_set = vec![ColumnStatistics::default(); size]; + let mut num_rows = Precision::::Absent; + let mut total_byte_size = Precision::::Absent; + + // Fusing the stream allows us to call next safely even once it is finished. + let mut all_files = Box::pin(all_files.fuse()); + + if let Some(first_file) = all_files.next().await { + let (mut file, file_stats) = first_file?; + file.statistics = Some(file_stats.as_ref().clone()); + result_files.push(file); + + // First file, we set them directly from the file statistics. + num_rows = file_stats.num_rows; + total_byte_size = file_stats.total_byte_size; + for (index, file_column) in + file_stats.column_statistics.clone().into_iter().enumerate() + { + col_stats_set[index].null_count = file_column.null_count; + col_stats_set[index].max_value = file_column.max_value; + col_stats_set[index].min_value = file_column.min_value; + col_stats_set[index].sum_value = file_column.sum_value; + } + + // If the number of rows exceeds the limit, we can stop processing + // files. This only applies when we know the number of rows. It also + // currently ignores tables that have no statistics regarding the + // number of rows. + let conservative_num_rows = match num_rows { + Precision::Exact(nr) => nr, + _ => usize::MIN, + }; + if conservative_num_rows <= limit.unwrap_or(usize::MAX) { + while let Some(current) = all_files.next().await { + let (mut file, file_stats) = current?; + file.statistics = Some(file_stats.as_ref().clone()); + result_files.push(file); + if !collect_stats { + continue; + } + + // We accumulate the number of rows, total byte size and null + // counts across all the files in question. If any file does not + // provide any information or provides an inexact value, we demote + // the statistic precision to inexact. + num_rows = add_row_stats(file_stats.num_rows, num_rows); + + total_byte_size = + add_row_stats(file_stats.total_byte_size, total_byte_size); + + for (file_col_stats, col_stats) in file_stats + .column_statistics + .iter() + .zip(col_stats_set.iter_mut()) + { + let ColumnStatistics { + null_count: file_nc, + max_value: file_max, + min_value: file_min, + sum_value: file_sum, + distinct_count: _, + } = file_col_stats; + + col_stats.null_count = add_row_stats(*file_nc, col_stats.null_count); + set_max_if_greater(file_max, &mut col_stats.max_value); + set_min_if_lesser(file_min, &mut col_stats.min_value); + col_stats.sum_value = file_sum.add(&col_stats.sum_value); + } + + // If the number of rows exceeds the limit, we can stop processing + // files. This only applies when we know the number of rows. It also + // currently ignores tables that have no statistics regarding the + // number of rows. + if num_rows.get_value().unwrap_or(&usize::MIN) + > &limit.unwrap_or(usize::MAX) + { + break; + } + } + } + }; + + let mut statistics = Statistics { + num_rows, + total_byte_size, + column_statistics: col_stats_set, + }; + if all_files.next().await.is_some() { + // If we still have files in the stream, it means that the limit kicked + // in, and the statistic could have been different had we processed the + // files in a different order. + statistics = statistics.to_inexact() + } + + Ok((result_files, statistics)) +} + +fn add_row_stats( + file_num_rows: Precision, + num_rows: Precision, +) -> Precision { + match (file_num_rows, &num_rows) { + (Precision::Absent, _) => num_rows.to_inexact(), + (lhs, Precision::Absent) => lhs.to_inexact(), + (lhs, rhs) => lhs.add(rhs), + } +} + +/// If the given value is numerically greater than the original maximum value, +/// return the new maximum value with appropriate exactness information. +fn set_max_if_greater( + max_nominee: &Precision, + max_value: &mut Precision, +) { + match (&max_value, max_nominee) { + (Precision::Exact(val1), Precision::Exact(val2)) if val1 < val2 => { + *max_value = max_nominee.clone(); + } + (Precision::Exact(val1), Precision::Inexact(val2)) + | (Precision::Inexact(val1), Precision::Inexact(val2)) + | (Precision::Inexact(val1), Precision::Exact(val2)) + if val1 < val2 => + { + *max_value = max_nominee.clone().to_inexact(); + } + (Precision::Exact(_), Precision::Absent) => { + let exact_max = mem::take(max_value); + *max_value = exact_max.to_inexact(); + } + (Precision::Absent, Precision::Exact(_)) => { + *max_value = max_nominee.clone().to_inexact(); + } + (Precision::Absent, Precision::Inexact(_)) => { + *max_value = max_nominee.clone(); + } + _ => {} + } +} + +/// If the given value is numerically lesser than the original minimum value, +/// return the new minimum value with appropriate exactness information. +fn set_min_if_lesser( + min_nominee: &Precision, + min_value: &mut Precision, +) { + match (&min_value, min_nominee) { + (Precision::Exact(val1), Precision::Exact(val2)) if val1 > val2 => { + *min_value = min_nominee.clone(); + } + (Precision::Exact(val1), Precision::Inexact(val2)) + | (Precision::Inexact(val1), Precision::Inexact(val2)) + | (Precision::Inexact(val1), Precision::Exact(val2)) + if val1 > val2 => + { + *min_value = min_nominee.clone().to_inexact(); + } + (Precision::Exact(_), Precision::Absent) => { + let exact_min = mem::take(min_value); + *min_value = exact_min.to_inexact(); + } + (Precision::Absent, Precision::Exact(_)) => { + *min_value = min_nominee.clone().to_inexact(); + } + (Precision::Absent, Precision::Inexact(_)) => { + *min_value = min_nominee.clone(); + } + _ => {} + } +}