Skip to content

Commit 2d3e2a1

Browse files
committed
feat: collect more errors
1 parent 03afbf6 commit 2d3e2a1

File tree

3 files changed

+94
-10
lines changed

3 files changed

+94
-10
lines changed

datafusion/common/src/error.rs

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,7 @@ pub enum DataFusionError {
133133
/// or serializing/deserializing protobytes to Substrait plans
134134
Substrait(String),
135135
Diagnostic(Diagnostic, Box<DataFusionError>),
136+
Collection(Vec<DataFusionError>),
136137
}
137138

138139
#[macro_export]
@@ -319,6 +320,7 @@ impl Error for DataFusionError {
319320
DataFusionError::Context(_, e) => Some(e.as_ref()),
320321
DataFusionError::Substrait(_) => None,
321322
DataFusionError::Diagnostic(_, e) => Some(e.as_ref()),
323+
DataFusionError::Collection(_) => None,
322324
}
323325
}
324326
}
@@ -442,6 +444,7 @@ impl DataFusionError {
442444
DataFusionError::Context(_, _) => "",
443445
DataFusionError::Substrait(_) => "Substrait error: ",
444446
DataFusionError::Diagnostic(_, _) => "",
447+
DataFusionError::Collection(_) => "",
445448
}
446449
}
447450

@@ -483,6 +486,13 @@ impl DataFusionError {
483486
}
484487
DataFusionError::Substrait(ref desc) => Cow::Owned(desc.to_string()),
485488
DataFusionError::Diagnostic(_, ref err) => Cow::Owned(err.to_string()),
489+
DataFusionError::Collection(ref v) => Cow::Owned(format!(
490+
"[{}]",
491+
v.iter()
492+
.map(|e| e.to_string())
493+
.collect::<Vec<_>>()
494+
.join(", ")
495+
)),
486496
}
487497
}
488498

@@ -517,6 +527,60 @@ impl DataFusionError {
517527

518528
DiagnosticsIterator { head: self }
519529
}
530+
531+
pub fn get_individual_errors(&self) -> impl Iterator<Item = &Self> + '_ {
532+
fn contains_collection(err: &DataFusionError) -> bool {
533+
let mut head = err;
534+
loop {
535+
if let DataFusionError::Collection(_) = head {
536+
return true;
537+
}
538+
539+
if let Some(source) = head
540+
.source()
541+
.map(|source| source.downcast_ref::<DataFusionError>())
542+
.flatten()
543+
{
544+
head = source;
545+
} else {
546+
return false;
547+
}
548+
}
549+
}
550+
551+
struct IndividualErrorsIterator<'a> {
552+
queue: Vec<&'a DataFusionError>,
553+
}
554+
555+
impl<'a> Iterator for IndividualErrorsIterator<'a> {
556+
type Item = &'a DataFusionError;
557+
558+
fn next(&mut self) -> Option<Self::Item> {
559+
while let Some(err) = self.queue.pop() {
560+
if !contains_collection(err) {
561+
return Some(err);
562+
}
563+
564+
if let DataFusionError::Collection(errs) = err {
565+
self.queue.extend(errs.iter());
566+
continue;
567+
}
568+
569+
if let Some(source) = err
570+
.source()
571+
.map(|source| source.downcast_ref::<DataFusionError>())
572+
.flatten()
573+
{
574+
self.queue.push(source);
575+
}
576+
}
577+
578+
None
579+
}
580+
}
581+
582+
IndividualErrorsIterator { queue: vec![self] }
583+
}
520584
}
521585

522586
/// Unwrap an `Option` if possible. Otherwise return an `DataFusionError::Internal`.

datafusion/expr/src/utils.rs

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -703,7 +703,7 @@ pub fn exprlist_to_fields<'a>(
703703
// Look for exact match in plan's output schema
704704
let wildcard_schema = find_base_plan(plan).schema();
705705
let input_schema = plan.schema();
706-
let result = exprs
706+
let (fields, errs) = exprs
707707
.into_iter()
708708
.map(|e| match e {
709709
Expr::Wildcard { qualifier, options } => match qualifier {
@@ -759,11 +759,19 @@ pub fn exprlist_to_fields<'a>(
759759
},
760760
_ => Ok(vec![e.to_field(input_schema)?]),
761761
})
762-
.collect::<Result<Vec<_>>>()?
763-
.into_iter()
764-
.flatten()
765-
.collect();
766-
Ok(result)
762+
.fold((vec![], vec![]), |(mut fields, mut errs), result| {
763+
match result {
764+
Ok(this_fields) => fields.extend(this_fields),
765+
Err(err) => errs.push(err),
766+
}
767+
(fields, errs)
768+
});
769+
770+
if !errs.is_empty() {
771+
Err(DataFusionError::Collection(errs))
772+
} else {
773+
Ok(fields)
774+
}
767775
}
768776

769777
/// Find the suitable base plan to expand the wildcard expression recursively.

datafusion/sql/src/utils.rs

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -123,29 +123,41 @@ pub(crate) fn check_columns_satisfy_exprs(
123123
Expr::Column(_) => Ok(()),
124124
_ => internal_err!("Expr::Column are required"),
125125
})?;
126+
let mut errs = vec![];
126127
let column_exprs = find_column_exprs(exprs);
127128
for e in &column_exprs {
128129
match e {
129130
Expr::GroupingSet(GroupingSet::Rollup(exprs)) => {
130131
for e in exprs {
131-
check_column_satisfies_expr(columns, e, call_purpose)?;
132+
if let Err(err) = check_column_satisfies_expr(columns, e, call_purpose) {
133+
errs.push(err);
134+
}
132135
}
133136
}
134137
Expr::GroupingSet(GroupingSet::Cube(exprs)) => {
135138
for e in exprs {
136-
check_column_satisfies_expr(columns, e, call_purpose)?;
139+
if let Err(err) = check_column_satisfies_expr(columns, e, call_purpose) {
140+
errs.push(err);
141+
}
137142
}
138143
}
139144
Expr::GroupingSet(GroupingSet::GroupingSets(lists_of_exprs)) => {
140145
for exprs in lists_of_exprs {
141146
for e in exprs {
142-
check_column_satisfies_expr(columns, e, call_purpose)?;
147+
if let Err(err) = check_column_satisfies_expr(columns, e, call_purpose) {
148+
errs.push(err);
149+
}
143150
}
144151
}
145152
}
146-
_ => check_column_satisfies_expr(columns, e, call_purpose)?,
153+
_ => if let Err(err) = check_column_satisfies_expr(columns, e, call_purpose) {
154+
errs.push(err);
155+
},
147156
}
148157
}
158+
if !errs.is_empty() {
159+
return Err(DataFusionError::Collection(errs));
160+
}
149161
Ok(())
150162
}
151163

0 commit comments

Comments
 (0)