Skip to content

Commit 97bd84a

Browse files
authored
refactor: Move Memtable to catalog (#15459)
* first iteration * fix: CI * move statistics * Merge relics * MemSink to datasource * clean stuff + backward compatibility * fix: cargo doc * fix:fmt
1 parent a81ab3a commit 97bd84a

File tree

9 files changed

+602
-586
lines changed

9 files changed

+602
-586
lines changed

datafusion/catalog/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ pub use catalog::*;
5050
pub use datafusion_session::Session;
5151
pub use dynamic_file::catalog::*;
5252
pub use memory::{
53-
MemoryCatalogProvider, MemoryCatalogProviderList, MemorySchemaProvider,
53+
MemTable, MemoryCatalogProvider, MemoryCatalogProviderList, MemorySchemaProvider,
5454
};
5555
pub use r#async::*;
5656
pub use schema::*;

datafusion/catalog/src/memory/mod.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,12 @@
1717

1818
pub(crate) mod catalog;
1919
pub(crate) mod schema;
20+
pub(crate) mod table;
2021

2122
pub use catalog::*;
2223
pub use schema::*;
24+
pub use table::*;
25+
26+
// backward compatibility
27+
pub use datafusion_datasource::memory::MemorySourceConfig;
28+
pub use datafusion_datasource::source::DataSourceExec;
Lines changed: 296 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,296 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! [`MemTable`] for querying `Vec<RecordBatch>` by DataFusion.
19+
20+
use std::any::Any;
21+
use std::collections::HashMap;
22+
use std::fmt::Debug;
23+
use std::sync::Arc;
24+
25+
use crate::TableProvider;
26+
use datafusion_common::error::Result;
27+
use datafusion_expr::Expr;
28+
use datafusion_expr::TableType;
29+
use datafusion_physical_expr::create_physical_sort_exprs;
30+
use datafusion_physical_plan::repartition::RepartitionExec;
31+
use datafusion_physical_plan::{
32+
common, ExecutionPlan, ExecutionPlanProperties, Partitioning,
33+
};
34+
35+
use arrow::datatypes::SchemaRef;
36+
use arrow::record_batch::RecordBatch;
37+
use datafusion_common::{not_impl_err, plan_err, Constraints, DFSchema, SchemaExt};
38+
use datafusion_common_runtime::JoinSet;
39+
use datafusion_datasource::memory::MemSink;
40+
use datafusion_datasource::memory::MemorySourceConfig;
41+
use datafusion_datasource::sink::DataSinkExec;
42+
use datafusion_datasource::source::DataSourceExec;
43+
use datafusion_expr::dml::InsertOp;
44+
use datafusion_expr::SortExpr;
45+
use datafusion_session::Session;
46+
47+
use async_trait::async_trait;
48+
use futures::StreamExt;
49+
use log::debug;
50+
use parking_lot::Mutex;
51+
use tokio::sync::RwLock;
52+
53+
// backward compatibility
54+
pub use datafusion_datasource::memory::PartitionData;
55+
56+
/// In-memory data source for presenting a `Vec<RecordBatch>` as a
57+
/// data source that can be queried by DataFusion. This allows data to
58+
/// be pre-loaded into memory and then repeatedly queried without
59+
/// incurring additional file I/O overhead.
60+
#[derive(Debug)]
61+
pub struct MemTable {
62+
schema: SchemaRef,
63+
// batches used to be pub(crate), but it's needed to be public for the tests
64+
pub batches: Vec<PartitionData>,
65+
constraints: Constraints,
66+
column_defaults: HashMap<String, Expr>,
67+
/// Optional pre-known sort order(s). Must be `SortExpr`s.
68+
/// inserting data into this table removes the order
69+
pub sort_order: Arc<Mutex<Vec<Vec<SortExpr>>>>,
70+
}
71+
72+
impl MemTable {
73+
/// Create a new in-memory table from the provided schema and record batches
74+
pub fn try_new(schema: SchemaRef, partitions: Vec<Vec<RecordBatch>>) -> Result<Self> {
75+
for batches in partitions.iter().flatten() {
76+
let batches_schema = batches.schema();
77+
if !schema.contains(&batches_schema) {
78+
debug!(
79+
"mem table schema does not contain batches schema. \
80+
Target_schema: {schema:?}. Batches Schema: {batches_schema:?}"
81+
);
82+
return plan_err!("Mismatch between schema and batches");
83+
}
84+
}
85+
86+
Ok(Self {
87+
schema,
88+
batches: partitions
89+
.into_iter()
90+
.map(|e| Arc::new(RwLock::new(e)))
91+
.collect::<Vec<_>>(),
92+
constraints: Constraints::empty(),
93+
column_defaults: HashMap::new(),
94+
sort_order: Arc::new(Mutex::new(vec![])),
95+
})
96+
}
97+
98+
/// Assign constraints
99+
pub fn with_constraints(mut self, constraints: Constraints) -> Self {
100+
self.constraints = constraints;
101+
self
102+
}
103+
104+
/// Assign column defaults
105+
pub fn with_column_defaults(
106+
mut self,
107+
column_defaults: HashMap<String, Expr>,
108+
) -> Self {
109+
self.column_defaults = column_defaults;
110+
self
111+
}
112+
113+
/// Specify an optional pre-known sort order(s). Must be `SortExpr`s.
114+
///
115+
/// If the data is not sorted by this order, DataFusion may produce
116+
/// incorrect results.
117+
///
118+
/// DataFusion may take advantage of this ordering to omit sorts
119+
/// or use more efficient algorithms.
120+
///
121+
/// Note that multiple sort orders are supported, if some are known to be
122+
/// equivalent,
123+
pub fn with_sort_order(self, mut sort_order: Vec<Vec<SortExpr>>) -> Self {
124+
std::mem::swap(self.sort_order.lock().as_mut(), &mut sort_order);
125+
self
126+
}
127+
128+
/// Create a mem table by reading from another data source
129+
pub async fn load(
130+
t: Arc<dyn TableProvider>,
131+
output_partitions: Option<usize>,
132+
state: &dyn Session,
133+
) -> Result<Self> {
134+
let schema = t.schema();
135+
let constraints = t.constraints();
136+
let exec = t.scan(state, None, &[], None).await?;
137+
let partition_count = exec.output_partitioning().partition_count();
138+
139+
let mut join_set = JoinSet::new();
140+
141+
for part_idx in 0..partition_count {
142+
let task = state.task_ctx();
143+
let exec = Arc::clone(&exec);
144+
join_set.spawn(async move {
145+
let stream = exec.execute(part_idx, task)?;
146+
common::collect(stream).await
147+
});
148+
}
149+
150+
let mut data: Vec<Vec<RecordBatch>> =
151+
Vec::with_capacity(exec.output_partitioning().partition_count());
152+
153+
while let Some(result) = join_set.join_next().await {
154+
match result {
155+
Ok(res) => data.push(res?),
156+
Err(e) => {
157+
if e.is_panic() {
158+
std::panic::resume_unwind(e.into_panic());
159+
} else {
160+
unreachable!();
161+
}
162+
}
163+
}
164+
}
165+
166+
let mut exec = DataSourceExec::new(Arc::new(MemorySourceConfig::try_new(
167+
&data,
168+
Arc::clone(&schema),
169+
None,
170+
)?));
171+
if let Some(cons) = constraints {
172+
exec = exec.with_constraints(cons.clone());
173+
}
174+
175+
if let Some(num_partitions) = output_partitions {
176+
let exec = RepartitionExec::try_new(
177+
Arc::new(exec),
178+
Partitioning::RoundRobinBatch(num_partitions),
179+
)?;
180+
181+
// execute and collect results
182+
let mut output_partitions = vec![];
183+
for i in 0..exec.properties().output_partitioning().partition_count() {
184+
// execute this *output* partition and collect all batches
185+
let task_ctx = state.task_ctx();
186+
let mut stream = exec.execute(i, task_ctx)?;
187+
let mut batches = vec![];
188+
while let Some(result) = stream.next().await {
189+
batches.push(result?);
190+
}
191+
output_partitions.push(batches);
192+
}
193+
194+
return MemTable::try_new(Arc::clone(&schema), output_partitions);
195+
}
196+
MemTable::try_new(Arc::clone(&schema), data)
197+
}
198+
}
199+
200+
#[async_trait]
201+
impl TableProvider for MemTable {
202+
fn as_any(&self) -> &dyn Any {
203+
self
204+
}
205+
206+
fn schema(&self) -> SchemaRef {
207+
Arc::clone(&self.schema)
208+
}
209+
210+
fn constraints(&self) -> Option<&Constraints> {
211+
Some(&self.constraints)
212+
}
213+
214+
fn table_type(&self) -> TableType {
215+
TableType::Base
216+
}
217+
218+
async fn scan(
219+
&self,
220+
state: &dyn Session,
221+
projection: Option<&Vec<usize>>,
222+
_filters: &[Expr],
223+
_limit: Option<usize>,
224+
) -> Result<Arc<dyn ExecutionPlan>> {
225+
let mut partitions = vec![];
226+
for arc_inner_vec in self.batches.iter() {
227+
let inner_vec = arc_inner_vec.read().await;
228+
partitions.push(inner_vec.clone())
229+
}
230+
231+
let mut source =
232+
MemorySourceConfig::try_new(&partitions, self.schema(), projection.cloned())?;
233+
234+
let show_sizes = state.config_options().explain.show_sizes;
235+
source = source.with_show_sizes(show_sizes);
236+
237+
// add sort information if present
238+
let sort_order = self.sort_order.lock();
239+
if !sort_order.is_empty() {
240+
let df_schema = DFSchema::try_from(self.schema.as_ref().clone())?;
241+
242+
let file_sort_order = sort_order
243+
.iter()
244+
.map(|sort_exprs| {
245+
create_physical_sort_exprs(
246+
sort_exprs,
247+
&df_schema,
248+
state.execution_props(),
249+
)
250+
})
251+
.collect::<Result<Vec<_>>>()?;
252+
source = source.try_with_sort_information(file_sort_order)?;
253+
}
254+
255+
Ok(DataSourceExec::from_data_source(source))
256+
}
257+
258+
/// Returns an ExecutionPlan that inserts the execution results of a given [`ExecutionPlan`] into this [`MemTable`].
259+
///
260+
/// The [`ExecutionPlan`] must have the same schema as this [`MemTable`].
261+
///
262+
/// # Arguments
263+
///
264+
/// * `state` - The [`SessionState`] containing the context for executing the plan.
265+
/// * `input` - The [`ExecutionPlan`] to execute and insert.
266+
///
267+
/// # Returns
268+
///
269+
/// * A plan that returns the number of rows written.
270+
///
271+
/// [`SessionState`]: https://docs.rs/datafusion/latest/datafusion/execution/session_state/struct.SessionState.html
272+
async fn insert_into(
273+
&self,
274+
_state: &dyn Session,
275+
input: Arc<dyn ExecutionPlan>,
276+
insert_op: InsertOp,
277+
) -> Result<Arc<dyn ExecutionPlan>> {
278+
// If we are inserting into the table, any sort order may be messed up so reset it here
279+
*self.sort_order.lock() = vec![];
280+
281+
// Create a physical plan from the logical plan.
282+
// Check that the schema of the plan matches the schema of this table.
283+
self.schema()
284+
.logically_equivalent_names_and_types(&input.schema())?;
285+
286+
if insert_op != InsertOp::Append {
287+
return not_impl_err!("{insert_op} not implemented for MemoryTable yet");
288+
}
289+
let sink = MemSink::try_new(self.batches.clone(), Arc::clone(&self.schema))?;
290+
Ok(Arc::new(DataSinkExec::new(input, Arc::new(sink), None)))
291+
}
292+
293+
fn get_column_default(&self, column: &str) -> Option<&Expr> {
294+
self.column_defaults.get(column)
295+
}
296+
}

0 commit comments

Comments
 (0)