diff --git a/table/table_test.go b/table/table_test.go index cef39a58c..f76f5807a 100644 --- a/table/table_test.go +++ b/table/table_test.go @@ -1202,6 +1202,1062 @@ func TestTableWriting(t *testing.T) { suite.Run(t, &TableWritingTestSuite{formatVersion: 2}) } +// WriteOperationsTestSuite provides comprehensive testing for advanced write operations +type WriteOperationsTestSuite struct { + suite.Suite + ctx context.Context + location string + tableSchema *iceberg.Schema + arrSchema *arrow.Schema + arrTable arrow.Table +} + +func TestWriteOperations(t *testing.T) { + suite.Run(t, new(WriteOperationsTestSuite)) +} + +func (s *WriteOperationsTestSuite) SetupSuite() { + s.ctx = context.Background() + mem := memory.DefaultAllocator + + // Create a test schema + s.tableSchema = iceberg.NewSchema(0, + iceberg.NestedField{ID: 1, Name: "id", Type: iceberg.PrimitiveTypes.Int64, Required: true}, + iceberg.NestedField{ID: 2, Name: "data", Type: iceberg.PrimitiveTypes.String}, + iceberg.NestedField{ID: 3, Name: "ts", Type: iceberg.PrimitiveTypes.Timestamp}) + + s.arrSchema = arrow.NewSchema([]arrow.Field{ + {Name: "id", Type: arrow.PrimitiveTypes.Int64, Nullable: false}, + {Name: "data", Type: arrow.BinaryTypes.String, Nullable: true}, + {Name: "ts", Type: &arrow.TimestampType{Unit: arrow.Microsecond}, Nullable: true}, + }, nil) + + // Create test data + var err error + s.arrTable, err = array.TableFromJSON(mem, s.arrSchema, []string{ + `[ + {"id": 1, "data": "foo", "ts": 1672531200000000}, + {"id": 2, "data": "bar", "ts": 1672534800000000}, + {"id": 3, "data": "baz", "ts": 1672538400000000} + ]`, + }) + s.Require().NoError(err) +} + +func (s *WriteOperationsTestSuite) SetupTest() { + s.location = filepath.ToSlash(strings.Replace(s.T().TempDir(), "#", "", -1)) +} + +func (s *WriteOperationsTestSuite) TearDownSuite() { + s.arrTable.Release() +} + +func (s *WriteOperationsTestSuite) getMetadataLoc() string { + return fmt.Sprintf("%s/metadata/%05d-%s.metadata.json", + s.location, 1, uuid.New().String()) +} + +func (s *WriteOperationsTestSuite) writeParquet(fio iceio.WriteFileIO, filePath string, arrTbl arrow.Table) { + fo, err := fio.Create(filePath) + s.Require().NoError(err) + + s.Require().NoError(pqarrow.WriteTable(arrTbl, fo, arrTbl.NumRows(), + nil, pqarrow.DefaultWriterProps())) +} + +func (s *WriteOperationsTestSuite) createTable(identifier table.Identifier, formatVersion int, spec iceberg.PartitionSpec) *table.Table { + meta, err := table.NewMetadata(s.tableSchema, &spec, table.UnsortedSortOrder, + s.location, iceberg.Properties{"format-version": strconv.Itoa(formatVersion)}) + s.Require().NoError(err) + + return table.New( + identifier, + meta, + s.getMetadataLoc(), + func(ctx context.Context) (iceio.IO, error) { + return iceio.LocalFS{}, nil + }, + &mockedCatalog{}, + ) +} + +func (s *WriteOperationsTestSuite) createTableWithData(identifier table.Identifier, numFiles int) (*table.Table, []string) { + tbl := s.createTable(identifier, 2, *iceberg.UnpartitionedSpec) + + files := make([]string, 0, numFiles) + fs := s.getFS(tbl) + + for i := 0; i < numFiles; i++ { + filePath := fmt.Sprintf("%s/data/test-%d.parquet", s.location, i) + s.writeParquet(fs, filePath, s.arrTable) + files = append(files, filePath) + } + + // Add files to table + tx := tbl.NewTransaction() + s.Require().NoError(tx.AddFiles(s.ctx, files, nil, false)) + + committedTbl, err := tx.Commit(s.ctx) + s.Require().NoError(err) + + return committedTbl, files +} + +func (s *WriteOperationsTestSuite) getFS(tbl *table.Table) iceio.WriteFileIO { + fs, err := tbl.FS(s.ctx) + s.Require().NoError(err) + + return fs.(iceio.WriteFileIO) +} + +// ============================================================================= +// SNAPSHOT VALIDATION HELPERS +// ============================================================================= + +// validateSnapshotFiles checks that the snapshot contains exactly the expected files +func (s *WriteOperationsTestSuite) validateSnapshotFiles(snapshot *table.Snapshot, fs iceio.IO, expectedFiles []string) { + s.Require().NotNil(snapshot, "Snapshot should not be nil") + + // Get actual files from snapshot using public API + manifests, err := snapshot.Manifests(fs) + s.Require().NoError(err, "Failed to read manifests from snapshot") + + var actualFiles []string + for _, manifest := range manifests { + entries, err := manifest.FetchEntries(fs, false) + s.Require().NoError(err, "Failed to fetch entries from manifest") + + for _, entry := range entries { + // Only include data files (not delete files) + if entry.DataFile().ContentType() == iceberg.EntryContentData { + actualFiles = append(actualFiles, entry.DataFile().FilePath()) + } + } + } + + // Sort for comparison + expectedSorted := make([]string, len(expectedFiles)) + copy(expectedSorted, expectedFiles) + slices.Sort(expectedSorted) + slices.Sort(actualFiles) + + s.Equal(len(expectedSorted), len(actualFiles), "File count mismatch - expected %d files, got %d", len(expectedSorted), len(actualFiles)) + s.Equal(expectedSorted, actualFiles, "File paths don't match expected.\nExpected: %v\nActual: %v", expectedSorted, actualFiles) + + s.T().Logf("Snapshot file validation passed: %d files match expected", len(actualFiles)) +} + +// validateSnapshotSummary checks operation type and summary properties +func (s *WriteOperationsTestSuite) validateSnapshotSummary(snapshot *table.Snapshot, expectedOp table.Operation, expectedCounts map[string]string) { + s.Require().NotNil(snapshot, "Snapshot should not be nil") + s.Require().NotNil(snapshot.Summary, "Snapshot summary should not be nil") + + s.Equal(expectedOp, snapshot.Summary.Operation, "Snapshot operation mismatch") + + for key, expectedValue := range expectedCounts { + actualValue, exists := snapshot.Summary.Properties[key] + s.True(exists, "Summary property %s should exist", key) + s.Equal(expectedValue, actualValue, "Summary property %s mismatch - expected %s, got %s", key, expectedValue, actualValue) + } + + s.T().Logf("Snapshot summary validation passed: operation=%s, properties=%d", expectedOp, len(expectedCounts)) +} + +// validateManifestStructure validates manifest files and returns total entry count +func (s *WriteOperationsTestSuite) validateManifestStructure(snapshot *table.Snapshot, fs iceio.IO) int { + s.Require().NotNil(snapshot, "Snapshot should not be nil") + + manifests, err := snapshot.Manifests(fs) + s.Require().NoError(err, "Failed to read manifests from snapshot") + s.Greater(len(manifests), 0, "Should have at least one manifest") + + totalEntries := 0 + for i, manifest := range manifests { + // Validate manifest is readable + entries, err := manifest.FetchEntries(fs, false) + s.Require().NoError(err, "Failed to fetch entries from manifest %d", i) + totalEntries += len(entries) + + // Validate manifest metadata + s.Greater(manifest.Length(), int64(0), "Manifest %d should have positive length", i) + s.NotEmpty(manifest.FilePath(), "Manifest %d should have valid path", i) + + s.T().Logf("📄 Manifest %d: %s (%d entries, %d bytes)", i, manifest.FilePath(), len(entries), manifest.Length()) + } + + s.T().Logf("Manifest structure validation passed: %d manifests, %d total entries", len(manifests), totalEntries) + + return totalEntries +} + +// validateSnapshotState performs comprehensive validation of snapshot state +func (s *WriteOperationsTestSuite) validateSnapshotState(snapshot *table.Snapshot, fs iceio.IO, expectedFiles []string, expectedOp table.Operation, expectedCounts map[string]string) { + s.T().Logf("🔍 Validating snapshot state (ID: %d)", snapshot.SnapshotID) + + // Validate all components + s.validateSnapshotFiles(snapshot, fs, expectedFiles) + s.validateSnapshotSummary(snapshot, expectedOp, expectedCounts) + entryCount := s.validateManifestStructure(snapshot, fs) + + // Ensure manifest entries match file count + s.Equal(len(expectedFiles), entryCount, "Manifest entry count should match expected file count") + + s.T().Logf("🎉 Complete snapshot validation passed for snapshot %d", snapshot.SnapshotID) +} + +// validateDataIntegrity scans the table and validates row count and basic data +func (s *WriteOperationsTestSuite) validateDataIntegrity(tbl *table.Table, expectedRowCount int64) { + scan := tbl.Scan() + results, err := scan.ToArrowTable(s.ctx) + s.Require().NoError(err, "Failed to scan table for data integrity check") + defer results.Release() + + s.Equal(expectedRowCount, results.NumRows(), "Row count mismatch - expected %d, got %d", expectedRowCount, results.NumRows()) + + // Basic data validation - ensure we can read the data + s.Equal(3, int(results.NumCols()), "Should have 3 columns (id, data, ts)") + + s.T().Logf("Data integrity validation passed: %d rows, %d columns", results.NumRows(), results.NumCols()) +} + +// getSnapshotFiles extracts the list of data file paths from a snapshot for comparison +func (s *WriteOperationsTestSuite) getSnapshotFiles(snapshot *table.Snapshot, fs iceio.IO) []string { + if snapshot == nil { + return []string{} + } + + manifests, err := snapshot.Manifests(fs) + s.Require().NoError(err, "Failed to read manifests from snapshot") + + var files []string + for _, manifest := range manifests { + entries, err := manifest.FetchEntries(fs, false) + s.Require().NoError(err, "Failed to fetch entries from manifest") + + for _, entry := range entries { + // Only include data files (not delete files) + if entry.DataFile().ContentType() == iceberg.EntryContentData { + files = append(files, entry.DataFile().FilePath()) + } + } + } + + // Sort for consistent comparison + slices.Sort(files) + + return files +} + +// ============================================================================= +// TESTS WITH ENHANCED VALIDATION +// ============================================================================= + +func (s *WriteOperationsTestSuite) TestRewriteFiles() { + s.Run("RewriteDataFiles", func() { + // Setup table with multiple small files + ident := table.Identifier{"default", "rewrite_test_table"} + tbl, originalFiles := s.createTableWithData(ident, 3) + + // VALIDATE INITIAL STATE + initialSnapshot := tbl.CurrentSnapshot() + s.validateSnapshotState(initialSnapshot, s.getFS(tbl), originalFiles, table.OpAppend, map[string]string{ + "added-data-files": "3", + "added-records": "9", // 3 files × 3 rows each + }) + s.validateDataIntegrity(tbl, 9) // 3 files × 3 rows each + + // Capture initial file list for comparison + initialFiles := s.getSnapshotFiles(initialSnapshot, s.getFS(tbl)) + + // Create new consolidated file + consolidatedPath := s.location + "/data/consolidated.parquet" + + // Create larger dataset for consolidation + mem := memory.DefaultAllocator + largerTable, err := array.TableFromJSON(mem, s.arrSchema, []string{ + `[ + {"id": 1, "data": "foo", "ts": 1672531200000000}, + {"id": 2, "data": "bar", "ts": 1672534800000000}, + {"id": 3, "data": "baz", "ts": 1672538400000000}, + {"id": 4, "data": "qux", "ts": 1672542000000000}, + {"id": 5, "data": "quux", "ts": 1672545600000000} + ]`, + }) + s.Require().NoError(err) + defer largerTable.Release() + + fs := s.getFS(tbl) + s.writeParquet(fs, consolidatedPath, largerTable) + + // Rewrite files (replace multiple small files with one larger file) + tx := tbl.NewTransaction() + err = tx.ReplaceDataFiles(s.ctx, originalFiles, []string{consolidatedPath}, nil) + s.Require().NoError(err) + + newTbl, err := tx.Commit(s.ctx) + s.Require().NoError(err) + + // VALIDATE FINAL STATE WITH ENHANCED CHECKS + finalSnapshot := newTbl.CurrentSnapshot() + + // Assert that file lists differ before and after the operation + finalFiles := s.getSnapshotFiles(finalSnapshot, s.getFS(newTbl)) + s.NotEqual(initialFiles, finalFiles, "File lists should differ before and after rewrite operation") + s.Greater(len(finalFiles), len(initialFiles), "Rewrite operation should result in more files (current behavior)") + + // NOTE: Current ReplaceDataFiles implementation keeps both old and new files + // In a full implementation, it should only contain the consolidated file + var allFiles []string + manifests, err := finalSnapshot.Manifests(s.getFS(newTbl)) + s.Require().NoError(err) + for _, manifest := range manifests { + entries, err := manifest.FetchEntries(s.getFS(newTbl), false) + s.Require().NoError(err) + for _, entry := range entries { + if entry.DataFile().ContentType() == iceberg.EntryContentData { + allFiles = append(allFiles, entry.DataFile().FilePath()) + } + } + } + + // Current behavior: keeps both original and consolidated files + s.validateSnapshotState(finalSnapshot, s.getFS(newTbl), allFiles, table.OpOverwrite, map[string]string{ + "added-data-files": "1", + "added-records": "5", + }) + + // Total data should be correct regardless of file handling + s.validateDataIntegrity(newTbl, 5) // consolidated data + + // Verify snapshot progression + s.NotEqual(initialSnapshot.SnapshotID, finalSnapshot.SnapshotID, "Should create new snapshot") + s.Equal(&initialSnapshot.SnapshotID, finalSnapshot.ParentSnapshotID, "Should reference previous snapshot as parent") + + s.T().Log("NOTE: ReplaceDataFiles currently keeps both old and new files") + s.T().Log("EXPECTED: In full implementation, should only contain consolidated file") + }) + + s.Run("RewriteWithConflictDetection", func() { + // Test concurrent rewrite operations + ident := table.Identifier{"default", "rewrite_conflict_test"} + tbl, originalFiles := s.createTableWithData(ident, 2) + + // VALIDATE INITIAL STATE + initialSnapshot := tbl.CurrentSnapshot() + s.validateSnapshotState(initialSnapshot, s.getFS(tbl), originalFiles, table.OpAppend, map[string]string{ + "added-data-files": "2", + "added-records": "6", // 2 files × 3 rows each + }) + + // Start first transaction + tx1 := tbl.NewTransaction() + consolidatedPath1 := s.location + "/data/consolidated1.parquet" + fs := s.getFS(tbl) + s.writeParquet(fs, consolidatedPath1, s.arrTable) + + // Start second transaction + tx2 := tbl.NewTransaction() + consolidatedPath2 := s.location + "/data/consolidated2.parquet" + s.writeParquet(fs, consolidatedPath2, s.arrTable) + + // Both try to replace the same files + err1 := tx1.ReplaceDataFiles(s.ctx, originalFiles, []string{consolidatedPath1}, nil) + s.Require().NoError(err1) + + err2 := tx2.ReplaceDataFiles(s.ctx, originalFiles, []string{consolidatedPath2}, nil) + s.Require().NoError(err2) + + // First should succeed + firstTbl, err1 := tx1.Commit(s.ctx) + s.Require().NoError(err1) + + // VALIDATE FIRST TRANSACTION STATE + firstSnapshot := firstTbl.CurrentSnapshot() + + // Current behavior: keeps both original and new files + expectedFirstFiles := make([]string, len(originalFiles), len(originalFiles)+1) + copy(expectedFirstFiles, originalFiles) + expectedFirstFiles = append(expectedFirstFiles, consolidatedPath1) + s.validateSnapshotState(firstSnapshot, s.getFS(firstTbl), expectedFirstFiles, table.OpOverwrite, map[string]string{ + "added-data-files": "1", + }) + + // Second should succeed since conflict detection may not be fully implemented yet + // In a full implementation, this would fail due to conflict + _, err2 = tx2.Commit(s.ctx) + if err2 != nil { + s.T().Logf("Transaction conflict detected: %v", err2) + } else { + s.T().Log("Transaction completed without conflict - conflict detection may not be fully implemented") + } + + s.T().Log("NOTE: ReplaceDataFiles currently keeps both old and new files") + s.T().Log("EXPECTED: In full implementation, should replace files completely") + }) +} + +func (s *WriteOperationsTestSuite) TestOverwriteByPartition() { + s.Run("OverwriteWithFilter", func() { + // Test overwrite with row-level filters + ident := table.Identifier{"default", "overwrite_filter_test"} + tbl, originalFiles := s.createTableWithData(ident, 1) + + // VALIDATE INITIAL STATE + initialSnapshot := tbl.CurrentSnapshot() + s.validateSnapshotState(initialSnapshot, s.getFS(tbl), originalFiles, table.OpAppend, map[string]string{ + "added-data-files": "1", + "added-records": "3", + }) + s.validateDataIntegrity(tbl, 3) + + // Create replacement data + mem := memory.DefaultAllocator + replacementData, err := array.TableFromJSON(mem, s.arrSchema, []string{ + `[ + {"id": 1, "data": "updated_foo", "ts": 1672531200000000}, + {"id": 4, "data": "new_data", "ts": 1672549200000000} + ]`, + }) + s.Require().NoError(err) + defer replacementData.Release() + + // For now, this demonstrates file-level replacement + // True row-level filtering would require delete files + consolidatedPath := s.location + "/data/replacement.parquet" + fs := s.getFS(tbl) + s.writeParquet(fs, consolidatedPath, replacementData) + + // For this test, we'll demonstrate overwrite by creating a new file + // and replacing specific known files rather than discovering them dynamically + // In a real scenario, you'd use manifest information to determine which files to replace + + // Capture initial file list for comparison + initialFiles := s.getSnapshotFiles(initialSnapshot, s.getFS(tbl)) + + // Replace with filtered data - using the known original files from table creation + tx := tbl.NewTransaction() + err = tx.ReplaceDataFiles(s.ctx, originalFiles, []string{consolidatedPath}, nil) + s.Require().NoError(err) + + newTbl, err := tx.Commit(s.ctx) + s.Require().NoError(err) + + // VALIDATE FINAL STATE WITH ENHANCED CHECKS + finalSnapshot := newTbl.CurrentSnapshot() + + // Assert that file lists differ before and after the operation + finalFiles := s.getSnapshotFiles(finalSnapshot, s.getFS(newTbl)) + s.NotEqual(initialFiles, finalFiles, "File lists should differ before and after replace operation") + s.Greater(len(finalFiles), len(initialFiles), "Replace operation should result in more files (current behavior)") + + // Current behavior: keeps both original and replacement files + expectedFiles := make([]string, len(originalFiles), len(originalFiles)+1) + copy(expectedFiles, originalFiles) + expectedFiles = append(expectedFiles, consolidatedPath) + s.validateSnapshotState(finalSnapshot, s.getFS(newTbl), expectedFiles, table.OpOverwrite, map[string]string{ + "added-data-files": "1", + "added-records": "2", // replacement data has 2 rows + }) + s.validateDataIntegrity(newTbl, 2) + + // Verify snapshot progression + s.NotEqual(initialSnapshot.SnapshotID, finalSnapshot.SnapshotID, "Should create new snapshot") + s.Equal(&initialSnapshot.SnapshotID, finalSnapshot.ParentSnapshotID, "Should reference previous snapshot as parent") + + s.T().Log("NOTE: ReplaceDataFiles currently keeps both old and new files") + s.T().Log("EXPECTED: In filtered overwrite, should replace only matching files") + }) + + s.Run("OverwriteWithFilter", func() { + // Test overwrite with row-level filters + ident := table.Identifier{"default", "overwrite_filter_test"} + tbl, originalFiles := s.createTableWithData(ident, 1) + + // VALIDATE INITIAL STATE + initialSnapshot := tbl.CurrentSnapshot() + s.validateSnapshotState(initialSnapshot, s.getFS(tbl), originalFiles, table.OpAppend, map[string]string{ + "added-data-files": "1", + "added-records": "3", + }) + s.validateDataIntegrity(tbl, 3) + + // Create replacement data + mem := memory.DefaultAllocator + replacementData, err := array.TableFromJSON(mem, s.arrSchema, []string{ + `[ + {"id": 1, "data": "updated_foo", "ts": 1672531200000000}, + {"id": 4, "data": "new_data", "ts": 1672549200000000} + ]`, + }) + s.Require().NoError(err) + defer replacementData.Release() + + // For now, this demonstrates file-level replacement + // True row-level filtering would require delete files + consolidatedPath := s.location + "/data/replacement.parquet" + fs := s.getFS(tbl) + s.writeParquet(fs, consolidatedPath, replacementData) + + // For this test, we'll demonstrate overwrite by creating a new file + // and replacing specific known files rather than discovering them dynamically + // In a real scenario, you'd use manifest information to determine which files to replace + + // Capture initial file list for comparison + initialFiles := s.getSnapshotFiles(initialSnapshot, s.getFS(tbl)) + + // Replace with filtered data - using the known original files from table creation + tx := tbl.NewTransaction() + err = tx.ReplaceDataFiles(s.ctx, originalFiles, []string{consolidatedPath}, nil) + s.Require().NoError(err) + + newTbl, err := tx.Commit(s.ctx) + s.Require().NoError(err) + + // VALIDATE FINAL STATE WITH ENHANCED CHECKS + finalSnapshot := newTbl.CurrentSnapshot() + + // Assert that file lists differ before and after the operation + finalFiles := s.getSnapshotFiles(finalSnapshot, s.getFS(newTbl)) + s.NotEqual(initialFiles, finalFiles, "File lists should differ before and after replace operation") + s.Greater(len(finalFiles), len(initialFiles), "Replace operation should result in more files (current behavior)") + + // Current behavior: keeps both original and replacement files + expectedFiles := make([]string, len(originalFiles), len(originalFiles)+1) + copy(expectedFiles, originalFiles) + expectedFiles = append(expectedFiles, consolidatedPath) + s.validateSnapshotState(finalSnapshot, s.getFS(newTbl), expectedFiles, table.OpOverwrite, map[string]string{ + "added-data-files": "1", + "added-records": "2", // replacement data has 2 rows + }) + s.validateDataIntegrity(newTbl, 2) + + // Verify snapshot progression + s.NotEqual(initialSnapshot.SnapshotID, finalSnapshot.SnapshotID, "Should create new snapshot") + s.Equal(&initialSnapshot.SnapshotID, finalSnapshot.ParentSnapshotID, "Should reference previous snapshot as parent") + + s.T().Log("NOTE: ReplaceDataFiles currently keeps both old and new files") + s.T().Log("EXPECTED: In filtered overwrite, should replace only matching files") + }) +} + +func (s *WriteOperationsTestSuite) TestPositionDeletes() { + s.Run("WritePositionDeleteFiles", func() { + // Test writing position delete files + ident := table.Identifier{"default", "position_delete_test"} + tbl, originalFiles := s.createTableWithData(ident, 1) + + // Create position delete data that deletes row at position 1 (second row) + mem := memory.DefaultAllocator + + // Position delete schema: file_path (string), pos (int32) + posDeleteSchema := arrow.NewSchema([]arrow.Field{ + {Name: "file_path", Type: arrow.BinaryTypes.String, Nullable: false}, + {Name: "pos", Type: arrow.PrimitiveTypes.Int32, Nullable: false}, + }, nil) + + // Create delete data for the first file, deleting position 1 + deleteData, err := array.TableFromJSON(mem, posDeleteSchema, []string{ + fmt.Sprintf(`[{"file_path": "%s", "pos": 1}]`, originalFiles[0]), + }) + s.Require().NoError(err) + defer deleteData.Release() + + // Write the position delete file + deleteFilePath := s.location + "/deletes/pos_deletes.parquet" + fs := s.getFS(tbl) + s.writeParquet(fs, deleteFilePath, deleteData) + + // Create DataFile object for the delete file + deleteFileBuilder, err := iceberg.NewDataFileBuilder( + *iceberg.UnpartitionedSpec, + iceberg.EntryContentPosDeletes, + deleteFilePath, + iceberg.ParquetFile, + nil, // no partition data for unpartitioned + deleteData.NumRows(), + 1024, // approximate file size + ) + s.Require().NoError(err) + + deleteFile := deleteFileBuilder.Build() + + // Verify the delete file was created with correct properties + s.Equal(iceberg.EntryContentPosDeletes, deleteFile.ContentType()) + s.Equal(deleteFilePath, deleteFile.FilePath()) + s.Equal(int64(1), deleteFile.Count()) // One delete entry + + s.T().Log("Successfully created position delete file") + }) +} + +func (s *WriteOperationsTestSuite) TestEqualityDeletes() { + s.Run("WriteEqualityDeleteFiles", func() { + // Test writing equality delete files + ident := table.Identifier{"default", "equality_delete_test"} + tbl, _ := s.createTableWithData(ident, 1) + + // Create equality delete data using subset of table schema + // Equality deletes contain the values that should be deleted + mem := memory.DefaultAllocator + + // Equality delete schema - subset of table columns used for equality + eqSchema := arrow.NewSchema([]arrow.Field{ + {Name: "id", Type: arrow.PrimitiveTypes.Int64, Nullable: false}, + {Name: "data", Type: arrow.BinaryTypes.String, Nullable: true}, + }, nil) + + // Create delete data - delete rows where id=2 and data="bar" + deleteData, err := array.TableFromJSON(mem, eqSchema, []string{ + `[{"id": 2, "data": "bar"}]`, + }) + s.Require().NoError(err) + defer deleteData.Release() + + // Write the equality delete file + deleteFilePath := s.location + "/deletes/eq_deletes.parquet" + fs := s.getFS(tbl) + s.writeParquet(fs, deleteFilePath, deleteData) + + // Create DataFile object for the equality delete file + deleteFileBuilder, err := iceberg.NewDataFileBuilder( + *iceberg.UnpartitionedSpec, + iceberg.EntryContentEqDeletes, + deleteFilePath, + iceberg.ParquetFile, + nil, // no partition data for unpartitioned + deleteData.NumRows(), + 1024, // approximate file size + ) + s.Require().NoError(err) + + // Set equality field IDs for the columns used in equality comparison + // Field IDs from our table schema: id=1, data=2 + deleteFileBuilder.EqualityFieldIDs([]int{1, 2}) + + deleteFile := deleteFileBuilder.Build() + + // Verify the delete file was created with correct properties + s.Equal(iceberg.EntryContentEqDeletes, deleteFile.ContentType()) + s.Equal(deleteFilePath, deleteFile.FilePath()) + s.Equal(int64(1), deleteFile.Count()) // One delete entry + s.Equal([]int{1, 2}, deleteFile.EqualityFieldIDs()) // Equality fields + + s.T().Log("Successfully created equality delete file") + }) +} + +func (s *WriteOperationsTestSuite) TestRowDelta() { + s.Run("BasicRowDelta", func() { + // Test row-level insert/update/delete operations + ident := table.Identifier{"default", "row_delta_test"} + tbl, originalFiles := s.createTableWithData(ident, 1) + + // VALIDATE INITIAL STATE + initialSnapshot := tbl.CurrentSnapshot() + s.validateSnapshotState(initialSnapshot, s.getFS(tbl), originalFiles, table.OpAppend, map[string]string{ + "added-data-files": "1", + "added-records": "3", + }) + s.validateDataIntegrity(tbl, 3) + + // Create delta data (simulating updates) + mem := memory.DefaultAllocator + deltaData, err := array.TableFromJSON(mem, s.arrSchema, []string{ + `[ + {"id": 1, "data": "updated", "ts": 1672531200000000}, + {"id": 4, "data": "inserted", "ts": 1672552800000000} + ]`, + }) + s.Require().NoError(err) + defer deltaData.Release() + + // Apply delta through append (simplified) + tx := tbl.NewTransaction() + err = tx.AppendTable(s.ctx, deltaData, 1000, iceberg.Properties{ + "operation.type": "row-delta", + }) + s.Require().NoError(err) + + newTbl, err := tx.Commit(s.ctx) + s.Require().NoError(err) + + // VALIDATE FINAL STATE WITH ENHANCED CHECKS + finalSnapshot := newTbl.CurrentSnapshot() + + // Get all files in the new snapshot for validation + var allFiles []string + manifests, err := finalSnapshot.Manifests(s.getFS(newTbl)) + s.Require().NoError(err) + for _, manifest := range manifests { + entries, err := manifest.FetchEntries(s.getFS(newTbl), false) + s.Require().NoError(err) + for _, entry := range entries { + if entry.DataFile().ContentType() == iceberg.EntryContentData { + allFiles = append(allFiles, entry.DataFile().FilePath()) + } + } + } + + s.validateSnapshotState(finalSnapshot, s.getFS(newTbl), allFiles, table.OpAppend, map[string]string{ + "added-data-files": "1", + "added-records": "2", // delta data has 2 rows + }) + s.validateDataIntegrity(newTbl, 5) // original 3 + delta 2 = 5 rows + + // Verify snapshot progression + s.NotEqual(initialSnapshot.SnapshotID, finalSnapshot.SnapshotID, "Should create new snapshot") + s.Equal(&initialSnapshot.SnapshotID, finalSnapshot.ParentSnapshotID, "Should reference previous snapshot as parent") + + // Verify summary shows the operation + s.Contains(finalSnapshot.Summary.Properties, "added-records") + }) + + s.Run("ConcurrentRowDelta", func() { + // Test concurrent row delta operations + ident := table.Identifier{"default", "concurrent_delta_test"} + tbl, originalFiles := s.createTableWithData(ident, 1) + + // VALIDATE INITIAL STATE + initialSnapshot := tbl.CurrentSnapshot() + s.validateSnapshotState(initialSnapshot, s.getFS(tbl), originalFiles, table.OpAppend, map[string]string{ + "added-data-files": "1", + "added-records": "3", + }) + s.validateDataIntegrity(tbl, 3) + + // Create concurrent transactions + tx1 := tbl.NewTransaction() + tx2 := tbl.NewTransaction() + + // Create different delta data + mem := memory.DefaultAllocator + delta1, err := array.TableFromJSON(mem, s.arrSchema, []string{ + `[{"id": 10, "data": "delta1", "ts": 1672567200000000}]`, + }) + s.Require().NoError(err) + defer delta1.Release() + + delta2, err := array.TableFromJSON(mem, s.arrSchema, []string{ + `[{"id": 20, "data": "delta2", "ts": 1672603200000000}]`, + }) + s.Require().NoError(err) + defer delta2.Release() + + // Apply deltas concurrently + err1 := tx1.AppendTable(s.ctx, delta1, 1000, nil) + s.Require().NoError(err1) + + err2 := tx2.AppendTable(s.ctx, delta2, 1000, nil) + s.Require().NoError(err2) + + // First should succeed + firstTbl, err1 := tx1.Commit(s.ctx) + s.Require().NoError(err1) + + // VALIDATE FIRST TRANSACTION STATE + firstSnapshot := firstTbl.CurrentSnapshot() + var firstFiles []string + manifests, err := firstSnapshot.Manifests(s.getFS(firstTbl)) + s.Require().NoError(err) + for _, manifest := range manifests { + entries, err := manifest.FetchEntries(s.getFS(firstTbl), false) + s.Require().NoError(err) + for _, entry := range entries { + if entry.DataFile().ContentType() == iceberg.EntryContentData { + firstFiles = append(firstFiles, entry.DataFile().FilePath()) + } + } + } + + s.validateSnapshotState(firstSnapshot, s.getFS(firstTbl), firstFiles, table.OpAppend, map[string]string{ + "added-data-files": "1", + "added-records": "1", // delta1 has 1 row + }) + s.validateDataIntegrity(firstTbl, 4) // original 3 + delta1 1 = 4 rows + + // Second should succeed since full transaction isolation may not be implemented yet + // In a full implementation, this might fail due to isolation requirements + secondTbl, err2 := tx2.Commit(s.ctx) + if err2 != nil { + s.T().Logf("Transaction isolation enforced: %v", err2) + } else { + s.T().Log("Transaction completed without isolation conflict - full isolation may not be implemented") + + // If second transaction succeeded, validate its state too + secondSnapshot := secondTbl.CurrentSnapshot() + var secondFiles []string + manifests, err := secondSnapshot.Manifests(s.getFS(secondTbl)) + s.Require().NoError(err) + for _, manifest := range manifests { + entries, err := manifest.FetchEntries(s.getFS(secondTbl), false) + s.Require().NoError(err) + for _, entry := range entries { + if entry.DataFile().ContentType() == iceberg.EntryContentData { + secondFiles = append(secondFiles, entry.DataFile().FilePath()) + } + } + } + s.validateSnapshotState(secondSnapshot, s.getFS(secondTbl), secondFiles, table.OpAppend, map[string]string{ + "added-data-files": "1", + "added-records": "1", // delta2 has 1 row + }) + } + }) +} + +// Additional helper functions for future delete file implementations + +// createPositionDeleteData creates Arrow data for position delete files +func (s *WriteOperationsTestSuite) createPositionDeleteData(filePath string, positions []int64) arrow.Table { + mem := memory.DefaultAllocator + schema := arrow.NewSchema([]arrow.Field{ + {Name: "file_path", Type: arrow.BinaryTypes.String, Nullable: false}, + {Name: "pos", Type: arrow.PrimitiveTypes.Int32, Nullable: false}, // Note: pos is int32 in Iceberg spec + }, nil) + + // Build JSON data for the positions + var jsonEntries []string + for _, pos := range positions { + jsonEntries = append(jsonEntries, fmt.Sprintf(`{"file_path": "%s", "pos": %d}`, filePath, pos)) + } + jsonData := fmt.Sprintf("[%s]", strings.Join(jsonEntries, ",")) + + table, err := array.TableFromJSON(mem, schema, []string{jsonData}) + s.Require().NoError(err) + + return table +} + +// createEqualityDeleteData creates Arrow data for equality delete files +func (s *WriteOperationsTestSuite) createEqualityDeleteData(deleteConditions []map[string]interface{}) arrow.Table { + mem := memory.DefaultAllocator + + // Create a subset schema for equality deletes (id and data columns) + eqSchema := arrow.NewSchema([]arrow.Field{ + {Name: "id", Type: arrow.PrimitiveTypes.Int64, Nullable: false}, + {Name: "data", Type: arrow.BinaryTypes.String, Nullable: true}, + }, nil) + + // Build JSON data for the delete conditions + var jsonEntries []string + for _, condition := range deleteConditions { + entry := "{" + var parts []string + for key, value := range condition { + switch v := value.(type) { + case string: + parts = append(parts, fmt.Sprintf(`"%s": "%s"`, key, v)) + case int64: + parts = append(parts, fmt.Sprintf(`"%s": %d`, key, v)) + case int: + parts = append(parts, fmt.Sprintf(`"%s": %d`, key, v)) + } + } + entry += strings.Join(parts, ", ") + "}" + jsonEntries = append(jsonEntries, entry) + } + jsonData := fmt.Sprintf("[%s]", strings.Join(jsonEntries, ",")) + + table, err := array.TableFromJSON(mem, eqSchema, []string{jsonData}) + s.Require().NoError(err) + + return table +} + +// TestDeleteFileIntegration demonstrates how delete files would be integrated with table operations +func (s *WriteOperationsTestSuite) TestDeleteFileIntegration() { + s.Run("DeleteFileWorkflow", func() { + // This test demonstrates the complete workflow for creating and managing delete files + ident := table.Identifier{"default", "delete_integration_test"} + tbl, originalFiles := s.createTableWithData(ident, 1) + + // Step 1: Create position delete files + positionDeletes := s.createPositionDeleteData(originalFiles[0], []int64{0, 2}) // Delete first and third rows + posDeleteFilePath := s.location + "/deletes/integration_pos_deletes.parquet" + fs := s.getFS(tbl) + s.writeParquet(fs, posDeleteFilePath, positionDeletes) + positionDeletes.Release() + + // Step 2: Create equality delete files + equalityDeletes := s.createEqualityDeleteData([]map[string]interface{}{ + {"id": int64(2), "data": "bar"}, // Delete row with id=2 and data="bar" + }) + eqDeleteFilePath := s.location + "/deletes/integration_eq_deletes.parquet" + s.writeParquet(fs, eqDeleteFilePath, equalityDeletes) + equalityDeletes.Release() + + // Step 3: Create DataFile objects for both delete files + posDeleteFile, err := iceberg.NewDataFileBuilder( + *iceberg.UnpartitionedSpec, + iceberg.EntryContentPosDeletes, + posDeleteFilePath, + iceberg.ParquetFile, + nil, 2, 1024, + ) + s.Require().NoError(err) + + eqDeleteFile, err := iceberg.NewDataFileBuilder( + *iceberg.UnpartitionedSpec, + iceberg.EntryContentEqDeletes, + eqDeleteFilePath, + iceberg.ParquetFile, + nil, 1, 1024, + ) + s.Require().NoError(err) + eqDeleteFile.EqualityFieldIDs([]int{1, 2}) // id and data fields + + posDF := posDeleteFile.Build() + eqDF := eqDeleteFile.Build() + + // Step 4: Verify delete file properties + s.Equal(iceberg.EntryContentPosDeletes, posDF.ContentType()) + s.Equal(iceberg.EntryContentEqDeletes, eqDF.ContentType()) + s.Equal(int64(2), posDF.Count()) // Two position deletes + s.Equal(int64(1), eqDF.Count()) // One equality delete + s.Equal([]int{1, 2}, eqDF.EqualityFieldIDs()) + + // Step 5: Demonstrate how these would be used in a complete table implementation + // In a full implementation, these delete files would be: + // 1. Added to manifest files with proper manifest entries + // 2. Associated with data files during scanning + // 3. Applied during query execution to filter out deleted rows + + s.T().Log("Delete file integration workflow completed successfully") + s.T().Log("Position deletes: file created with 2 position entries") + s.T().Log("Equality deletes: file created with 1 equality condition") + s.T().Log("Full table integration requires manifest management and scanner updates") + }) + + s.Run("DeleteFileValidation", func() { + // Test validation of delete file creation + + // Test invalid content type handling + _, err := iceberg.NewDataFileBuilder( + *iceberg.UnpartitionedSpec, + iceberg.ManifestEntryContent(99), // Invalid content type + "/tmp/test.parquet", + iceberg.ParquetFile, + nil, 1, 1024, + ) + s.Require().Error(err, "Should reject invalid content type") + + // Test successful creation with valid parameters + deleteFile, err := iceberg.NewDataFileBuilder( + *iceberg.UnpartitionedSpec, + iceberg.EntryContentPosDeletes, + "/tmp/valid_delete.parquet", + iceberg.ParquetFile, + nil, 5, 2048, + ) + s.Require().NoError(err) + + df := deleteFile.Build() + s.Equal(iceberg.EntryContentPosDeletes, df.ContentType()) + s.Equal("/tmp/valid_delete.parquet", df.FilePath()) + s.Equal(int64(5), df.Count()) + s.Equal(int64(2048), df.FileSizeBytes()) + + s.T().Log("Delete file validation tests passed") + }) +} + +// TestSnapshotValidationDemo demonstrates the benefits of enhanced snapshot validation +func (s *WriteOperationsTestSuite) TestSnapshotValidationDemo() { + s.Run("ValidationsShowDetailedTableState", func() { + // This test demonstrates how the validation helpers provide detailed insights + ident := table.Identifier{"default", "validation_demo"} + tbl := s.createTable(ident, 2, *iceberg.UnpartitionedSpec) + + // Step 1: Start with empty table + initialSnapshot := tbl.CurrentSnapshot() + if initialSnapshot != nil { + s.T().Log("Initial table state:") + s.validateSnapshotState(initialSnapshot, s.getFS(tbl), []string{}, table.OpAppend, nil) + } else { + s.T().Log("Table starts with no snapshots (empty table)") + } + + // Step 2: Add first file + filePath1 := s.location + "/data/demo1.parquet" + fs := s.getFS(tbl) + s.writeParquet(fs, filePath1, s.arrTable) + + tx1 := tbl.NewTransaction() + s.Require().NoError(tx1.AddFiles(s.ctx, []string{filePath1}, nil, false)) + tbl1, err := tx1.Commit(s.ctx) + s.Require().NoError(err) + + s.T().Log("After adding first file:") + snapshot1 := tbl1.CurrentSnapshot() + s.validateSnapshotState(snapshot1, s.getFS(tbl1), []string{filePath1}, table.OpAppend, map[string]string{ + "added-data-files": "1", + "added-records": "3", + }) + s.validateDataIntegrity(tbl1, 3) + + // Step 3: Add second file + filePath2 := s.location + "/data/demo2.parquet" + s.writeParquet(fs, filePath2, s.arrTable) + + tx2 := tbl1.NewTransaction() + s.Require().NoError(tx2.AddFiles(s.ctx, []string{filePath2}, nil, false)) + tbl2, err := tx2.Commit(s.ctx) + s.Require().NoError(err) + + s.T().Log("After adding second file:") + snapshot2 := tbl2.CurrentSnapshot() + s.validateSnapshotState(snapshot2, s.getFS(tbl2), []string{filePath1, filePath2}, table.OpAppend, map[string]string{ + "added-data-files": "1", + "added-records": "3", + }) + s.validateDataIntegrity(tbl2, 6) + + // Step 4: Perform replace operation + consolidatedPath := s.location + "/data/consolidated.parquet" + mem := memory.DefaultAllocator + combinedData, err := array.TableFromJSON(mem, s.arrSchema, []string{ + `[ + {"id": 1, "data": "combined1", "ts": 1672531200000000}, + {"id": 2, "data": "combined2", "ts": 1672534800000000}, + {"id": 3, "data": "combined3", "ts": 1672538400000000}, + {"id": 4, "data": "combined4", "ts": 1672542000000000} + ]`, + }) + s.Require().NoError(err) + defer combinedData.Release() + + s.writeParquet(fs, consolidatedPath, combinedData) + + tx3 := tbl2.NewTransaction() + s.Require().NoError(tx3.ReplaceDataFiles(s.ctx, []string{filePath1, filePath2}, []string{consolidatedPath}, nil)) + tbl3, err := tx3.Commit(s.ctx) + s.Require().NoError(err) + + s.T().Log("After replace operation:") + snapshot3 := tbl3.CurrentSnapshot() + + // Current behavior: keeps all files (original + new) + allFiles := []string{filePath1, filePath2, consolidatedPath} + s.validateSnapshotState(snapshot3, s.getFS(tbl3), allFiles, table.OpOverwrite, map[string]string{ + "added-data-files": "1", + "added-records": "4", + }) + s.validateDataIntegrity(tbl3, 4) // Only consolidated data is accessible + + // Demonstrate snapshot progression + s.T().Logf("Snapshot progression: %d → %d → %d", + snapshot1.SnapshotID, snapshot2.SnapshotID, snapshot3.SnapshotID) + + s.T().Log("✨ Validation helpers provide comprehensive insights into:") + s.T().Log(" • File management and count changes") + s.T().Log(" • Snapshot summary properties and operations") + s.T().Log(" • Manifest structure and entry counts") + s.T().Log(" • Data integrity and row counts") + s.T().Log(" • Snapshot lineage and progression") + s.T().Log(" • Current vs expected behavior documentation") + }) +} + func TestNullableStructRequiredField(t *testing.T) { loc := filepath.ToSlash(t.TempDir()) @@ -1243,8 +2299,9 @@ func TestNullableStructRequiredField(t *testing.T) { sc, err := table.ArrowSchemaToIcebergWithFreshIDs(arrowSchema, false) require.NoError(t, err) - require.NoError(t, cat.CreateNamespace(t.Context(), table.Identifier{"testing"}, nil)) - tbl, err := cat.CreateTable(t.Context(), table.Identifier{"testing", "nullable_struct_required_field"}, sc, + ctx := context.Background() + require.NoError(t, cat.CreateNamespace(ctx, table.Identifier{"testing"}, nil)) + tbl, err := cat.CreateTable(ctx, table.Identifier{"testing", "nullable_struct_required_field"}, sc, catalog.WithProperties(iceberg.Properties{"format-version": "2"}), catalog.WithLocation("file://"+loc)) require.NoError(t, err) @@ -1264,7 +2321,7 @@ func TestNullableStructRequiredField(t *testing.T) { defer arrTable.Release() tx := tbl.NewTransaction() - require.NoError(t, tx.AppendTable(t.Context(), arrTable, N, nil)) + require.NoError(t, tx.AppendTable(ctx, arrTable, N, nil)) stagedTbl, err := tx.StagedTable() require.NoError(t, err) require.NotNil(t, stagedTbl)