Skip to content

Commit 9da02c8

Browse files
committed
[SPARK-52523] Update arrow-swift code for Timestamp
### What changes were proposed in this pull request? This PR aims to sync with the upstream `arrow-swift` code for `Timestamp`. - apache/arrow-swift@476d1c0 ### Why are the changes needed? Minimize the difference in order to prepare to migrate to `arrow-swift`. ### Does this PR introduce _any_ user-facing change? No behavior change. ### How was this patch tested? Pass the CIs. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #201 from dongjoon-hyun/SPARK-52523. Authored-by: Dongjoon Hyun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 056c0cf commit 9da02c8

File tree

6 files changed

+253
-1
lines changed

6 files changed

+253
-1
lines changed

Sources/SparkConnect/ArrowArray.swift

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,8 @@ public class ArrowArrayHolderImpl: ArrowArrayHolder {
111111
return try ArrowArrayHolderImpl(Time32Array(with))
112112
case .time64:
113113
return try ArrowArrayHolderImpl(Time64Array(with))
114+
case .timestamp:
115+
return try ArrowArrayHolderImpl(TimestampArray(with))
114116
case .string:
115117
return try ArrowArrayHolderImpl(StringArray(with))
116118
case .boolean:
@@ -269,6 +271,86 @@ public class Decimal128Array: FixedArray<Decimal> {
269271
}
270272
}
271273

274+
public class TimestampArray: FixedArray<Timestamp> {
275+
276+
public struct FormattingOptions: Equatable {
277+
public var dateFormat: String = "yyyy-MM-dd HH:mm:ss.SSS"
278+
public var locale: Locale = .current
279+
public var includeTimezone: Bool = true
280+
public var fallbackToRaw: Bool = true
281+
282+
public init(
283+
dateFormat: String = "yyyy-MM-dd HH:mm:ss.SSS",
284+
locale: Locale = .current,
285+
includeTimezone: Bool = true,
286+
fallbackToRaw: Bool = true
287+
) {
288+
self.dateFormat = dateFormat
289+
self.locale = locale
290+
self.includeTimezone = includeTimezone
291+
self.fallbackToRaw = fallbackToRaw
292+
}
293+
294+
public static func == (lhs: FormattingOptions, rhs: FormattingOptions) -> Bool {
295+
return lhs.dateFormat == rhs.dateFormat && lhs.locale.identifier == rhs.locale.identifier
296+
&& lhs.includeTimezone == rhs.includeTimezone && lhs.fallbackToRaw == rhs.fallbackToRaw
297+
}
298+
}
299+
300+
private var cachedFormatter: DateFormatter?
301+
private var cachedOptions: FormattingOptions?
302+
303+
public func formattedDate(at index: UInt, options: FormattingOptions = FormattingOptions())
304+
-> String?
305+
{
306+
guard let timestamp = self[index] else { return nil }
307+
308+
guard let timestampType = self.arrowData.type as? ArrowTypeTimestamp else {
309+
return options.fallbackToRaw ? "\(timestamp)" : nil
310+
}
311+
312+
let date = dateFromTimestamp(timestamp, unit: timestampType.unit)
313+
314+
if cachedFormatter == nil || cachedOptions != options {
315+
let formatter = DateFormatter()
316+
formatter.dateFormat = options.dateFormat
317+
formatter.locale = options.locale
318+
if options.includeTimezone, let timezone = timestampType.timezone {
319+
formatter.timeZone = TimeZone(identifier: timezone)
320+
}
321+
cachedFormatter = formatter
322+
cachedOptions = options
323+
}
324+
325+
return cachedFormatter?.string(from: date)
326+
}
327+
328+
private func dateFromTimestamp(_ timestamp: Int64, unit: ArrowTimestampUnit) -> Date {
329+
let timeInterval: TimeInterval
330+
331+
switch unit {
332+
case .seconds:
333+
timeInterval = TimeInterval(timestamp)
334+
case .milliseconds:
335+
timeInterval = TimeInterval(timestamp) / 1_000
336+
case .microseconds:
337+
timeInterval = TimeInterval(timestamp) / 1_000_000
338+
case .nanoseconds:
339+
timeInterval = TimeInterval(timestamp) / 1_000_000_000
340+
}
341+
342+
return Date(timeIntervalSince1970: timeInterval)
343+
}
344+
345+
public override func asString(_ index: UInt) -> String {
346+
if let formatted = formattedDate(at: index) {
347+
return formatted
348+
}
349+
350+
return super.asString(index)
351+
}
352+
}
353+
272354
/// @nodoc
273355
public class BinaryArray: ArrowArray<Data> {
274356
public struct Options {

Sources/SparkConnect/ArrowArrayBuilder.swift

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,12 @@ public class Decimal128ArrayBuilder: ArrowArrayBuilder<FixedBufferBuilder<Decima
129129
}
130130
}
131131

132+
public class TimestampArrayBuilder: ArrowArrayBuilder<FixedBufferBuilder<Int64>, TimestampArray> {
133+
fileprivate convenience init(_ unit: ArrowTimestampUnit, timezone: String? = nil) throws {
134+
try self.init(ArrowTypeTimestamp(unit, timezone: timezone))
135+
}
136+
}
137+
132138
public class StructArrayBuilder: ArrowArrayBuilder<StructBufferBuilder, StructArray> {
133139
let builders: [any ArrowArrayHolderBuilder]
134140
let fields: [ArrowField]
@@ -293,6 +299,11 @@ public class ArrowArrayBuilders {
293299
throw ArrowError.invalid("Expected ArrowTypeDecimal128 for decimal128 type")
294300
}
295301
return try Decimal128ArrayBuilder(precision: decimalType.precision, scale: decimalType.scale)
302+
case .timestamp:
303+
guard let timestampType = arrowType as? ArrowTypeTimestamp else {
304+
throw ArrowError.invalid("Expected arrow type for \(arrowType.id) not found")
305+
}
306+
return try TimestampArrayBuilder(timestampType.unit)
296307
default:
297308
throw ArrowError.unknownType("Builder not found for arrow type: \(arrowType.id)")
298309
}
@@ -355,6 +366,12 @@ public class ArrowArrayBuilders {
355366
return try Time64ArrayBuilder(unit)
356367
}
357368

369+
public static func loadTimestampArrayBuilder(_ unit: ArrowTimestampUnit, timezone: String? = nil)
370+
throws -> TimestampArrayBuilder
371+
{
372+
return try TimestampArrayBuilder(unit, timezone: timezone)
373+
}
374+
358375
public static func loadDecimal128ArrayBuilder(
359376
_ precision: Int32 = 38,
360377
_ scale: Int32 = 18

Sources/SparkConnect/ArrowReaderHelper.swift

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,25 @@ private func makeTimeHolder(
112112
}
113113
}
114114

115+
private func makeTimestampHolder(
116+
_ field: ArrowField,
117+
buffers: [ArrowBuffer],
118+
nullCount: UInt
119+
) -> Result<ArrowArrayHolder, ArrowError> {
120+
do {
121+
if let arrowType = field.type as? ArrowTypeTimestamp {
122+
let arrowData = try ArrowData(arrowType, buffers: buffers, nullCount: nullCount)
123+
return .success(ArrowArrayHolderImpl(try TimestampArray(arrowData)))
124+
} else {
125+
return .failure(.invalid("Incorrect field type for timestamp: \(field.type)"))
126+
}
127+
} catch let error as ArrowError {
128+
return .failure(error)
129+
} catch {
130+
return .failure(.unknownError("\(error)"))
131+
}
132+
}
133+
115134
private func makeBoolHolder(
116135
_ buffers: [ArrowBuffer],
117136
nullCount: UInt
@@ -214,6 +233,8 @@ func makeArrayHolder( // swiftlint:disable:this cyclomatic_complexity
214233
return makeDateHolder(field, buffers: buffers, nullCount: nullCount)
215234
case .time32, .time64:
216235
return makeTimeHolder(field, buffers: buffers, nullCount: nullCount)
236+
case .timestamp:
237+
return makeTimestampHolder(field, buffers: buffers, nullCount: nullCount)
217238
case .strct:
218239
return makeStructHolder(
219240
field, buffers: buffers, nullCount: nullCount, children: children!, rbLength: rbLength)
@@ -234,7 +255,7 @@ func makeBuffer(
234255

235256
func isFixedPrimitive(_ type: org_apache_arrow_flatbuf_Type_) -> Bool {
236257
switch type {
237-
case .int, .bool, .floatingpoint, .date, .time, .decimal:
258+
case .int, .bool, .floatingpoint, .date, .time, .timestamp, .decimal:
238259
return true
239260
default:
240261
return false
@@ -307,6 +328,22 @@ func findArrowType( // swiftlint:disable:this cyclomatic_complexity function_bo
307328
}
308329

309330
return ArrowTypeTime64(timeType.unit == .microsecond ? .microseconds : .nanoseconds)
331+
case .timestamp:
332+
let timestampType = field.type(type: org_apache_arrow_flatbuf_Timestamp.self)!
333+
let arrowUnit: ArrowTimestampUnit
334+
switch timestampType.unit {
335+
case .second:
336+
arrowUnit = .seconds
337+
case .millisecond:
338+
arrowUnit = .milliseconds
339+
case .microsecond:
340+
arrowUnit = .microseconds
341+
case .nanosecond:
342+
arrowUnit = .nanoseconds
343+
}
344+
345+
let timezone = timestampType.timezone
346+
return ArrowTypeTimestamp(arrowUnit, timezone: timezone)
310347
case .struct_:
311348
_ = field.type(type: org_apache_arrow_flatbuf_Struct_.self)!
312349
var fields = [ArrowField]()

Sources/SparkConnect/ArrowType.swift

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ public typealias Time64 = Int64
2525
public typealias Date32 = Int32
2626
/// @nodoc
2727
public typealias Date64 = Int64
28+
public typealias Timestamp = Int64
2829

2930
func FlatBuffersVersion_23_1_4() { // swiftlint:disable:this identifier_name
3031
}
@@ -70,6 +71,7 @@ public enum ArrowTypeId: Sendable, Equatable {
7071
case strct
7172
case time32
7273
case time64
74+
case timestamp
7375
case time
7476
case uint16
7577
case uint32
@@ -146,6 +148,47 @@ public class ArrowTypeDecimal128: ArrowType {
146148
}
147149
}
148150

151+
public enum ArrowTimestampUnit {
152+
case seconds
153+
case milliseconds
154+
case microseconds
155+
case nanoseconds
156+
}
157+
158+
public class ArrowTypeTimestamp: ArrowType {
159+
let unit: ArrowTimestampUnit
160+
let timezone: String?
161+
162+
public init(_ unit: ArrowTimestampUnit, timezone: String? = nil) {
163+
self.unit = unit
164+
self.timezone = timezone
165+
166+
super.init(ArrowType.ArrowTimestamp)
167+
}
168+
169+
public convenience init(type: ArrowTypeId) {
170+
self.init(.milliseconds, timezone: nil)
171+
}
172+
173+
public override var cDataFormatId: String {
174+
get throws {
175+
let unitChar: String
176+
switch self.unit {
177+
case .seconds: unitChar = "s"
178+
case .milliseconds: unitChar = "m"
179+
case .microseconds: unitChar = "u"
180+
case .nanoseconds: unitChar = "n"
181+
}
182+
183+
if let timezone = self.timezone {
184+
return "ts\(unitChar):\(timezone)"
185+
} else {
186+
return "ts\(unitChar)"
187+
}
188+
}
189+
}
190+
}
191+
149192
/// @nodoc
150193
public class ArrowNestedType: ArrowType {
151194
let fields: [ArrowField]
@@ -177,6 +220,7 @@ public class ArrowType {
177220
public static let ArrowBinary = Info.variableInfo(ArrowTypeId.binary)
178221
public static let ArrowTime32 = Info.timeInfo(ArrowTypeId.time32)
179222
public static let ArrowTime64 = Info.timeInfo(ArrowTypeId.time64)
223+
public static let ArrowTimestamp = Info.timeInfo(ArrowTypeId.timestamp)
180224
public static let ArrowStruct = Info.complexInfo(ArrowTypeId.strct)
181225

182226
public init(_ info: ArrowType.Info) {
@@ -305,6 +349,8 @@ public class ArrowType {
305349
return MemoryLayout<Time32>.stride
306350
case .time64:
307351
return MemoryLayout<Time64>.stride
352+
case .timestamp:
353+
return MemoryLayout<Timestamp>.stride
308354
case .binary:
309355
return MemoryLayout<Int8>.stride
310356
case .string:
@@ -357,6 +403,11 @@ public class ArrowType {
357403
return try time64.cDataFormatId
358404
}
359405
return "ttu"
406+
case ArrowTypeId.timestamp:
407+
if let timestamp = self as? ArrowTypeTimestamp {
408+
return try timestamp.cDataFormatId
409+
}
410+
return "tsu"
360411
case ArrowTypeId.binary:
361412
return "z"
362413
case ArrowTypeId.string:
@@ -409,6 +460,27 @@ public class ArrowType {
409460
return ArrowTypeTime64(.microseconds)
410461
} else if from == "ttn" {
411462
return ArrowTypeTime64(.nanoseconds)
463+
} else if from.starts(with: "ts") {
464+
let components = from.split(separator: ":", maxSplits: 1)
465+
guard let unitPart = components.first, unitPart.count == 3 else {
466+
throw ArrowError.invalid(
467+
"Invalid timestamp format '\(from)'. Expected format 'ts[s|m|u|n][:timezone]'")
468+
}
469+
470+
let unitChar = unitPart.suffix(1)
471+
let unit: ArrowTimestampUnit
472+
switch unitChar {
473+
case "s": unit = .seconds
474+
case "m": unit = .milliseconds
475+
case "u": unit = .microseconds
476+
case "n": unit = .nanoseconds
477+
default:
478+
throw ArrowError.invalid(
479+
"Unrecognized timestamp unit '\(unitChar)'. Expected 's', 'm', 'u', or 'n'.")
480+
}
481+
482+
let timezone = components.count > 1 ? String(components[1]) : nil
483+
return ArrowTypeTimestamp(unit, timezone: timezone)
412484
} else if from == "z" {
413485
return ArrowType(ArrowType.ArrowBinary)
414486
} else if from == "u" {

Sources/SparkConnect/ArrowWriterHelper.swift

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ func toFBTypeEnum(_ arrowType: ArrowType) -> Result<org_apache_arrow_flatbuf_Typ
4141
return .success(org_apache_arrow_flatbuf_Type_.date)
4242
case .time32, .time64:
4343
return .success(org_apache_arrow_flatbuf_Type_.time)
44+
case .timestamp:
45+
return .success(org_apache_arrow_flatbuf_Type_.timestamp)
4446
case .strct:
4547
return .success(org_apache_arrow_flatbuf_Type_.struct_)
4648
default:
@@ -114,6 +116,32 @@ func toFBType( // swiftlint:disable:this cyclomatic_complexity function_body_le
114116
}
115117

116118
return .failure(.invalid("Unable to case to Time64"))
119+
case .timestamp:
120+
if let timestampType = arrowType as? ArrowTypeTimestamp {
121+
let startOffset = org_apache_arrow_flatbuf_Timestamp.startTimestamp(&fbb)
122+
123+
let fbUnit: org_apache_arrow_flatbuf_TimeUnit
124+
switch timestampType.unit {
125+
case .seconds:
126+
fbUnit = .second
127+
case .milliseconds:
128+
fbUnit = .millisecond
129+
case .microseconds:
130+
fbUnit = .microsecond
131+
case .nanoseconds:
132+
fbUnit = .nanosecond
133+
}
134+
org_apache_arrow_flatbuf_Timestamp.add(unit: fbUnit, &fbb)
135+
136+
if let timezone = timestampType.timezone {
137+
let timezoneOffset = fbb.create(string: timezone)
138+
org_apache_arrow_flatbuf_Timestamp.add(timezone: timezoneOffset, &fbb)
139+
}
140+
141+
return .success(org_apache_arrow_flatbuf_Timestamp.endTimestamp(&fbb, start: startOffset))
142+
}
143+
144+
return .failure(.invalid("Unable to cast to Timestamp"))
117145
case .strct:
118146
let startOffset = org_apache_arrow_flatbuf_Struct_.startStruct_(&fbb)
119147
return .success(org_apache_arrow_flatbuf_Struct_.endStruct_(&fbb, start: startOffset))

Sources/SparkConnect/ProtoUtil.swift

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,22 @@ func fromProto( // swiftlint:disable:this cyclomatic_complexity function_body_l
7373
let arrowUnit: ArrowTime64Unit = timeType.unit == .microsecond ? .microseconds : .nanoseconds
7474
arrowType = ArrowTypeTime64(arrowUnit)
7575
}
76+
case .timestamp:
77+
let timestampType = field.type(type: org_apache_arrow_flatbuf_Timestamp.self)!
78+
let arrowUnit: ArrowTimestampUnit
79+
switch timestampType.unit {
80+
case .second:
81+
arrowUnit = .seconds
82+
case .millisecond:
83+
arrowUnit = .milliseconds
84+
case .microsecond:
85+
arrowUnit = .microseconds
86+
case .nanosecond:
87+
arrowUnit = .nanoseconds
88+
}
89+
90+
let timezone = timestampType.timezone
91+
arrowType = ArrowTypeTimestamp(arrowUnit, timezone: timezone?.isEmpty == true ? nil : timezone)
7692
case .struct_:
7793
var children = [ArrowField]()
7894
for index in 0..<field.childrenCount {

0 commit comments

Comments
 (0)