diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala index 3f13bc046037a..acbc6aa14b50c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala @@ -464,6 +464,8 @@ object FunctionRegistry { expression[TryToBinary]("try_to_binary"), expressionBuilder("try_to_timestamp", TryToTimestampExpressionBuilder, setAlias = true), expressionBuilder("try_to_time", TryToTimeExpressionBuilder, setAlias = true), + expressionBuilder("time", TimeExpressionBuilder, setAlias = true), + expressionBuilder("try_time", TryTimeExpressionBuilder, setAlias = true), expression[TryAesDecrypt]("try_aes_decrypt"), expression[TryReflect]("try_reflect"), expression[TryUrlDecode]("try_url_decode"), diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala index 7a4145933fc7f..7db9c220a18dd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala @@ -740,6 +740,28 @@ case class Cast( } else { buildCast[UTF8String](_, s => DateTimeUtils.stringToTime(s).orNull) } + + case TimestampType => + buildCast[Long](_, ts => { + // Convert timestamp to LocalDateTime + val localDateTime = DateTimeUtils.toJavaTimestamp(ts).toLocalDateTime() + // Extract time component and convert to microseconds + val nanos = localDateTime.getNano() + val seconds = localDateTime.getHour() * 3600 + + localDateTime.getMinute() * 60 + + localDateTime.getSecond() + // Return microseconds since midnight + seconds * MICROS_PER_SECOND + nanos / 1000 + }) + + case _: IntegralType => + buildCast[Number](_, n => n.longValue() * MICROS_PER_SECOND) + + case _: FractionalType => + buildCast[Number](_, n => { + // Convert floating-point seconds to microseconds + (n.doubleValue() * MICROS_PER_SECOND).toLong + }) } // IntervalConverter @@ -1375,6 +1397,26 @@ case class Cast( """ } + case TimestampType => + (c, evPrim, evNull) => + code""" + java.time.LocalDateTime localDateTime = $dateTimeUtilsCls + .toJavaTimestamp($c).toLocalDateTime(); + int nanos = localDateTime.getNano(); + long seconds = localDateTime.getHour() * 3600L + + localDateTime.getMinute() * 60L + + localDateTime.getSecond(); + $evPrim = seconds * $MICROS_PER_SECOND + nanos / 1000; + """ + + case _: IntegralType => + (c, evPrim, evNull) => + code"""$evPrim = $c * $MICROS_PER_SECOND;""" + + case _: FractionalType => + (c, evPrim, evNull) => + code"""$evPrim = (long)($c * $MICROS_PER_SECOND);""" + case _ => (_, _, evNull) => code"$evNull = true;" } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/timeExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/timeExpressions.scala index ede5f10788474..1875833ab3335 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/timeExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/timeExpressions.scala @@ -20,14 +20,16 @@ package org.apache.spark.sql.catalyst.expressions import java.time.DateTimeException import org.apache.spark.sql.catalyst.analysis.ExpressionBuilder +import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} import org.apache.spark.sql.catalyst.expressions.objects.{Invoke, StaticInvoke} import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.catalyst.util.TimeFormatter import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} import org.apache.spark.sql.internal.types.StringTypeWithCollation -import org.apache.spark.sql.types.{AbstractDataType, IntegerType, ObjectType, TimeType} +import org.apache.spark.sql.types.{AbstractDataType, AnyDataType, DataType, IntegerType, ObjectType, TimeType} import org.apache.spark.unsafe.types.UTF8String + /** * Parses a column to a time based on the given format. */ @@ -162,6 +164,186 @@ object TryToTimeExpressionBuilder extends ExpressionBuilder { } } +/** + * Converts or extracts time from various input types. + */ +// scalastyle:off line.size.limit +@ExpressionDescription( + usage = """ + _FUNC_(expr) - Extracts the time part from or converts the given expression to a time. + The expression can be a string, timestamp, or numeric value. + """, + arguments = """ + Arguments: + * expr - An expression that can be one of: + - A string representing a time + - A timestamp value + - A numeric value representing total seconds + """, + examples = """ + Examples: + > SELECT _FUNC_('12:25:13.45'); + 12:25:13.45 + > SELECT _FUNC_(timestamp'2020-04-30 12:25:13.45'); + 12:25:13.45 + > SELECT _FUNC_(123); + 00:02:03 + """, + group = "datetime_funcs", + since = "4.1.0") +// scalastyle:on line.size.limit +case class Time(expr: Expression) + extends UnaryExpression with ExpectsInputTypes { + + override def child: Expression = expr + + override def dataType: DataType = TimeType() + + override def inputTypes: Seq[AbstractDataType] = Seq(AnyDataType) + + override def prettyName: String = "time" + + override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { + val castExpr = Cast(child, TimeType()) + castExpr.genCode(ctx) + } + + override protected def nullSafeEval(input: Any): Any = { + Cast(child, TimeType()).eval(null) + } + + override protected def withNewChildInternal(newChild: Expression): + Expression = copy(expr = newChild) + +} + +/** + * Similar to Time but returns NULL instead of raising an error on invalid input. + */ +@ExpressionDescription( + usage = """ + _FUNC_(expr) - Extracts the time part from or converts the given expression to a time. + If the conversion fails, it returns NULL. The expression can be a string, timestamp, + or numeric value. + """, + arguments = """ + Arguments: + * expr - An expression that can be one of: + - A string representing a time + - A timestamp value + - A numeric value representing total seconds + """, + examples = """ + Examples: + > SELECT _FUNC_('12:25:13.45'); + 12:25:13.45 + > SELECT _FUNC_('invalid'); + NULL + > SELECT _FUNC_(timestamp'2020-04-30 12:25:13.45'); + 12:25:13.45 + """, + group = "datetime_funcs", + since = "4.1.0") +case class TryTime(expr: Expression) + extends UnaryExpression with ExpectsInputTypes { + + override def child: Expression = expr + + override def dataType: DataType = TimeType() + + override def inputTypes: Seq[AbstractDataType] = Seq(AnyDataType) + + override def prettyName: String = "time" + + override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { + val castExpr = Cast(child, TimeType()) + castExpr.genCode(ctx) + } + + override protected def nullSafeEval(input: Any): Any = { + Cast(child, TimeType()).eval(null) + } + + override protected def withNewChildInternal(newChild: Expression): + Expression = copy(expr = newChild) +} + +// scalastyle:off line.size.limit +@ExpressionDescription( + usage = """ + _FUNC_(expr) - Parses the `expr` expression to a time. If the parsing fails, it returns NULL. + The expression can be a string, timestamp, or numeric value. + """, + arguments = """ + Arguments: + * expr - An expression that can be one of: + - A string representing a time + - A timestamp value + - A numeric value representing total seconds + """, + examples = """ + Examples: + > SELECT _FUNC_('12:25:13.45'); + 12:25:13.45 + > SELECT _FUNC_('invalid time'); + NULL + > SELECT _FUNC_(timestamp'2020-04-30 12:25:13.45'); + 12:25:13.45 + > SELECT _FUNC_(123); + 00:02:03 + """, + group = "datetime_funcs", + since = "4.1.0") +// scalastyle:on line.size.limit +object TimeExpressionBuilder extends ExpressionBuilder { + override def build(funcName: String, expressions: Seq[Expression]): Expression = { + val numArgs = expressions.length + if (numArgs == 1) { + Time(expressions.head) + } else { + throw QueryCompilationErrors.wrongNumArgsError(funcName, Seq(1), numArgs) + } + } +} + +// scalastyle:off line.size.limit +@ExpressionDescription( + usage = """ + _FUNC_(expr) - Parses the `expr` expression to a time. If the parsing fails, it returns NULL. + The expression can be a string, timestamp, or numeric value. + """, + arguments = """ + Arguments: + * expr - An expression that can be one of: + - A string representing a time + - A timestamp value + - A numeric value representing total seconds + """, + examples = """ + Examples: + > SELECT _FUNC_('12:25:13.45'); + 12:25:13.45 + > SELECT _FUNC_('invalid time'); + NULL + > SELECT _FUNC_(timestamp'2020-04-30 12:25:13.45'); + 12:25:13.45 + > SELECT _FUNC_(123); + 00:02:03 + """, + group = "datetime_funcs", + since = "4.1.0") +// scalastyle:on line.size.limit +object TryTimeExpressionBuilder extends ExpressionBuilder { + override def build(funcName: String, expressions: Seq[Expression]): Expression = { + val numArgs = expressions.length + if (numArgs == 1) { + TryTime(expressions.head) + } else { + throw QueryCompilationErrors.wrongNumArgsError(funcName, Seq(1), numArgs) + } + } +} + // scalastyle:off line.size.limit @ExpressionDescription( usage = """ diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/TimeExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/TimeExpressionsSuite.scala index 5f02a23004053..e8d7be38ec8c0 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/TimeExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/TimeExpressionsSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst.expressions import org.apache.spark.{SPARK_DOC_ROOT, SparkDateTimeException, SparkFunSuite} import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.util.DateTimeTestUtils._ +// import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.types.{StringType, TimeType} class TimeExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { @@ -53,6 +54,27 @@ class TimeExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { parameters = Map("input" -> "'100:50'", "format" -> "'mm:HH'")) } + test("Time Function") { + // Test with null inputs + checkEvaluation(new Time(Literal(null, StringType)), null) + + // Test with string inputs + checkEvaluation(new Time(Literal("00:00:00")), localTime()) + checkEvaluation(new Time(Literal("23:59:59.999999")), localTime(23, 59, 59, 999999)) + checkEvaluation(new Time(Literal("12:30:45.123")), localTime(12, 30, 45, 123000)) + checkEvaluation(new Time(NonFoldableLiteral(" 12:00:00.909 ")), localTime(12, 0, 0, 909000)) + + // Test with numeric inputs (seconds) + checkEvaluation(new Time(Literal(0)), localTime(0, 0, 0, 0)) + checkEvaluation(new Time(Literal(123)), localTime(0, 2, 3, 0)) + checkEvaluation(new Time(Literal(3661.5)), localTime(1, 1, 1, 500000)) + + // Test with very large values + checkEvaluation( + new Time(Literal(86399.999999)), + localTime(23, 59, 59, 999999)) + } + test("HourExpressionBuilder") { // Empty expressions list checkError( diff --git a/sql/core/src/test/resources/sql-functions/sql-expression-schema.md b/sql/core/src/test/resources/sql-functions/sql-expression-schema.md index 9156a01349cf1..41d5678319090 100644 --- a/sql/core/src/test/resources/sql-functions/sql-expression-schema.md +++ b/sql/core/src/test/resources/sql-functions/sql-expression-schema.md @@ -338,6 +338,7 @@ | org.apache.spark.sql.catalyst.expressions.Subtract | - | SELECT 2 - 1 | struct<(2 - 1):int> | | org.apache.spark.sql.catalyst.expressions.Tan | tan | SELECT tan(0) | struct | | org.apache.spark.sql.catalyst.expressions.Tanh | tanh | SELECT tanh(0) | struct | +| org.apache.spark.sql.catalyst.expressions.TimeExpressionBuilder | time | SELECT time('12:25:13.45') | struct | | org.apache.spark.sql.catalyst.expressions.TimeWindow | window | SELECT a, window.start, window.end, count(*) as cnt FROM VALUES ('A1', '2021-01-01 00:00:00'), ('A1', '2021-01-01 00:04:30'), ('A1', '2021-01-01 00:06:00'), ('A2', '2021-01-01 00:01:00') AS tab(a, b) GROUP by a, window(b, '5 minutes') ORDER BY a, start | struct | | org.apache.spark.sql.catalyst.expressions.ToBinary | to_binary | SELECT to_binary('abc', 'utf-8') | struct | | org.apache.spark.sql.catalyst.expressions.ToCharacterBuilder | to_char | SELECT to_char(454, '999') | struct | @@ -365,6 +366,7 @@ | org.apache.spark.sql.catalyst.expressions.TryParseUrl | try_parse_url | SELECT try_parse_url('http://spark.apache.org/path?query=1', 'HOST') | struct | | org.apache.spark.sql.catalyst.expressions.TryReflect | try_reflect | SELECT try_reflect('java.util.UUID', 'randomUUID') | struct | | org.apache.spark.sql.catalyst.expressions.TrySubtract | try_subtract | SELECT try_subtract(2, 1) | struct | +| org.apache.spark.sql.catalyst.expressions.TryTimeExpressionBuilder | try_time | SELECT try_time('12:25:13.45') | struct | | org.apache.spark.sql.catalyst.expressions.TryToBinary | try_to_binary | SELECT try_to_binary('abc', 'utf-8') | struct | | org.apache.spark.sql.catalyst.expressions.TryToNumber | try_to_number | SELECT try_to_number('454', '999') | struct | | org.apache.spark.sql.catalyst.expressions.TryToTimeExpressionBuilder | try_to_time | SELECT try_to_time('00:12:00.001') | struct |