diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index b68c47c57eb9..3bedc299fc2a 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -843,6 +843,10 @@ message PhysicalExprNode { } } +message PhysicalExprNodeCollection { + repeated PhysicalExprNode exprs = 1; +} + message PhysicalScalarUdfNode { string name = 1; repeated PhysicalExprNode args = 2; @@ -1248,3 +1252,31 @@ message PartitionStats { int64 num_bytes = 3; repeated datafusion_common.ColumnStats column_stats = 4; } + +message PhysicalPlanProperties { + EquivalenceProperties eq_properties = 1; + Partitioning partitioning = 2; + ExecutionMode mode = 3; +} + +message EquivalenceProperties { + EquivalenceGroup group = 1; + repeated PhysicalSortExprNodeCollection output_ordering_equivalence = 2; + repeated ConstExpr constants = 3; + datafusion_common.Schema schema = 4; +} + +message ConstExpr { + PhysicalExprNode expr = 1; + bool across_partitions = 2; +} + +enum ExecutionMode { + Bounded = 0; + Unbounded = 1; + PipelineBreaking = 2; +} + +message EquivalenceGroup { + repeated PhysicalExprNodeCollection classes = 1; +} diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index e54edb718808..b9d8c4e845ea 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -2613,6 +2613,115 @@ impl<'de> serde::Deserialize<'de> for ColumnUnnestListRecursions { deserializer.deserialize_struct("datafusion.ColumnUnnestListRecursions", FIELDS, GeneratedVisitor) } } +impl serde::Serialize for ConstExpr { + #[allow(deprecated)] + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + use serde::ser::SerializeStruct; + let mut len = 0; + if self.expr.is_some() { + len += 1; + } + if self.across_partitions { + len += 1; + } + let mut struct_ser = serializer.serialize_struct("datafusion.ConstExpr", len)?; + if let Some(v) = self.expr.as_ref() { + struct_ser.serialize_field("expr", v)?; + } + if self.across_partitions { + struct_ser.serialize_field("acrossPartitions", &self.across_partitions)?; + } + struct_ser.end() + } +} +impl<'de> serde::Deserialize<'de> for ConstExpr { + #[allow(deprecated)] + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + const FIELDS: &[&str] = &[ + "expr", + "across_partitions", + "acrossPartitions", + ]; + + #[allow(clippy::enum_variant_names)] + enum GeneratedField { + Expr, + AcrossPartitions, + } + impl<'de> serde::Deserialize<'de> for GeneratedField { + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + struct GeneratedVisitor; + + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = GeneratedField; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(formatter, "expected one of: {:?}", &FIELDS) + } + + #[allow(unused_variables)] + fn visit_str(self, value: &str) -> std::result::Result + where + E: serde::de::Error, + { + match value { + "expr" => Ok(GeneratedField::Expr), + "acrossPartitions" | "across_partitions" => Ok(GeneratedField::AcrossPartitions), + _ => Err(serde::de::Error::unknown_field(value, FIELDS)), + } + } + } + deserializer.deserialize_identifier(GeneratedVisitor) + } + } + struct GeneratedVisitor; + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = ConstExpr; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + formatter.write_str("struct datafusion.ConstExpr") + } + + fn visit_map(self, mut map_: V) -> std::result::Result + where + V: serde::de::MapAccess<'de>, + { + let mut expr__ = None; + let mut across_partitions__ = None; + while let Some(k) = map_.next_key()? { + match k { + GeneratedField::Expr => { + if expr__.is_some() { + return Err(serde::de::Error::duplicate_field("expr")); + } + expr__ = map_.next_value()?; + } + GeneratedField::AcrossPartitions => { + if across_partitions__.is_some() { + return Err(serde::de::Error::duplicate_field("acrossPartitions")); + } + across_partitions__ = Some(map_.next_value()?); + } + } + } + Ok(ConstExpr { + expr: expr__, + across_partitions: across_partitions__.unwrap_or_default(), + }) + } + } + deserializer.deserialize_struct("datafusion.ConstExpr", FIELDS, GeneratedVisitor) + } +} impl serde::Serialize for CopyToNode { #[allow(deprecated)] fn serialize(&self, serializer: S) -> std::result::Result @@ -5025,6 +5134,314 @@ impl<'de> serde::Deserialize<'de> for EmptyRelationNode { deserializer.deserialize_struct("datafusion.EmptyRelationNode", FIELDS, GeneratedVisitor) } } +impl serde::Serialize for EquivalenceGroup { + #[allow(deprecated)] + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + use serde::ser::SerializeStruct; + let mut len = 0; + if !self.classes.is_empty() { + len += 1; + } + let mut struct_ser = serializer.serialize_struct("datafusion.EquivalenceGroup", len)?; + if !self.classes.is_empty() { + struct_ser.serialize_field("classes", &self.classes)?; + } + struct_ser.end() + } +} +impl<'de> serde::Deserialize<'de> for EquivalenceGroup { + #[allow(deprecated)] + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + const FIELDS: &[&str] = &[ + "classes", + ]; + + #[allow(clippy::enum_variant_names)] + enum GeneratedField { + Classes, + } + impl<'de> serde::Deserialize<'de> for GeneratedField { + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + struct GeneratedVisitor; + + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = GeneratedField; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(formatter, "expected one of: {:?}", &FIELDS) + } + + #[allow(unused_variables)] + fn visit_str(self, value: &str) -> std::result::Result + where + E: serde::de::Error, + { + match value { + "classes" => Ok(GeneratedField::Classes), + _ => Err(serde::de::Error::unknown_field(value, FIELDS)), + } + } + } + deserializer.deserialize_identifier(GeneratedVisitor) + } + } + struct GeneratedVisitor; + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = EquivalenceGroup; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + formatter.write_str("struct datafusion.EquivalenceGroup") + } + + fn visit_map(self, mut map_: V) -> std::result::Result + where + V: serde::de::MapAccess<'de>, + { + let mut classes__ = None; + while let Some(k) = map_.next_key()? { + match k { + GeneratedField::Classes => { + if classes__.is_some() { + return Err(serde::de::Error::duplicate_field("classes")); + } + classes__ = Some(map_.next_value()?); + } + } + } + Ok(EquivalenceGroup { + classes: classes__.unwrap_or_default(), + }) + } + } + deserializer.deserialize_struct("datafusion.EquivalenceGroup", FIELDS, GeneratedVisitor) + } +} +impl serde::Serialize for EquivalenceProperties { + #[allow(deprecated)] + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + use serde::ser::SerializeStruct; + let mut len = 0; + if self.group.is_some() { + len += 1; + } + if !self.output_ordering_equivalence.is_empty() { + len += 1; + } + if !self.constants.is_empty() { + len += 1; + } + if self.schema.is_some() { + len += 1; + } + let mut struct_ser = serializer.serialize_struct("datafusion.EquivalenceProperties", len)?; + if let Some(v) = self.group.as_ref() { + struct_ser.serialize_field("group", v)?; + } + if !self.output_ordering_equivalence.is_empty() { + struct_ser.serialize_field("outputOrderingEquivalence", &self.output_ordering_equivalence)?; + } + if !self.constants.is_empty() { + struct_ser.serialize_field("constants", &self.constants)?; + } + if let Some(v) = self.schema.as_ref() { + struct_ser.serialize_field("schema", v)?; + } + struct_ser.end() + } +} +impl<'de> serde::Deserialize<'de> for EquivalenceProperties { + #[allow(deprecated)] + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + const FIELDS: &[&str] = &[ + "group", + "output_ordering_equivalence", + "outputOrderingEquivalence", + "constants", + "schema", + ]; + + #[allow(clippy::enum_variant_names)] + enum GeneratedField { + Group, + OutputOrderingEquivalence, + Constants, + Schema, + } + impl<'de> serde::Deserialize<'de> for GeneratedField { + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + struct GeneratedVisitor; + + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = GeneratedField; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(formatter, "expected one of: {:?}", &FIELDS) + } + + #[allow(unused_variables)] + fn visit_str(self, value: &str) -> std::result::Result + where + E: serde::de::Error, + { + match value { + "group" => Ok(GeneratedField::Group), + "outputOrderingEquivalence" | "output_ordering_equivalence" => Ok(GeneratedField::OutputOrderingEquivalence), + "constants" => Ok(GeneratedField::Constants), + "schema" => Ok(GeneratedField::Schema), + _ => Err(serde::de::Error::unknown_field(value, FIELDS)), + } + } + } + deserializer.deserialize_identifier(GeneratedVisitor) + } + } + struct GeneratedVisitor; + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = EquivalenceProperties; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + formatter.write_str("struct datafusion.EquivalenceProperties") + } + + fn visit_map(self, mut map_: V) -> std::result::Result + where + V: serde::de::MapAccess<'de>, + { + let mut group__ = None; + let mut output_ordering_equivalence__ = None; + let mut constants__ = None; + let mut schema__ = None; + while let Some(k) = map_.next_key()? { + match k { + GeneratedField::Group => { + if group__.is_some() { + return Err(serde::de::Error::duplicate_field("group")); + } + group__ = map_.next_value()?; + } + GeneratedField::OutputOrderingEquivalence => { + if output_ordering_equivalence__.is_some() { + return Err(serde::de::Error::duplicate_field("outputOrderingEquivalence")); + } + output_ordering_equivalence__ = Some(map_.next_value()?); + } + GeneratedField::Constants => { + if constants__.is_some() { + return Err(serde::de::Error::duplicate_field("constants")); + } + constants__ = Some(map_.next_value()?); + } + GeneratedField::Schema => { + if schema__.is_some() { + return Err(serde::de::Error::duplicate_field("schema")); + } + schema__ = map_.next_value()?; + } + } + } + Ok(EquivalenceProperties { + group: group__, + output_ordering_equivalence: output_ordering_equivalence__.unwrap_or_default(), + constants: constants__.unwrap_or_default(), + schema: schema__, + }) + } + } + deserializer.deserialize_struct("datafusion.EquivalenceProperties", FIELDS, GeneratedVisitor) + } +} +impl serde::Serialize for ExecutionMode { + #[allow(deprecated)] + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + let variant = match self { + Self::Bounded => "Bounded", + Self::Unbounded => "Unbounded", + Self::PipelineBreaking => "PipelineBreaking", + }; + serializer.serialize_str(variant) + } +} +impl<'de> serde::Deserialize<'de> for ExecutionMode { + #[allow(deprecated)] + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + const FIELDS: &[&str] = &[ + "Bounded", + "Unbounded", + "PipelineBreaking", + ]; + + struct GeneratedVisitor; + + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = ExecutionMode; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(formatter, "expected one of: {:?}", &FIELDS) + } + + fn visit_i64(self, v: i64) -> std::result::Result + where + E: serde::de::Error, + { + i32::try_from(v) + .ok() + .and_then(|x| x.try_into().ok()) + .ok_or_else(|| { + serde::de::Error::invalid_value(serde::de::Unexpected::Signed(v), &self) + }) + } + + fn visit_u64(self, v: u64) -> std::result::Result + where + E: serde::de::Error, + { + i32::try_from(v) + .ok() + .and_then(|x| x.try_into().ok()) + .ok_or_else(|| { + serde::de::Error::invalid_value(serde::de::Unexpected::Unsigned(v), &self) + }) + } + + fn visit_str(self, value: &str) -> std::result::Result + where + E: serde::de::Error, + { + match value { + "Bounded" => Ok(ExecutionMode::Bounded), + "Unbounded" => Ok(ExecutionMode::Unbounded), + "PipelineBreaking" => Ok(ExecutionMode::PipelineBreaking), + _ => Err(serde::de::Error::unknown_variant(value, FIELDS)), + } + } + } + deserializer.deserialize_any(GeneratedVisitor) + } +} impl serde::Serialize for ExplainExecNode { #[allow(deprecated)] fn serialize(&self, serializer: S) -> std::result::Result @@ -14182,6 +14599,97 @@ impl<'de> serde::Deserialize<'de> for PhysicalExprNode { deserializer.deserialize_struct("datafusion.PhysicalExprNode", FIELDS, GeneratedVisitor) } } +impl serde::Serialize for PhysicalExprNodeCollection { + #[allow(deprecated)] + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + use serde::ser::SerializeStruct; + let mut len = 0; + if !self.exprs.is_empty() { + len += 1; + } + let mut struct_ser = serializer.serialize_struct("datafusion.PhysicalExprNodeCollection", len)?; + if !self.exprs.is_empty() { + struct_ser.serialize_field("exprs", &self.exprs)?; + } + struct_ser.end() + } +} +impl<'de> serde::Deserialize<'de> for PhysicalExprNodeCollection { + #[allow(deprecated)] + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + const FIELDS: &[&str] = &[ + "exprs", + ]; + + #[allow(clippy::enum_variant_names)] + enum GeneratedField { + Exprs, + } + impl<'de> serde::Deserialize<'de> for GeneratedField { + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + struct GeneratedVisitor; + + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = GeneratedField; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(formatter, "expected one of: {:?}", &FIELDS) + } + + #[allow(unused_variables)] + fn visit_str(self, value: &str) -> std::result::Result + where + E: serde::de::Error, + { + match value { + "exprs" => Ok(GeneratedField::Exprs), + _ => Err(serde::de::Error::unknown_field(value, FIELDS)), + } + } + } + deserializer.deserialize_identifier(GeneratedVisitor) + } + } + struct GeneratedVisitor; + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = PhysicalExprNodeCollection; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + formatter.write_str("struct datafusion.PhysicalExprNodeCollection") + } + + fn visit_map(self, mut map_: V) -> std::result::Result + where + V: serde::de::MapAccess<'de>, + { + let mut exprs__ = None; + while let Some(k) = map_.next_key()? { + match k { + GeneratedField::Exprs => { + if exprs__.is_some() { + return Err(serde::de::Error::duplicate_field("exprs")); + } + exprs__ = Some(map_.next_value()?); + } + } + } + Ok(PhysicalExprNodeCollection { + exprs: exprs__.unwrap_or_default(), + }) + } + } + deserializer.deserialize_struct("datafusion.PhysicalExprNodeCollection", FIELDS, GeneratedVisitor) + } +} impl serde::Serialize for PhysicalExtensionExprNode { #[allow(deprecated)] fn serialize(&self, serializer: S) -> std::result::Result @@ -15627,6 +16135,134 @@ impl<'de> serde::Deserialize<'de> for PhysicalPlanNode { deserializer.deserialize_struct("datafusion.PhysicalPlanNode", FIELDS, GeneratedVisitor) } } +impl serde::Serialize for PhysicalPlanProperties { + #[allow(deprecated)] + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + use serde::ser::SerializeStruct; + let mut len = 0; + if self.eq_properties.is_some() { + len += 1; + } + if self.partitioning.is_some() { + len += 1; + } + if self.mode != 0 { + len += 1; + } + let mut struct_ser = serializer.serialize_struct("datafusion.PhysicalPlanProperties", len)?; + if let Some(v) = self.eq_properties.as_ref() { + struct_ser.serialize_field("eqProperties", v)?; + } + if let Some(v) = self.partitioning.as_ref() { + struct_ser.serialize_field("partitioning", v)?; + } + if self.mode != 0 { + let v = ExecutionMode::try_from(self.mode) + .map_err(|_| serde::ser::Error::custom(format!("Invalid variant {}", self.mode)))?; + struct_ser.serialize_field("mode", &v)?; + } + struct_ser.end() + } +} +impl<'de> serde::Deserialize<'de> for PhysicalPlanProperties { + #[allow(deprecated)] + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + const FIELDS: &[&str] = &[ + "eq_properties", + "eqProperties", + "partitioning", + "mode", + ]; + + #[allow(clippy::enum_variant_names)] + enum GeneratedField { + EqProperties, + Partitioning, + Mode, + } + impl<'de> serde::Deserialize<'de> for GeneratedField { + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + struct GeneratedVisitor; + + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = GeneratedField; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(formatter, "expected one of: {:?}", &FIELDS) + } + + #[allow(unused_variables)] + fn visit_str(self, value: &str) -> std::result::Result + where + E: serde::de::Error, + { + match value { + "eqProperties" | "eq_properties" => Ok(GeneratedField::EqProperties), + "partitioning" => Ok(GeneratedField::Partitioning), + "mode" => Ok(GeneratedField::Mode), + _ => Err(serde::de::Error::unknown_field(value, FIELDS)), + } + } + } + deserializer.deserialize_identifier(GeneratedVisitor) + } + } + struct GeneratedVisitor; + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = PhysicalPlanProperties; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + formatter.write_str("struct datafusion.PhysicalPlanProperties") + } + + fn visit_map(self, mut map_: V) -> std::result::Result + where + V: serde::de::MapAccess<'de>, + { + let mut eq_properties__ = None; + let mut partitioning__ = None; + let mut mode__ = None; + while let Some(k) = map_.next_key()? { + match k { + GeneratedField::EqProperties => { + if eq_properties__.is_some() { + return Err(serde::de::Error::duplicate_field("eqProperties")); + } + eq_properties__ = map_.next_value()?; + } + GeneratedField::Partitioning => { + if partitioning__.is_some() { + return Err(serde::de::Error::duplicate_field("partitioning")); + } + partitioning__ = map_.next_value()?; + } + GeneratedField::Mode => { + if mode__.is_some() { + return Err(serde::de::Error::duplicate_field("mode")); + } + mode__ = Some(map_.next_value::()? as i32); + } + } + } + Ok(PhysicalPlanProperties { + eq_properties: eq_properties__, + partitioning: partitioning__, + mode: mode__.unwrap_or_default(), + }) + } + } + deserializer.deserialize_struct("datafusion.PhysicalPlanProperties", FIELDS, GeneratedVisitor) + } +} impl serde::Serialize for PhysicalScalarUdfNode { #[allow(deprecated)] fn serialize(&self, serializer: S) -> std::result::Result diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index dfc30e809108..23d64a2042c3 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -1212,6 +1212,11 @@ pub mod physical_expr_node { } } #[derive(Clone, PartialEq, ::prost::Message)] +pub struct PhysicalExprNodeCollection { + #[prost(message, repeated, tag = "1")] + pub exprs: ::prost::alloc::vec::Vec, +} +#[derive(Clone, PartialEq, ::prost::Message)] pub struct PhysicalScalarUdfNode { #[prost(string, tag = "1")] pub name: ::prost::alloc::string::String, @@ -1809,6 +1814,40 @@ pub struct PartitionStats { #[prost(message, repeated, tag = "4")] pub column_stats: ::prost::alloc::vec::Vec, } +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct PhysicalPlanProperties { + #[prost(message, optional, tag = "1")] + pub eq_properties: ::core::option::Option, + #[prost(message, optional, tag = "2")] + pub partitioning: ::core::option::Option, + #[prost(enumeration = "ExecutionMode", tag = "3")] + pub mode: i32, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct EquivalenceProperties { + #[prost(message, optional, tag = "1")] + pub group: ::core::option::Option, + #[prost(message, repeated, tag = "2")] + pub output_ordering_equivalence: ::prost::alloc::vec::Vec< + PhysicalSortExprNodeCollection, + >, + #[prost(message, repeated, tag = "3")] + pub constants: ::prost::alloc::vec::Vec, + #[prost(message, optional, tag = "4")] + pub schema: ::core::option::Option, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ConstExpr { + #[prost(message, optional, tag = "1")] + pub expr: ::core::option::Option, + #[prost(bool, tag = "2")] + pub across_partitions: bool, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct EquivalenceGroup { + #[prost(message, repeated, tag = "1")] + pub classes: ::prost::alloc::vec::Vec, +} #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] #[repr(i32)] pub enum BuiltInWindowFunction { @@ -2053,3 +2092,32 @@ impl AggregateMode { } } } +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] +#[repr(i32)] +pub enum ExecutionMode { + Bounded = 0, + Unbounded = 1, + PipelineBreaking = 2, +} +impl ExecutionMode { + /// String value of the enum field names used in the ProtoBuf definition. + /// + /// The values are not transformed in any way and thus are considered stable + /// (if the ProtoBuf definition does not change) and safe for programmatic use. + pub fn as_str_name(&self) -> &'static str { + match self { + Self::Bounded => "Bounded", + Self::Unbounded => "Unbounded", + Self::PipelineBreaking => "PipelineBreaking", + } + } + /// Creates an enum from field names used in the ProtoBuf definition. + pub fn from_str_name(value: &str) -> ::core::option::Option { + match value { + "Bounded" => Some(Self::Bounded), + "Unbounded" => Some(Self::Unbounded), + "PipelineBreaking" => Some(Self::PipelineBreaking), + _ => None, + } + } +} diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index 20ec5eeaeaf8..45d0069a89f1 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -21,6 +21,7 @@ use std::sync::Arc; use arrow::compute::SortOptions; use chrono::{TimeZone, Utc}; +use datafusion::physical_expr::equivalence::{EquivalenceClass, EquivalenceGroup}; use datafusion_expr::dml::InsertOp; use object_store::path::Path; use object_store::ObjectMeta; @@ -35,13 +36,17 @@ use datafusion::datasource::object_store::ObjectStoreUrl; use datafusion::datasource::physical_plan::{FileScanConfig, FileSinkConfig}; use datafusion::execution::FunctionRegistry; use datafusion::logical_expr::WindowFunctionDefinition; -use datafusion::physical_expr::{PhysicalSortExpr, ScalarFunctionExpr}; +use datafusion::physical_expr::{ + ConstExpr, EquivalenceProperties, PhysicalSortExpr, ScalarFunctionExpr, +}; use datafusion::physical_plan::expressions::{ in_list, BinaryExpr, CaseExpr, CastExpr, Column, IsNotNullExpr, IsNullExpr, LikeExpr, Literal, NegativeExpr, NotExpr, TryCastExpr, }; use datafusion::physical_plan::windows::{create_window_expr, schema_add_window_field}; -use datafusion::physical_plan::{Partitioning, PhysicalExpr, WindowExpr}; +use datafusion::physical_plan::{ + ExecutionMode, Partitioning, PhysicalExpr, PlanProperties, WindowExpr, +}; use datafusion_common::{not_impl_err, DataFusionError, Result}; use datafusion_proto_common::common::proto_error; @@ -657,3 +662,133 @@ impl TryFrom<&protobuf::FileSinkConfig> for FileSinkConfig { }) } } + +pub fn parse_plan_properties( + props: &protobuf::PhysicalPlanProperties, + registry: &dyn FunctionRegistry, + input_schema: &Schema, + codec: &dyn PhysicalExtensionCodec, +) -> Result { + let eq_props = props + .eq_properties + .as_ref() + .ok_or_else(|| proto_error("Unexpected empty equivalence properties"))?; + let eq_properties = + parse_equivalence_properties(eq_props, registry, input_schema, codec)?; + + let partitioning = parse_protobuf_partitioning( + props.partitioning.as_ref(), + registry, + input_schema, + codec, + )? + .ok_or_else(|| proto_error("Unexpected empty partitioning"))?; + + let execution_mode: protobuf::ExecutionMode = props + .mode + .try_into() + .map_err(|_| proto_error("unexpected serialized execution mode"))?; + + Ok(PlanProperties::new( + eq_properties, + partitioning, + (&execution_mode).into(), + )) +} + +pub fn parse_equivalence_properties( + props: &protobuf::EquivalenceProperties, + registry: &dyn FunctionRegistry, + input_schema: &Schema, + codec: &dyn PhysicalExtensionCodec, +) -> Result { + let group = props + .group + .as_ref() + .ok_or_else(|| proto_error("Unexpected empty equivalence group"))?; + let eq_group = parse_equivalence_group(group, registry, input_schema, codec)?; + + let orderings = props + .output_ordering_equivalence + .iter() + .map(|ordering| { + parse_physical_sort_exprs( + &ordering.physical_sort_expr_nodes, + registry, + input_schema, + codec, + ) + }) + .collect::>>()?; + + let constants = parse_const_exprs(&props.constants, registry, input_schema, codec)?; + + let schema = Arc::new(convert_required!(props.schema)?); + + let mut equiv_props = EquivalenceProperties::new_with_orderings(schema, &orderings) + .with_constants(constants); + equiv_props.add_equivalence_group(eq_group); + + Ok(equiv_props) +} + +pub fn parse_equivalence_group( + equiv_group: &protobuf::EquivalenceGroup, + registry: &dyn FunctionRegistry, + input_schema: &Schema, + codec: &dyn PhysicalExtensionCodec, +) -> Result { + let classes: Result> = equiv_group + .classes + .iter() + .map(|equiv_class| { + parse_equivalence_class(equiv_class, registry, input_schema, codec) + }) + .collect(); + + Ok(EquivalenceGroup { classes: classes? }) +} + +pub fn parse_equivalence_class( + equiv_class: &protobuf::PhysicalExprNodeCollection, + registry: &dyn FunctionRegistry, + input_schema: &Schema, + codec: &dyn PhysicalExtensionCodec, +) -> Result { + let exprs = + parse_physical_exprs(equiv_class.exprs.iter(), registry, input_schema, codec)?; + + Ok(EquivalenceClass::new(exprs)) +} + +pub fn parse_const_exprs( + const_exprs: &[protobuf::ConstExpr], + registry: &dyn FunctionRegistry, + input_schema: &Schema, + codec: &dyn PhysicalExtensionCodec, +) -> Result> { + const_exprs + .iter() + .map(|const_expr| { + let expr = const_expr + .expr + .as_ref() + .ok_or_else(|| proto_error("Unexpected empty physical expression"))?; + + Ok( + ConstExpr::new(parse_physical_expr(expr, registry, input_schema, codec)?) + .with_across_partitions(const_expr.across_partitions), + ) + }) + .collect() +} + +impl From<&protobuf::ExecutionMode> for ExecutionMode { + fn from(mode: &protobuf::ExecutionMode) -> Self { + match mode { + protobuf::ExecutionMode::Bounded => ExecutionMode::Bounded, + protobuf::ExecutionMode::Unbounded => ExecutionMode::Unbounded, + protobuf::ExecutionMode::PipelineBreaking => ExecutionMode::PipelineBreaking, + } + } +} diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index 89a2403922e9..8bae9ad29b78 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -20,15 +20,20 @@ use std::sync::Arc; #[cfg(feature = "parquet")] use datafusion::datasource::file_format::parquet::ParquetSink; +use datafusion::physical_expr::equivalence::{EquivalenceClass, EquivalenceGroup}; use datafusion::physical_expr::window::{NthValueKind, SlidingAggregateWindowExpr}; -use datafusion::physical_expr::{PhysicalSortExpr, ScalarFunctionExpr}; +use datafusion::physical_expr::{ + ConstExpr, EquivalenceProperties, PhysicalSortExpr, ScalarFunctionExpr, +}; use datafusion::physical_plan::expressions::{ BinaryExpr, CaseExpr, CastExpr, Column, InListExpr, IsNotNullExpr, IsNullExpr, Literal, NegativeExpr, NotExpr, NthValue, TryCastExpr, }; use datafusion::physical_plan::udaf::AggregateFunctionExpr; use datafusion::physical_plan::windows::{BuiltInWindowExpr, PlainAggregateWindowExpr}; -use datafusion::physical_plan::{Partitioning, PhysicalExpr, WindowExpr}; +use datafusion::physical_plan::{ + ExecutionMode, Partitioning, PhysicalExpr, PlanProperties, WindowExpr, +}; use datafusion::{ datasource::{ file_format::{csv::CsvSink, json::JsonSink}, @@ -611,3 +616,100 @@ impl TryFrom<&FileSinkConfig> for protobuf::FileSinkConfig { }) } } + +pub fn serialize_plan_properties( + props: &PlanProperties, + codec: &dyn PhysicalExtensionCodec, +) -> Result { + let eq_properties = Some(serialize_equivalence_properties( + &props.eq_properties, + codec, + )?); + + let partitioning = Some(serialize_partitioning(&props.partitioning, codec)?); + + let mode: protobuf::ExecutionMode = (&props.execution_mode).into(); + + Ok(protobuf::PhysicalPlanProperties { + eq_properties, + partitioning, + mode: mode.into(), + }) +} + +pub fn serialize_equivalence_properties( + props: &EquivalenceProperties, + codec: &dyn PhysicalExtensionCodec, +) -> Result { + let group = Some(serialize_equivalence_group(&props.eq_group, codec)?); + + let oeq_class = props + .oeq_class() + .orderings + .iter() + .map(|ordering| serialize_physical_sort_exprs(ordering.iter().cloned(), codec)) + .collect::>>()? + .into_iter() + .map(|ordering| PhysicalSortExprNodeCollection { + physical_sort_expr_nodes: ordering, + }) + .collect(); + + let constants = serialize_const_exprs(&props.constants, codec)?; + + let schema = Some(props.schema().as_ref().try_into()?); + + Ok(protobuf::EquivalenceProperties { + group, + output_ordering_equivalence: oeq_class, + constants, + schema, + }) +} + +pub fn serialize_equivalence_group( + equiv_group: &EquivalenceGroup, + codec: &dyn PhysicalExtensionCodec, +) -> Result { + let classes: Result> = equiv_group + .classes + .iter() + .map(|equiv_class| serialize_equivalence_class(equiv_class, codec)) + .collect(); + + Ok(protobuf::EquivalenceGroup { classes: classes? }) +} + +pub fn serialize_equivalence_class( + equiv_class: &EquivalenceClass, + codec: &dyn PhysicalExtensionCodec, +) -> Result { + let exprs = serialize_physical_exprs(equiv_class.iter(), codec)?; + + Ok(protobuf::PhysicalExprNodeCollection { exprs }) +} + +pub fn serialize_const_exprs( + const_exprs: &[ConstExpr], + codec: &dyn PhysicalExtensionCodec, +) -> Result> { + const_exprs + .iter() + .map(|const_expr| { + Ok(protobuf::ConstExpr { + expr: Some(serialize_physical_expr(const_expr.expr(), codec)?), + across_partitions: const_expr.across_partitions(), + }) + }) + .collect() +} + +impl From<&ExecutionMode> for protobuf::ExecutionMode { + fn from(mode: &ExecutionMode) -> Self { + match mode { + ExecutionMode::Bounded => protobuf::ExecutionMode::Bounded, + ExecutionMode::Unbounded => protobuf::ExecutionMode::Unbounded, + ExecutionMode::PipelineBreaking => protobuf::ExecutionMode::PipelineBreaking, + } + } +}