Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ the table, the following tracks the current write support:
| Append Data Files | X |
| Rewrite Files | |
| Rewrite manifests | |
| Overwrite Files | |
| Overwrite Files | X |
| Write Pos Delete | |
| Write Eq Delete | |
| Row Delta | |
Expand Down
36 changes: 33 additions & 3 deletions internal/recipe/validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,40 @@ def testSetProperties():


def testAddedFile():
spark.sql("SELECT COUNT(*) FROM default.test_partitioned_by_days").show(truncate=False)
spark.sql("SELECT COUNT(*) FROM default.test_partitioned_by_days").show(
truncate=False
)


def testReadDifferentDataTypes():
spark.sql("DESCRIBE TABLE EXTENDED default.go_test_different_data_types").show(truncate=False)
spark.sql("DESCRIBE TABLE EXTENDED default.go_test_different_data_types").show(
truncate=False
)
spark.sql("SELECT * FROM default.go_test_different_data_types").show(truncate=False)


def testReadSpecUpdate():
spark.sql("DESCRIBE TABLE EXTENDED default.go_test_update_spec").show(truncate=False)
spark.sql("DESCRIBE TABLE EXTENDED default.go_test_update_spec").show(
truncate=False
)


def testOverwriteBasic():
spark.sql("SELECT COUNT(*) FROM default.go_test_overwrite_basic").show(
truncate=False
)
spark.sql("SELECT * FROM default.go_test_overwrite_basic ORDER BY baz").show(
truncate=False
)


def testOverwriteWithFilter():
spark.sql("SELECT COUNT(*) FROM default.go_test_overwrite_filter").show(
truncate=False
)
spark.sql("SELECT * FROM default.go_test_overwrite_filter ORDER BY baz").show(
truncate=False
)


if __name__ == "__main__":
Expand All @@ -54,3 +78,9 @@ def testReadSpecUpdate():

if args.test == "TestReadSpecUpdate":
testReadSpecUpdate()

if args.test == "TestOverwriteBasic":
testOverwriteBasic()

if args.test == "TestOverwriteWithFilter":
testOverwriteWithFilter()
20 changes: 20 additions & 0 deletions table/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,26 @@ func (t Table) Append(ctx context.Context, rdr array.RecordReader, snapshotProps
return txn.Commit(ctx)
}

// OverwriteTable is a shortcut for NewTransaction().OverwriteTable() and then committing the transaction
func (t Table) OverwriteTable(ctx context.Context, tbl arrow.Table, batchSize int64, filter iceberg.BooleanExpression, caseSensitive bool, snapshotProps iceberg.Properties) (*Table, error) {
txn := t.NewTransaction()
if err := txn.OverwriteTable(ctx, tbl, batchSize, filter, caseSensitive, snapshotProps); err != nil {
return nil, err
}

return txn.Commit(ctx)
}

// Overwrite is a shortcut for NewTransaction().Overwrite() and then committing the transaction
func (t Table) Overwrite(ctx context.Context, rdr array.RecordReader, filter iceberg.BooleanExpression, caseSensitive bool, snapshotProps iceberg.Properties) (*Table, error) {
txn := t.NewTransaction()
if err := txn.Overwrite(ctx, rdr, filter, caseSensitive, snapshotProps); err != nil {
return nil, err
}

return txn.Commit(ctx)
}

func (t Table) AllManifests(ctx context.Context) iter.Seq2[iceberg.ManifestFile, error] {
fs, err := t.fsF(ctx)
if err != nil {
Expand Down
46 changes: 45 additions & 1 deletion table/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ func (t *TableWritingTestSuite) createTable(identifier table.Identifier, formatV
func(ctx context.Context) (iceio.IO, error) {
return iceio.LocalFS{}, nil
},
nil,
&mockedCatalog{meta},
)
}

Expand Down Expand Up @@ -1281,6 +1281,50 @@ func (t *TableWritingTestSuite) TestMergeManifests() {
t.True(array.TableEqual(resultB, resultC), "expected:\n %s\ngot:\n %s", resultB, resultC)
}

// TestOverwriteTable verifies that Table.OverwriteTable properly delegates to Transaction.OverwriteTable
func (t *TableWritingTestSuite) TestOverwriteTable() {
ident := table.Identifier{"default", "overwrite_table_wrapper_v" + strconv.Itoa(t.formatVersion)}
tbl := t.createTable(ident, t.formatVersion, *iceberg.UnpartitionedSpec, t.tableSchema)
newTable, err := array.TableFromJSON(memory.DefaultAllocator, t.arrSchema, []string{
`[{"foo": false, "bar": "wrapper_test", "baz": 123, "qux": "2024-01-01"}]`,
})
t.Require().NoError(err)
defer newTable.Release()
resultTbl, err := tbl.OverwriteTable(t.ctx, newTable, 1, nil, true, nil)
t.Require().NoError(err)
t.NotNil(resultTbl)

snapshot := resultTbl.CurrentSnapshot()
t.NotNil(snapshot)
t.Equal(table.OpAppend, snapshot.Summary.Operation) // Empty table overwrite becomes append
}

// TestOverwriteRecord verifies that Table.Overwrite properly delegates to Transaction.Overwrite
func (t *TableWritingTestSuite) TestOverwriteRecord() {
ident := table.Identifier{"default", "overwrite_record_wrapper_v" + strconv.Itoa(t.formatVersion)}
tbl := t.createTable(ident, t.formatVersion, *iceberg.UnpartitionedSpec, t.tableSchema)

// Create test data as RecordReader
testTable, err := array.TableFromJSON(memory.DefaultAllocator, t.arrSchema, []string{
`[{"foo": true, "bar": "record_test", "baz": 456, "qux": "2024-01-02"}]`,
})
t.Require().NoError(err)
defer testTable.Release()

rdr := array.NewTableReader(testTable, 1)
defer rdr.Release()

// Test that Table.Overwrite works (delegates to transaction)
resultTbl, err := tbl.Overwrite(t.ctx, rdr, nil, true, nil)
t.Require().NoError(err)
t.NotNil(resultTbl)

// Verify the operation worked
snapshot := resultTbl.CurrentSnapshot()
t.NotNil(snapshot)
t.Equal(table.OpAppend, snapshot.Summary.Operation) // Empty table overwrite becomes append
}

func TestTableWriting(t *testing.T) {
suite.Run(t, &TableWritingTestSuite{formatVersion: 1})
suite.Run(t, &TableWritingTestSuite{formatVersion: 2})
Expand Down
Loading
Loading