diff --git a/datafusion/common/src/error.rs b/datafusion/common/src/error.rs index 0b4aab1dc7e1..af3a774e06bd 100644 --- a/datafusion/common/src/error.rs +++ b/datafusion/common/src/error.rs @@ -136,6 +136,12 @@ pub enum DataFusionError { /// human-readable messages, and locations in the source query that relate /// to the error in some way. Diagnostic(Box, Box), + /// A [`DataFusionError`] which shares an underlying [`DataFusionError`]. + /// + /// This is useful when the same underlying [`DataFusionError`] is passed + /// to multiple receivers. For example, when the source of a repartition + /// errors and the error is propagated to multiple consumers. + Shared(Arc), } #[macro_export] @@ -262,6 +268,17 @@ impl From for ArrowError { } } +impl From<&Arc> for DataFusionError { + fn from(e: &Arc) -> Self { + if let DataFusionError::Shared(e_inner) = e.as_ref() { + // don't re-wrap + DataFusionError::Shared(Arc::clone(e_inner)) + } else { + DataFusionError::Shared(Arc::clone(e)) + } + } +} + #[cfg(feature = "parquet")] impl From for DataFusionError { fn from(e: ParquetError) -> Self { @@ -298,7 +315,16 @@ impl From for DataFusionError { impl From for DataFusionError { fn from(err: GenericError) -> Self { - DataFusionError::External(err) + // If the error is already a DataFusionError, not wrapping it. + if err.is::() { + if let Ok(e) = err.downcast::() { + *e + } else { + unreachable!() + } + } else { + DataFusionError::External(err) + } } } @@ -334,6 +360,7 @@ impl Error for DataFusionError { DataFusionError::Context(_, e) => Some(e.as_ref()), DataFusionError::Substrait(_) => None, DataFusionError::Diagnostic(_, e) => Some(e.as_ref()), + DataFusionError::Shared(e) => Some(e.as_ref()), } } } @@ -448,6 +475,7 @@ impl DataFusionError { DataFusionError::Context(_, _) => "", DataFusionError::Substrait(_) => "Substrait error: ", DataFusionError::Diagnostic(_, _) => "", + DataFusionError::Shared(_) => "", } } @@ -489,6 +517,7 @@ impl DataFusionError { } DataFusionError::Substrait(ref desc) => Cow::Owned(desc.to_string()), DataFusionError::Diagnostic(_, ref err) => Cow::Owned(err.to_string()), + DataFusionError::Shared(ref desc) => Cow::Owned(desc.to_string()), } } @@ -713,7 +742,7 @@ pub fn unqualified_field_not_found(name: &str, schema: &DFSchema) -> DataFusionE mod test { use std::sync::Arc; - use crate::error::DataFusionError; + use crate::error::{DataFusionError, GenericError}; use arrow::error::ArrowError; #[test] @@ -867,6 +896,43 @@ mod test { ); } + #[test] + fn external_error() { + // assert not wrapping DataFusionError + let generic_error: GenericError = + Box::new(DataFusionError::Plan("test".to_string())); + let datafusion_error: DataFusionError = generic_error.into(); + println!("{}", datafusion_error.strip_backtrace()); + assert_eq!( + datafusion_error.strip_backtrace(), + "Error during planning: test" + ); + + // assert wrapping other Error + let generic_error: GenericError = + Box::new(std::io::Error::new(std::io::ErrorKind::Other, "io error")); + let datafusion_error: DataFusionError = generic_error.into(); + println!("{}", datafusion_error.strip_backtrace()); + assert_eq!( + datafusion_error.strip_backtrace(), + "External error: io error" + ); + } + + #[test] + fn external_error_no_recursive() { + let generic_error_1: GenericError = + Box::new(std::io::Error::new(std::io::ErrorKind::Other, "io error")); + let external_error_1: DataFusionError = generic_error_1.into(); + let generic_error_2: GenericError = Box::new(external_error_1); + let external_error_2: DataFusionError = generic_error_2.into(); + + println!("{}", external_error_2); + assert!(external_error_2 + .to_string() + .starts_with("External error: io error")); + } + /// Model what happens when implementing SendableRecordBatchStream: /// DataFusion code needs to return an ArrowError fn return_arrow_error() -> arrow::error::Result<()> { diff --git a/datafusion/physical-plan/src/joins/cross_join.rs b/datafusion/physical-plan/src/joins/cross_join.rs index ab94c132a209..cec717a25cf0 100644 --- a/datafusion/physical-plan/src/joins/cross_join.rs +++ b/datafusion/physical-plan/src/joins/cross_join.rs @@ -867,7 +867,7 @@ mod tests { assert_contains!( err.to_string(), - "External error: Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: CrossJoinExec" + "Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: CrossJoinExec" ); Ok(()) diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index 76e535d93b7e..b4e03b57e87f 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -4014,7 +4014,7 @@ mod tests { // Asserting that operator-level reservation attempting to overallocate assert_contains!( err.to_string(), - "External error: Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: HashJoinInput" + "Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: HashJoinInput" ); assert_contains!( @@ -4095,7 +4095,7 @@ mod tests { // Asserting that stream-level reservation attempting to overallocate assert_contains!( err.to_string(), - "External error: Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: HashJoinInput[1]" + "Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: HashJoinInput[1]" ); diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs b/datafusion/physical-plan/src/joins/nested_loop_join.rs index 50c411ccd785..5bd2658dc719 100644 --- a/datafusion/physical-plan/src/joins/nested_loop_join.rs +++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs @@ -1514,7 +1514,7 @@ pub(crate) mod tests { assert_contains!( err.to_string(), - "External error: Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: NestedLoopJoinLoad[0]" + "Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: NestedLoopJoinLoad[0]" ); } diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index dbe90077bc8c..61b7d2a06c5f 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -1077,7 +1077,7 @@ impl OnceFut { OnceFutState::Ready(r) => Poll::Ready( r.as_ref() .map(|r| r.as_ref()) - .map_err(|e| DataFusionError::External(Box::new(Arc::clone(e)))), + .map_err(DataFusionError::from), ), } } @@ -1091,10 +1091,9 @@ impl OnceFut { match &self.state { OnceFutState::Pending(_) => unreachable!(), - OnceFutState::Ready(r) => Poll::Ready( - r.clone() - .map_err(|e| DataFusionError::External(Box::new(e))), - ), + OnceFutState::Ready(r) => { + Poll::Ready(r.clone().map_err(DataFusionError::Shared)) + } } } } diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index 63658340f432..c01c0a9564e0 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -911,11 +911,12 @@ impl RepartitionExec { } // Error from running input task Ok(Err(e)) => { + // send the same Arc'd error to all output partitions let e = Arc::new(e); for (_, tx) in txs { // wrap it because need to send error to all output partitions - let err = Err(DataFusionError::External(Box::new(Arc::clone(&e)))); + let err = Err(DataFusionError::from(&e)); tx.send(Some(err)).await.ok(); } } diff --git a/datafusion/sqllogictest/test_files/aggregate.slt b/datafusion/sqllogictest/test_files/aggregate.slt index d6a8c428ac46..7caa81d64e5b 100644 --- a/datafusion/sqllogictest/test_files/aggregate.slt +++ b/datafusion/sqllogictest/test_files/aggregate.slt @@ -142,7 +142,7 @@ statement error DataFusion error: Error during planning: Failed to coerce argume SELECT approx_percentile_cont_with_weight(c3, c2, c1) FROM aggregate_test_100 # csv_query_approx_percentile_cont_with_histogram_bins -statement error DataFusion error: External error: This feature is not implemented: Tdigest max_size value for 'APPROX_PERCENTILE_CONT' must be UInt > 0 literal \(got data type Int64\)\. +statement error DataFusion error: This feature is not implemented: Tdigest max_size value for 'APPROX_PERCENTILE_CONT' must be UInt > 0 literal \(got data type Int64\)\. SELECT c1, approx_percentile_cont(c3, 0.95, -1000) AS c3_p95 FROM aggregate_test_100 GROUP BY 1 ORDER BY 1 statement error DataFusion error: Error during planning: Failed to coerce arguments to satisfy a call to 'approx_percentile_cont' function: coercion from \[Int16, Float64, Utf8\] to the signature OneOf(.*) failed(.|\n)* diff --git a/datafusion/sqllogictest/test_files/errors.slt b/datafusion/sqllogictest/test_files/errors.slt index c54ba16e972a..7e2af5b9cbc9 100644 --- a/datafusion/sqllogictest/test_files/errors.slt +++ b/datafusion/sqllogictest/test_files/errors.slt @@ -161,3 +161,12 @@ create table records (timestamp timestamp, value float) as values ( '2021-01-01 00:00:00', 1.0, '2021-01-01 00:00:00', 2.0 ); + +statement ok +CREATE TABLE tab0(col0 INTEGER, col1 INTEGER, col2 INTEGER); + +statement ok +INSERT INTO tab0 VALUES(83,0,38); + +query error DataFusion error: Arrow error: Divide by zero error +SELECT DISTINCT - 84 FROM tab0 AS cor0 WHERE NOT + 96 / + col1 <= NULL GROUP BY col1, col0; \ No newline at end of file