diff --git a/rust/cubesql/cubesql/src/compile/engine/df/scan.rs b/rust/cubesql/cubesql/src/compile/engine/df/scan.rs index 773723c857fd8..8956e34ae7e5a 100644 --- a/rust/cubesql/cubesql/src/compile/engine/df/scan.rs +++ b/rust/cubesql/cubesql/src/compile/engine/df/scan.rs @@ -40,7 +40,7 @@ use datafusion::{ #[derive(Debug, Clone)] pub struct CubeScanNode { pub schema: DFSchemaRef, - pub member_fields: Vec, + pub member_fields: Vec>, pub request: V1LoadRequestQuery, pub auth_context: Arc, } @@ -48,7 +48,7 @@ pub struct CubeScanNode { impl CubeScanNode { pub fn new( schema: DFSchemaRef, - member_fields: Vec, + member_fields: Vec>, request: V1LoadRequestQuery, auth_context: Arc, ) -> Self { @@ -147,7 +147,7 @@ impl ExtensionPlanner for CubeScanExtensionPlanner { struct CubeScanExecutionPlan { // Options from logical node schema: SchemaRef, - member_fields: Vec, + member_fields: Vec>, request: V1LoadRequestQuery, auth_context: Arc, // Shared references which will be injected by extension planner @@ -169,26 +169,32 @@ impl CubeScanExecutionPlan { DataType::Utf8 => { let mut builder = StringBuilder::new(100); - for row in response.data.iter() { - let value = row.as_object().unwrap().get(field_name).ok_or( - DataFusionError::Internal( - "Unexpected response from Cube.js, rows are not objects" - .to_string(), - ), - )?; - match &value { - serde_json::Value::Null => builder.append_null()?, - serde_json::Value::String(v) => builder.append_value(v)?, - serde_json::Value::Bool(v) => { - builder.append_value(if *v { "true" } else { "false" })? - } - serde_json::Value::Number(v) => builder.append_value(v.to_string())?, - v => { - self.logger.error(format!("Unable to map value {:?} to DataType::Utf8 (returning null)", v).as_str()); - - builder.append_null()? - } - }; + if let Some(field_name) = field_name { + for row in response.data.iter() { + let value = row.as_object().unwrap().get(field_name).ok_or( + DataFusionError::Internal( + "Unexpected response from Cube.js, rows are not objects" + .to_string(), + ), + )?; + match &value { + serde_json::Value::Null => builder.append_null()?, + serde_json::Value::String(v) => builder.append_value(v)?, + serde_json::Value::Bool(v) => { + builder.append_value(if *v { "true" } else { "false" })? + } + serde_json::Value::Number(v) => { + builder.append_value(v.to_string())? + } + v => { + self.logger.error(format!("Unable to map value {:?} to DataType::Utf8 (returning null)", v).as_str()); + + builder.append_null()? + } + }; + } + } else { + builder.append_null()?; } Arc::new(builder.finish()) as ArrayRef @@ -197,32 +203,39 @@ impl CubeScanExecutionPlan { let mut builder = Int64Builder::new(100); for row in response.data.iter() { - let value = row.as_object().unwrap().get(field_name).ok_or( - DataFusionError::Internal( - "Unexpected response from Cube.js, rows are not objects" - .to_string(), - ), - )?; - match &value { - serde_json::Value::Null => builder.append_null()?, - serde_json::Value::Number(number) => match number.as_i64() { - Some(v) => builder.append_value(v)?, - None => builder.append_null()?, - }, - serde_json::Value::String(s) => match s.parse::() { - Ok(v) => builder.append_value(v)?, - Err(error) => { - warn!("Unable to parse value as i64: {}", error.to_string()); + if let Some(field_name) = field_name { + let value = row.as_object().unwrap().get(field_name).ok_or( + DataFusionError::Internal( + "Unexpected response from Cube.js, rows are not objects" + .to_string(), + ), + )?; + match &value { + serde_json::Value::Null => builder.append_null()?, + serde_json::Value::Number(number) => match number.as_i64() { + Some(v) => builder.append_value(v)?, + None => builder.append_null()?, + }, + serde_json::Value::String(s) => match s.parse::() { + Ok(v) => builder.append_value(v)?, + Err(error) => { + warn!( + "Unable to parse value as i64: {}", + error.to_string() + ); + + builder.append_null()? + } + }, + v => { + self.logger.error(format!("Unable to map value {:?} to DataType::Int64 (returning null)", v).as_str()); builder.append_null()? } - }, - v => { - self.logger.error(format!("Unable to map value {:?} to DataType::Int64 (returning null)", v).as_str()); - - builder.append_null()? - } - }; + }; + } else { + builder.append_null()?; + } } Arc::new(builder.finish()) as ArrayRef @@ -231,32 +244,39 @@ impl CubeScanExecutionPlan { let mut builder = Float64Builder::new(100); for row in response.data.iter() { - let value = row.as_object().unwrap().get(field_name).ok_or( - DataFusionError::Internal( - "Unexpected response from Cube.js, rows are not objects" - .to_string(), - ), - )?; - match &value { - serde_json::Value::Null => builder.append_null()?, - serde_json::Value::Number(number) => match number.as_f64() { - Some(v) => builder.append_value(v)?, - None => builder.append_null()?, - }, - serde_json::Value::String(s) => match s.parse::() { - Ok(v) => builder.append_value(v)?, - Err(error) => { - warn!("Unable to parse value as f64: {}", error.to_string()); + if let Some(field_name) = field_name { + let value = row.as_object().unwrap().get(field_name).ok_or( + DataFusionError::Internal( + "Unexpected response from Cube.js, rows are not objects" + .to_string(), + ), + )?; + match &value { + serde_json::Value::Null => builder.append_null()?, + serde_json::Value::Number(number) => match number.as_f64() { + Some(v) => builder.append_value(v)?, + None => builder.append_null()?, + }, + serde_json::Value::String(s) => match s.parse::() { + Ok(v) => builder.append_value(v)?, + Err(error) => { + warn!( + "Unable to parse value as f64: {}", + error.to_string() + ); + + builder.append_null()? + } + }, + v => { + self.logger.error(format!("Unable to map value {:?} to DataType::Float64 (returning null)", v).as_str()); builder.append_null()? } - }, - v => { - self.logger.error(format!("Unable to map value {:?} to DataType::Float64 (returning null)", v).as_str()); - - builder.append_null()? - } - }; + }; + } else { + builder.append_null()?; + } } Arc::new(builder.finish()) as ArrayRef @@ -265,31 +285,35 @@ impl CubeScanExecutionPlan { let mut builder = BooleanBuilder::new(100); for row in response.data.iter() { - let value = row.as_object().unwrap().get(field_name).ok_or( - DataFusionError::Internal( - "Unexpected response from Cube.js, rows are not objects" - .to_string(), - ), - )?; - match &value { - serde_json::Value::Null => builder.append_null()?, - serde_json::Value::Bool(v) => builder.append_value(*v)?, - // Cube allows to mark a type as boolean, but it doesn't guarantee that the user will return a boolean type - serde_json::Value::String(v) => match v.as_str() { - "true" | "1" => builder.append_value(true)?, - "false" | "0" => builder.append_value(false)?, - _ => { + if let Some(field_name) = field_name { + let value = row.as_object().unwrap().get(field_name).ok_or( + DataFusionError::Internal( + "Unexpected response from Cube.js, rows are not objects" + .to_string(), + ), + )?; + match &value { + serde_json::Value::Null => builder.append_null()?, + serde_json::Value::Bool(v) => builder.append_value(*v)?, + // Cube allows to mark a type as boolean, but it doesn't guarantee that the user will return a boolean type + serde_json::Value::String(v) => match v.as_str() { + "true" | "1" => builder.append_value(true)?, + "false" | "0" => builder.append_value(false)?, + _ => { + self.logger.error(format!("Unable to map value {:?} to DataType::Boolean (returning null)", v).as_str()); + + builder.append_null()? + } + }, + v => { self.logger.error(format!("Unable to map value {:?} to DataType::Boolean (returning null)", v).as_str()); builder.append_null()? } - }, - v => { - self.logger.error(format!("Unable to map value {:?} to DataType::Boolean (returning null)", v).as_str()); - - builder.append_null()? - } - }; + }; + } else { + builder.append_null()?; + } } Arc::new(builder.finish()) as ArrayRef @@ -298,31 +322,35 @@ impl CubeScanExecutionPlan { let mut builder = TimestampNanosecondBuilder::new(response.data.len()); for row in response.data.iter() { - let value = row.as_object().unwrap().get(field_name).ok_or( - DataFusionError::Internal( - "Unexpected response from Cube.js, rows are not objects" - .to_string(), - ), - )?; - match &value { - serde_json::Value::Null => builder.append_null()?, - serde_json::Value::String(s) => { - let timestamp = Utc - .datetime_from_str(s.as_str(), "%Y-%m-%dT%H:%M:%S.%f") - .map_err(|e| { - DataFusionError::Execution(format!( - "Can't parse timestamp: '{}': {}", - s, e - )) - })?; - builder.append_value(timestamp.timestamp_nanos())?; - } - v => { - self.logger.error(format!("Unable to map value {:?} to DataType::Timestamp(TimeUnit::Nanosecond, None) (returning null)", v).as_str()); - - builder.append_null()? - } - }; + if let Some(field_name) = field_name { + let value = row.as_object().unwrap().get(field_name).ok_or( + DataFusionError::Internal( + "Unexpected response from Cube.js, rows are not objects" + .to_string(), + ), + )?; + match &value { + serde_json::Value::Null => builder.append_null()?, + serde_json::Value::String(s) => { + let timestamp = Utc + .datetime_from_str(s.as_str(), "%Y-%m-%dT%H:%M:%S.%f") + .map_err(|e| { + DataFusionError::Execution(format!( + "Can't parse timestamp: '{}': {}", + s, e + )) + })?; + builder.append_value(timestamp.timestamp_nanos())?; + } + v => { + self.logger.error(format!("Unable to map value {:?} to DataType::Timestamp(TimeUnit::Nanosecond, None) (returning null)", v).as_str()); + + builder.append_null()? + } + }; + } else { + builder.append_null()?; + } } Arc::new(builder.finish()) as ArrayRef @@ -571,7 +599,7 @@ mod tests { member_fields: schema .fields() .iter() - .map(|f| f.name().to_string()) + .map(|f| Some(f.name().to_string())) .collect(), request: V1LoadRequestQuery { measures: None, diff --git a/rust/cubesql/cubesql/src/compile/mod.rs b/rust/cubesql/cubesql/src/compile/mod.rs index 12ff40c0763ae..20757e554f094 100644 --- a/rust/cubesql/cubesql/src/compile/mod.rs +++ b/rust/cubesql/cubesql/src/compile/mod.rs @@ -1626,7 +1626,7 @@ impl QueryPlanner { schema .fields() .iter() - .map(|f| f.name().to_string()) + .map(|f| Some(f.name().to_string())) .collect(), query.request, // @todo Remove after split! @@ -3588,7 +3588,13 @@ mod tests { order: None, limit: None, offset: None, - filters: None, + filters: Some(vec![V1LoadRequestQueryFilterItem { + member: Some("KibanaSampleDataEcommerce.count".to_string()), + operator: Some("gt".to_string()), + values: Some(vec!["0".to_string()]), + or: None, + and: None, + }]), } ); @@ -3608,13 +3614,22 @@ mod tests { order: None, limit: None, offset: None, - filters: Some(vec![V1LoadRequestQueryFilterItem { - member: Some("KibanaSampleDataEcommerce.has_subscription".to_string()), - operator: Some("equals".to_string()), - values: Some(vec!["false".to_string()]), - or: None, - and: None, - }]), + filters: Some(vec![ + V1LoadRequestQueryFilterItem { + member: Some("KibanaSampleDataEcommerce.has_subscription".to_string()), + operator: Some("equals".to_string()), + values: Some(vec!["false".to_string()]), + or: None, + and: None, + }, + V1LoadRequestQueryFilterItem { + member: Some("KibanaSampleDataEcommerce.count".to_string()), + operator: Some("gt".to_string()), + values: Some(vec!["0".to_string()]), + or: None, + and: None, + } + ]), } ); } @@ -3875,7 +3890,13 @@ ORDER BY \"COUNT(count)\" DESC" order: None, limit: None, offset: None, - filters: None, + filters: Some(vec![V1LoadRequestQueryFilterItem { + member: Some("KibanaSampleDataEcommerce.count".to_string()), + operator: Some("gt".to_string()), + values: Some(vec!["0".to_string()]), + or: None, + and: None, + }]), } ); @@ -3886,17 +3907,11 @@ ORDER BY \"COUNT(count)\" DESC" .iter() .map(|f| f.name().to_string()) .collect::>(), - vec![ - "SUM(KibanaSampleDataEcommerce.count)".to_string(), - "COUNT(UInt8(1))".to_string() - ] + vec!["sum:count:ok".to_string(),] ); assert_eq!( &cube_scan.member_fields, - &vec![ - "KibanaSampleDataEcommerce.count".to_string(), - "KibanaSampleDataEcommerce.count".to_string() - ] + &vec![Some("KibanaSampleDataEcommerce.count".to_string())] ); } @@ -4104,7 +4119,7 @@ ORDER BY \"COUNT(count)\" DESC" let query_plan = convert_select_to_query_plan( "select \"rows\".\"customer_gender\" as \"customer_gender\", -\n count(1) as \"a0\"\ +\n sum(\"rows\".\"count\") as \"a0\"\ \nfrom\ \n(\ \n select \"_\".\"count\",\ @@ -4131,7 +4146,7 @@ ORDER BY \"COUNT(count)\" DESC" segments: Some(vec![]), time_dimensions: None, order: None, - limit: Some(1000001), + limit: Some(50000), offset: None, filters: Some(vec![V1LoadRequestQueryFilterItem { member: Some("KibanaSampleDataEcommerce.customer_gender".to_string()), @@ -4144,6 +4159,108 @@ ORDER BY \"COUNT(count)\" DESC" ); } + #[test] + fn powerbi_inner_wrapped_dates() { + init_logger(); + + let query_plan = convert_select_to_query_plan( + "select \"_\".\"created_at_day\",\ +\n \"_\".\"a0\"\ +\nfrom \ +\n(\ +\n select \"rows\".\"created_at_day\" as \"created_at_day\",\ +\n sum(\"rows\".\"cnt\") as \"a0\"\ +\n from \ +\n (\ +\n select count(*) cnt,date_trunc('day', order_date) as created_at_day, date_trunc('month', order_date) as created_at_month from public.KibanaSampleDataEcommerce group by 2, 3\ +\n ) \"rows\"\ +\n group by \"created_at_day\"\ +\n) \"_\"\ +\nwhere not \"_\".\"a0\" is null\ +\nlimit 1000001" + .to_string(), + DatabaseProtocol::PostgreSQL, + ); + + let logical_plan = query_plan.as_logical_plan(); + assert_eq!( + logical_plan.find_cube_scan().request, + V1LoadRequestQuery { + measures: Some(vec!["KibanaSampleDataEcommerce.count".to_string()]), + dimensions: Some(vec![]), + segments: Some(vec![]), + time_dimensions: Some(vec![V1LoadRequestQueryTimeDimension { + dimension: "KibanaSampleDataEcommerce.order_date".to_string(), + granularity: Some("day".to_string()), + date_range: None, + }]), + order: None, + limit: Some(50000), + offset: None, + filters: Some(vec![V1LoadRequestQueryFilterItem { + member: Some("KibanaSampleDataEcommerce.count".to_string()), + operator: Some("set".to_string()), + values: None, + or: None, + and: None, + }]), + } + ); + } + + #[test] + fn powerbi_inner_wrapped_asterisk() { + init_logger(); + + let query_plan = convert_select_to_query_plan( + "select \"rows\".\"customer_gender\" as \"customer_gender\",\ +\n \"rows\".\"created_at_month\" as \"created_at_month\"\ +\nfrom \ +\n(\ +\n select \"_\".\"count\",\ +\n \"_\".\"minPrice\",\ +\n \"_\".\"maxPrice\",\ +\n \"_\".\"avgPrice\",\ +\n \"_\".\"order_date\",\ +\n \"_\".\"customer_gender\",\ +\n \"_\".\"created_at_day\",\ +\n \"_\".\"created_at_month\"\ +\n from \ +\n (\ +\n select *, date_trunc('day', order_date) created_at_day, date_trunc('month', order_date) created_at_month from public.KibanaSampleDataEcommerce\ +\n ) \"_\"\ +\n where \"_\".\"created_at_month\" < timestamp '2022-06-13 00:00:00' and \"_\".\"created_at_month\" >= timestamp '2021-12-16 00:00:00'\ +\n) \"rows\"\ +\ngroup by \"customer_gender\",\ +\n \"created_at_month\"\ +\nlimit 1000001" + .to_string(), + DatabaseProtocol::PostgreSQL, + ); + + let logical_plan = query_plan.as_logical_plan(); + assert_eq!( + logical_plan.find_cube_scan().request, + V1LoadRequestQuery { + measures: Some(vec![]), + dimensions: Some(vec!["KibanaSampleDataEcommerce.customer_gender".to_string()]), + segments: Some(vec![]), + time_dimensions: Some(vec![V1LoadRequestQueryTimeDimension { + dimension: "KibanaSampleDataEcommerce.order_date".to_string(), + granularity: Some("month".to_string()), + date_range: Some(json!(vec![ + "2021-12-16 00:00:00".to_string(), + "2022-06-13 00:00:00".to_string() + ])), + }]), + order: None, + limit: Some(50000), + offset: None, + filters: None, + } + ); + } + #[test] fn test_select_aggregations() { let variants = vec![ @@ -4295,10 +4412,10 @@ ORDER BY \"COUNT(count)\" DESC" // "SELECT is_male FROM KibanaSampleDataEcommerce".to_string(), // CompilationError::User("Unable to use segment 'is_male' as column in SELECT statement".to_string()), // ), - ( - "SELECT COUNT(*) FROM KibanaSampleDataEcommerce GROUP BY is_male".to_string(), - CompilationError::User("Unable to use segment 'is_male' in GROUP BY".to_string()), - ), + // ( + // "SELECT COUNT(*) FROM KibanaSampleDataEcommerce GROUP BY is_male".to_string(), + // CompilationError::User("Unable to use segment 'is_male' in GROUP BY".to_string()), + // ), // ( // "SELECT COUNT(*) FROM KibanaSampleDataEcommerce ORDER BY is_male DESC".to_string(), // CompilationError::User("Unable to use segment 'is_male' in ORDER BY".to_string()), @@ -4608,7 +4725,6 @@ ORDER BY \"COUNT(count)\" DESC" "" } ); - println!("Query: {}", query); let logical_plan = convert_select_to_query_plan(query, DatabaseProtocol::MySQL).as_logical_plan(); diff --git a/rust/cubesql/cubesql/src/compile/rewrite/analysis.rs b/rust/cubesql/cubesql/src/compile/rewrite/analysis.rs index 19739cc302e2b..7ed81be0a1022 100644 --- a/rust/cubesql/cubesql/src/compile/rewrite/analysis.rs +++ b/rust/cubesql/cubesql/src/compile/rewrite/analysis.rs @@ -4,7 +4,7 @@ use crate::{ rewrite::{ converter::{is_expr_node, node_to_expr}, AliasExprAlias, ColumnExprColumn, DimensionName, LiteralExprValue, LogicalPlanLanguage, - MeasureName, TableScanSourceTableName, TimeDimensionName, + MeasureName, SegmentName, TableScanSourceTableName, TimeDimensionName, }, }, var_iter, CubeError, @@ -124,6 +124,16 @@ impl LogicalPlanAnalysis { None } } + LogicalPlanLanguage::Segment(params) => { + if let Some(_) = column_name(params[1]) { + let expr = original_expr(params[1])?; + let segment_name = var_iter!(egraph[params[0]], SegmentName).next().unwrap(); + map.push((segment_name.to_string(), expr)); + Some(map) + } else { + None + } + } LogicalPlanLanguage::TimeDimension(params) => { if let Some(_) = column_name(params[3]) { let expr = original_expr(params[3])?; diff --git a/rust/cubesql/cubesql/src/compile/rewrite/converter.rs b/rust/cubesql/cubesql/src/compile/rewrite/converter.rs index 52c2bda2df477..1cf55c9e432ee 100644 --- a/rust/cubesql/cubesql/src/compile/rewrite/converter.rs +++ b/rust/cubesql/cubesql/src/compile/rewrite/converter.rs @@ -37,6 +37,7 @@ use datafusion::{ }; use egg::{EGraph, Id, RecExpr}; use itertools::Itertools; +use serde_json::json; use std::{collections::HashMap, ops::Index, sync::Arc}; macro_rules! add_data_node { @@ -1080,7 +1081,7 @@ impl LanguageToLogicalPlanConverter { // TODO actually nullable. Just to fit tests false, ), - measure.to_string(), + Some(measure.to_string()), )); } LogicalPlanLanguage::TimeDimension(params) => { @@ -1119,7 +1120,7 @@ impl LanguageToLogicalPlanConverter { // TODO actually nullable. Just to fit tests false, ), - format!("{}.{}", dimension, granularity), + Some(format!("{}.{}", dimension, granularity)), )); } } @@ -1145,7 +1146,21 @@ impl LanguageToLogicalPlanConverter { // TODO actually nullable. Just to fit tests false, ), - dimension, + Some(dimension), + )); + } + LogicalPlanLanguage::Segment(params) => { + let expr = self.to_expr(params[1])?; + fields.push(( + DFField::new( + Some(&table_name), + // TODO empty schema + &expr_name(&expr)?, + DataType::Boolean, + // TODO actually nullable. Just to fit tests + false, + ), + None, )); } LogicalPlanLanguage::MemberError(params) => { @@ -1161,6 +1176,7 @@ impl LanguageToLogicalPlanConverter { match_list_node!(node_by_id, cube_scan_params[2], CubeScanFilters); fn to_filter( + query_time_dimensions: &mut Vec, filters: Vec, node_by_id: &impl Index, ) -> Result<(Vec, Vec), CubeError> @@ -1177,7 +1193,8 @@ impl LanguageToLogicalPlanConverter { ); let op = match_data_node!(node_by_id, params[1], FilterOpOp); - let (filters, segments) = to_filter(filters, node_by_id)?; + let (filters, segments) = + to_filter(query_time_dimensions, filters, node_by_id)?; match op.as_str() { "and" => { result.push(V1LoadRequestQueryFilterItem { @@ -1229,17 +1246,40 @@ impl LanguageToLogicalPlanConverter { params[2], FilterMemberValues ); - result.push(V1LoadRequestQueryFilterItem { - member: Some(member), - operator: Some(op), - values: if !values.is_empty() { - Some(values) - } else { - None - }, - or: None, - and: None, - }); + if op == "inDateRange" { + let existing_time_dimension = query_time_dimensions + .iter_mut() + .find_map(|mut td| { + if td.dimension == member + && td.date_range.is_none() + { + td.date_range = Some(json!(values)); + Some(td) + } else { + None + } + }); + if existing_time_dimension.is_none() { + let dimension = V1LoadRequestQueryTimeDimension { + dimension: member.to_string(), + granularity: None, + date_range: Some(json!(values)), + }; + query_time_dimensions.push(dimension); + } + } else { + result.push(V1LoadRequestQueryFilterItem { + member: Some(member), + operator: Some(op), + values: if !values.is_empty() { + Some(values) + } else { + None + }, + or: None, + and: None, + }); + } } LogicalPlanLanguage::SegmentMember(params) => { let member = match_data_node!( @@ -1255,7 +1295,8 @@ impl LanguageToLogicalPlanConverter { Ok((result, segments_result)) } - let (filters, segments) = to_filter(filters, node_by_id)?; + let (filters, segments) = + to_filter(&mut query_time_dimensions, filters, node_by_id)?; query.filters = if filters.len() > 0 { Some(filters) @@ -1311,7 +1352,7 @@ impl LanguageToLogicalPlanConverter { }; query.limit = match_data_node!(node_by_id, cube_scan_params[4], CubeScanLimit) - .map(|n| n as i32); + .map(|n| if n > 50000 { 50000 } else { n as i32 }); let aliases = match_data_node!(node_by_id, cube_scan_params[6], CubeScanAliases); @@ -1340,7 +1381,7 @@ impl LanguageToLogicalPlanConverter { fields = new_fields; } - let member_fields = fields.iter().map(|(_, m)| m.to_string()).collect(); + let member_fields = fields.iter().map(|(_, m)| m.clone()).collect(); Arc::new(CubeScanNode::new( Arc::new(DFSchema::new_with_metadata( fields.into_iter().map(|(f, _)| f).collect(), diff --git a/rust/cubesql/cubesql/src/compile/rewrite/cost.rs b/rust/cubesql/cubesql/src/compile/rewrite/cost.rs index ba5f37d16c429..923b010646733 100644 --- a/rust/cubesql/cubesql/src/compile/rewrite/cost.rs +++ b/rust/cubesql/cubesql/src/compile/rewrite/cost.rs @@ -84,7 +84,7 @@ impl CostFunction for BestCubePlan { LogicalPlanLanguage::InnerAggregateSplitReplacer(_) => 1, LogicalPlanLanguage::OuterProjectionSplitReplacer(_) => 1, LogicalPlanLanguage::OuterAggregateSplitReplacer(_) => 1, - LogicalPlanLanguage::ColumnAliasReplacer(_) => 1, + LogicalPlanLanguage::MemberPushdownReplacer(_) => 1, _ => 0, }; diff --git a/rust/cubesql/cubesql/src/compile/rewrite/mod.rs b/rust/cubesql/cubesql/src/compile/rewrite/mod.rs index aef0d38ae67a5..ca468e07ed406 100644 --- a/rust/cubesql/cubesql/src/compile/rewrite/mod.rs +++ b/rust/cubesql/cubesql/src/compile/rewrite/mod.rs @@ -23,7 +23,7 @@ use egg::{ rewrite, Applier, EGraph, Id, Pattern, PatternAst, Rewrite, SearchMatches, Searcher, Subst, Symbol, Var, }; -use std::{collections::HashMap, fmt::Display, slice::Iter, str::FromStr}; +use std::{collections::HashMap, fmt::Display, ops::Index, slice::Iter, str::FromStr}; // trace_macros!(true); @@ -224,6 +224,10 @@ crate::plan_to_language! { name: String, expr: Arc, }, + Segment { + name: String, + expr: Arc, + }, Order { member: String, asc: bool, @@ -257,6 +261,18 @@ crate::plan_to_language! { members: Vec, cube: Arc, }, + MemberPushdownReplacer { + members: Vec, + old_members: Arc, + table_name: String, + target_table_name: String, + }, + ListConcatPushdownReplacer { + members: Arc, + }, + ListConcatPushupReplacer { + members: Arc, + }, TimeDimensionDateRangeReplacer { members: Vec, member: String, @@ -265,24 +281,17 @@ crate::plan_to_language! { FilterReplacer { filters: Vec, cube: Option, + members: Vec, + table_name: String, }, FilterCastUnwrapReplacer { filters: Vec, }, - SaveDateRangeReplacer { - members: Vec, - }, OrderReplacer { sort_expr: Vec, column_name_to_member: Vec<(String, String)>, cube: Option, }, - ColumnAliasReplacer { - members: Vec, - aliases: Vec<(String, String)>, - table_name: Option, - target_table_name: Option, - }, InnerAggregateSplitReplacer { members: Vec, cube: String, @@ -310,6 +319,16 @@ macro_rules! var_iter { }}; } +#[macro_export] +macro_rules! var_list_iter { + ($eclass:expr, $field_variant:ident) => {{ + $eclass.nodes.iter().filter_map(|node| match node { + LogicalPlanLanguage::$field_variant(v) => Some(v), + _ => None, + }) + }}; +} + #[macro_export] macro_rules! var { ($var_str:expr) => { @@ -335,11 +354,35 @@ fn column_name_to_member_name( member_name_to_expr: Vec<(String, Expr)>, table_name: String, ) -> HashMap { + column_name_to_member_vec(member_name_to_expr, table_name) + .into_iter() + .collect::>() +} + +fn column_name_to_member_vec( + member_name_to_expr: Vec<(String, Expr)>, + table_name: String, +) -> Vec<(String, String)> { let mut relation = WithColumnRelation(table_name); member_name_to_expr .into_iter() .map(|(member, expr)| (expr_column_name_with_relation(expr, &mut relation), member)) - .collect::>() + .collect::>() +} + +fn member_name_by_alias( + egraph: &EGraph, + id: Id, + alias: &str, + table_name: String, +) -> Option { + if let Some(member_name_to_expr) = egraph.index(id).data.member_name_to_expr.as_ref() { + let column_name_to_member = + column_name_to_member_name(member_name_to_expr.clone(), table_name); + column_name_to_member.get(alias).cloned() + } else { + None + } } fn referenced_columns(referenced_expr: Vec, table_name: String) -> Vec { @@ -634,20 +677,28 @@ fn filter(expr: impl Display, input: impl Display) -> String { format!("(Filter {} {})", expr, input) } -fn column_alias_replacer( +fn member_replacer(members: impl Display, aliases: impl Display) -> String { + format!("(MemberReplacer {} {})", members, aliases) +} + +fn member_pushdown_replacer( members: impl Display, - aliases: impl Display, + old_members: impl Display, table_name: impl Display, target_table_name: impl Display, ) -> String { format!( - "(ColumnAliasReplacer {} {} {} {})", - members, aliases, table_name, target_table_name + "(MemberPushdownReplacer {} {} {} {})", + members, old_members, table_name, target_table_name ) } -fn member_replacer(members: impl Display, aliases: impl Display) -> String { - format!("(MemberReplacer {} {})", members, aliases) +fn list_concat_pushdown_replacer(members: impl Display) -> String { + format!("(ListConcatPushdownReplacer {})", members) +} + +fn list_concat_pushup_replacer(members: impl Display) -> String { + format!("(ListConcatPushupReplacer {})", members) } fn time_dimension_date_range_replacer( @@ -665,12 +716,16 @@ fn order_replacer(members: impl Display, aliases: impl Display, cube: impl Displ format!("(OrderReplacer {} {} {})", members, aliases, cube) } -fn filter_replacer(members: impl Display, cube: impl Display) -> String { - format!("(FilterReplacer {} {})", members, cube) -} - -fn save_date_range_replacer(members: impl Display) -> String { - format!("(SaveDateRangeReplacer {})", members) +fn filter_replacer( + members: impl Display, + cube: impl Display, + cube_members: impl Display, + table_name: impl Display, +) -> String { + format!( + "(FilterReplacer {} {} {} {})", + members, cube, cube_members, table_name + ) } fn filter_cast_unwrap_replacer(members: impl Display) -> String { @@ -741,6 +796,10 @@ fn dimension_expr(name: impl Display, expr: impl Display) -> String { format!("(Dimension {} {})", name, expr) } +fn segment_expr(name: impl Display, expr: impl Display) -> String { + format!("(Segment {} {})", name, expr) +} + fn time_dimension_expr( name: impl Display, granularity: impl Display, diff --git a/rust/cubesql/cubesql/src/compile/rewrite/rewriter.rs b/rust/cubesql/cubesql/src/compile/rewrite/rewriter.rs index 955ad29b01b92..caf5d7c3da186 100644 --- a/rust/cubesql/cubesql/src/compile/rewrite/rewriter.rs +++ b/rust/cubesql/cubesql/src/compile/rewrite/rewriter.rs @@ -16,9 +16,9 @@ use crate::{ CubeError, }; use datafusion::{logical_plan::LogicalPlan, physical_plan::planner::DefaultPhysicalPlanner}; -use egg::{EGraph, Extractor, Id, IterationData, Language, Rewrite, Runner}; +use egg::{EGraph, Extractor, Id, IterationData, Language, Rewrite, Runner, StopReason}; use itertools::Itertools; -use std::{env, ffi::OsStr, fs, io::Write, sync::Arc}; +use std::{env, ffi::OsStr, fs, io::Write, sync::Arc, time::Duration}; pub struct Rewriter { graph: EGraph, @@ -172,7 +172,7 @@ impl IterDebugInfo { I: IntoIterator, { use std::process::{Command, Stdio}; - let mut child = Command::new("dot") + let mut child = Command::new(env::var("CUBESQL_DOT_PATH").unwrap_or("dot".to_string())) .args(args) .stdin(Stdio::piped()) .stdout(Stdio::null()) @@ -252,8 +252,22 @@ impl Rewriter { self.cube_context.clone(), Arc::new(DefaultPhysicalPlanner::default()), )) - .with_iter_limit(100) - .with_node_limit(10000) + // TODO move config to injector + .with_iter_limit( + env::var("CUBESQL_REWRITE_MAX_ITERATIONS") + .map(|v| v.parse::().unwrap()) + .unwrap_or(300), + ) + .with_node_limit( + env::var("CUBESQL_REWRITE_MAX_NODES") + .map(|v| v.parse::().unwrap()) + .unwrap_or(10000), + ) + .with_time_limit(Duration::from_secs( + env::var("CUBESQL_REWRITE_TIMEOUT") + .map(|v| v.parse::().unwrap()) + .unwrap_or(15), + )) .with_egraph(self.graph.clone()) } @@ -266,6 +280,25 @@ impl Rewriter { let rules = self.rewrite_rules(); let runner = runner.run(rules.iter()); log::debug!("Iterations: {:?}", runner.iterations); + let stop_reason = &runner.iterations[runner.iterations.len() - 1].stop_reason; + let stop_reason = match stop_reason { + None => Some("timeout reached".to_string()), + Some(StopReason::Saturated) => None, + Some(StopReason::NodeLimit(limit)) => Some(format!("{} AST node limit reached", limit)), + Some(StopReason::IterationLimit(limit)) => { + Some(format!("{} iteration limit reached", limit)) + } + Some(StopReason::Other(other)) => Some(other.to_string()), + Some(StopReason::TimeLimit(seconds)) => { + Some(format!("{} seconds timeout reached", seconds)) + } + }; + if let Some(stop_reason) = stop_reason { + return Err(CubeError::user(format!( + "Can't find rewrite due to {}", + stop_reason + ))); + } if IterInfo::egraph_debug_enabled() { let _ = fs::remove_dir_all("egraph-debug"); let _ = fs::create_dir_all("egraph-debug"); diff --git a/rust/cubesql/cubesql/src/compile/rewrite/rules/filters.rs b/rust/cubesql/cubesql/src/compile/rewrite/rules/filters.rs index 389f82102a97c..ab8859350f6e5 100644 --- a/rust/cubesql/cubesql/src/compile/rewrite/rules/filters.rs +++ b/rust/cubesql/cubesql/src/compile/rewrite/rules/filters.rs @@ -3,29 +3,31 @@ use crate::{ engine::provider::CubeContext, rewrite::{ analysis::LogicalPlanAnalysis, between_expr, binary_expr, case_expr, case_expr_var_arg, - cast_expr, column_expr, cube_scan, cube_scan_filters, cube_scan_filters_empty_tail, - cube_scan_members, dimension_expr, filter, filter_cast_unwrap_replacer, filter_member, + cast_expr, column_expr, cube_scan, cube_scan_filters, cube_scan_members, + dimension_expr, expr_column_name, filter, filter_cast_unwrap_replacer, filter_member, filter_op, filter_op_filters, filter_replacer, fun_expr, fun_expr_var_arg, inlist_expr, is_not_null_expr, is_null_expr, limit, literal_expr, literal_string, measure_expr, - not_expr, projection, rewrite, rewriter::RewriteRules, scalar_fun_expr_args, - scalar_fun_expr_args_empty_tail, segment_member, time_dimension_date_range_replacer, - time_dimension_expr, transforming_rewrite, BetweenExprNegated, BinaryExprOp, - ColumnExprColumn, CubeScanLimit, FilterMemberMember, FilterMemberOp, - FilterMemberValues, FilterReplacerCube, InListExprNegated, LimitN, LiteralExprValue, + member_name_by_alias, not_expr, projection, rewrite, rewriter::RewriteRules, + scalar_fun_expr_args, scalar_fun_expr_args_empty_tail, segment_member, + time_dimension_date_range_replacer, time_dimension_expr, transforming_rewrite, + BetweenExprNegated, BinaryExprOp, ColumnExprColumn, CubeScanLimit, CubeScanTableName, + FilterMemberMember, FilterMemberOp, FilterMemberValues, FilterReplacerCube, + FilterReplacerTableName, InListExprNegated, LimitN, LiteralExprValue, LogicalPlanLanguage, SegmentMemberMember, TableScanSourceTableName, TimeDimensionDateRange, TimeDimensionDateRangeReplacerDateRange, TimeDimensionDateRangeReplacerMember, TimeDimensionGranularity, TimeDimensionName, }, }, - transport::{ext::V1CubeMetaExt, MemberType}, + transport::{ext::V1CubeMetaExt, MemberType, MetaContext}, var, var_iter, }; use chrono::{SecondsFormat, TimeZone, Utc}; +use cubeclient::models::V1CubeMeta; use datafusion::{ - logical_plan::{Column, Operator}, + logical_plan::{Column, Expr, Operator}, scalar::ScalarValue, }; -use egg::{EGraph, Rewrite, Subst}; +use egg::{EGraph, Rewrite, Subst, Var}; use std::{fmt::Display, ops::Index, sync::Arc}; pub struct FilterRules { @@ -56,7 +58,12 @@ impl RewriteRules for FilterRules { "?members", cube_scan_filters( "?filters", - filter_replacer(filter_cast_unwrap_replacer("?expr"), "?cube"), + filter_replacer( + filter_cast_unwrap_replacer("?expr"), + "?cube", + "?members", + "?filter_table_name", + ), ), "?order", "?limit", @@ -65,7 +72,13 @@ impl RewriteRules for FilterRules { "?table_name", "?split", ), - self.push_down_filter("?source_table_name", "?expr", "?cube"), + self.push_down_filter( + "?source_table_name", + "?table_name", + "?expr", + "?cube", + "?filter_table_name", + ), ), transforming_rewrite( "push-down-limit-filter", @@ -180,6 +193,8 @@ impl RewriteRules for FilterRules { filter_replacer( binary_expr(column_expr("?column"), "?op", literal_expr("?literal")), "?cube", + "?members", + "?table_name", ), filter_member("?filter_member", "?filter_op", "?filter_values"), self.transform_filter( @@ -187,6 +202,8 @@ impl RewriteRules for FilterRules { "?op", "?literal", "?cube", + "?members", + "?table_name", "?filter_member", "?filter_op", "?filter_values", @@ -197,24 +214,43 @@ impl RewriteRules for FilterRules { filter_replacer( binary_expr(column_expr("?column"), "?op", literal_expr("?literal")), "?cube", + "?members", + "?table_name", ), segment_member("?segment"), - self.transform_segment("?column", "?op", "?literal", "?cube", "?segment"), + self.transform_segment( + "?column", + "?op", + "?literal", + "?cube", + "?members", + "?table_name", + "?segment", + ), ), rewrite( "filter-in-place-filter-to-true-filter", - filter_replacer(column_expr("?column"), "?cube"), + filter_replacer(column_expr("?column"), "?cube", "?members", "?table_name"), filter_replacer( binary_expr(column_expr("?column"), "=", literal_string("true")), "?cube", + "?members", + "?table_name", ), ), rewrite( "filter-in-place-filter-to-false-filter", - filter_replacer(not_expr(column_expr("?column")), "?cube"), + filter_replacer( + not_expr(column_expr("?column")), + "?cube", + "?members", + "?table_name", + ), filter_replacer( binary_expr(column_expr("?column"), "=", literal_string("false")), "?cube", + "?members", + "?table_name", ), ), transforming_rewrite( @@ -222,6 +258,8 @@ impl RewriteRules for FilterRules { filter_replacer( inlist_expr(column_expr("?column"), "?list", "?negated"), "?cube", + "?members", + "?table_name", ), filter_member("?filter_member", "?filter_op", "?filter_values"), self.transform_in_filter( @@ -229,6 +267,8 @@ impl RewriteRules for FilterRules { "?list", "?negated", "?cube", + "?members", + "?table_name", "?filter_member", "?filter_op", "?filter_values", @@ -236,11 +276,18 @@ impl RewriteRules for FilterRules { ), transforming_rewrite( "filter-replacer-is-null", - filter_replacer(is_null_expr(column_expr("?column")), "?cube"), + filter_replacer( + is_null_expr(column_expr("?column")), + "?cube", + "?members", + "?table_name", + ), filter_member("?filter_member", "?filter_op", "?filter_values"), self.transform_is_null( "?column", "?cube", + "?members", + "?table_name", "?filter_member", "?filter_op", "?filter_values", @@ -249,11 +296,18 @@ impl RewriteRules for FilterRules { ), transforming_rewrite( "filter-replacer-is-not-null", - filter_replacer(is_not_null_expr(column_expr("?column")), "?cube"), + filter_replacer( + is_not_null_expr(column_expr("?column")), + "?cube", + "?members", + "?table_name", + ), filter_member("?filter_member", "?filter_op", "?filter_values"), self.transform_is_null( "?column", "?cube", + "?members", + "?table_name", "?filter_member", "?filter_op", "?filter_values", @@ -269,10 +323,14 @@ impl RewriteRules for FilterRules { literal_expr("?literal"), )), "?cube", + "?members", + "?table_name", ), filter_replacer( binary_expr(column_expr("?column"), "!=", literal_expr("?literal")), "?cube", + "?members", + "?table_name", ), ), rewrite( @@ -284,32 +342,58 @@ impl RewriteRules for FilterRules { literal_expr("?literal"), )), "?cube", + "?members", + "?table_name", ), filter_replacer( binary_expr(column_expr("?column"), "=", literal_expr("?literal")), "?cube", + "?members", + "?table_name", ), ), rewrite( "filter-replacer-is-null-negation", - filter_replacer(not_expr(is_null_expr("?expr")), "?cube"), - filter_replacer(is_not_null_expr("?expr"), "?cube"), + filter_replacer( + not_expr(is_null_expr("?expr")), + "?cube", + "?members", + "?table_name", + ), + filter_replacer( + is_not_null_expr("?expr"), + "?cube", + "?members", + "?table_name", + ), ), rewrite( "filter-replacer-is-not-null-negation", - filter_replacer(not_expr(is_not_null_expr("?expr")), "?cube"), - filter_replacer(is_null_expr("?expr"), "?cube"), + filter_replacer( + not_expr(is_not_null_expr("?expr")), + "?cube", + "?members", + "?table_name", + ), + filter_replacer(is_null_expr("?expr"), "?cube", "?members", "?table_name"), ), rewrite( "filter-replacer-double-negation", - filter_replacer(not_expr(not_expr("?expr")), "?cube"), - filter_replacer("?expr", "?cube"), + filter_replacer( + not_expr(not_expr("?expr")), + "?cube", + "?members", + "?table_name", + ), + filter_replacer("?expr", "?cube", "?members", "?table_name"), ), transforming_rewrite( "filter-replacer-between", filter_replacer( between_expr(column_expr("?column"), "?negated", "?low", "?high"), "?cube", + "?members", + "?table_name", ), filter_member("?filter_member", "?filter_op", "?filter_values"), self.transform_between( @@ -318,6 +402,8 @@ impl RewriteRules for FilterRules { "?low", "?high", "?cube", + "?members", + "?table_name", "?filter_member", "?filter_op", "?filter_values", @@ -325,22 +411,32 @@ impl RewriteRules for FilterRules { ), rewrite( "filter-replacer-and", - filter_replacer(binary_expr("?left", "AND", "?right"), "?cube"), + filter_replacer( + binary_expr("?left", "AND", "?right"), + "?cube", + "?members", + "?table_name", + ), filter_op( filter_op_filters( - filter_replacer("?left", "?cube"), - filter_replacer("?right", "?cube"), + filter_replacer("?left", "?cube", "?members", "?table_name"), + filter_replacer("?right", "?cube", "?members", "?table_name"), ), "and", ), ), rewrite( "filter-replacer-or", - filter_replacer(binary_expr("?left", "OR", "?right"), "?cube"), + filter_replacer( + binary_expr("?left", "OR", "?right"), + "?cube", + "?members", + "?table_name", + ), filter_op( filter_op_filters( - filter_replacer("?left", "?cube"), - filter_replacer("?right", "?cube"), + filter_replacer("?left", "?cube", "?members", "?table_name"), + filter_replacer("?right", "?cube", "?members", "?table_name"), ), "or", ), @@ -358,10 +454,14 @@ impl RewriteRules for FilterRules { literal_expr("?zero"), ), "?cube", + "?members", + "?table_name", ), filter_replacer( binary_expr(column_expr("?column"), "LIKE", literal_expr("?value")), "?cube", + "?members", + "?table_name", ), ), rewrite( @@ -379,6 +479,8 @@ impl RewriteRules for FilterRules { literal_expr("?zero"), ), "?cube", + "?members", + "?table_name", ), filter_replacer( binary_expr( @@ -390,6 +492,8 @@ impl RewriteRules for FilterRules { literal_expr("?zero"), ), "?cube", + "?members", + "?table_name", ), ), rewrite( @@ -413,6 +517,8 @@ impl RewriteRules for FilterRules { literal_expr("?zero"), ), "?cube", + "?members", + "?table_name", ), filter_replacer( binary_expr( @@ -424,6 +530,8 @@ impl RewriteRules for FilterRules { literal_expr("?zero"), ), "?cube", + "?members", + "?table_name", ), ), // Every expression should be handled by filter cast unwrap replacer otherwise other rules just won't work @@ -553,19 +661,21 @@ impl RewriteRules for FilterRules { filter_member("?member", "FilterMemberOp:inDateRange", "?date_range"), self.merge_date_range("?date_range_start", "?date_range_end", "?date_range"), ), - transforming_rewrite( - "in-date-range-to-time-dimension", - filter_member("?member", "FilterMemberOp:inDateRange", "?date_range"), - time_dimension_date_range_replacer( - cube_scan_filters_empty_tail(), - "?time_dimension_member", - "?time_dimension_date_range", + rewrite( + "filter-replacer-in-date-range-inverse", + filter_op( + filter_op_filters( + filter_member("?member", "FilterMemberOp:beforeDate", "?date_range_end"), + filter_member("?member", "FilterMemberOp:afterDate", "?date_range_start"), + ), + "and", ), - self.filter_to_time_dimension_range( - "?member", - "?date_range", - "?time_dimension_member", - "?time_dimension_date_range", + filter_op( + filter_op_filters( + filter_member("?member", "FilterMemberOp:afterDate", "?date_range_start"), + filter_member("?member", "FilterMemberOp:beforeDate", "?date_range_end"), + ), + "and", ), ), rewrite( @@ -615,7 +725,7 @@ impl RewriteRules for FilterRules { "?offset", "?aliases", "?table_name", - "?split", + "CubeScanSplit:false", ), cube_scan( "?source_table_name", @@ -630,7 +740,7 @@ impl RewriteRules for FilterRules { "?offset", "?aliases", "?table_name", - "?split", + "CubeScanSplit:false", ), ), transforming_rewrite( @@ -753,25 +863,46 @@ impl FilterRules { fn push_down_filter( &self, + source_table_name_var: &'static str, table_name_var: &'static str, exp_var: &'static str, cube_var: &'static str, + filter_table_name_var: &'static str, ) -> impl Fn(&mut EGraph, &mut Subst) -> bool { + let source_table_name_var = var!(source_table_name_var); let table_name_var = var!(table_name_var); let exp_var = var!(exp_var); let cube_var = var!(cube_var); + let filter_table_name_var = var!(filter_table_name_var); move |egraph, subst| { - for table_name in var_iter!(egraph[subst[table_name_var]], TableScanSourceTableName) { - if let Some(_referenced_expr) = &egraph.index(subst[exp_var]).data.referenced_expr { - let table_name = table_name.to_string(); - // TODO check referenced_expr - subst.insert( - cube_var, - egraph.add(LogicalPlanLanguage::FilterReplacerCube(FilterReplacerCube( - Some(table_name.to_string()), - ))), - ); - return true; + for cube in var_iter!( + egraph[subst[source_table_name_var]], + TableScanSourceTableName + ) + .cloned() + { + for table_name in + var_iter!(egraph[subst[table_name_var]], CubeScanTableName).cloned() + { + if let Some(_referenced_expr) = + &egraph.index(subst[exp_var]).data.referenced_expr + { + // TODO check referenced_expr + subst.insert( + cube_var, + egraph.add(LogicalPlanLanguage::FilterReplacerCube( + FilterReplacerCube(Some(cube.to_string())), + )), + ); + + subst.insert( + filter_table_name_var, + egraph.add(LogicalPlanLanguage::FilterReplacerTableName( + FilterReplacerTableName(table_name.to_string()), + )), + ); + return true; + } } } false @@ -811,6 +942,8 @@ impl FilterRules { op_var: &'static str, literal_var: &'static str, cube_var: &'static str, + members_var: &'static str, + table_name_var: &'static str, filter_member_var: &'static str, filter_op_var: &'static str, filter_values_var: &'static str, @@ -819,102 +952,104 @@ impl FilterRules { let op_var = op_var.parse().unwrap(); let literal_var = literal_var.parse().unwrap(); let cube_var = cube_var.parse().unwrap(); + let members_var = var!(members_var); + let table_name_var = var!(table_name_var); let filter_member_var = filter_member_var.parse().unwrap(); let filter_op_var = filter_op_var.parse().unwrap(); let filter_values_var = filter_values_var.parse().unwrap(); let meta_context = self.cube_context.meta.clone(); move |egraph, subst| { - for cube in var_iter!(egraph[subst[cube_var]], FilterReplacerCube) { - for expr_op in var_iter!(egraph[subst[op_var]], BinaryExprOp) { - for literal in var_iter!(egraph[subst[literal_var]], LiteralExprValue) { - if let Some(cube) = cube - .as_ref() - .and_then(|cube| meta_context.find_cube_with_name(cube.to_string())) - { - for column in var_iter!(egraph[subst[column_var]], ColumnExprColumn) { - let member_name = format!("{}.{}", cube.name, column.name); - if let Some(member_type) = cube.member_type(&member_name) { - // Segments are handled by separate rule - if cube.lookup_measure(&column.name).is_some() - || cube.lookup_dimension(&column.name).is_some() - { - let op = match expr_op { - Operator::Eq => "equals", - Operator::NotEq => "notEquals", - Operator::Lt => "lt", - Operator::LtEq => "lte", - Operator::Gt => "gt", - Operator::GtEq => "gte", - Operator::Like => "contains", - Operator::NotLike => "notContains", - _ => { - continue; - } - }; + for expr_op in var_iter!(egraph[subst[op_var]], BinaryExprOp) { + for literal in var_iter!(egraph[subst[literal_var]], LiteralExprValue) { + if let Some((member_name, cube)) = Self::filter_member_name( + egraph, + subst, + &meta_context, + cube_var, + column_var, + members_var, + table_name_var, + ) { + if let Some(member_type) = cube.member_type(&member_name) { + // Segments are handled by separate rule + if cube.lookup_measure_by_member_name(&member_name).is_some() + || cube.lookup_dimension_by_member_name(&member_name).is_some() + { + let op = match expr_op { + Operator::Eq => "equals", + Operator::NotEq => "notEquals", + Operator::Lt => "lt", + Operator::LtEq => "lte", + Operator::Gt => "gt", + Operator::GtEq => "gte", + Operator::Like => "contains", + Operator::NotLike => "notContains", + _ => { + continue; + } + }; - let op = match member_type { - MemberType::String => op, - MemberType::Number => op, - MemberType::Boolean => op, - MemberType::Time => match expr_op { - Operator::Lt => "beforeDate", - Operator::LtEq => "beforeDate", - Operator::Gt => "afterDate", - Operator::GtEq => "afterDate", - _ => op, - }, - }; + let op = match member_type { + MemberType::String => op, + MemberType::Number => op, + MemberType::Boolean => op, + MemberType::Time => match expr_op { + Operator::Lt => "beforeDate", + Operator::LtEq => "beforeDate", + Operator::Gt => "afterDate", + Operator::GtEq => "afterDate", + _ => op, + }, + }; - let value = match literal { - ScalarValue::Utf8(Some(value)) => value.to_string(), - ScalarValue::Int64(Some(value)) => value.to_string(), - ScalarValue::Boolean(Some(value)) => value.to_string(), - ScalarValue::Float64(Some(value)) => value.to_string(), - ScalarValue::TimestampNanosecond(Some(value), _) => { - let minus_one = Utc - .timestamp_nanos(*value - 1000) - .to_rfc3339_opts(SecondsFormat::Millis, true); - let value = Utc - .timestamp_nanos(*value) - .to_rfc3339_opts(SecondsFormat::Millis, true); + let value = match literal { + ScalarValue::Utf8(Some(value)) => value.to_string(), + ScalarValue::Int64(Some(value)) => value.to_string(), + ScalarValue::Boolean(Some(value)) => value.to_string(), + ScalarValue::Float64(Some(value)) => value.to_string(), + ScalarValue::TimestampNanosecond(Some(value), _) => { + let minus_one = Utc + .timestamp_nanos(*value - 1000) + .to_rfc3339_opts(SecondsFormat::Millis, true); + let value = Utc + .timestamp_nanos(*value) + .to_rfc3339_opts(SecondsFormat::Millis, true); - match expr_op { - Operator::Lt => minus_one, - Operator::LtEq => minus_one, - Operator::Gt => value, - Operator::GtEq => value, - _ => { - continue; - } - } + match expr_op { + Operator::Lt => minus_one, + Operator::LtEq => minus_one, + Operator::Gt => value, + Operator::GtEq => value, + _ => { + continue; } - x => panic!("Unsupported filter scalar: {:?}", x), - }; + } + } + x => panic!("Unsupported filter scalar: {:?}", x), + }; - subst.insert( - filter_member_var, - egraph.add(LogicalPlanLanguage::FilterMemberMember( - FilterMemberMember(member_name.to_string()), - )), - ); + subst.insert( + filter_member_var, + egraph.add(LogicalPlanLanguage::FilterMemberMember( + FilterMemberMember(member_name.to_string()), + )), + ); - subst.insert( - filter_op_var, - egraph.add(LogicalPlanLanguage::FilterMemberOp( - FilterMemberOp(op.to_string()), - )), - ); + subst.insert( + filter_op_var, + egraph.add(LogicalPlanLanguage::FilterMemberOp( + FilterMemberOp(op.to_string()), + )), + ); - subst.insert( - filter_values_var, - egraph.add(LogicalPlanLanguage::FilterMemberValues( - FilterMemberValues(vec![value.to_string()]), - )), - ); + subst.insert( + filter_values_var, + egraph.add(LogicalPlanLanguage::FilterMemberValues( + FilterMemberValues(vec![value.to_string()]), + )), + ); - return true; - } - } + return true; } } } @@ -931,49 +1066,47 @@ impl FilterRules { op_var: &'static str, literal_var: &'static str, cube_var: &'static str, + members_var: &'static str, + table_name_var: &'static str, segment_member_var: &'static str, ) -> impl Fn(&mut EGraph, &mut Subst) -> bool { let column_var = column_var.parse().unwrap(); let op_var = op_var.parse().unwrap(); let literal_var = literal_var.parse().unwrap(); let cube_var = cube_var.parse().unwrap(); + let members_var = var!(members_var); + let table_name_var = var!(table_name_var); let segment_member_var = segment_member_var.parse().unwrap(); let meta_context = self.cube_context.meta.clone(); move |egraph, subst| { - for cube in var_iter!(egraph[subst[cube_var]], FilterReplacerCube) { - for expr_op in var_iter!(egraph[subst[op_var]], BinaryExprOp) { - for literal in var_iter!(egraph[subst[literal_var]], LiteralExprValue) { - if let Some(cube) = cube - .as_ref() - .and_then(|cube| meta_context.find_cube_with_name(cube.to_string())) + for expr_op in var_iter!(egraph[subst[op_var]], BinaryExprOp) { + for literal in var_iter!(egraph[subst[literal_var]], LiteralExprValue) { + if expr_op == &Operator::Eq { + if literal == &ScalarValue::Boolean(Some(true)) + || literal == &ScalarValue::Utf8(Some("true".to_string())) { - if expr_op == &Operator::Eq { - if literal == &ScalarValue::Boolean(Some(true)) - || literal == &ScalarValue::Utf8(Some("true".to_string())) + if let Some((member_name, cube)) = Self::filter_member_name( + egraph, + subst, + &meta_context, + cube_var, + column_var, + members_var, + table_name_var, + ) { + if let Some(_) = cube + .segments + .iter() + .find(|s| s.name.eq_ignore_ascii_case(&member_name)) { - for column in - var_iter!(egraph[subst[column_var]], ColumnExprColumn) - { - let member_name = format!("{}.{}", cube.name, column.name); - if let Some(_) = cube - .segments - .iter() - .find(|s| s.name.eq_ignore_ascii_case(&member_name)) - { - subst.insert( - segment_member_var, - egraph.add( - LogicalPlanLanguage::SegmentMemberMember( - SegmentMemberMember( - member_name.to_string(), - ), - ), - ), - ); + subst.insert( + segment_member_var, + egraph.add(LogicalPlanLanguage::SegmentMemberMember( + SegmentMemberMember(member_name.to_string()), + )), + ); - return true; - } - } + return true; } } } @@ -991,6 +1124,8 @@ impl FilterRules { list_var: &'static str, negated_var: &'static str, cube_var: &'static str, + members_var: &'static str, + table_name_var: &'static str, filter_member_var: &'static str, filter_op_var: &'static str, filter_values_var: &'static str, @@ -999,57 +1134,57 @@ impl FilterRules { let list_var = var!(list_var); let negated_var = var!(negated_var); let cube_var = var!(cube_var); + let members_var = var!(members_var); + let table_name_var = var!(table_name_var); let filter_member_var = var!(filter_member_var); let filter_op_var = var!(filter_op_var); let filter_values_var = var!(filter_values_var); let meta_context = self.cube_context.meta.clone(); move |egraph, subst| { - for cube in var_iter!(egraph[subst[cube_var]], FilterReplacerCube) { - if let Some(cube) = cube - .as_ref() - .and_then(|cube| meta_context.find_cube_with_name(cube.to_string())) - { - if let Some(list) = &egraph[subst[list_var]].data.constant_in_list { - let values = list - .into_iter() - .map(|literal| FilterRules::scalar_to_value(literal)) - .collect::>(); + if let Some(list) = &egraph[subst[list_var]].data.constant_in_list { + let values = list + .into_iter() + .map(|literal| FilterRules::scalar_to_value(literal)) + .collect::>(); - for column in var_iter!(egraph[subst[column_var]], ColumnExprColumn) { - let member_name = format!("{}.{}", cube.name, column.name); - if cube.contains_member(&member_name) { - for negated in - var_iter!(egraph[subst[negated_var]], InListExprNegated) - { - let negated = *negated; - subst.insert( - filter_member_var, - egraph.add(LogicalPlanLanguage::FilterMemberMember( - FilterMemberMember(member_name.to_string()), - )), - ); + if let Some((member_name, cube)) = Self::filter_member_name( + egraph, + subst, + &meta_context, + cube_var, + column_var, + members_var, + table_name_var, + ) { + if cube.contains_member(&member_name) { + for negated in var_iter!(egraph[subst[negated_var]], InListExprNegated) { + let negated = *negated; + subst.insert( + filter_member_var, + egraph.add(LogicalPlanLanguage::FilterMemberMember( + FilterMemberMember(member_name.to_string()), + )), + ); - subst.insert( - filter_op_var, - egraph.add(LogicalPlanLanguage::FilterMemberOp( - FilterMemberOp(if negated { - "notEquals".to_string() - } else { - "equals".to_string() - }), - )), - ); + subst.insert( + filter_op_var, + egraph.add(LogicalPlanLanguage::FilterMemberOp(FilterMemberOp( + if negated { + "notEquals".to_string() + } else { + "equals".to_string() + }, + ))), + ); - subst.insert( - filter_values_var, - egraph.add(LogicalPlanLanguage::FilterMemberValues( - FilterMemberValues(values), - )), - ); + subst.insert( + filter_values_var, + egraph.add(LogicalPlanLanguage::FilterMemberValues( + FilterMemberValues(values), + )), + ); - return true; - } - } + return true; } } } @@ -1073,6 +1208,8 @@ impl FilterRules { &self, column_var: &'static str, cube_var: &'static str, + members_var: &'static str, + table_name_var: &'static str, filter_member_var: &'static str, filter_op_var: &'static str, filter_values_var: &'static str, @@ -1080,47 +1217,49 @@ impl FilterRules { ) -> impl Fn(&mut EGraph, &mut Subst) -> bool { let column_var = var!(column_var); let cube_var = var!(cube_var); + let members_var = var!(members_var); + let table_name_var = var!(table_name_var); let filter_member_var = var!(filter_member_var); let filter_op_var = var!(filter_op_var); let filter_values_var = var!(filter_values_var); let meta_context = self.cube_context.meta.clone(); move |egraph, subst| { - for cube in var_iter!(egraph[subst[cube_var]], FilterReplacerCube) { - if let Some(cube) = cube - .as_ref() - .and_then(|cube| meta_context.find_cube_with_name(cube.to_string())) - { - for column in var_iter!(egraph[subst[column_var]], ColumnExprColumn) { - let member_name = format!("{}.{}", cube.name, column.name); - if cube.contains_member(&member_name) { - subst.insert( - filter_member_var, - egraph.add(LogicalPlanLanguage::FilterMemberMember( - FilterMemberMember(member_name.to_string()), - )), - ); + if let Some((member_name, cube)) = Self::filter_member_name( + egraph, + subst, + &meta_context, + cube_var, + column_var, + members_var, + table_name_var, + ) { + if cube.contains_member(&member_name) { + subst.insert( + filter_member_var, + egraph.add(LogicalPlanLanguage::FilterMemberMember(FilterMemberMember( + member_name.to_string(), + ))), + ); - subst.insert( - filter_op_var, - egraph.add(LogicalPlanLanguage::FilterMemberOp(FilterMemberOp( - if is_null_op { - "notSet".to_string() - } else { - "set".to_string() - }, - ))), - ); + subst.insert( + filter_op_var, + egraph.add(LogicalPlanLanguage::FilterMemberOp(FilterMemberOp( + if is_null_op { + "notSet".to_string() + } else { + "set".to_string() + }, + ))), + ); - subst.insert( - filter_values_var, - egraph.add(LogicalPlanLanguage::FilterMemberValues( - FilterMemberValues(Vec::new()), - )), - ); + subst.insert( + filter_values_var, + egraph.add(LogicalPlanLanguage::FilterMemberValues(FilterMemberValues( + Vec::new(), + ))), + ); - return true; - } - } + return true; } } @@ -1128,6 +1267,45 @@ impl FilterRules { } } + fn filter_member_name( + egraph: &EGraph, + subst: &Subst, + meta_context: &Arc, + cube_var: Var, + column_var: Var, + members_var: Var, + table_name_var: Var, + ) -> Option<(String, V1CubeMeta)> { + for cube in var_iter!(egraph[subst[cube_var]], FilterReplacerCube) { + if let Some(cube) = cube + .as_ref() + .and_then(|cube| meta_context.find_cube_with_name(cube.to_string())) + { + for column in var_iter!(egraph[subst[column_var]], ColumnExprColumn).cloned() { + for table_name in + var_iter!(egraph[subst[table_name_var]], FilterReplacerTableName).cloned() + { + let alias_name = expr_column_name( + Expr::Column(column.clone()), + &Some(table_name.to_string()), + ); + let member_name = member_name_by_alias( + egraph, + subst[members_var], + &alias_name, + table_name.to_string(), + ) + .unwrap_or(format!("{}.{}", cube.name, column.name)); + + return Some((member_name, cube)); + } + } + } + } + + None + } + fn transform_between( &self, column_var: &'static str, @@ -1135,6 +1313,8 @@ impl FilterRules { low_var: &'static str, high_var: &'static str, cube_var: &'static str, + members_var: &'static str, + table_name_var: &'static str, filter_member_var: &'static str, filter_op_var: &'static str, filter_values_var: &'static str, @@ -1144,57 +1324,58 @@ impl FilterRules { let low_var = var!(low_var); let high_var = var!(high_var); let cube_var = var!(cube_var); + let members_var = var!(members_var); + let table_name_var = var!(table_name_var); let filter_member_var = var!(filter_member_var); let filter_op_var = var!(filter_op_var); let filter_values_var = var!(filter_values_var); let meta_context = self.cube_context.meta.clone(); move |egraph, subst| { - for cube in var_iter!(egraph[subst[cube_var]], FilterReplacerCube) { - if let Some(cube) = cube - .as_ref() - .and_then(|cube| meta_context.find_cube_with_name(cube.to_string())) - { - for column in var_iter!(egraph[subst[column_var]], ColumnExprColumn) { - let member_name = format!("{}.{}", cube.name, column.name); - if let Some(_) = cube.lookup_dimension(&member_name) { - for negated in var_iter!(egraph[subst[negated_var]], BetweenExprNegated) - { - let negated = *negated; - if let Some(low) = &egraph[subst[low_var]].data.constant { - if let Some(high) = &egraph[subst[high_var]].data.constant { - let values = vec![ - FilterRules::scalar_to_value(&low), - FilterRules::scalar_to_value(&high), - ]; + if let Some((member_name, cube)) = Self::filter_member_name( + egraph, + subst, + &meta_context, + cube_var, + column_var, + members_var, + table_name_var, + ) { + if let Some(_) = cube.lookup_dimension(&member_name) { + for negated in var_iter!(egraph[subst[negated_var]], BetweenExprNegated) { + let negated = *negated; + if let Some(low) = &egraph[subst[low_var]].data.constant { + if let Some(high) = &egraph[subst[high_var]].data.constant { + let values = vec![ + FilterRules::scalar_to_value(&low), + FilterRules::scalar_to_value(&high), + ]; - subst.insert( - filter_member_var, - egraph.add(LogicalPlanLanguage::FilterMemberMember( - FilterMemberMember(member_name.to_string()), - )), - ); + subst.insert( + filter_member_var, + egraph.add(LogicalPlanLanguage::FilterMemberMember( + FilterMemberMember(member_name.to_string()), + )), + ); - subst.insert( - filter_op_var, - egraph.add(LogicalPlanLanguage::FilterMemberOp( - FilterMemberOp(if negated { - "notInDateRange".to_string() - } else { - "inDateRange".to_string() - }), - )), - ); + subst.insert( + filter_op_var, + egraph.add(LogicalPlanLanguage::FilterMemberOp( + FilterMemberOp(if negated { + "notInDateRange".to_string() + } else { + "inDateRange".to_string() + }), + )), + ); - subst.insert( - filter_values_var, - egraph.add(LogicalPlanLanguage::FilterMemberValues( - FilterMemberValues(values), - )), - ); + subst.insert( + filter_values_var, + egraph.add(LogicalPlanLanguage::FilterMemberValues( + FilterMemberValues(values), + )), + ); - return true; - } - } + return true; } } } @@ -1238,46 +1419,6 @@ impl FilterRules { } } - fn filter_to_time_dimension_range( - &self, - member_var: &'static str, - date_range_var: &'static str, - time_dimension_member_var: &'static str, - time_dimension_date_range_var: &'static str, - ) -> impl Fn(&mut EGraph, &mut Subst) -> bool { - let member_var = member_var.parse().unwrap(); - let date_range_var = date_range_var.parse().unwrap(); - let time_dimension_member_var = time_dimension_member_var.parse().unwrap(); - let time_dimension_date_range_var = time_dimension_date_range_var.parse().unwrap(); - move |egraph, subst| { - for member in var_iter!(egraph[subst[member_var]], FilterMemberMember) { - let member = member.to_string(); - for date_range in var_iter!(egraph[subst[date_range_var]], FilterMemberValues) { - let date_range = date_range.clone(); - subst.insert( - time_dimension_member_var, - egraph.add(LogicalPlanLanguage::TimeDimensionDateRangeReplacerMember( - TimeDimensionDateRangeReplacerMember(member.to_string()), - )), - ); - - subst.insert( - time_dimension_date_range_var, - egraph.add( - LogicalPlanLanguage::TimeDimensionDateRangeReplacerDateRange( - TimeDimensionDateRangeReplacerDateRange(date_range.clone()), - ), - ), - ); - - return true; - } - } - - false - } - } - fn push_down_time_dimension_replacer_new_time_dimension( &self, members_var: &'static str, diff --git a/rust/cubesql/cubesql/src/compile/rewrite/rules/members.rs b/rust/cubesql/cubesql/src/compile/rewrite/rules/members.rs index ab1d5df275c5f..c605483b6b059 100644 --- a/rust/cubesql/cubesql/src/compile/rewrite/rules/members.rs +++ b/rust/cubesql/cubesql/src/compile/rewrite/rules/members.rs @@ -3,33 +3,39 @@ use crate::{ engine::provider::CubeContext, rewrite::{ agg_fun_expr, aggr_aggr_expr, aggr_aggr_expr_empty_tail, aggr_group_expr, - aggr_group_expr_empty_tail, aggregate, alias_expr, analysis::LogicalPlanAnalysis, - binary_expr, cast_expr, column_alias_replacer, column_expr, column_name_to_member_name, - cube_scan, cube_scan_filters_empty_tail, cube_scan_members, + aggr_group_expr_empty_tail, aggregate, alias_expr, + analysis::LogicalPlanAnalysis, + binary_expr, cast_expr, column_expr, column_name_to_member_name, + column_name_to_member_vec, cube_scan, cube_scan_filters_empty_tail, cube_scan_members, cube_scan_members_empty_tail, cube_scan_order_empty_tail, dimension_expr, - expr_column_name, expr_column_name_with_relation, fun_expr, limit, literal_expr, - measure_expr, member_replacer, original_expr_name, projection, projection_expr, - projection_expr_empty_tail, referenced_columns, rewrite, rewriter::RewriteRules, - save_date_range_replacer, table_scan, time_dimension_expr, transforming_chain_rewrite, + expr_column_name, expr_column_name_with_relation, fun_expr, limit, + list_concat_pushdown_replacer, list_concat_pushup_replacer, literal_expr, measure_expr, + member_pushdown_replacer, member_replacer, original_expr_name, projection, + projection_expr, projection_expr_empty_tail, referenced_columns, rewrite, + rewriter::RewriteRules, + rules::{replacer_push_down_node, replacer_push_down_node_substitute_rules}, + segment_expr, table_scan, time_dimension_expr, transforming_chain_rewrite, transforming_rewrite, udaf_expr, AggregateFunctionExprDistinct, - AggregateFunctionExprFun, AliasExprAlias, ColumnAliasReplacerAliases, - ColumnAliasReplacerTableName, ColumnAliasReplacerTargetTableName, ColumnExprColumn, - CubeScanAliases, CubeScanLimit, CubeScanTableName, DimensionName, LimitN, - LiteralExprValue, LogicalPlanLanguage, MeasureName, MemberErrorError, - MemberErrorPriority, ProjectionAlias, TableScanSourceTableName, TableScanTableName, + AggregateFunctionExprFun, AliasExprAlias, ColumnExprColumn, CubeScanAliases, + CubeScanLimit, CubeScanTableName, DimensionName, LimitN, LiteralExprValue, + LogicalPlanLanguage, MeasureName, MemberErrorError, MemberErrorPriority, + MemberPushdownReplacerTableName, MemberPushdownReplacerTargetTableName, + ProjectionAlias, SegmentName, TableScanSourceTableName, TableScanTableName, TimeDimensionDateRange, TimeDimensionGranularity, TimeDimensionName, WithColumnRelation, }, }, - transport::{V1CubeMetaDimensionExt, V1CubeMetaMeasureExt, V1CubeMetaSegmentExt}, - var, var_iter, CubeError, + transport::{V1CubeMetaDimensionExt, V1CubeMetaMeasureExt}, + var, var_iter, var_list_iter, CubeError, }; +use cubeclient::models::V1CubeMetaMeasure; use datafusion::{ - logical_plan::{Column, DFSchema}, + logical_plan::{Column, DFSchema, Expr}, physical_plan::aggregates::AggregateFunction, scalar::ScalarValue, }; -use egg::{EGraph, Id, Rewrite, Subst}; +use egg::{EGraph, Id, Rewrite, Subst, Var}; +use itertools::Itertools; use std::{collections::HashSet, ops::Index, sync::Arc}; pub struct MemberRules { @@ -38,7 +44,7 @@ pub struct MemberRules { impl RewriteRules for MemberRules { fn rewrite_rules(&self) -> Vec> { - vec![ + let mut rules = vec![ transforming_rewrite( "cube-scan", table_scan( @@ -147,34 +153,6 @@ impl RewriteRules for MemberRules { "?member".to_string(), self.transform_projection_member("?source_table_name", "?column", None, "?member"), ), - transforming_rewrite( - "projection-segment", - member_replacer( - projection_expr(column_expr("?column"), "?tail_group_expr"), - "?source_table_name", - ), - member_replacer("?tail_group_expr", "?source_table_name"), - self.transform_segment("?source_table_name", "?column"), - ), - // TODO this rule only for group by segment error - transforming_chain_rewrite( - "member-replacer-dimension", - member_replacer( - aggr_group_expr("?original_expr", "?tail_group_expr"), - "?source_table_name", - ), - vec![("?original_expr", column_expr("?column"))], - cube_scan_members( - "?dimension", - member_replacer("?tail_group_expr", "?source_table_name"), - ), - self.transform_dimension( - "?source_table_name", - "?column", - "?dimension", - "?original_expr", - ), - ), transforming_chain_rewrite( "date-trunc", member_replacer("?original_expr", "?source_table_name"), @@ -316,11 +294,18 @@ impl RewriteRules for MemberRules { cube_scan( "?source_table_name", cube_scan_members( - cube_scan_members( - member_replacer("?group_expr", "?source_table_name"), - member_replacer("?aggr_expr", "?source_table_name"), + member_pushdown_replacer( + "?group_expr", + list_concat_pushdown_replacer("?old_members"), + "?member_pushdown_replacer_table_name", + "?member_pushdown_replacer_target_table_name", + ), + member_pushdown_replacer( + "?aggr_expr", + list_concat_pushdown_replacer("?old_members"), + "?member_pushdown_replacer_table_name", + "?member_pushdown_replacer_target_table_name", ), - save_date_range_replacer("?old_members"), ), "?filters", "?orders", @@ -335,6 +320,8 @@ impl RewriteRules for MemberRules { "?group_expr", "?aggr_expr", "?old_members", + "?member_pushdown_replacer_table_name", + "?member_pushdown_replacer_target_table_name", ), ), transforming_rewrite( @@ -386,7 +373,12 @@ impl RewriteRules for MemberRules { ), cube_scan( "?source_table_name", - column_alias_replacer("?members", "?aliases", "?cube", "?target_table_name"), + member_pushdown_replacer( + "?expr", + list_concat_pushdown_replacer("?members"), + "?member_pushdown_table_name", + "?target_table_name", + ), "?filters", "?orders", "?limit", @@ -399,11 +391,10 @@ impl RewriteRules for MemberRules { "?expr", "?members", "?aliases", - "?cube", - "?cube_aliases", "?alias", "?table_name", "?new_table_name", + "?member_pushdown_table_name", "?target_table_name", ), ), @@ -436,207 +427,6 @@ impl RewriteRules for MemberRules { ), self.push_down_limit("?limit", "?new_limit"), ), - rewrite( - "alias-replacer-split", - column_alias_replacer( - cube_scan_members( - cube_scan_members("?members_left", "?tail_left"), - cube_scan_members("?members_right", "?tail_right"), - ), - "?aliases", - "?cube", - "?target_table_name", - ), - cube_scan_members( - column_alias_replacer( - cube_scan_members("?members_left", "?tail_left"), - "?aliases", - "?cube", - "?target_table_name", - ), - column_alias_replacer( - cube_scan_members("?members_right", "?tail_right"), - "?aliases", - "?cube", - "?target_table_name", - ), - ), - ), - rewrite( - "alias-replacer-split-left-empty", - column_alias_replacer( - cube_scan_members( - cube_scan_members_empty_tail(), - cube_scan_members("?members_right", "?tail_right"), - ), - "?aliases", - "?cube", - "?target_table_name", - ), - cube_scan_members( - cube_scan_members_empty_tail(), - column_alias_replacer( - cube_scan_members("?members_right", "?tail_right"), - "?aliases", - "?cube", - "?target_table_name", - ), - ), - ), - rewrite( - "alias-replacer-split-right-empty", - column_alias_replacer( - cube_scan_members( - cube_scan_members("?members_left", "?tail_left"), - cube_scan_members_empty_tail(), - ), - "?aliases", - "?cube", - "?target_table_name", - ), - cube_scan_members( - column_alias_replacer( - cube_scan_members("?members_left", "?tail_left"), - "?aliases", - "?cube", - "?target_table_name", - ), - cube_scan_members_empty_tail(), - ), - ), - transforming_rewrite( - "alias-replacer-measure", - column_alias_replacer( - cube_scan_members(measure_expr("?measure", "?expr"), "?tail_group_expr"), - "?aliases", - "?cube", - "?target_table_name", - ), - cube_scan_members( - measure_expr("?measure", "?replaced_alias_expr"), - column_alias_replacer( - "?tail_group_expr", - "?aliases", - "?cube", - "?target_table_name", - ), - ), - self.replace_projection_alias( - "?expr", - "?aliases", - "?cube", - "?target_table_name", - "?replaced_alias_expr", - ), - ), - transforming_rewrite( - "alias-replacer-dimension", - column_alias_replacer( - cube_scan_members(dimension_expr("?dimension", "?expr"), "?tail_group_expr"), - "?aliases", - "?cube", - "?target_table_name", - ), - cube_scan_members( - dimension_expr("?dimension", "?replaced_alias_expr"), - column_alias_replacer( - "?tail_group_expr", - "?aliases", - "?cube", - "?target_table_name", - ), - ), - self.replace_projection_alias( - "?expr", - "?aliases", - "?cube", - "?target_table_name", - "?replaced_alias_expr", - ), - ), - transforming_rewrite( - "alias-replacer-time-dimension", - column_alias_replacer( - cube_scan_members( - time_dimension_expr( - "?time_dimension_name", - "?time_dimension_granularity", - "?date_range", - "?expr", - ), - "?tail_group_expr", - ), - "?aliases", - "?cube", - "?target_table_name", - ), - cube_scan_members( - time_dimension_expr( - "?time_dimension_name", - "?time_dimension_granularity", - "?date_range", - "?replaced_alias_expr", - ), - column_alias_replacer( - "?tail_group_expr", - "?aliases", - "?cube", - "?target_table_name", - ), - ), - self.replace_projection_alias( - "?expr", - "?aliases", - "?cube", - "?target_table_name", - "?replaced_alias_expr", - ), - ), - rewrite( - "alias-replacer-tail", - column_alias_replacer( - cube_scan_members_empty_tail(), - "?aliases", - "?cube", - "?target_table_name", - ), - cube_scan_members_empty_tail(), - ), - // SaveDateRangeReplacer - rewrite( - "save-date-range-replacer-push-down", - save_date_range_replacer(cube_scan_members("?left", "?right")), - cube_scan_members( - save_date_range_replacer("?left"), - save_date_range_replacer("?right"), - ), - ), - rewrite( - "save-date-range-replacer-push-down-empty-tail", - save_date_range_replacer(cube_scan_members_empty_tail()), - cube_scan_members_empty_tail(), - ), - rewrite( - "save-date-range-replacer-push-down-measure", - save_date_range_replacer(measure_expr("?name", "?expr")), - cube_scan_members_empty_tail(), - ), - rewrite( - "save-date-range-replacer-push-down-dimension", - save_date_range_replacer(dimension_expr("?name", "?expr")), - cube_scan_members_empty_tail(), - ), - transforming_rewrite( - "save-date-range-replacer-push-down-time-dimension", - save_date_range_replacer(time_dimension_expr( - "?name", - "?granularity", - "?date_range", - "?expr", - )), - "?new_time_dimension".to_string(), - self.save_date_range("?name", "?date_range", "?expr", "?new_time_dimension"), - ), // Empty tail merges rewrite( "merge-member-empty-tails", @@ -673,7 +463,10 @@ impl RewriteRules for MemberRules { binary_expr(binary_expr("?a", "*", "?b"), "*", "?c"), binary_expr("?a", "*", binary_expr("?b", "*", "?c")), ), - ] + ]; + + rules.extend(self.member_pushdown_rules()); + rules } } @@ -682,6 +475,222 @@ impl MemberRules { Self { cube_context } } + fn member_pushdown_rules(&self) -> Vec> { + let mut rules = Vec::new(); + let member_replacer_fn = |members| { + member_pushdown_replacer(members, "?old_members", "?table_name", "?target_table_name") + }; + + fn member_column_pushdown( + name: &str, + member_fn: impl Fn(&str) -> String, + ) -> Vec> { + vec![ + transforming_rewrite( + &format!("member-pushdown-replacer-column-{}", name), + member_pushdown_replacer( + column_expr("?column"), + member_fn("?old_alias"), + "?table_name", + "?target_table_name", + ), + member_fn("?output_column"), + MemberRules::transform_column_alias("?column", "?output_column"), + ), + transforming_rewrite( + &format!("member-pushdown-replacer-column-{}-alias", name), + member_pushdown_replacer( + alias_expr("?expr", "?alias"), + member_fn("?old_alias"), + "?table_name", + "?target_table_name", + ), + member_fn("?output_column"), + MemberRules::transform_alias("?alias", "?output_column"), + ), + ] + } + + let find_matching_old_member = |name: &str, column_expr: String| { + transforming_rewrite( + &format!( + "member-pushdown-replacer-column-find-matching-old-member-{}", + name + ), + member_pushdown_replacer( + column_expr.clone(), + list_concat_pushup_replacer("?old_members"), + "?table_name", + "?target_table_name", + ), + member_pushdown_replacer( + column_expr, + "?terminal_member", + "?table_name", + "?target_table_name", + ), + self.find_matching_old_member( + "?column", + "?old_members", + "?table_name", + "?terminal_member", + ), + ) + }; + + rules.extend(replacer_push_down_node_substitute_rules( + "member-pushdown-replacer-aggregate-group", + "AggregateGroupExpr", + "CubeScanMembers", + member_replacer_fn.clone(), + )); + rules.extend(replacer_push_down_node_substitute_rules( + "member-pushdown-replacer-aggregate-aggr", + "AggregateAggrExpr", + "CubeScanMembers", + member_replacer_fn.clone(), + )); + rules.extend(replacer_push_down_node_substitute_rules( + "member-pushdown-replacer-projection", + "ProjectionExpr", + "CubeScanMembers", + member_replacer_fn.clone(), + )); + rules.push(find_matching_old_member("column", column_expr("?column"))); + rules.push(find_matching_old_member( + "alias", + alias_expr(column_expr("?column"), "?alias"), + )); + rules.push(find_matching_old_member( + "agg-fun", + agg_fun_expr("?fun_name", vec![column_expr("?column")], "?distinct"), + )); + rules.push(transforming_chain_rewrite( + "member-pushdown-replacer-agg-fun", + member_pushdown_replacer( + "?aggr_expr", + measure_expr("?name", "?old_alias"), + "?table_name", + "?target_table_name", + ), + vec![( + "?aggr_expr", + agg_fun_expr("?fun_name", vec![column_expr("?column")], "?distinct"), + )], + "?measure".to_string(), + self.pushdown_measure( + "?name", + Some("?fun_name"), + Some("?distinct"), + "?aggr_expr", + "?measure", + ), + )); + rules.extend(member_column_pushdown("measure", |column| { + measure_expr("?name", column) + })); + rules.extend(member_column_pushdown("dimension", |column| { + dimension_expr("?name", column) + })); + rules.extend(member_column_pushdown("segment", |column| { + segment_expr("?name", column) + })); + rules.extend(member_column_pushdown("time-dimension", |column| { + time_dimension_expr("?name", "?granularity", "?date_range", column) + })); + + fn list_concat_terminal( + name: &str, + member_fn: String, + ) -> Rewrite { + rewrite( + &format!("list-concat-terminal-{}", name), + list_concat_pushdown_replacer(member_fn.to_string()), + list_concat_pushup_replacer(member_fn), + ) + } + + // List concat replacer -- concats CubeScanMembers into big single node to provide + // O(n*2) CubeScanMembers complexity instead of O(n^2) for old member search + // TODO check why overall graph size is increased most of the times + rules.extend(replacer_push_down_node( + "list-concat-replacer", + "CubeScanMembers", + list_concat_pushdown_replacer, + false, + )); + rules.push(list_concat_terminal( + "measure", + measure_expr("?name", "?expr"), + )); + rules.push(list_concat_terminal( + "dimension", + dimension_expr("?name", "?expr"), + )); + rules.push(list_concat_terminal( + "segment", + segment_expr("?name", "?expr"), + )); + rules.push(list_concat_terminal( + "time-dimension", + time_dimension_expr("?name", "?granularity", "?date_range", "?expr"), + )); + rules.push(list_concat_terminal( + "empty-tail", + cube_scan_members_empty_tail(), + )); + rules.push(transforming_rewrite( + "list-concat-replacer-merge", + cube_scan_members( + list_concat_pushup_replacer("?left"), + list_concat_pushup_replacer("?right"), + ), + list_concat_pushup_replacer("?concat_output"), + self.concat_cube_scan_members("?left", "?right", "?concat_output"), + )); + + rules + } + + fn concat_cube_scan_members( + &self, + left_var: &'static str, + right_var: &'static str, + concat_output_var: &'static str, + ) -> impl Fn(&mut EGraph, &mut Subst) -> bool { + let left_var = var!(left_var); + let right_var = var!(right_var); + let concat_output_var = var!(concat_output_var); + move |egraph, subst| { + let left_list = var_list_iter!(egraph[subst[left_var]], CubeScanMembers) + .cloned() + .collect::>(); + let left_list = if left_list.is_empty() { + vec![vec![subst[left_var]]] + } else { + left_list + }; + for left in left_list { + let right_list = var_list_iter!(egraph[subst[right_var]], CubeScanMembers) + .cloned() + .collect::>(); + let right_list = if right_list.is_empty() { + vec![vec![subst[right_var]]] + } else { + right_list + }; + for right in right_list { + let output = egraph.add(LogicalPlanLanguage::CubeScanMembers( + left.into_iter().chain(right.into_iter()).collect(), + )); + subst.insert(concat_output_var, output); + return true; + } + } + false + } + } + fn transform_table_scan( &self, var: &'static str, @@ -811,23 +820,22 @@ impl MemberRules { &self, projection_expr_var: &'static str, members_var: &'static str, - aliases_var: &'static str, - cube_var: &'static str, cube_aliases_var: &'static str, alias_var: &'static str, table_name_var: &'static str, new_table_name_var: &'static str, - target_table_name_var: &'static str, + member_pushdown_replacer_table_name_var: &'static str, + member_pushdown_replacer_target_table_name_var: &'static str, ) -> impl Fn(&mut EGraph, &mut Subst) -> bool { let projection_expr_var = var!(projection_expr_var); let members_var = var!(members_var); - let aliases_var = var!(aliases_var); - let cube_var = var!(cube_var); let cube_aliases_var = var!(cube_aliases_var); let alias_var = var!(alias_var); let table_name_var = var!(table_name_var); let new_table_name_var = var!(new_table_name_var); - let target_table_name_var = var!(target_table_name_var); + let member_pushdown_replacer_table_name_var = var!(member_pushdown_replacer_table_name_var); + let member_pushdown_replacer_target_table_name_var = + var!(member_pushdown_replacer_target_table_name_var); move |egraph, subst| { if let Some(expr_to_alias) = &egraph.index(subst[projection_expr_var]).data.expr_to_alias @@ -849,19 +857,14 @@ impl MemberRules { { let column_name_to_member_name = column_name_to_member_name(member_name_to_expr, table_name.to_string()); - if column_name_to_alias - .iter() - .all(|(c, _)| column_name_to_member_name.contains_key(c)) + + for projection_alias in + var_iter!(egraph[subst[alias_var]], ProjectionAlias).cloned() { - for projection_alias in - var_iter!(egraph[subst[alias_var]], ProjectionAlias).cloned() + if column_name_to_alias + .iter() + .all(|(c, _)| column_name_to_member_name.contains_key(c)) { - let aliases = - egraph.add(LogicalPlanLanguage::ColumnAliasReplacerAliases( - ColumnAliasReplacerAliases(column_name_to_alias.clone()), - )); - subst.insert(aliases_var, aliases); - let cube_aliases = egraph.add( LogicalPlanLanguage::CubeScanAliases(CubeScanAliases(Some( column_name_to_alias @@ -872,26 +875,35 @@ impl MemberRules { ); subst.insert(cube_aliases_var, cube_aliases); - let cube = - egraph.add(LogicalPlanLanguage::ColumnAliasReplacerTableName( - ColumnAliasReplacerTableName(Some(table_name.to_string())), - )); - subst.insert(cube_var, cube); - let final_table_name = - projection_alias.clone().unwrap_or(table_name); + projection_alias.clone().unwrap_or(table_name.to_string()); let new_table_name = egraph.add(LogicalPlanLanguage::CubeScanTableName( CubeScanTableName(final_table_name.to_string()), )); subst.insert(new_table_name_var, new_table_name); + let replacer_table_name = egraph.add( + LogicalPlanLanguage::MemberPushdownReplacerTableName( + MemberPushdownReplacerTableName(table_name.to_string()), + ), + ); + subst.insert( + member_pushdown_replacer_table_name_var, + replacer_table_name, + ); + let target_table_name = egraph.add( - LogicalPlanLanguage::ColumnAliasReplacerTargetTableName( - ColumnAliasReplacerTargetTableName(Some(final_table_name)), + LogicalPlanLanguage::MemberPushdownReplacerTargetTableName( + MemberPushdownReplacerTargetTableName( + final_table_name.to_string(), + ), ), ); - subst.insert(target_table_name_var, target_table_name); + subst.insert( + member_pushdown_replacer_target_table_name_var, + target_table_name, + ); return true; } @@ -935,13 +947,18 @@ impl MemberRules { group_expr_var: &'static str, aggregate_expr_var: &'static str, members_var: &'static str, + member_pushdown_replacer_table_name_var: &'static str, + member_pushdown_replacer_target_table_name_var: &'static str, ) -> impl Fn(&mut EGraph, &mut Subst) -> bool { let table_name_var = var!(table_name_var); let group_expr_var = var!(group_expr_var); let aggregate_expr_var = var!(aggregate_expr_var); let members_var = var!(members_var); + let member_pushdown_replacer_table_name_var = var!(member_pushdown_replacer_table_name_var); + let member_pushdown_replacer_target_table_name_var = + var!(member_pushdown_replacer_target_table_name_var); move |egraph, subst| { - for table_name in var_iter!(egraph[subst[table_name_var]], CubeScanTableName) { + for table_name in var_iter!(egraph[subst[table_name_var]], CubeScanTableName).cloned() { if let Some(referenced_group_expr) = &egraph.index(subst[group_expr_var]).data.referenced_expr { @@ -978,6 +995,26 @@ impl MemberRules { ); // TODO default count member is not in the columns set but it should be there if columns.iter().all(|c| member_column_names.contains(c)) { + let table_name_id = egraph.add( + LogicalPlanLanguage::MemberPushdownReplacerTableName( + MemberPushdownReplacerTableName(table_name.to_string()), + ), + ); + subst + .insert(member_pushdown_replacer_table_name_var, table_name_id); + + let target_table_name_id = egraph.add( + LogicalPlanLanguage::MemberPushdownReplacerTargetTableName( + MemberPushdownReplacerTargetTableName( + table_name.to_string(), + ), + ), + ); + subst.insert( + member_pushdown_replacer_target_table_name_var, + target_table_name_id, + ); + return true; } } @@ -1012,64 +1049,6 @@ impl MemberRules { } } - fn replace_projection_alias( - &self, - expr_var: &'static str, - column_name_to_alias: &'static str, - cube_var: &'static str, - target_table_name_var: &'static str, - replaced_alias_expr: &'static str, - ) -> impl Fn(&mut EGraph, &mut Subst) -> bool { - let expr_var = expr_var.parse().unwrap(); - let column_name_to_alias = column_name_to_alias.parse().unwrap(); - let cube_var = cube_var.parse().unwrap(); - let replaced_alias_expr = replaced_alias_expr.parse().unwrap(); - let target_table_name_var = var!(target_table_name_var); - move |egraph, subst| { - let expr = egraph[subst[expr_var]] - .data - .original_expr - .as_ref() - .expect(&format!( - "Original expr wasn't prepared for {:?}", - egraph[subst[expr_var]] - )); - for table_name in var_iter!(egraph[subst[cube_var]], ColumnAliasReplacerTableName) { - let column_name = expr_column_name(expr.clone(), &table_name); - for column_name_to_alias in var_iter!( - egraph[subst[column_name_to_alias]], - ColumnAliasReplacerAliases - ) { - for target_table_name in var_iter!( - egraph[subst[target_table_name_var]], - ColumnAliasReplacerTargetTableName - ) - .cloned() - { - if let Some((_, new_alias)) = - column_name_to_alias.iter().find(|(c, _)| c == &column_name) - { - let new_alias = new_alias.clone(); - let alias = egraph.add(LogicalPlanLanguage::ColumnExprColumn( - ColumnExprColumn(Column { - name: new_alias.to_string(), - relation: target_table_name.clone(), - }), - )); - subst.insert( - replaced_alias_expr, - egraph.add(LogicalPlanLanguage::ColumnExpr([alias])), - ); - return true; - } - } - } - } - - false - } - } - fn transform_default_member_error( &self, cube_var: &'static str, @@ -1169,107 +1148,27 @@ impl MemberRules { ); return true; } - } - } - } - } - false - } - } - fn transform_segment( - &self, - cube_var: &'static str, - column_var: &'static str, - ) -> impl Fn(&mut EGraph, &mut Subst) -> bool { - let cube_var = cube_var.parse().unwrap(); - let column_var = column_var.parse().unwrap(); - let meta_context = self.cube_context.meta.clone(); - move |egraph, subst| { - for member_name in - var_iter!(egraph[subst[column_var]], ColumnExprColumn).map(|c| c.name.to_string()) - { - for cube_name in var_iter!(egraph[subst[cube_var]], TableScanSourceTableName) { - if let Some(cube) = meta_context - .cubes - .iter() - .find(|c| c.name.eq_ignore_ascii_case(cube_name)) - { - let member_name = format!("{}.{}", cube_name, member_name); - if let Some(_) = cube - .segments - .iter() - .find(|d| d.name.eq_ignore_ascii_case(&member_name)) - { - return true; - } - } - } - } - false - } - } - - fn transform_dimension( - &self, - cube_var: &'static str, - column_var: &'static str, - dimension_var: &'static str, - original_expr_var: &'static str, - ) -> impl Fn(&mut EGraph, &mut Subst) -> bool { - let cube_var = var!(cube_var); - let column_var = var!(column_var); - let dimension_var = var!(dimension_var); - let original_expr_var = var!(original_expr_var); - let meta_context = self.cube_context.meta.clone(); - move |egraph, subst| { - for dimension_name in - var_iter!(egraph[subst[column_var]], ColumnExprColumn).map(|c| c.name.to_string()) - { - for cube_name in var_iter!(egraph[subst[cube_var]], TableScanSourceTableName) { - if let Some(cube) = meta_context.find_cube_with_name(cube_name.to_string()) { - if let Some(alias) = original_expr_name(egraph, subst[original_expr_var]) { - let dimension_name = format!("{}.{}", cube_name, dimension_name); - if let Some(dimension) = cube - .dimensions + if let Some(segment) = cube + .segments .iter() - .find(|d| d.name.eq_ignore_ascii_case(&dimension_name)) + .find(|d| d.name.eq_ignore_ascii_case(&member_name)) { - let dimension_name = - egraph.add(LogicalPlanLanguage::DimensionName(DimensionName( - dimension.name.to_string(), - ))); - - let alias_expr = Self::add_alias_column(egraph, alias); - + let measure_name = egraph.add(LogicalPlanLanguage::SegmentName( + SegmentName(segment.name.to_string()), + )); + let alias = egraph.add(LogicalPlanLanguage::ColumnExprColumn( + ColumnExprColumn(Column::from_name(column_name)), + )); + let alias_expr = + egraph.add(LogicalPlanLanguage::ColumnExpr([alias])); subst.insert( - dimension_var, - egraph.add(LogicalPlanLanguage::Dimension([ - dimension_name, + member_var, + egraph.add(LogicalPlanLanguage::Segment([ + measure_name, alias_expr, ])), ); - - return true; - } - - if let Some(s) = cube - .segments - .iter() - .find(|d| d.name.eq_ignore_ascii_case(&dimension_name)) - { - subst.insert( - dimension_var, - add_member_error( - egraph, - format!( - "Unable to use segment '{}' in GROUP BY", - s.get_real_name() - ), - 1, - ), - ); - return true; } } @@ -1368,47 +1267,6 @@ impl MemberRules { } } - fn save_date_range( - &self, - name_var: &'static str, - date_range_var: &'static str, - expr_var: &'static str, - new_time_dimension_var: &'static str, - ) -> impl Fn(&mut EGraph, &mut Subst) -> bool { - let name_var = var!(name_var); - let date_range_var = var!(date_range_var); - let expr_var = var!(expr_var); - let new_time_dimension_var = var!(new_time_dimension_var); - move |egraph, subst| { - for date_range in - var_iter!(egraph[subst[date_range_var]], TimeDimensionDateRange).cloned() - { - if let Some(_) = date_range { - let new_granularity = - egraph.add(LogicalPlanLanguage::TimeDimensionGranularity( - TimeDimensionGranularity(None), - )); - subst.insert( - new_time_dimension_var, - egraph.add(LogicalPlanLanguage::TimeDimension([ - subst[name_var], - new_granularity, - subst[date_range_var], - subst[expr_var], - ])), - ); - } else { - subst.insert( - new_time_dimension_var, - egraph.add(LogicalPlanLanguage::CubeScanMembers(Vec::new())), - ); - } - return true; - } - false - } - } - fn add_alias_column( egraph: &mut EGraph, alias: String, @@ -1443,6 +1301,116 @@ impl MemberRules { ) } + fn find_matching_old_member( + &self, + column_var: &'static str, + old_members_var: &'static str, + table_name_var: &'static str, + terminal_member: &'static str, + ) -> impl Fn(&mut EGraph, &mut Subst) -> bool { + let column_var = var!(column_var); + let old_members_var = var!(old_members_var); + let table_name_var = var!(table_name_var); + let terminal_member = var!(terminal_member); + move |egraph, subst| { + for table_name in var_iter!( + egraph[subst[table_name_var]], + MemberPushdownReplacerTableName + ) + .cloned() + { + for alias_column in var_iter!(egraph[subst[column_var]], ColumnExprColumn).cloned() + { + let alias_name = + expr_column_name(Expr::Column(alias_column), &Some(table_name.to_string())); + + if let Some(left_member_name_to_expr) = egraph + .index(subst[old_members_var]) + .data + .member_name_to_expr + .clone() + { + let column_name_to_member = column_name_to_member_vec( + left_member_name_to_expr, + table_name.to_string(), + ); + if let Some((index, _)) = column_name_to_member + .iter() + .find_position(|(member_alias, _)| member_alias == &alias_name) + { + for old_members in + var_list_iter!(egraph[subst[old_members_var]], CubeScanMembers) + .cloned() + { + subst.insert(terminal_member, old_members[index]); + } + + return true; + } + } + } + } + false + } + } + + fn pushdown_measure( + &self, + measure_name_var: &'static str, + fun_var: Option<&'static str>, + distinct_var: Option<&'static str>, + original_expr_var: &'static str, + measure_out_var: &'static str, + ) -> impl Fn(&mut EGraph, &mut Subst) -> bool { + let measure_name_var = var!(measure_name_var); + let fun_var = fun_var.map(|fun_var| var!(fun_var)); + let distinct_var = distinct_var.map(|distinct_var| var!(distinct_var)); + let original_expr_var = var!(original_expr_var); + let measure_out_var = var!(measure_out_var); + let meta_context = self.cube_context.meta.clone(); + move |egraph, subst| { + if let Some(alias) = original_expr_name(egraph, subst[original_expr_var]) { + for measure_name in var_iter!(egraph[subst[measure_name_var]], MeasureName).cloned() + { + if let Some(measure) = meta_context.find_measure_with_name(measure_name) { + for fun in fun_var + .map(|fun_var| { + var_iter!(egraph[subst[fun_var]], AggregateFunctionExprFun) + .map(|fun| Some(fun)) + .collect() + }) + .unwrap_or(vec![None]) + { + for distinct in distinct_var + .map(|distinct_var| { + var_iter!( + egraph[subst[distinct_var]], + AggregateFunctionExprDistinct + ) + .map(|d| *d) + .collect() + }) + .unwrap_or(vec![false]) + { + let call_agg_type = Self::get_agg_type(fun, distinct); + Self::measure_output( + egraph, + subst, + &measure, + call_agg_type, + alias, + measure_out_var, + ); + return true; + } + } + } + } + } + false + } + } + fn transform_measure( &self, cube_var: &'static str, @@ -1493,24 +1461,7 @@ impl MemberRules { }) .unwrap_or(vec![None]) { - let call_agg_type = { - fun.map(|fun| match fun { - AggregateFunction::Count => { - if distinct { - "countDistinct" - } else { - "count" - } - } - AggregateFunction::Sum => "sum", - AggregateFunction::Min => "min", - AggregateFunction::Max => "max", - AggregateFunction::Avg => "avg", - AggregateFunction::ApproxDistinct => "countDistinctApprox", - // TODO: Fix me - _ => "unknown_aggregation_type_hardcoded", - }) - }; + let call_agg_type = Self::get_agg_type(fun, distinct); let measure_name = format!("{}.{}", cube_name, measure_name); if let Some(measure) = cube @@ -1521,35 +1472,14 @@ impl MemberRules { if let Some(alias) = original_expr_name(egraph, subst[aggr_expr_var]) { - if call_agg_type.is_some() - && !measure - .is_same_agg_type(call_agg_type.as_ref().unwrap()) - { - subst.insert( - measure_out_var, - add_member_error(egraph, format!( - "Measure aggregation type doesn't match. The aggregation type for '{}' is '{}()' but '{}()' was provided", - measure.get_real_name(), - measure.agg_type.as_ref().unwrap_or(&"unknown".to_string()).to_uppercase(), - call_agg_type.unwrap().to_uppercase(), - ), 1), - ); - } else { - let measure_name = - egraph.add(LogicalPlanLanguage::MeasureName( - MeasureName(measure.name.to_string()), - )); - - let alias_expr = Self::add_alias_column(egraph, alias); - - subst.insert( - measure_out_var, - egraph.add(LogicalPlanLanguage::Measure([ - measure_name, - alias_expr, - ])), - ); - } + Self::measure_output( + egraph, + subst, + measure, + call_agg_type, + alias, + measure_out_var, + ); } return true; @@ -1580,6 +1510,92 @@ impl MemberRules { } } + fn measure_output( + egraph: &mut EGraph, + subst: &mut Subst, + measure: &V1CubeMetaMeasure, + call_agg_type: Option, + alias: String, + measure_out_var: Var, + ) { + if call_agg_type.is_some() && !measure.is_same_agg_type(call_agg_type.as_ref().unwrap()) { + subst.insert( + measure_out_var, + add_member_error(egraph, format!( + "Measure aggregation type doesn't match. The aggregation type for '{}' is '{}()' but '{}()' was provided", + measure.get_real_name(), + measure.agg_type.as_ref().unwrap_or(&"unknown".to_string()).to_uppercase(), + call_agg_type.unwrap().to_uppercase(), + ), 1), + ); + } else { + let measure_name = egraph.add(LogicalPlanLanguage::MeasureName(MeasureName( + measure.name.to_string(), + ))); + + let alias_expr = Self::add_alias_column(egraph, alias); + + subst.insert( + measure_out_var, + egraph.add(LogicalPlanLanguage::Measure([measure_name, alias_expr])), + ); + } + } + + fn transform_column_alias( + column_var: &'static str, + output_column_var: &'static str, + ) -> impl Fn(&mut EGraph, &mut Subst) -> bool { + let column_var = var!(column_var); + let output_column_var = var!(output_column_var); + move |egraph, subst| { + for column in var_iter!(egraph[subst[column_var]], ColumnExprColumn).cloned() { + let alias_expr = Self::add_alias_column(egraph, column.name.to_string()); + subst.insert(output_column_var, alias_expr); + return true; + } + false + } + } + + fn transform_alias( + alias_var: &'static str, + output_column_var: &'static str, + ) -> impl Fn(&mut EGraph, &mut Subst) -> bool { + let alias_var = var!(alias_var); + let output_column_var = var!(output_column_var); + move |egraph, subst| { + for alias in var_iter!(egraph[subst[alias_var]], AliasExprAlias).cloned() { + let alias_expr = Self::add_alias_column(egraph, alias.to_string()); + subst.insert(output_column_var, alias_expr); + return true; + } + false + } + } + + fn get_agg_type(fun: Option<&AggregateFunction>, distinct: bool) -> Option { + fun.map(|fun| { + match fun { + AggregateFunction::Count => { + if distinct { + "countDistinct" + } else { + "count" + } + } + AggregateFunction::Sum => "sum", + AggregateFunction::Min => "min", + AggregateFunction::Max => "max", + AggregateFunction::Avg => "avg", + AggregateFunction::ApproxDistinct => "countDistinctApprox", + // TODO: Fix me + _ => "unknown_aggregation_type_hardcoded", + } + .to_string() + }) + } + pub fn default_count_measure_name() -> String { "count".to_string() } diff --git a/rust/cubesql/cubesql/src/compile/rewrite/rules/mod.rs b/rust/cubesql/cubesql/src/compile/rewrite/rules/mod.rs index 5d1647f829cc3..3f3e4cc193d51 100644 --- a/rust/cubesql/cubesql/src/compile/rewrite/rules/mod.rs +++ b/rust/cubesql/cubesql/src/compile/rewrite/rules/mod.rs @@ -1,5 +1,63 @@ +use crate::compile::rewrite::{analysis::LogicalPlanAnalysis, rewrite, LogicalPlanLanguage}; +use egg::Rewrite; + pub mod dates; pub mod filters; pub mod members; pub mod order; pub mod split; + +pub fn replacer_push_down_node( + name: &str, + list_node: &str, + replacer_node: impl Fn(String) -> String, + include_tail: bool, +) -> Vec> { + let push_down_rule = rewrite( + &format!("{}-push-down", name), + replacer_node(format!("({} ?left ?right)", list_node)), + format!( + "({} {} {})", + list_node, + replacer_node("?left".to_string()), + replacer_node("?right".to_string()) + ), + ); + if include_tail { + vec![ + push_down_rule, + rewrite( + &format!("{}-tail", name), + replacer_node(list_node.to_string()), + list_node.to_string(), + ), + ] + } else { + vec![push_down_rule] + } +} + +pub fn replacer_push_down_node_substitute_rules( + name: &str, + list_node: &str, + substitute_node: &str, + replacer_node: impl Fn(String) -> String, +) -> Vec> { + vec![ + rewrite( + &format!("{}-push-down", name), + replacer_node(format!("({} ?left ?right)", list_node)), + format!( + "({} {} {})", + substitute_node, + replacer_node("?left".to_string()), + replacer_node("?right".to_string()) + ), + ), + rewrite( + &format!("{}-tail", name), + replacer_node(list_node.to_string()), + substitute_node.to_string(), + ), + ] +} diff --git a/rust/cubesql/cubesql/src/transport/ctx.rs b/rust/cubesql/cubesql/src/transport/ctx.rs index d96edbdac3f41..790655dc6fb74 100644 --- a/rust/cubesql/cubesql/src/transport/ctx.rs +++ b/rust/cubesql/cubesql/src/transport/ctx.rs @@ -1,7 +1,7 @@ use datafusion::arrow::datatypes::DataType; use std::ops::RangeFrom; -use cubeclient::models::V1CubeMeta; +use cubeclient::models::{V1CubeMeta, V1CubeMetaMeasure}; use crate::sql::ColumnType; @@ -67,6 +67,15 @@ impl MetaContext { None } + pub fn find_measure_with_name(&self, name: String) -> Option { + let cube_and_member_name = name.split(".").collect::>(); + if let Some(cube) = self.find_cube_with_name(cube_and_member_name[0].to_string()) { + cube.lookup_measure(cube_and_member_name[1]).cloned() + } else { + None + } + } + pub fn find_df_data_type(&self, member_name: String) -> Option { self.find_cube_with_name(member_name.split(".").next()?.to_string())? .df_data_type(member_name.as_str()) diff --git a/rust/cubesql/cubesql/src/transport/ext.rs b/rust/cubesql/cubesql/src/transport/ext.rs index acec688c262de..579a7de5ca906 100644 --- a/rust/cubesql/cubesql/src/transport/ext.rs +++ b/rust/cubesql/cubesql/src/transport/ext.rs @@ -143,8 +143,12 @@ pub trait V1CubeMetaExt { fn lookup_dimension(&self, column_name: &str) -> Option<&V1CubeMetaDimension>; + fn lookup_dimension_by_member_name(&self, member_name: &str) -> Option<&V1CubeMetaDimension>; + fn lookup_measure(&self, column_name: &str) -> Option<&V1CubeMetaMeasure>; + fn lookup_measure_by_member_name(&self, member_name: &str) -> Option<&V1CubeMetaMeasure>; + fn lookup_segment(&self, column_name: &str) -> Option<&V1CubeMetaSegment>; fn df_data_type(&self, member_name: &str) -> Option; @@ -228,6 +232,10 @@ impl V1CubeMetaExt for V1CubeMeta { fn lookup_measure(&self, column_name: &str) -> Option<&V1CubeMetaMeasure> { let member_name = self.member_name(column_name); + self.lookup_measure_by_member_name(&member_name) + } + + fn lookup_measure_by_member_name(&self, member_name: &str) -> Option<&V1CubeMetaMeasure> { self.measures .iter() .find(|m| m.name.eq_ignore_ascii_case(&member_name)) @@ -235,6 +243,10 @@ impl V1CubeMetaExt for V1CubeMeta { fn lookup_dimension(&self, column_name: &str) -> Option<&V1CubeMetaDimension> { let member_name = self.member_name(column_name); + self.lookup_dimension_by_member_name(&member_name) + } + + fn lookup_dimension_by_member_name(&self, member_name: &str) -> Option<&V1CubeMetaDimension> { self.dimensions .iter() .find(|m| m.name.eq_ignore_ascii_case(&member_name))