-
Notifications
You must be signed in to change notification settings - Fork 2k
[Spark] Unity Catalog integration foundation and repository setup for Delta Lake #5523
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
- Moved UnityCatalogSupport.scala and UnityCatalogSupportSuite.scala to com.sparkuctest package - Updated package declarations from org.apache.spark.sql.delta to com.sparkuctest - All 4 tests pass successfully with Spark 4.0
8d685a6 to
1b7fe04
Compare
| // scalastyle:off println | ||
| println(s"Unity Catalog server started and ready at $unityCatalogUri") | ||
| println(s"Created catalog '$unityCatalogName' with schema 'default'") | ||
| // scalastyle:on println |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make all prints into log lines
| Thread.sleep(5000) | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
instead of sleeping a fixed out can we poll to check when the server is ready
| private def createUCClient(): ApiClient = { | ||
| val client = new ApiClient() | ||
| // Extract port from unityCatalogUri | ||
| val port = unityCatalogUri.split(":")(2).toInt | ||
| client.setScheme("http") | ||
| client.setHost("localhost") | ||
| client.setPort(port) | ||
| client | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can the UnityCatalogSupport trait provide the client
spark/unitycatalog/src/test/scala/com/sparkuctest/UnityCatalogSupportSuite.scala
Outdated
Show resolved
Hide resolved
spark/unitycatalog/src/test/scala/com/sparkuctest/UnityCatalogSupportSuite.scala
Outdated
Show resolved
Hide resolved
spark/unitycatalog/src/test/scala/com/sparkuctest/UnityCatalogSupportSuite.scala
Show resolved
Hide resolved
…, and dependency management
…, and dependency management
| // Standard test dependencies | ||
| "org.scalatest" %% "scalatest" % scalaTestVersion % "test", | ||
|
|
||
| // Unity Catalog dependencies - exclude Jackson to use Spark's Jackson 2.15.x |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove jackson version
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we need to change the unitycatalog's jackson version, so that we can align the spark's.
huan233usc
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking good! just some questions on server setup and formatting.
| @@ -0,0 +1,258 @@ | |||
| /* | |||
| * Copyright (2021) The Delta Lake Project Authors. | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: 2025
| * limitations under the License. | ||
| */ | ||
|
|
||
| package com.sparkuctest |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is org.apache.spark.sql.delta.test.unty a better naming?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i am intentionally keeping it out of the package so that we dont accidentally depend or anything internal to delta or uc. closer to how any user / app would run
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But it's not a com package right ? maybe we can use io.unitycatalog.ittest ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
btw, should we write the test in scala or java ? will it be better to use Java to maintain those tests ?
| ucTempDir.deleteOnExit() | ||
|
|
||
| // Find an available port | ||
| ucPort = findAvailablePort() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: There is a small potential for the racing of port if the port is taken between here and Line 147. Should we include these logic in the retry? (I think might be ok to keep things as is as it is a test that is not parallelzable)
| val response: ListTablesResponse = tablesApi.listTables(catalogName, schemaName, null, null) | ||
|
|
||
| import scala.jdk.CollectionConverters._ | ||
| if (response.getTables != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The null check here seems to be redundant. If there is no tables, we will get empty list?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is to handle the case if any server returns the corresponding field in the response json as null (that is field is missing) instead of an empty [].
| * Creates a Unity Catalog API client configured for this server. | ||
| */ | ||
| def createUnityCatalogClient(): ApiClient = { | ||
| val port = unityCatalogUri.split(":")(2).toInt |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we directly us ucPort
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
haaah. stupid claude.
| // 3. Verify we can query UC server directly via SDK | ||
| val ucTables = listTables(unityCatalogName, "default") | ||
| // Should succeed even if empty - this confirms UC server is responding | ||
| assert(ucTables != null, "Should be able to query UC server via SDK") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This assertion should always pass as listTables never returns null, should we remove?
| """) | ||
|
|
||
| // If we got here, the catalog is working | ||
| val tables = spark.sql(s"SHOW TABLES IN $unityCatalogName.default") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does checkAnswer works?https://github.com/apache/spark/blob/61668ad02671af0a80ec6d91ddaf02e6b3f042e6/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala#L137
checkAnswer(
spark.sql(s"SHOW TABLES IN $unityCatalogName.default")
.select("tableName")
.filter($"tableName" === "test_verify_catalog"),
Row("test_verify_catalog") :: Nil
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems like more code than this version :)
| val result = spark.sql(s"SELECT * FROM $testTable ORDER BY id").collect() | ||
| assert(result.length == 3, s"Should have 3 rows, got ${result.length}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should use checkAnswer
openinx
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the work, just left several comments. It's a great work to get start with the unitycatalog integration testing work.
| kernel/kernel-benchmarks/benchmark_report.json | ||
|
|
||
| # Unity Catalog test artifacts | ||
| spark/unitycatalog/etc/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Which tests will generate those artifacts ?
|
|
||
|
|
||
| val unityCatalogVersion = "0.3.0" | ||
| val sparkUnityCatalogJacksonVersion = "2.15.4" // We are using Spark 4.0's Jackson version 2.15.x, to override Unity Catalog 0.3.0's version 2.18.x |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unrelated: I think we may need to relocate the Jackson in the oss-unitycatalog spark connector, since it's such a general client-side jar, which will run under different runtime environments. And for those common jars, it can be easily conflicted with the jars introduced from other projects.
I had an issue for oss-unitycatalog before: unitycatalog/unitycatalog#1141
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i agree.. jackson was a massive pain. we need to fix it.
|
|
||
| lazy val sparkUnityCatalog = (project in file("spark/unitycatalog")) | ||
| .dependsOn(spark % "compile->compile;test->test;provided->provided") | ||
| .disablePlugins(JavaFormatterPlugin, ScalafmtPlugin) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why disable the java and scala formatter plugin ? Seems weird ?
| // This is a test-only module - no production sources | ||
| Compile / sources := Seq.empty, | ||
|
|
||
| Test / javaOptions ++= Seq("-ea"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like uncommon to use -ea (or -enableassertions) for testing ? If we all use the org.assertj, then maybe we can remove this option, do I understand correctly ?
| // Standard test dependencies | ||
| "org.scalatest" %% "scalatest" % scalaTestVersion % "test", | ||
|
|
||
| // Unity Catalog dependencies - exclude Jackson to use Spark's Jackson 2.15.x |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we need to change the unitycatalog's jackson version, so that we can align the spark's.
| ), | ||
| assembly / assemblyMergeStrategy := { | ||
| // Discard `module-info.class` to fix the `different file contents found` error. | ||
| // TODO Upgrade SBT to 1.5 which will do this automatically |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Is this related to this PR ?
| val testClient = new ApiClient() | ||
| testClient.setScheme("http") | ||
| testClient.setHost("localhost") | ||
| testClient.setPort(ucPort) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not use the createUnityCatalogClient directly ?
|
|
||
| test("UnityCatalogSupport trait starts UC server and configures Spark correctly") { | ||
| // 1. Verify UC server is accessible via URI | ||
| assert(unityCatalogUri.startsWith("http://localhost:"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How will it work if we want to run this same tests on different UC server setup ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can make this a env variable. if this env variable is set, then we use that URL instead of starting a new UC OSS server.
🥞 Stacked PR
Use this link to review incremental changes.
Which Delta project/connector is this regarding?
Description
Establishes foundational Unity Catalog integration for Delta Lake with embedded UC server lifecycle management and comprehensive test framework.
Adds new
sparkUnityCatalogSBT module with Unity Catalog 0.3.0 dependencies,UnityCatalogSupporttrait for managing UC server lifecycle in tests, andUnityCatalogSupportSuitewith 4 integration tests validating UC-Delta connectivity and table operations. Uses shaded UC server JAR to avoid dependency conflicts. Compatible with Spark 4.0 and Delta Lake. Includes repository setup (.gitignore updates).How was this patch tested?
UnityCatalogSupportSuitewith 4 comprehensive integration testsDoes this PR introduce any user-facing changes?
No. This PR only adds test infrastructure for Unity Catalog integration. No user-facing functionality is changed.