Skip to content

Commit 9657b75

Browse files
authored
feat: support array_append (apache#1072)
* feat: support array_append * formatted code * rewrite array_append plan to match spark behaviour and fixed bug in QueryPlan serde * remove unwrap * Fix for Spark 3.3 * refactor array_append binary expression serde code * Disabled array_append test for spark 4.0+
1 parent 712658e commit 9657b75

File tree

4 files changed

+61
-2
lines changed

4 files changed

+61
-2
lines changed

native/core/src/execution/datafusion/planner.rs

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ use datafusion::{
8282
},
8383
prelude::SessionContext,
8484
};
85+
use datafusion_functions_nested::concat::ArrayAppend;
8586
use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr};
8687

8788
use datafusion_comet_proto::{
@@ -107,7 +108,8 @@ use datafusion_common::{
107108
};
108109
use datafusion_expr::expr::find_df_window_func;
109110
use datafusion_expr::{
110-
AggregateUDF, WindowFrame, WindowFrameBound, WindowFrameUnits, WindowFunctionDefinition,
111+
AggregateUDF, ScalarUDF, WindowFrame, WindowFrameBound, WindowFrameUnits,
112+
WindowFunctionDefinition,
111113
};
112114
use datafusion_physical_expr::expressions::{Literal, StatsType};
113115
use datafusion_physical_expr::window::WindowExpr;
@@ -691,6 +693,33 @@ impl PhysicalPlanner {
691693
expr.ordinal as usize,
692694
)))
693695
}
696+
ExprStruct::ArrayAppend(expr) => {
697+
let left =
698+
self.create_expr(expr.left.as_ref().unwrap(), Arc::clone(&input_schema))?;
699+
let right =
700+
self.create_expr(expr.right.as_ref().unwrap(), Arc::clone(&input_schema))?;
701+
let return_type = left.data_type(&input_schema)?;
702+
let args = vec![Arc::clone(&left), right];
703+
let datafusion_array_append =
704+
Arc::new(ScalarUDF::new_from_impl(ArrayAppend::new()));
705+
let array_append_expr: Arc<dyn PhysicalExpr> = Arc::new(ScalarFunctionExpr::new(
706+
"array_append",
707+
datafusion_array_append,
708+
args,
709+
return_type,
710+
));
711+
712+
let is_null_expr: Arc<dyn PhysicalExpr> = Arc::new(IsNullExpr::new(left));
713+
let null_literal_expr: Arc<dyn PhysicalExpr> =
714+
Arc::new(Literal::new(ScalarValue::Null));
715+
716+
let case_expr = CaseExpr::try_new(
717+
None,
718+
vec![(is_null_expr, null_literal_expr)],
719+
Some(array_append_expr),
720+
)?;
721+
Ok(Arc::new(case_expr))
722+
}
694723
expr => Err(ExecutionError::GeneralError(format!(
695724
"Not implemented: {:?}",
696725
expr

native/proto/src/proto/expr.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ message Expr {
8282
ToJson to_json = 55;
8383
ListExtract list_extract = 56;
8484
GetArrayStructFields get_array_struct_fields = 57;
85+
BinaryExpr array_append = 58;
8586
}
8687
}
8788

spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2237,7 +2237,12 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim
22372237
withInfo(expr, "unsupported arguments for GetArrayStructFields", child)
22382238
None
22392239
}
2240-
2240+
case _ if expr.prettyName == "array_append" =>
2241+
createBinaryExpr(
2242+
expr.children(0),
2243+
expr.children(1),
2244+
inputs,
2245+
(builder, binaryExpr) => builder.setArrayAppend(binaryExpr))
22412246
case _ =>
22422247
withInfo(expr, s"${expr.prettyName} is not supported", expr.children: _*)
22432248
None

spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2313,4 +2313,28 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
23132313
}
23142314
}
23152315
}
2316+
2317+
test("array_append") {
2318+
// array append has been added in Spark 3.4 and in Spark 4.0 it gets written to ArrayInsert
2319+
assume(isSpark34Plus && !isSpark40Plus)
2320+
Seq(true, false).foreach { dictionaryEnabled =>
2321+
withTempDir { dir =>
2322+
val path = new Path(dir.toURI.toString, "test.parquet")
2323+
makeParquetFileAllTypes(path, dictionaryEnabled = dictionaryEnabled, 10000)
2324+
spark.read.parquet(path.toString).createOrReplaceTempView("t1");
2325+
checkSparkAnswerAndOperator(spark.sql("Select array_append(array(_1),false) from t1"))
2326+
checkSparkAnswerAndOperator(
2327+
spark.sql("SELECT array_append(array(_2, _3, _4), 4) FROM t1"))
2328+
checkSparkAnswerAndOperator(
2329+
spark.sql("SELECT array_append(array(_2, _3, _4), null) FROM t1"));
2330+
checkSparkAnswerAndOperator(
2331+
spark.sql("SELECT array_append(array(_6, _7), CAST(6.5 AS DOUBLE)) FROM t1"));
2332+
checkSparkAnswerAndOperator(spark.sql("SELECT array_append(array(_8), 'test') FROM t1"));
2333+
checkSparkAnswerAndOperator(spark.sql("SELECT array_append(array(_19), _19) FROM t1"));
2334+
checkSparkAnswerAndOperator(
2335+
spark.sql("SELECT array_append((CASE WHEN _2 =_3 THEN array(_4) END), _4) FROM t1"));
2336+
}
2337+
2338+
}
2339+
}
23162340
}

0 commit comments

Comments
 (0)