Skip to content

[SPARK-51934] Add MacOS integration test with Apache Spark 3.5.5 #92

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions .github/workflows/build_and_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -118,3 +118,24 @@ jobs:
./start-connect-server.sh
cd ../..
swift test --no-parallel

integration-test-mac-spark3:
runs-on: macos-15
steps:
- uses: actions/checkout@v4
- uses: swift-actions/[email protected]
with:
swift-version: "6.1"
- name: Install Java
uses: actions/setup-java@v4
with:
distribution: zulu
java-version: 17
- name: Test
run: |
curl -LO https://downloads.apache.org/spark/spark-3.5.5/spark-3.5.5-bin-hadoop3.tgz
tar xvfz spark-3.5.5-bin-hadoop3.tgz
cd spark-3.5.5-bin-hadoop3/sbin
./start-connect-server.sh --packages org.apache.spark:spark-connect_2.12:3.5.5
cd ../..
swift test --no-parallel
4 changes: 2 additions & 2 deletions Tests/SparkConnectTests/DataFrameInternalTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ struct DataFrameInternalTests {
#expect(rows.count == 1)
#expect(rows[0].length == 1)
#expect(
try rows[0].get(0) as! String == """
try (rows[0].get(0) as! String).trimmingCharacters(in: .whitespacesAndNewlines) == """
+---+
|id |
+---+
Expand Down Expand Up @@ -73,7 +73,7 @@ struct DataFrameInternalTests {
#expect(rows[0].length == 1)
print(try rows[0].get(0) as! String)
#expect(
try rows[0].get(0) as! String == """
try (rows[0].get(0) as! String).trimmingCharacters(in: .whitespacesAndNewlines) == """
-RECORD 0--
id | 0
-RECORD 1--
Expand Down
12 changes: 7 additions & 5 deletions Tests/SparkConnectTests/DataFrameReaderTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,12 @@ struct DataFrameReaderTests {
@Test
func xml() async throws {
let spark = try await SparkSession.builder.getOrCreate()
let path = "../examples/src/main/resources/people.xml"
#expect(try await spark.read.option("rowTag", "person").format("xml").load(path).count() == 3)
#expect(try await spark.read.option("rowTag", "person").xml(path).count() == 3)
#expect(try await spark.read.option("rowTag", "person").xml(path, path).count() == 6)
if await spark.version >= "4.0.0" {
let path = "../examples/src/main/resources/people.xml"
#expect(try await spark.read.option("rowTag", "person").format("xml").load(path).count() == 3)
#expect(try await spark.read.option("rowTag", "person").xml(path).count() == 3)
#expect(try await spark.read.option("rowTag", "person").xml(path, path).count() == 6)
}
await spark.stop()
}

Expand Down Expand Up @@ -80,7 +82,7 @@ struct DataFrameReaderTests {
let tableName = "TABLE_" + UUID().uuidString.replacingOccurrences(of: "-", with: "")
let spark = try await SparkSession.builder.getOrCreate()
try await SQLHelper.withTable(spark, tableName)({
_ = try await spark.sql("CREATE TABLE \(tableName) AS VALUES (1), (2)").count()
_ = try await spark.sql("CREATE TABLE \(tableName) USING ORC AS VALUES (1), (2)").count()
#expect(try await spark.read.table(tableName).count() == 2)
})
await spark.stop()
Expand Down
53 changes: 30 additions & 23 deletions Tests/SparkConnectTests/DataFrameTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -69,16 +69,20 @@ struct DataFrameTests {
let spark = try await SparkSession.builder.getOrCreate()

let schema1 = try await spark.sql("SELECT 'a' as col1").schema
#expect(
schema1
== #"{"struct":{"fields":[{"name":"col1","dataType":{"string":{"collation":"UTF8_BINARY"}}}]}}"#
)
let answer1 = if await spark.version.starts(with: "4.") {
#"{"struct":{"fields":[{"name":"col1","dataType":{"string":{"collation":"UTF8_BINARY"}}}]}}"#
} else {
#"{"struct":{"fields":[{"name":"col1","dataType":{"string":{}}}]}}"#
}
#expect(schema1 == answer1)

let schema2 = try await spark.sql("SELECT 'a' as col1, 'b' as col2").schema
#expect(
schema2
== #"{"struct":{"fields":[{"name":"col1","dataType":{"string":{"collation":"UTF8_BINARY"}}},{"name":"col2","dataType":{"string":{"collation":"UTF8_BINARY"}}}]}}"#
)
let answer2 = if await spark.version.starts(with: "4.") {
#"{"struct":{"fields":[{"name":"col1","dataType":{"string":{"collation":"UTF8_BINARY"}}},{"name":"col2","dataType":{"string":{"collation":"UTF8_BINARY"}}}]}}"#
} else {
#"{"struct":{"fields":[{"name":"col1","dataType":{"string":{}}},{"name":"col2","dataType":{"string":{}}}]}}"#
}
#expect(schema2 == answer2)

let emptySchema = try await spark.sql("DROP TABLE IF EXISTS nonexistent").schema
#expect(emptySchema == #"{"struct":{}}"#)
Expand Down Expand Up @@ -319,11 +323,12 @@ struct DataFrameTests {
let spark = try await SparkSession.builder.getOrCreate()
#expect(try await spark.sql("DROP TABLE IF EXISTS t").count() == 0)
#expect(try await spark.sql("SHOW TABLES").count() == 0)
#expect(try await spark.sql("CREATE TABLE IF NOT EXISTS t(a INT)").count() == 0)
#expect(try await spark.sql("CREATE TABLE IF NOT EXISTS t(a INT) USING ORC").count() == 0)
#expect(try await spark.sql("SHOW TABLES").count() == 1)
#expect(try await spark.sql("SELECT * FROM t").count() == 0)
#expect(try await spark.sql("INSERT INTO t VALUES (1), (2), (3)").count() == 0)
#expect(try await spark.sql("SELECT * FROM t").count() == 3)
#expect(try await spark.sql("DROP TABLE IF EXISTS t").count() == 0)
await spark.stop()
}

Expand Down Expand Up @@ -482,20 +487,22 @@ struct DataFrameTests {
@Test
func lateralJoin() async throws {
let spark = try await SparkSession.builder.getOrCreate()
let df1 = try await spark.sql("SELECT * FROM VALUES ('a', '1'), ('b', '2') AS T(a, b)")
let df2 = try await spark.sql("SELECT * FROM VALUES ('c', '2'), ('d', '3') AS S(c, b)")
let expectedCross = [
Row("a", "1", "c", "2"),
Row("a", "1", "d", "3"),
Row("b", "2", "c", "2"),
Row("b", "2", "d", "3"),
]
#expect(try await df1.lateralJoin(df2).collect() == expectedCross)
#expect(try await df1.lateralJoin(df2, joinType: "inner").collect() == expectedCross)

let expected = [Row("b", "2", "c", "2")]
#expect(try await df1.lateralJoin(df2, joinExprs: "T.b = S.b").collect() == expected)
#expect(try await df1.lateralJoin(df2, joinExprs: "T.b = S.b", joinType: "inner").collect() == expected)
if await spark.version.starts(with: "4.") {
let df1 = try await spark.sql("SELECT * FROM VALUES ('a', '1'), ('b', '2') AS T(a, b)")
let df2 = try await spark.sql("SELECT * FROM VALUES ('c', '2'), ('d', '3') AS S(c, b)")
let expectedCross = [
Row("a", "1", "c", "2"),
Row("a", "1", "d", "3"),
Row("b", "2", "c", "2"),
Row("b", "2", "d", "3"),
]
#expect(try await df1.lateralJoin(df2).collect() == expectedCross)
#expect(try await df1.lateralJoin(df2, joinType: "inner").collect() == expectedCross)

let expected = [Row("b", "2", "c", "2")]
#expect(try await df1.lateralJoin(df2, joinExprs: "T.b = S.b").collect() == expected)
#expect(try await df1.lateralJoin(df2, joinExprs: "T.b = S.b", joinType: "inner").collect() == expected)
}
await spark.stop()
}

Expand Down
6 changes: 4 additions & 2 deletions Tests/SparkConnectTests/DataFrameWriterTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,10 @@ struct DataFrameWriterTests {
func xml() async throws {
let tmpDir = "/tmp/" + UUID().uuidString
let spark = try await SparkSession.builder.getOrCreate()
try await spark.range(2025).write.option("rowTag", "person").xml(tmpDir)
#expect(try await spark.read.option("rowTag", "person").xml(tmpDir).count() == 2025)
if await spark.version >= "4.0.0" {
try await spark.range(2025).write.option("rowTag", "person").xml(tmpDir)
#expect(try await spark.read.option("rowTag", "person").xml(tmpDir).count() == 2025)
}
await spark.stop()
}

Expand Down
11 changes: 11 additions & 0 deletions Tests/SparkConnectTests/SQLTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,24 @@ struct SQLTests {
#expect(removeOwner("185") == "*")
}

let queriesForSpark4Only: [String] = [
"create_scala_function.sql",
"create_table_function.sql",
"pipesyntax.sql",
"explain.sql",
]

#if !os(Linux)
@Test
func runAll() async throws {
let spark = try await SparkSession.builder.getOrCreate()
for name in try! fm.contentsOfDirectory(atPath: path).sorted() {
guard name.hasSuffix(".sql") else { continue }
print(name)
if queriesForSpark4Only.contains(name) {
print("Skip query \(name) due to the difference between Spark 3 and 4.")
continue
}

let sql = try String(contentsOf: URL(fileURLWithPath: "\(path)/\(name)"), encoding: .utf8)
let answer = cleanUp(try await spark.sql(sql).collect().map { $0.toString() }.joined(separator: "\n"))
Expand Down
10 changes: 5 additions & 5 deletions Tests/SparkConnectTests/SparkConnectClientTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -84,15 +84,15 @@ struct SparkConnectClientTests {
await client.stop()
}

#if !os(Linux) // TODO: Enable this with the offical Spark 4 docker image
@Test
func jsonToDdl() async throws {
let client = SparkConnectClient(remote: TEST_REMOTE)
let _ = try await client.connect(UUID().uuidString)
let json =
let response = try await client.connect(UUID().uuidString)
if response.sparkVersion.version.starts(with: "4.") {
let json =
#"{"type":"struct","fields":[{"name":"id","type":"long","nullable":false,"metadata":{}}]}"#
#expect(try await client.jsonToDdl(json) == "id BIGINT NOT NULL")
#expect(try await client.jsonToDdl(json) == "id BIGINT NOT NULL")
}
await client.stop()
}
#endif
}
5 changes: 3 additions & 2 deletions Tests/SparkConnectTests/SparkSessionTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ struct SparkSessionTests {
@Test
func version() async throws {
let spark = try await SparkSession.builder.getOrCreate()
#expect(await spark.version.starts(with: "4.0.0"))
let version = await spark.version
#expect(version.starts(with: "4.0.0") || version.starts(with: "3.5."))
await spark.stop()
}

Expand All @@ -80,7 +81,7 @@ struct SparkSessionTests {
let tableName = "TABLE_" + UUID().uuidString.replacingOccurrences(of: "-", with: "")
let spark = try await SparkSession.builder.getOrCreate()
try await SQLHelper.withTable(spark, tableName)({
_ = try await spark.sql("CREATE TABLE \(tableName) AS VALUES (1), (2)").count()
_ = try await spark.sql("CREATE TABLE \(tableName) USING ORC AS VALUES (1), (2)").count()
#expect(try await spark.table(tableName).count() == 2)
})
await spark.stop()
Expand Down
Loading