Skip to content

Add nested struct casting support and integrate into SchemaAdapter #16371

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 67 commits into from
Jun 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
67 commits
Select commit Hold shift + click to select a range
2706808
feat: add NestedStructSchemaAdapter for handling schema evolution of …
kosiew Jun 10, 2025
dba76b0
Fix: unwrap result of schema adapter factory in ParquetSource configu…
kosiew Jun 10, 2025
3b7223a
feat: add example for handling schema evolution with nested structs i…
kosiew Jun 10, 2025
c1059e7
nested_struct2 <- optimizer_rule
kosiew Jun 10, 2025
9ffeec3
Revert "nested_struct2 <- optimizer_rule"
kosiew Jun 10, 2025
b5b3ed3
add nested_struct2.rs for testing
kosiew Jun 10, 2025
3c711a5
feat: implement CustomParquetDataSource using NestedStructSchemaAdapt…
kosiew Jun 10, 2025
0e38592
feat: add schema adapter factory support to ListingTableConfig
kosiew Jun 10, 2025
e8f5b95
feat: ListingTable apply schema adapter to file source
kosiew Jun 10, 2025
6fcf1a7
feat: nested_struct2 - fix test
kosiew Jun 10, 2025
f170169
feat: remove nested_struct2 example file
kosiew Jun 10, 2025
f6997cd
refactor: reorganize imports in nested_schema_adapter.rs for clarity
kosiew Jun 10, 2025
3fb1b70
feat: use custom schema adapter factory
kosiew Jun 11, 2025
11b7463
refactor schema mapping with column adapter
kosiew Jun 11, 2025
368774f
Remove unused create_adapter
kosiew Jun 11, 2025
6eb22af
evaluate-create_adapter-for-public-api into nested-struct-14757
kosiew Jun 11, 2025
a9661e7
refactor: reorganize imports and simplify closure in NestedStructSche…
kosiew Jun 11, 2025
d822cfc
Merge branch 'main' into nested-struct-14757
kosiew Jun 11, 2025
8b87fc6
refactor: enhance ListingTableConfig with Default implementation and …
kosiew Jun 11, 2025
71d06fe
refactor: amend with_schema_adapter_factory method in ListingTable to…
kosiew Jun 11, 2025
22ef8ad
refactor: add create_file_source_with_schema_adapter method to Listin…
kosiew Jun 11, 2025
f99205e
refactor: implement FailingMapSchemaAdapterFactory for enhanced schem…
kosiew Jun 11, 2025
1c887d7
split nested_schema_adapter.rs into smaller modules—e.g. factory.rs,…
kosiew Jun 11, 2025
e4913d1
refactor: reorder imports in tests module for improved clarity
kosiew Jun 11, 2025
e70ce79
refactor: enhance adapt_column and adapt_struct_column for improved c…
kosiew Jun 11, 2025
dafb68e
refactor: impl Debug for SchemaMapping adapt_column
kosiew Jun 11, 2025
2c1dd3c
Merge branch 'main' into nested-struct-14757
kosiew Jun 11, 2025
fd8929c
refactor: update imports in tests module for clarity and organization
kosiew Jun 12, 2025
034716f
refactor: rename source to file_source for clarity in TableProvider i…
kosiew Jun 12, 2025
2413723
refactor: extract schema adapter creation into a separate method for …
kosiew Jun 12, 2025
85d41da
refactor: simplify test setup by introducing a helper function for cr…
kosiew Jun 12, 2025
6a25f27
test: replace magic number with constant for null count in tests
kosiew Jun 12, 2025
d42bd34
fix: specify type for DUMMY_NULL_COUNT constant in tests for clarity
kosiew Jun 12, 2025
1140ce7
fix: update DUMMY_NULL_COUNT constant type from u8 to usize for accuracy
kosiew Jun 12, 2025
c48a43f
test: refactor schema adapter error tests to use rstest for improved …
kosiew Jun 12, 2025
fc85f36
fix: update comment for clarity on DUMMY_NULL_COUNT usage in tests
kosiew Jun 12, 2025
9b078ce
refactor: organize imports and clean up test module structure
kosiew Jun 12, 2025
fd8cb10
feat: add nested_struct module and adapt_column function for handling…
kosiew Jun 12, 2025
e3bcaf2
fix: reorder imports to maintain consistency in nested_struct module
kosiew Jun 12, 2025
a4ff0dd
fix cargo fmt errors
kosiew Jun 12, 2025
2f6f91a
Merge branch 'main' into nested-struct-14757
kosiew Jun 12, 2025
5120d68
fix: add Default trait implementation for SchemaSource enum
kosiew Jun 12, 2025
2cb97b9
docs: add comment indicating tests for adapt_column function
kosiew Jun 12, 2025
5dd339e
refactor(tests): reorganize and enhance schema evolution tests for ne…
kosiew Jun 12, 2025
8809c3d
reorganize tests
kosiew Jun 12, 2025
d08e058
test: add unit tests for adapt_column function handling nested structs
kosiew Jun 12, 2025
68ad4fc
refactor(tests): introduce helper function for downcasting columns in…
kosiew Jun 12, 2025
9d1daa2
refactor(tests): reorganize imports, simplify get_column_as lifetime
kosiew Jun 12, 2025
e974348
refactor(tests): replace get_column_as function with macro for downca…
kosiew Jun 12, 2025
755abb5
refactor(tests): remove unused helper functions for nested schema cre…
kosiew Jun 12, 2025
013bf89
fix clippy error
kosiew Jun 12, 2025
15df575
Merge branch 'main' into nested-struct-14757
kosiew Jun 13, 2025
948a9be
Merge branch 'main' into nested-struct-14757
kosiew Jun 17, 2025
909eee7
Merge branch 'main' into nested-struct-14757
kosiew Jun 19, 2025
2862a50
Merge branch 'main' into nested-struct-14757
kosiew Jun 20, 2025
07aa2aa
refactor: rename adapt_column to cast_column
kosiew Jun 20, 2025
e392878
refactor: cast_struct_column - error if source is not struct
kosiew Jun 20, 2025
267768e
refactor: enhance struct compatibility validation for casting operations
kosiew Jun 20, 2025
53ab183
remove unused import
kosiew Jun 20, 2025
3a766fd
refactor: remove schema adapter factory and related code from Listing…
kosiew Jun 20, 2025
8874489
Merge branch 'main' into nested-struct-14757
kosiew Jun 20, 2025
6fa8c89
fix: tests
kosiew Jun 20, 2025
96f97b7
refactor: rename AdaptColumnFn to CastColumnFn for clarity
kosiew Jun 20, 2025
2bc8256
feat: move and make validate_struct_compatibility public for nested …
kosiew Jun 20, 2025
5be2fa7
fix: update code block syntax in documentation
kosiew Jun 20, 2025
33f8bdc
Merge branch 'main' into nested-struct-14757
kosiew Jun 20, 2025
4456419
Integrate nested schema support into DefaultSchemaAdapter
kosiew Jun 27, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions datafusion/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ pub mod file_options;
pub mod format;
pub mod hash_utils;
pub mod instant;
pub mod nested_struct;
mod null_equality;
pub mod parsers;
pub mod pruning;
Expand Down
329 changes: 329 additions & 0 deletions datafusion/common/src/nested_struct.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,329 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use crate::error::{DataFusionError, Result, _plan_err};
use arrow::{
array::{new_null_array, Array, ArrayRef, StructArray},
compute::cast,
datatypes::{DataType::Struct, Field, FieldRef},
};
use std::sync::Arc;

/// Cast a struct column to match target struct fields, handling nested structs recursively.
///
/// This function implements struct-to-struct casting with the assumption that **structs should
/// always be allowed to cast to other structs**. However, the source column must already be
/// a struct type - non-struct sources will result in an error.
///
/// ## Field Matching Strategy
/// - **By Name**: Source struct fields are matched to target fields by name (case-sensitive)
/// - **Type Adaptation**: When a matching field is found, it is recursively cast to the target field's type
/// - **Missing Fields**: Target fields not present in the source are filled with null values
/// - **Extra Fields**: Source fields not present in the target are ignored
///
/// ## Nested Struct Handling
/// - Nested structs are handled recursively using the same casting rules
/// - Each level of nesting follows the same field matching and null-filling strategy
/// - This allows for complex struct transformations while maintaining data integrity
///
/// # Arguments
/// * `source_col` - The source array to cast (must be a struct array)
/// * `target_fields` - The target struct field definitions to cast to
///
/// # Returns
/// A `Result<ArrayRef>` containing the cast struct array
///
/// # Errors
/// Returns a `DataFusionError::Plan` if the source column is not a struct type
fn cast_struct_column(
source_col: &ArrayRef,
target_fields: &[Arc<Field>],
) -> Result<ArrayRef> {
if let Some(struct_array) = source_col.as_any().downcast_ref::<StructArray>() {
let mut children: Vec<(Arc<Field>, Arc<dyn Array>)> = Vec::new();
let num_rows = source_col.len();

for target_child_field in target_fields {
let field_arc = Arc::clone(target_child_field);
match struct_array.column_by_name(target_child_field.name()) {
Some(source_child_col) => {
let adapted_child =
cast_column(source_child_col, target_child_field)?;
children.push((field_arc, adapted_child));
}
None => {
children.push((
field_arc,
new_null_array(target_child_field.data_type(), num_rows),
));
}
}
}

let struct_array = StructArray::from(children);
Ok(Arc::new(struct_array))
} else {
// Return error if source is not a struct type
Err(DataFusionError::Plan(format!(
"Cannot cast column of type {:?} to struct type. Source must be a struct to cast to struct.",
source_col.data_type()
)))
}
}

/// Cast a column to match the target field type, with special handling for nested structs.
///
/// This function serves as the main entry point for column casting operations. For struct
/// types, it enforces that **only struct columns can be cast to struct types**.
///
/// ## Casting Behavior
/// - **Struct Types**: Delegates to `cast_struct_column` for struct-to-struct casting only
/// - **Non-Struct Types**: Uses Arrow's standard `cast` function for primitive type conversions
///
/// ## Struct Casting Requirements
/// The struct casting logic requires that the source column must already be a struct type.
/// This makes the function useful for:
/// - Schema evolution scenarios where struct layouts change over time
/// - Data migration between different struct schemas
/// - Type-safe data processing pipelines that maintain struct type integrity
///
/// # Arguments
/// * `source_col` - The source array to cast
/// * `target_field` - The target field definition (including type and metadata)
///
/// # Returns
/// A `Result<ArrayRef>` containing the cast array
///
/// # Errors
/// Returns an error if:
/// - Attempting to cast a non-struct column to a struct type
/// - Arrow's cast function fails for non-struct types
/// - Memory allocation fails during struct construction
/// - Invalid data type combinations are encountered
pub fn cast_column(source_col: &ArrayRef, target_field: &Field) -> Result<ArrayRef> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the ability to cast from one struct to another type is really nice

However, I am worried that by only supporting struct -> struct casting in this module (and then in the NestedSchemaAdapter below) most DataFusion users won't be able to take advantage of this feature

Have you considered changing all uses of

use arrow::compute::cast;

To use this function instead?

I think that woudl nicely close #14757 as well as improve the situation with #14396?

For example

> select {'a': 1};
+----------------------------------+
| named_struct(Utf8("a"),Int64(1)) |
+----------------------------------+
| {a: 1}                           |
+----------------------------------+
1 row(s) fetched.
Elapsed 0.037 seconds.

> select {'a': 1} = {'a': 2, 'b': NULL};
Error during planning: Cannot infer common argument type for comparison operation Struct(a Int64) = Struct(a Int64, b Null)

Copy link
Contributor Author

@kosiew kosiew Jun 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

match target_field.data_type() {
Struct(target_fields) => cast_struct_column(source_col, target_fields),
_ => Ok(cast(source_col, target_field.data_type())?),
}
}

/// Validates compatibility between source and target struct fields for casting operations.
///
/// This function implements comprehensive struct compatibility checking by examining:
/// - Field name matching between source and target structs
/// - Type castability for each matching field (including recursive struct validation)
/// - Proper handling of missing fields (target fields not in source are allowed - filled with nulls)
/// - Proper handling of extra fields (source fields not in target are allowed - ignored)
///
/// # Compatibility Rules
/// - **Field Matching**: Fields are matched by name (case-sensitive)
/// - **Missing Target Fields**: Allowed - will be filled with null values during casting
/// - **Extra Source Fields**: Allowed - will be ignored during casting
/// - **Type Compatibility**: Each matching field must be castable using Arrow's type system
/// - **Nested Structs**: Recursively validates nested struct compatibility
///
/// # Arguments
/// * `source_fields` - Fields from the source struct type
/// * `target_fields` - Fields from the target struct type
///
/// # Returns
/// * `Ok(true)` if the structs are compatible for casting
/// * `Err(DataFusionError)` with detailed error message if incompatible
///
/// # Examples
/// ```text
/// // Compatible: source has extra field, target has missing field
/// // Source: {a: i32, b: string, c: f64}
/// // Target: {a: i64, d: bool}
/// // Result: Ok(true) - 'a' can cast i32->i64, 'b','c' ignored, 'd' filled with nulls
///
/// // Incompatible: matching field has incompatible types
/// // Source: {a: string}
/// // Target: {a: binary}
/// // Result: Err(...) - string cannot cast to binary
/// ```
pub fn validate_struct_compatibility(
source_fields: &[FieldRef],
target_fields: &[FieldRef],
) -> Result<bool> {
// Check compatibility for each target field
for target_field in target_fields {
// Look for matching field in source by name
if let Some(source_field) = source_fields
.iter()
.find(|f| f.name() == target_field.name())
{
// Check if the matching field types are compatible
match (source_field.data_type(), target_field.data_type()) {
// Recursively validate nested structs
(Struct(source_nested), Struct(target_nested)) => {
validate_struct_compatibility(source_nested, target_nested)?;
}
// For non-struct types, use the existing castability check
_ => {
if !arrow::compute::can_cast_types(
source_field.data_type(),
target_field.data_type(),
) {
return _plan_err!(
"Cannot cast struct field '{}' from type {:?} to type {:?}",
target_field.name(),
source_field.data_type(),
target_field.data_type()
);
}
}
}
}
// Missing fields in source are OK - they'll be filled with nulls
}

// Extra fields in source are OK - they'll be ignored
Ok(true)
}

#[cfg(test)]
mod tests {
use super::*;
use arrow::{
array::{Int32Array, Int64Array, StringArray},
datatypes::{DataType, Field},
};
/// Macro to extract and downcast a column from a StructArray
macro_rules! get_column_as {
($struct_array:expr, $column_name:expr, $array_type:ty) => {
$struct_array
.column_by_name($column_name)
.unwrap()
.as_any()
.downcast_ref::<$array_type>()
.unwrap()
};
}

#[test]
fn test_cast_simple_column() {
let source = Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef;
let target_field = Field::new("ints", DataType::Int64, true);
let result = cast_column(&source, &target_field).unwrap();
let result = result.as_any().downcast_ref::<Int64Array>().unwrap();
assert_eq!(result.len(), 3);
assert_eq!(result.value(0), 1);
assert_eq!(result.value(1), 2);
assert_eq!(result.value(2), 3);
}

#[test]
fn test_cast_struct_with_missing_field() {
let a_array = Arc::new(Int32Array::from(vec![1, 2])) as ArrayRef;
let source_struct = StructArray::from(vec![(
Arc::new(Field::new("a", DataType::Int32, true)),
Arc::clone(&a_array),
)]);
let source_col = Arc::new(source_struct) as ArrayRef;

let target_field = Field::new(
"s",
Struct(
vec![
Arc::new(Field::new("a", DataType::Int32, true)),
Arc::new(Field::new("b", DataType::Utf8, true)),
]
.into(),
),
true,
);

let result = cast_column(&source_col, &target_field).unwrap();
let struct_array = result.as_any().downcast_ref::<StructArray>().unwrap();
assert_eq!(struct_array.fields().len(), 2);
let a_result = get_column_as!(&struct_array, "a", Int32Array);
assert_eq!(a_result.value(0), 1);
assert_eq!(a_result.value(1), 2);

let b_result = get_column_as!(&struct_array, "b", StringArray);
assert_eq!(b_result.len(), 2);
assert!(b_result.is_null(0));
assert!(b_result.is_null(1));
}

#[test]
fn test_cast_struct_source_not_struct() {
let source = Arc::new(Int32Array::from(vec![10, 20])) as ArrayRef;
let target_field = Field::new(
"s",
Struct(vec![Arc::new(Field::new("a", DataType::Int32, true))].into()),
true,
);

let result = cast_column(&source, &target_field);
assert!(result.is_err());
let error_msg = result.unwrap_err().to_string();
assert!(error_msg.contains("Cannot cast column of type"));
assert!(error_msg.contains("to struct type"));
assert!(error_msg.contains("Source must be a struct"));
}

#[test]
fn test_validate_struct_compatibility_incompatible_types() {
// Source struct: {field1: Binary, field2: String}
let source_fields = vec![
Arc::new(Field::new("field1", DataType::Binary, true)),
Arc::new(Field::new("field2", DataType::Utf8, true)),
];

// Target struct: {field1: Int32}
let target_fields = vec![Arc::new(Field::new("field1", DataType::Int32, true))];

let result = validate_struct_compatibility(&source_fields, &target_fields);
assert!(result.is_err());
let error_msg = result.unwrap_err().to_string();
assert!(error_msg.contains("Cannot cast struct field 'field1'"));
assert!(error_msg.contains("Binary"));
assert!(error_msg.contains("Int32"));
}

#[test]
fn test_validate_struct_compatibility_compatible_types() {
// Source struct: {field1: Int32, field2: String}
let source_fields = vec![
Arc::new(Field::new("field1", DataType::Int32, true)),
Arc::new(Field::new("field2", DataType::Utf8, true)),
];

// Target struct: {field1: Int64} (Int32 can cast to Int64)
let target_fields = vec![Arc::new(Field::new("field1", DataType::Int64, true))];

let result = validate_struct_compatibility(&source_fields, &target_fields);
assert!(result.is_ok());
assert!(result.unwrap());
}

#[test]
fn test_validate_struct_compatibility_missing_field_in_source() {
// Source struct: {field2: String} (missing field1)
let source_fields = vec![Arc::new(Field::new("field2", DataType::Utf8, true))];

// Target struct: {field1: Int32}
let target_fields = vec![Arc::new(Field::new("field1", DataType::Int32, true))];

// Should be OK - missing fields will be filled with nulls
let result = validate_struct_compatibility(&source_fields, &target_fields);
assert!(result.is_ok());
assert!(result.unwrap());
}
}
18 changes: 10 additions & 8 deletions datafusion/core/src/datasource/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,25 +51,26 @@ pub use datafusion_physical_expr::create_ordering;
#[cfg(all(test, feature = "parquet"))]
mod tests {

use datafusion_datasource::schema_adapter::{
DefaultSchemaAdapterFactory, SchemaAdapter, SchemaAdapterFactory, SchemaMapper,
};

use crate::prelude::SessionContext;
use ::object_store::{path::Path, ObjectMeta};
use arrow::{
array::{Int32Array, StringArray},
datatypes::{DataType, Field, Schema, SchemaRef},
record_batch::RecordBatch,
};
use datafusion_common::{record_batch, test_util::batches_to_sort_string};
use datafusion_datasource::{
file::FileSource, file_scan_config::FileScanConfigBuilder,
source::DataSourceExec, PartitionedFile,
file::FileSource,
file_scan_config::FileScanConfigBuilder,
schema_adapter::{
DefaultSchemaAdapterFactory, SchemaAdapter, SchemaAdapterFactory,
SchemaMapper,
},
source::DataSourceExec,
PartitionedFile,
};
use datafusion_datasource_parquet::source::ParquetSource;
use datafusion_execution::object_store::ObjectStoreUrl;
use datafusion_physical_plan::collect;
use object_store::{path::Path, ObjectMeta};
use std::{fs, sync::Arc};
use tempfile::TempDir;

Expand All @@ -79,6 +80,7 @@ mod tests {
// record batches returned from parquet. This can be useful for schema evolution
// where older files may not have all columns.

use datafusion_execution::object_store::ObjectStoreUrl;
let tmp_dir = TempDir::new().unwrap();
let table_dir = tmp_dir.path().join("parquet_test");
fs::DirBuilder::new().create(table_dir.as_path()).unwrap();
Expand Down
Loading