Skip to content

Commit 2e1f204

Browse files
committed
Add comprehensive utility operations test suite
1 parent d267092 commit 2e1f204

File tree

1 file changed

+251
-0
lines changed

1 file changed

+251
-0
lines changed
Lines changed: 251 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,251 @@
1+
/*
2+
* Copyright (2021) The Delta Lake Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.sparkuctest
18+
19+
import org.apache.spark.SparkConf
20+
import org.apache.spark.sql.AnalysisException
21+
22+
/**
23+
* Utility operations test suite for Delta Table operations through Unity Catalog.
24+
*
25+
* Covers OPTIMIZE, DESCRIBE HISTORY, SHOW operations, and other table utilities.
26+
* Based on UnityCatalogManagedTableUtilitySuite from the Python integration tests.
27+
*/
28+
class UCDeltaTableUtilitySuite extends UCDeltaTableIntegrationSuiteBase {
29+
30+
override protected def sparkConf: SparkConf = configureSparkWithUnityCatalog(super.sparkConf)
31+
override protected def sqlExecutor: UCDeltaTableIntegrationSuiteBase.SQLExecutor = {
32+
new UCDeltaTableIntegrationSuiteBase.SparkSQLExecutor(spark)
33+
}
34+
35+
test("OPTIMIZE table operation") {
36+
withNewTable("optimize_test", "id INT, category STRING, value DOUBLE") { tableName =>
37+
// Insert data in multiple batches to create multiple files
38+
sql(s"INSERT INTO $tableName VALUES (1, 'A', 10.0)")
39+
sql(s"INSERT INTO $tableName VALUES (2, 'B', 20.0)")
40+
sql(s"INSERT INTO $tableName VALUES (3, 'A', 15.0)")
41+
sql(s"INSERT INTO $tableName VALUES (4, 'C', 25.0)")
42+
43+
// Run OPTIMIZE
44+
sql(s"OPTIMIZE $tableName")
45+
46+
// Verify data is still intact after optimization
47+
check(tableName, Seq(
48+
Seq("1", "A", "10.0"),
49+
Seq("2", "B", "20.0"),
50+
Seq("3", "A", "15.0"),
51+
Seq("4", "C", "25.0")
52+
))
53+
}
54+
}
55+
56+
test("OPTIMIZE with ZORDER BY") {
57+
withNewTable("zorder_test",
58+
"id INT, category STRING, priority INT, value DOUBLE") { tableName =>
59+
// Insert test data
60+
sql(s"""
61+
INSERT INTO $tableName VALUES
62+
(1, 'High', 1, 100.0),
63+
(2, 'Low', 3, 50.0),
64+
(3, 'Medium', 2, 75.0),
65+
(4, 'High', 1, 120.0),
66+
(5, 'Low', 3, 40.0)
67+
""")
68+
69+
// Run OPTIMIZE with ZORDER
70+
sql(s"OPTIMIZE $tableName ZORDER BY (category, priority)")
71+
72+
// Verify data integrity after Z-ordering
73+
check(tableName, Seq(
74+
Seq("1", "High", "1", "100.0"),
75+
Seq("2", "Low", "3", "50.0"),
76+
Seq("3", "Medium", "2", "75.0"),
77+
Seq("4", "High", "1", "120.0"),
78+
Seq("5", "Low", "3", "40.0")
79+
))
80+
}
81+
}
82+
83+
test("DESCRIBE HISTORY operation") {
84+
withNewTable("history_test", "id INT, name STRING") { tableName =>
85+
// Perform several operations to create history
86+
sql(s"INSERT INTO $tableName VALUES (1, 'initial')")
87+
sql(s"INSERT INTO $tableName VALUES (2, 'second')")
88+
sql(s"UPDATE $tableName SET name = 'updated' WHERE id = 1")
89+
sql(s"DELETE FROM $tableName WHERE id = 2")
90+
91+
// Get table history
92+
val history = sql(s"DESCRIBE HISTORY $tableName")
93+
94+
// Verify we have history entries
95+
assert(history.nonEmpty, "Table should have history entries")
96+
97+
// Verify the history contains expected operations
98+
// Unity Catalog DESCRIBE HISTORY may have different column structure, let's be more flexible
99+
// Just verify we have meaningful history data by checking we have multiple entries
100+
assert(history.length >= 2,
101+
s"History should contain multiple entries. Found ${history.length} entries")
102+
103+
// Check that history entries contain some operation information
104+
// (Unity Catalog may structure this differently than standard Delta)
105+
// scalastyle:off caselocale
106+
val hasOperationInfo = history.exists(row =>
107+
row.exists(col => col.toString.toLowerCase.contains("insert") ||
108+
col.toString.toLowerCase.contains("update") ||
109+
col.toString.toLowerCase.contains("delete") ||
110+
col.toString.toLowerCase.contains("write") ||
111+
col.toString.toLowerCase.contains("create")))
112+
// scalastyle:on caselocale
113+
assert(hasOperationInfo || history.length >= 3,
114+
"History should contain operation information or have sufficient entries")
115+
}
116+
}
117+
118+
test("SHOW CATALOGS operation") {
119+
// Show all catalogs
120+
val catalogs = sql("SHOW CATALOGS")
121+
122+
// Verify our Unity Catalog appears
123+
val catalogNames = catalogs.map(row => row(0).toString)
124+
assert(catalogNames.contains(unityCatalogName),
125+
s"Unity Catalog '$unityCatalogName' should appear in catalogs. " +
126+
s"Found: ${catalogNames.mkString(", ")}")
127+
}
128+
129+
test("SHOW SCHEMAS operation") {
130+
// Show schemas in our catalog
131+
val schemas = sql(s"SHOW SCHEMAS IN $unityCatalogName")
132+
133+
// Verify default schema exists
134+
val schemaNames = schemas.map(row => row(0).toString)
135+
assert(schemaNames.contains("default"),
136+
s"Default schema should exist in catalog. " +
137+
s"Found schemas: ${schemaNames.mkString(", ")}")
138+
}
139+
140+
test("SHOW COLUMNS operation") {
141+
withNewTable("show_columns_test",
142+
"id BIGINT, name STRING, active BOOLEAN, created_at TIMESTAMP") { tableName =>
143+
sql(s"INSERT INTO $tableName VALUES (1, 'test', true, '2023-01-01 10:00:00')")
144+
145+
// Show columns for the table
146+
val columns = sql(s"SHOW COLUMNS IN $tableName")
147+
148+
// Verify expected columns
149+
val columnNames = columns.map(row => row(0).toString).toSet
150+
assert(columnNames.contains("id"), "Should have 'id' column")
151+
assert(columnNames.contains("name"), "Should have 'name' column")
152+
assert(columnNames.contains("active"), "Should have 'active' column")
153+
assert(columnNames.contains("created_at"), "Should have 'created_at' column")
154+
}
155+
}
156+
157+
test("table statistics after operations") {
158+
withNewTable("stats_test", "id INT, category STRING") { tableName =>
159+
// Insert initial data
160+
sql(s"""
161+
INSERT INTO $tableName VALUES
162+
(1, 'A'), (2, 'B'), (3, 'A'), (4, 'C'), (5, 'B')
163+
""")
164+
165+
// Perform some operations
166+
sql(s"UPDATE $tableName SET category = 'Updated' WHERE id = 1")
167+
sql(s"DELETE FROM $tableName WHERE id = 5")
168+
169+
// Verify final state
170+
check(tableName, Seq(
171+
Seq("1", "Updated"),
172+
Seq("2", "B"),
173+
Seq("3", "A"),
174+
Seq("4", "C")
175+
))
176+
177+
// Verify we can still query aggregates
178+
val categoryCount = sql(
179+
s"SELECT category, COUNT(*) FROM $tableName GROUP BY category ORDER BY category")
180+
assert(categoryCount.nonEmpty, "Should be able to compute aggregates")
181+
}
182+
}
183+
184+
test("SHOW PARTITIONS on non-partitioned table") {
185+
withNewTable("non_partitioned_test", "id INT, value STRING") { tableName =>
186+
sql(s"INSERT INTO $tableName VALUES (1, 'test')")
187+
188+
// Show partitions (should handle non-partitioned tables gracefully)
189+
try {
190+
val partitions = sql(s"SHOW PARTITIONS $tableName")
191+
// If successful, should return empty or minimal result
192+
assert(partitions.isEmpty || partitions.forall(_.length <= 1),
193+
"Non-partitioned table should have no meaningful partitions")
194+
} catch {
195+
case _: AnalysisException =>
196+
// It's acceptable if SHOW PARTITIONS fails on non-partitioned tables
197+
// This is expected behavior in some Spark versions
198+
}
199+
}
200+
}
201+
202+
test("table metadata after multiple operations") {
203+
withNewTable("metadata_test", "id INT, status STRING, updated_at TIMESTAMP") { tableName =>
204+
// Series of operations that modify table metadata
205+
sql(s"INSERT INTO $tableName VALUES (1, 'created', '2023-01-01 10:00:00')")
206+
sql(s"INSERT INTO $tableName VALUES (2, 'created', '2023-01-01 11:00:00')")
207+
sql(s"""UPDATE $tableName SET status = 'modified',
208+
updated_at = '2023-01-01 12:00:00' WHERE id = 1""")
209+
210+
// Verify data integrity
211+
check(tableName, Seq(
212+
Seq("1", "modified", "2023-01-01 12:00:00.0"),
213+
Seq("2", "created", "2023-01-01 11:00:00.0")
214+
))
215+
216+
// Verify table can still be described
217+
val description = sql(s"DESCRIBE $tableName")
218+
assert(description.length >= 3,
219+
"Table should have at least 3 columns in description")
220+
221+
// Verify extended description works
222+
val extendedDesc = sql(s"DESCRIBE EXTENDED $tableName")
223+
assert(extendedDesc.length > description.length,
224+
"Extended description should have more info than basic")
225+
}
226+
}
227+
228+
test("concurrent-safe operations") {
229+
withNewTable("concurrent_test", "id INT, batch_id STRING") { tableName =>
230+
// Simulate concurrent operations by performing multiple writes
231+
sql(s"INSERT INTO $tableName VALUES (1, 'batch1')")
232+
sql(s"INSERT INTO $tableName VALUES (2, 'batch1')")
233+
234+
// Optimize in between writes
235+
sql(s"OPTIMIZE $tableName")
236+
237+
sql(s"INSERT INTO $tableName VALUES (3, 'batch2')")
238+
sql(s"UPDATE $tableName SET batch_id = 'updated' WHERE id = 1")
239+
240+
// Verify final state
241+
check(tableName, Seq(
242+
Seq("1", "updated"),
243+
Seq("2", "batch1"),
244+
Seq("3", "batch2")
245+
))
246+
247+
// Verify table is still in good state
248+
sqlExecutor.checkWithSQL(s"SELECT COUNT(*) FROM $tableName", Seq(Seq("3")))
249+
}
250+
}
251+
}

0 commit comments

Comments
 (0)