@@ -52,6 +52,7 @@ use datafusion_common::{
52
52
53
53
// backwards compatibility
54
54
use crate :: display:: PgJsonVisitor ;
55
+ use crate :: logical_plan:: tree_node:: unwrap_arc;
55
56
pub use datafusion_common:: display:: { PlanType , StringifiedPlan , ToStringifiedPlan } ;
56
57
pub use datafusion_common:: { JoinConstraint , JoinType } ;
57
58
@@ -467,6 +468,200 @@ impl LogicalPlan {
467
468
self . with_new_exprs ( self . expressions ( ) , inputs. to_vec ( ) )
468
469
}
469
470
471
+ /// Recomputes schema and type information for this LogicalPlan if needed.
472
+ ///
473
+ /// Some `LogicalPlan`s may need to recompute their schema if the number or
474
+ /// type of expressions have been changed (for example due to type
475
+ /// coercion). For example [`LogicalPlan::Projection`]s schema depends on
476
+ /// its expressions.
477
+ ///
478
+ /// Some `LogicalPlan`s schema is unaffected by any changes to their
479
+ /// expressions. For example [`LogicalPlan::Filter`] schema is always the
480
+ /// same as its input schema.
481
+ ///
482
+ /// # Return value
483
+ /// Returns an error if there is some issue recomputing the schema.
484
+ ///
485
+ /// # Notes
486
+ ///
487
+ /// * Does not recursively recompute schema for input (child) plans.
488
+ pub fn recompute_schema ( self ) -> Result < Self > {
489
+ match self {
490
+ // Since expr may be different than the previous expr, schema of the projection
491
+ // may change. We need to use try_new method instead of try_new_with_schema method.
492
+ LogicalPlan :: Projection ( Projection {
493
+ expr,
494
+ input,
495
+ schema : _,
496
+ } ) => Projection :: try_new ( expr, input) . map ( LogicalPlan :: Projection ) ,
497
+ LogicalPlan :: Dml ( _) => Ok ( self ) ,
498
+ LogicalPlan :: Copy ( _) => Ok ( self ) ,
499
+ LogicalPlan :: Values ( Values { schema, values } ) => {
500
+ // todo it isn't clear why the schema is not recomputed here
501
+ Ok ( LogicalPlan :: Values ( Values { schema, values } ) )
502
+ }
503
+ LogicalPlan :: Filter ( Filter { predicate, input } ) => {
504
+ // todo: should this logic be moved to Filter::try_new?
505
+
506
+ // filter predicates should not contain aliased expressions so we remove any aliases
507
+ // before this logic was added we would have aliases within filters such as for
508
+ // benchmark q6:
509
+ //
510
+ // lineitem.l_shipdate >= Date32(\"8766\")
511
+ // AND lineitem.l_shipdate < Date32(\"9131\")
512
+ // AND CAST(lineitem.l_discount AS Decimal128(30, 15)) AS lineitem.l_discount >=
513
+ // Decimal128(Some(49999999999999),30,15)
514
+ // AND CAST(lineitem.l_discount AS Decimal128(30, 15)) AS lineitem.l_discount <=
515
+ // Decimal128(Some(69999999999999),30,15)
516
+ // AND lineitem.l_quantity < Decimal128(Some(2400),15,2)
517
+
518
+ let predicate = predicate
519
+ . transform_down ( |expr| {
520
+ match expr {
521
+ Expr :: Exists { .. }
522
+ | Expr :: ScalarSubquery ( _)
523
+ | Expr :: InSubquery ( _) => {
524
+ // subqueries could contain aliases so we don't recurse into those
525
+ Ok ( Transformed :: new ( expr, false , TreeNodeRecursion :: Jump ) )
526
+ }
527
+ Expr :: Alias ( _) => Ok ( Transformed :: new (
528
+ expr. unalias ( ) ,
529
+ true ,
530
+ TreeNodeRecursion :: Jump ,
531
+ ) ) ,
532
+ _ => Ok ( Transformed :: no ( expr) ) ,
533
+ }
534
+ } )
535
+ . data ( ) ?;
536
+
537
+ Filter :: try_new ( predicate, input) . map ( LogicalPlan :: Filter )
538
+ }
539
+ LogicalPlan :: Repartition ( _) => Ok ( self ) ,
540
+ LogicalPlan :: Window ( Window {
541
+ input,
542
+ window_expr,
543
+ schema : _,
544
+ } ) => Window :: try_new ( window_expr, input) . map ( LogicalPlan :: Window ) ,
545
+ LogicalPlan :: Aggregate ( Aggregate {
546
+ input,
547
+ group_expr,
548
+ aggr_expr,
549
+ schema : _,
550
+ } ) => Aggregate :: try_new ( input, group_expr, aggr_expr)
551
+ . map ( LogicalPlan :: Aggregate ) ,
552
+ LogicalPlan :: Sort ( _) => Ok ( self ) ,
553
+ LogicalPlan :: Join ( Join {
554
+ left,
555
+ right,
556
+ filter,
557
+ join_type,
558
+ join_constraint,
559
+ on,
560
+ schema : _,
561
+ null_equals_null,
562
+ } ) => {
563
+ let schema =
564
+ build_join_schema ( left. schema ( ) , right. schema ( ) , & join_type) ?;
565
+
566
+ let new_on: Vec < _ > = on
567
+ . into_iter ( )
568
+ . map ( |equi_expr| {
569
+ // SimplifyExpression rule may add alias to the equi_expr.
570
+ ( equi_expr. 0 . unalias ( ) , equi_expr. 1 . unalias ( ) )
571
+ } )
572
+ . collect ( ) ;
573
+
574
+ Ok ( LogicalPlan :: Join ( Join {
575
+ left,
576
+ right,
577
+ join_type,
578
+ join_constraint,
579
+ on : new_on,
580
+ filter,
581
+ schema : DFSchemaRef :: new ( schema) ,
582
+ null_equals_null,
583
+ } ) )
584
+ }
585
+ LogicalPlan :: CrossJoin ( CrossJoin {
586
+ left,
587
+ right,
588
+ schema : _,
589
+ } ) => {
590
+ let join_schema =
591
+ build_join_schema ( left. schema ( ) , right. schema ( ) , & JoinType :: Inner ) ?;
592
+
593
+ Ok ( LogicalPlan :: CrossJoin ( CrossJoin {
594
+ left,
595
+ right,
596
+ schema : join_schema. into ( ) ,
597
+ } ) )
598
+ }
599
+ LogicalPlan :: Subquery ( _) => Ok ( self ) ,
600
+ LogicalPlan :: SubqueryAlias ( SubqueryAlias {
601
+ input,
602
+ alias,
603
+ schema : _,
604
+ } ) => SubqueryAlias :: try_new ( input, alias) . map ( LogicalPlan :: SubqueryAlias ) ,
605
+ LogicalPlan :: Limit ( _) => Ok ( self ) ,
606
+ LogicalPlan :: Ddl ( _) => Ok ( self ) ,
607
+ LogicalPlan :: Extension ( Extension { node } ) => {
608
+ // todo make an API that does not require cloning
609
+ // This requires a copy of the extension nodes expressions and inputs
610
+ let expr = node. expressions ( ) ;
611
+ let inputs: Vec < _ > = node. inputs ( ) . into_iter ( ) . cloned ( ) . collect ( ) ;
612
+ Ok ( LogicalPlan :: Extension ( Extension {
613
+ node : node. from_template ( & expr, & inputs) ,
614
+ } ) )
615
+ }
616
+ LogicalPlan :: Union ( Union { inputs, schema } ) => {
617
+ let input_schema = inputs[ 0 ] . schema ( ) ;
618
+ // If inputs are not pruned do not change schema
619
+ // TODO this seems wrong (shouldn't we always use the schema of the input?)
620
+ let schema = if schema. fields ( ) . len ( ) == input_schema. fields ( ) . len ( ) {
621
+ schema. clone ( )
622
+ } else {
623
+ input_schema. clone ( )
624
+ } ;
625
+ Ok ( LogicalPlan :: Union ( Union { inputs, schema } ) )
626
+ }
627
+ LogicalPlan :: Distinct ( distinct) => {
628
+ let distinct = match distinct {
629
+ Distinct :: All ( input) => Distinct :: All ( input) ,
630
+ Distinct :: On ( DistinctOn {
631
+ on_expr,
632
+ select_expr,
633
+ sort_expr,
634
+ input,
635
+ schema : _,
636
+ } ) => Distinct :: On ( DistinctOn :: try_new (
637
+ on_expr,
638
+ select_expr,
639
+ sort_expr,
640
+ input,
641
+ ) ?) ,
642
+ } ;
643
+ Ok ( LogicalPlan :: Distinct ( distinct) )
644
+ }
645
+ LogicalPlan :: RecursiveQuery ( _) => Ok ( self ) ,
646
+ LogicalPlan :: Analyze ( _) => Ok ( self ) ,
647
+ LogicalPlan :: Explain ( _) => Ok ( self ) ,
648
+ LogicalPlan :: Prepare ( _) => Ok ( self ) ,
649
+ LogicalPlan :: TableScan ( _) => Ok ( self ) ,
650
+ LogicalPlan :: EmptyRelation ( _) => Ok ( self ) ,
651
+ LogicalPlan :: Statement ( _) => Ok ( self ) ,
652
+ LogicalPlan :: DescribeTable ( _) => Ok ( self ) ,
653
+ LogicalPlan :: Unnest ( Unnest {
654
+ input,
655
+ columns,
656
+ schema : _,
657
+ options,
658
+ } ) => {
659
+ // Update schema with unnested column type.
660
+ unnest_with_options ( unwrap_arc ( input) , columns, options)
661
+ }
662
+ }
663
+ }
664
+
470
665
/// Returns a new `LogicalPlan` based on `self` with inputs and
471
666
/// expressions replaced.
472
667
///
0 commit comments