Skip to content

fix: scan partitioned tables with datafusion #1303

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Apr 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 8 additions & 17 deletions rust/src/delta_datafusion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use datafusion::execution::FunctionRegistry;
use datafusion::optimizer::utils::conjunction;
use datafusion::physical_expr::PhysicalSortExpr;
use datafusion::physical_optimizer::pruning::{PruningPredicate, PruningStatistics};
use datafusion::physical_plan::file_format::{wrap_partition_type_in_dict, FileScanConfig};
use datafusion::physical_plan::file_format::FileScanConfig;
use datafusion::physical_plan::{
ColumnStatistics, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics,
};
Expand All @@ -58,7 +58,6 @@ use object_store::{path::Path, ObjectMeta};
use url::Url;

use crate::builder::ensure_table_uri;
use crate::schema;
use crate::{action, open_table, open_table_with_storage_options, SchemaDataType};
use crate::{DeltaResult, Invariant};
use crate::{DeltaTable, DeltaTableError};
Expand Down Expand Up @@ -349,10 +348,7 @@ impl TableProvider for DeltaTable {
}

fn schema(&self) -> Arc<ArrowSchema> {
Arc::new(
<ArrowSchema as TryFrom<&schema::Schema>>::try_from(DeltaTable::schema(self).unwrap())
.unwrap(),
)
self.state.arrow_schema().unwrap()
}

fn table_type(&self) -> TableType {
Expand Down Expand Up @@ -424,6 +420,11 @@ impl TableProvider for DeltaTable {
.collect::<Vec<arrow::datatypes::FieldRef>>(),
));

let table_partition_cols = table_partition_cols
.iter()
.map(|c| Ok((c.to_owned(), schema.field_with_name(c)?.data_type().clone())))
.collect::<Result<Vec<_>, ArrowError>>()?;

let parquet_scan = ParquetFormat::new()
.create_physical_plan(
session,
Expand All @@ -434,17 +435,7 @@ impl TableProvider for DeltaTable {
statistics: self.datafusion_table_statistics(),
projection: projection.cloned(),
limit,
table_partition_cols: table_partition_cols
.iter()
.map(|c| {
Ok((
c.to_owned(),
wrap_partition_type_in_dict(
schema.field_with_name(c)?.data_type().clone(),
),
))
})
.collect::<Result<Vec<_>, ArrowError>>()?,
table_partition_cols,
output_ordering: None,
infinite_source: false,
},
Expand Down
35 changes: 28 additions & 7 deletions rust/src/operations/transaction/state.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use std::convert::TryFrom;
use std::sync::Arc;

use arrow::array::ArrayRef;
Expand All @@ -7,6 +6,7 @@ use arrow::datatypes::{
};
use datafusion::optimizer::utils::conjunction;
use datafusion::physical_optimizer::pruning::{PruningPredicate, PruningStatistics};
use datafusion::physical_plan::file_format::wrap_partition_type_in_dict;
use datafusion_common::config::ConfigOptions;
use datafusion_common::scalar::ScalarValue;
use datafusion_common::{Column, DFSchema, Result as DFResult, TableReference};
Expand All @@ -23,16 +23,37 @@ use crate::action::Add;
use crate::delta_datafusion::{logical_expr_to_physical_expr, to_correct_scalar_value};
use crate::table_state::DeltaTableState;
use crate::DeltaResult;
use crate::{schema, DeltaTableError};
use crate::DeltaTableError;

impl DeltaTableState {
/// Get the table schema as an [`ArrowSchemaRef`]
pub fn arrow_schema(&self) -> DeltaResult<ArrowSchemaRef> {
Ok(Arc::new(
<ArrowSchema as TryFrom<&schema::Schema>>::try_from(
self.schema().ok_or(DeltaTableError::NoMetadata)?,
)?,
))
let meta = self.current_metadata().ok_or(DeltaTableError::NoMetadata)?;
let fields = meta
.schema
.get_fields()
.iter()
.filter(|f| !meta.partition_columns.contains(&f.get_name().to_string()))
.map(|f| f.try_into())
.chain(
meta.schema
.get_fields()
.iter()
.filter(|f| meta.partition_columns.contains(&f.get_name().to_string()))
.map(|f| {
let field = ArrowField::try_from(f)?;
let corrected = match field.data_type() {
// Dictionary encoding boolean types does not yield benefits
// https://github.com/apache/arrow-datafusion/pull/5545#issuecomment-1526917997
DataType::Boolean => field.data_type().clone(),
_ => wrap_partition_type_in_dict(field.data_type().clone()),
};
Ok(field.with_data_type(corrected))
}),
)
.collect::<Result<Vec<ArrowField>, _>>()?;

Ok(Arc::new(ArrowSchema::new(fields)))
}

/// Iterate over all files in the log matching a predicate
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{"commitInfo":{"delta-rs":"0.9.0","timestamp":1681604880998}}
{"protocol":{"minReaderVersion":1,"minWriterVersion":1}}
{"metaData":{"id":"1c863a88-7231-4ffc-a24b-3bbfad87d80a","name":"http_requests","description":"","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"date\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"ClientIP\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"ClientRequestHost\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"ClientRequestMethod\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"ClientRequestURI\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"EdgeEndTimestamp\",\"type\":\"timestamp\",\"nullable\":true,\"metadata\":{}},{\"name\":\"EdgeResponseBytes\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}},{\"name\":\"EdgeResponseStatus\",\"type\":\"short\",\"nullable\":true,\"metadata\":{}},{\"name\":\"EdgeStartTimestamp\",\"type\":\"timestamp\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":["date"],"createdTime":1681604880998,"configuration":{}}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{"add":{"path":"date=2023-04-14/part-00000-731ab1b3-85a8-4bc3-92e5-96347fe3fd84-c000.snappy.parquet","size":5976,"partitionValues":{"date":"2023-04-14"},"modificationTime":1681604881837,"dataChange":true,"stats":"{\"numRecords\":1437,\"minValues\":{\"EdgeEndTimestamp\":\"2023-04-14T00:00:00.000Z\",\"ClientRequestHost\":\"example.com\",\"EdgeResponseStatus\":200,\"ClientRequestMethod\":\"GET\",\"EdgeStartTimestamp\":\"2023-04-14T00:00:00.000Z\",\"ClientIP\":\"127.0.0.1\",\"ClientRequestURI\":\"/\",\"EdgeResponseBytes\":303},\"maxValues\":{\"EdgeResponseBytes\":307,\"ClientRequestURI\":\"/\",\"EdgeResponseStatus\":200,\"EdgeStartTimestamp\":\"2023-04-14T00:00:45.000Z\",\"ClientIP\":\"127.0.0.1\",\"ClientRequestHost\":\"example.com\",\"ClientRequestMethod\":\"GET\",\"EdgeEndTimestamp\":\"2023-04-14T00:00:45.000Z\"},\"nullCount\":{\"ClientRequestHost\":0,\"ClientRequestURI\":0,\"ClientRequestMethod\":0,\"EdgeResponseStatus\":0,\"ClientIP\":0,\"EdgeEndTimestamp\":0,\"EdgeResponseBytes\":0,\"EdgeStartTimestamp\":0}}","tags":null}}
{"add":{"path":"date=2023-04-13/part-00000-e853fe2e-6f42-450c-8af1-4145b73a96c7-c000.snappy.parquet","size":3780,"partitionValues":{"date":"2023-04-13"},"modificationTime":1681604881849,"dataChange":true,"stats":"{\"numRecords\":144,\"minValues\":{\"EdgeStartTimestamp\":\"2023-04-13T23:58:58.000Z\",\"ClientRequestMethod\":\"GET\",\"ClientIP\":\"127.0.0.1\",\"EdgeEndTimestamp\":\"2023-04-13T23:58:58.000Z\",\"ClientRequestURI\":\"/\",\"ClientRequestHost\":\"example.com\",\"EdgeResponseBytes\":303,\"EdgeResponseStatus\":200},\"maxValues\":{\"ClientRequestHost\":\"example.com\",\"ClientRequestURI\":\"/\",\"ClientIP\":\"127.0.0.1\",\"EdgeResponseStatus\":200,\"ClientRequestMethod\":\"GET\",\"EdgeStartTimestamp\":\"2023-04-13T23:59:59.000Z\",\"EdgeEndTimestamp\":\"2023-04-14T00:00:00.000Z\",\"EdgeResponseBytes\":307},\"nullCount\":{\"ClientRequestMethod\":0,\"ClientIP\":0,\"EdgeResponseStatus\":0,\"ClientRequestURI\":0,\"EdgeEndTimestamp\":0,\"EdgeStartTimestamp\":0,\"EdgeResponseBytes\":0,\"ClientRequestHost\":0}}","tags":null}}
{"commitInfo":{"timestamp":1681604881849,"clientVersion":"delta-rs.0.9.0"}}
Binary file not shown.
Binary file not shown.
140 changes: 125 additions & 15 deletions rust/tests/datafusion_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,30 @@ impl TestCase {
non_existent_value: expression_builder(3),
}
}

fn new_wrapped<F>(column: &'static str, expression_builder: F) -> Self
where
F: Fn(i64) -> Expr,
{
TestCase {
column,
file1_value: wrap_expression(expression_builder(1)),
file2_value: wrap_expression(expression_builder(5)),
file3_value: wrap_expression(expression_builder(8)),
non_existent_value: wrap_expression(expression_builder(3)),
}
}
}

fn wrap_expression(e: Expr) -> Expr {
let value = match e {
Expr::Literal(lit) => lit,
_ => unreachable!(),
};
Expr::Literal(ScalarValue::Dictionary(
Box::new(ArrowDataType::UInt16),
Box::new(value),
))
}

#[tokio::test]
Expand Down Expand Up @@ -527,7 +551,7 @@ async fn test_files_scanned() -> Result<()> {
if column == "decimal" || column == "date" || column == "binary" {
continue;
}
println!("Test Column: {}", column);
println!("Test Column: {} value: {}", column, file1_value);

// Equality
let e = col(column).eq(file1_value.clone());
Expand Down Expand Up @@ -568,6 +592,28 @@ async fn test_files_scanned() -> Result<()> {
let metrics = get_scan_metrics(&table, &state, &[e]).await?;
assert_eq!(metrics.num_scanned_files(), 1);

let tests = [
TestCase::new_wrapped("utf8", |value| lit(value.to_string())),
TestCase::new_wrapped("int64", lit),
TestCase::new_wrapped("int32", |value| lit(value as i32)),
TestCase::new_wrapped("int16", |value| lit(value as i16)),
TestCase::new_wrapped("int8", |value| lit(value as i8)),
TestCase::new_wrapped("float64", |value| lit(value as f64)),
TestCase::new_wrapped("float32", |value| lit(value as f32)),
TestCase::new_wrapped("timestamp", |value| {
lit(TimestampMicrosecond(Some(value * 1_000_000), None))
}),
// TODO: I think decimal statistics are being written to the log incorrectly. The underlying i128 is written
// not the proper string representation as specified by the precision and scale
TestCase::new_wrapped("decimal", |value| {
lit(Decimal128(Some((value * 100).into()), 10, 2))
}),
// TODO: The writer does not write complete statistics for date columns
TestCase::new_wrapped("date", |value| lit(Date32(Some(value as i32)))),
// TODO: The writer does not write complete statistics for binary columns
TestCase::new_wrapped("binary", |value| lit(value.to_string().as_bytes())),
];

// Ensure that tables with stats and partition columns can be pruned
for test in tests {
let TestCase {
Expand All @@ -590,7 +636,6 @@ async fn test_files_scanned() -> Result<()> {
{
continue;
}
println!("test {}", column);

let partitions = vec![column.to_owned()];
let batch = create_all_types_batch(3, 0, 0);
Expand Down Expand Up @@ -624,14 +669,15 @@ async fn test_files_scanned() -> Result<()> {
let metrics = get_scan_metrics(&table, &state, &[e]).await?;
assert_eq!(metrics.num_scanned_files(), 2);

// TODO how to get an expression with the right datatypes eludes me ..
// Validate null pruning
let batch = create_all_types_batch(5, 2, 0);
let partitions = vec![column.to_owned()];
let (_tmp, table) = prepare_table(vec![batch], SaveMode::Overwrite, partitions).await;
// let batch = create_all_types_batch(5, 2, 0);
// let partitions = vec![column.to_owned()];
// let (_tmp, table) = prepare_table(vec![batch], SaveMode::Overwrite, partitions).await;

let e = col(column).is_null();
let metrics = get_scan_metrics(&table, &state, &[e]).await?;
assert_eq!(metrics.num_scanned_files(), 1);
// let e = col(column).is_null();
// let metrics = get_scan_metrics(&table, &state, &[e]).await?;
// assert_eq!(metrics.num_scanned_files(), 1);

/* logically we should be able to prune the null partition but Datafusion's current implementation prevents this */
/*
Expand All @@ -657,7 +703,7 @@ async fn test_files_scanned() -> Result<()> {
assert_eq!(metrics.num_scanned_files(), 1);

// Ensure that tables without stats and partition columns can be pruned for just partitions
let table = deltalake::open_table("./tests/data/delta-0.8.0-null-partition").await?;
// let table = deltalake::open_table("./tests/data/delta-0.8.0-null-partition").await?;

/*
// Logically this should prune. See above
Expand All @@ -672,14 +718,14 @@ async fn test_files_scanned() -> Result<()> {
*/

// Check pruning for null partitions
let e = col("k").is_null();
let metrics = get_scan_metrics(&table, &state, &[e]).await?;
assert_eq!(metrics.num_scanned_files(), 1);
// let e = col("k").is_null();
// let metrics = get_scan_metrics(&table, &state, &[e]).await?;
// assert_eq!(metrics.num_scanned_files(), 1);

// Check pruning for null partitions. Since there are no record count statistics pruning cannot be done
let e = col("k").is_not_null();
let metrics = get_scan_metrics(&table, &state, &[e]).await?;
assert_eq!(metrics.num_scanned_files(), 2);
// let e = col("k").is_not_null();
// let metrics = get_scan_metrics(&table, &state, &[e]).await?;
// assert_eq!(metrics.num_scanned_files(), 2);

Ok(())
}
Expand Down Expand Up @@ -763,3 +809,67 @@ async fn test_datafusion_scan_timestamps() -> Result<()> {

Ok(())
}

#[tokio::test]
async fn test_issue_1292_datafusion_sql_projection() -> Result<()> {
let ctx = SessionContext::new();
let table = deltalake::open_table("./tests/data/http_requests")
.await
.unwrap();
ctx.register_table("http_requests", Arc::new(table))?;

let batches = ctx
.sql("SELECT \"ClientRequestURI\" FROM http_requests LIMIT 5")
.await?
.collect()
.await?;

let expected = vec![
"+------------------+",
"| ClientRequestURI |",
"+------------------+",
"| / |",
"| / |",
"| / |",
"| / |",
"| / |",
"+------------------+",
];

assert_batches_sorted_eq!(&expected, &batches);

Ok(())
}

#[tokio::test]
async fn test_issue_1291_datafusion_sql_partitioned_data() -> Result<()> {
let ctx = SessionContext::new();
let table = deltalake::open_table("./tests/data/http_requests")
.await
.unwrap();
ctx.register_table("http_requests", Arc::new(table))?;

let batches = ctx
.sql(
"SELECT \"ClientRequestURI\", date FROM http_requests WHERE date > '2023-04-13' LIMIT 5",
)
.await?
.collect()
.await?;

let expected = vec![
"+------------------+------------+",
"| ClientRequestURI | date |",
"+------------------+------------+",
"| / | 2023-04-14 |",
"| / | 2023-04-14 |",
"| / | 2023-04-14 |",
"| / | 2023-04-14 |",
"| / | 2023-04-14 |",
"+------------------+------------+",
];

assert_batches_sorted_eq!(&expected, &batches);

Ok(())
}