diff --git a/catalog/glue/schema_test.go b/catalog/glue/schema_test.go index 88412db0e..04f5dd74f 100644 --- a/catalog/glue/schema_test.go +++ b/catalog/glue/schema_test.go @@ -437,7 +437,7 @@ func TestSchemasToGlueColumns(t *testing.T) { metadata, err := table.NewMetadata(schemas[0], nil, table.SortOrder{}, "s3://example/path", nil) assert.NoError(t, err) - mb, err := table.MetadataBuilderFromBase(metadata) + mb, err := table.MetadataBuilderFromBase(metadata, "") assert.NoError(t, err) err = mb.AddSchema(schemas[1]) diff --git a/catalog/internal/utils.go b/catalog/internal/utils.go index 493f32e96..c50eddc57 100644 --- a/catalog/internal/utils.go +++ b/catalog/internal/utils.go @@ -69,35 +69,7 @@ func WriteMetadata(ctx context.Context, metadata table.Metadata, loc string, pro } func UpdateTableMetadata(base table.Metadata, updates []table.Update, metadataLoc string) (table.Metadata, error) { - bldr, err := table.MetadataBuilderFromBase(base) - if err != nil { - return nil, err - } - - for _, update := range updates { - if err := update.Apply(bldr); err != nil { - return nil, err - } - } - - if bldr.HasChanges() { - if metadataLoc != "" { - maxMetadataLogEntries := max(1, - base.Properties().GetInt( - table.MetadataPreviousVersionsMaxKey, table.MetadataPreviousVersionsMaxDefault)) - - bldr.TrimMetadataLogs(maxMetadataLogEntries + 1). - AppendMetadataLog(table.MetadataLogEntry{ - MetadataFile: metadataLoc, - TimestampMs: base.LastUpdatedMillis(), - }) - } - if base.LastUpdatedMillis() == bldr.LastUpdatedMS() { - bldr.SetLastUpdatedMS() - } - } - - return bldr.Build() + return table.UpdateTableMetadata(base, updates, metadataLoc) } func CreateStagedTable(ctx context.Context, catprops iceberg.Properties, nspropsFn GetNamespacePropsFn, ident table.Identifier, sc *iceberg.Schema, opts ...catalog.CreateTableOpt) (table.StagedTable, error) { diff --git a/errors.go b/errors.go index 9ecc26d55..f1194fc04 100644 --- a/errors.go +++ b/errors.go @@ -17,12 +17,16 @@ package iceberg -import "errors" +import ( + "errors" + "fmt" +) var ( ErrInvalidTypeString = errors.New("invalid type") ErrNotImplemented = errors.New("not implemented") ErrInvalidArgument = errors.New("invalid argument") + ErrInvalidFormatVersion = fmt.Errorf("%w: invalid format version", ErrInvalidArgument) ErrInvalidSchema = errors.New("invalid schema") ErrInvalidTransform = errors.New("invalid transform syntax") ErrType = errors.New("type error") diff --git a/table/arrow_utils_internal_test.go b/table/arrow_utils_internal_test.go index 4305642e1..d32298225 100644 --- a/table/arrow_utils_internal_test.go +++ b/table/arrow_utils_internal_test.go @@ -184,7 +184,7 @@ func (suite *FileStatsMetricsSuite) getDataFile(meta iceberg.Properties, writeSt schema := tableMeta.CurrentSchema() if len(meta) > 0 { - bldr, err := MetadataBuilderFromBase(tableMeta) + bldr, err := MetadataBuilderFromBase(tableMeta, "") suite.Require().NoError(err) err = bldr.SetProperties(meta) suite.Require().NoError(err) diff --git a/table/metadata.go b/table/metadata.go index 2240549bc..b41d40a5c 100644 --- a/table/metadata.go +++ b/table/metadata.go @@ -175,6 +175,7 @@ type MetadataBuilder struct { defaultSortOrderID int refs map[string]SnapshotRef + previousFileEntry *MetadataLogEntry // >v1 specific lastSequenceNumber *int64 // update tracking @@ -183,28 +184,41 @@ type MetadataBuilder struct { lastAddedSortOrderID *int } -func NewMetadataBuilder() (*MetadataBuilder, error) { +func NewMetadataBuilder(formatVersion int) (*MetadataBuilder, error) { + if formatVersion < 1 || formatVersion > supportedTableFormatVersion { + return nil, fmt.Errorf("%w: %d", iceberg.ErrInvalidFormatVersion, formatVersion) + } + return &MetadataBuilder{ - updates: make([]Update, 0), - schemaList: make([]*iceberg.Schema, 0), - specs: make([]iceberg.PartitionSpec, 0), - props: make(iceberg.Properties), - snapshotList: make([]Snapshot, 0), - snapshotLog: make([]SnapshotLogEntry, 0), - metadataLog: make([]MetadataLogEntry, 0), - sortOrderList: make([]SortOrder, 0), - refs: make(map[string]SnapshotRef), + updates: make([]Update, 0), + schemaList: make([]*iceberg.Schema, 0), + specs: make([]iceberg.PartitionSpec, 0), + props: make(iceberg.Properties), + snapshotList: make([]Snapshot, 0), + snapshotLog: make([]SnapshotLogEntry, 0), + metadataLog: make([]MetadataLogEntry, 0), + sortOrderList: make([]SortOrder, 0), + refs: make(map[string]SnapshotRef), + currentSchemaID: -1, + defaultSortOrderID: -1, + defaultSpecID: -1, + lastColumnId: -1, + formatVersion: formatVersion, }, nil } -func MetadataBuilderFromBase(metadata Metadata) (*MetadataBuilder, error) { +// MetadataBuilderFromBase creates a MetadataBuilder from an existing Metadata object. +// currentFileLocation is the location where the current version of the metadata +// file is stored. This is used to update the metadata log. If currentFileLocation is +// empty, the metadata log will not be updated. This should only be used to stage-create tables. +func MetadataBuilderFromBase(metadata Metadata, currentFileLocation string) (*MetadataBuilder, error) { b := &MetadataBuilder{} b.base = metadata b.formatVersion = metadata.Version() b.uuid = metadata.TableUUID() b.loc = metadata.Location() - b.lastUpdatedMS = metadata.LastUpdatedMillis() + b.lastUpdatedMS = 0 b.lastColumnId = metadata.LastColumnID() b.schemaList = slices.Clone(metadata.Schemas()) b.currentSchemaID = metadata.CurrentSchema().ID @@ -229,6 +243,13 @@ func MetadataBuilderFromBase(metadata Metadata) (*MetadataBuilder, error) { b.snapshotLog = slices.Collect(metadata.SnapshotLogs()) b.metadataLog = slices.Collect(metadata.PreviousFiles()) + if currentFileLocation != "" { + b.previousFileEntry = &MetadataLogEntry{ + MetadataFile: currentFileLocation, + TimestampMs: metadata.LastUpdatedMillis(), + } + } + return b, nil } @@ -451,6 +472,7 @@ func (b *MetadataBuilder) AddSortOrder(sortOrder *SortOrder) error { } b.sortOrderList = append(sortOrders, *sortOrder) + b.lastAddedSortOrderID = &sortOrder.orderID b.updates = append(b.updates, NewAddSortOrderUpdate(sortOrder)) return nil @@ -474,8 +496,7 @@ func (b *MetadataBuilder) RemoveProperties(keys []string) error { } func (b *MetadataBuilder) SetCurrentSchemaID(currentSchemaID int) error { - takeLast := currentSchemaID == -1 - if takeLast { + if currentSchemaID == -1 { if b.lastAddedSchemaID == nil { return errors.New("can't set current schema to last added schema, no schema has been added") } @@ -491,7 +512,7 @@ func (b *MetadataBuilder) SetCurrentSchemaID(currentSchemaID int) error { return fmt.Errorf("can't set current schema to schema with id %d: %w", currentSchemaID, err) } - if takeLast { + if b.lastAddedSchemaID != nil && currentSchemaID == *b.lastAddedSchemaID { b.updates = append(b.updates, NewSetCurrentSchemaUpdate(-1)) } else { b.updates = append(b.updates, NewSetCurrentSchemaUpdate(currentSchemaID)) @@ -503,14 +524,10 @@ func (b *MetadataBuilder) SetCurrentSchemaID(currentSchemaID int) error { func (b *MetadataBuilder) SetDefaultSortOrderID(defaultSortOrderID int) error { if defaultSortOrderID == -1 { - defaultSortOrderID = maxBy(b.sortOrderList, func(s SortOrder) int { - return s.OrderID() - }) - if !slices.ContainsFunc(b.updates, func(u Update) bool { - return u.Action() == UpdateAddSortOrder && u.(*addSortOrderUpdate).SortOrder.OrderID() == defaultSortOrderID - }) { + if b.lastAddedSortOrderID == nil { return errors.New("can't set default sort order to last added with no added sort orders") } + defaultSortOrderID = *b.lastAddedSortOrderID } if defaultSortOrderID == b.defaultSortOrderID { @@ -521,17 +538,20 @@ func (b *MetadataBuilder) SetDefaultSortOrderID(defaultSortOrderID int) error { return fmt.Errorf("can't set default sort order to sort order with id %d: %w", defaultSortOrderID, err) } - b.updates = append(b.updates, NewSetDefaultSortOrderUpdate(defaultSortOrderID)) + if b.lastAddedSortOrderID != nil && defaultSortOrderID == *b.lastAddedSortOrderID { + b.updates = append(b.updates, NewSetDefaultSortOrderUpdate(-1)) + } else { + b.updates = append(b.updates, NewSetDefaultSortOrderUpdate(defaultSortOrderID)) + } + b.defaultSortOrderID = defaultSortOrderID return nil } func (b *MetadataBuilder) SetDefaultSpecID(defaultSpecID int) error { - lastUsed := false if defaultSpecID == -1 { if b.lastAddedPartitionID != nil { - lastUsed = true defaultSpecID = *b.lastAddedPartitionID } else { return errors.New("can't set default spec to last added with no added partition specs") @@ -546,7 +566,7 @@ func (b *MetadataBuilder) SetDefaultSpecID(defaultSpecID int) error { return fmt.Errorf("can't set default spec to spec with id %d: %w", defaultSpecID, err) } - if lastUsed { + if b.lastAddedPartitionID != nil && defaultSpecID == *b.lastAddedPartitionID { b.updates = append(b.updates, NewSetDefaultSpecUpdate(-1)) } else { b.updates = append(b.updates, NewSetDefaultSpecUpdate(defaultSpecID)) @@ -563,7 +583,7 @@ func (b *MetadataBuilder) SetFormatVersion(formatVersion int) error { } if formatVersion > supportedTableFormatVersion { - return fmt.Errorf("unsupported format version %d", formatVersion) + return fmt.Errorf("%w: %d", iceberg.ErrInvalidFormatVersion, formatVersion) } if formatVersion == b.formatVersion { @@ -757,6 +777,14 @@ func (b *MetadataBuilder) buildCommonMetadata() (*commonMetadata, error) { b.lastUpdatedMS = time.Now().UnixMilli() } + if b.previousFileEntry != nil && b.HasChanges() { + maxMetadataLogEntries := max(1, + b.base.Properties().GetInt( + MetadataPreviousVersionsMaxKey, MetadataPreviousVersionsMaxDefault)) + b.AppendMetadataLog(*b.previousFileEntry) + b.TrimMetadataLogs(maxMetadataLogEntries) + } + return &commonMetadata{ FormatVersion: b.formatVersion, UUID: b.uuid, @@ -1702,15 +1730,11 @@ func NewMetadataWithUUID(sc *iceberg.Schema, partitions *iceberg.PartitionSpec, return nil, err } - builder, err := NewMetadataBuilder() + builder, err := NewMetadataBuilder(formatVersion) if err != nil { return nil, err } - if err = builder.SetFormatVersion(formatVersion); err != nil { - return nil, err - } - if err = builder.SetUUID(tableUuid); err != nil { return nil, err } @@ -1791,3 +1815,18 @@ func reassignIDs(sc *iceberg.Schema, partitions *iceberg.PartitionSpec, sortOrde sortOrder: freshSortOrder, }, nil } + +func UpdateTableMetadata(base Metadata, updates []Update, metadataLoc string) (Metadata, error) { + bldr, err := MetadataBuilderFromBase(base, metadataLoc) + if err != nil { + return nil, err + } + + for _, update := range updates { + if err := update.Apply(bldr); err != nil { + return nil, err + } + } + + return bldr.Build() +} diff --git a/table/metadata_builder_internal_test.go b/table/metadata_builder_internal_test.go index addb182fa..010348179 100644 --- a/table/metadata_builder_internal_test.go +++ b/table/metadata_builder_internal_test.go @@ -20,6 +20,7 @@ package table import ( "fmt" "testing" + "time" "github.com/apache/iceberg-go" "github.com/davecgh/go-spew/spew" @@ -70,12 +71,11 @@ func builderWithoutChanges(formatVersion int) MetadataBuilder { partitionSpec := partitionSpec() sortOrder := sortOrder() - builder, err := NewMetadataBuilder() + builder, err := NewMetadataBuilder(formatVersion) if err != nil { panic(err) } - if err = builder.SetFormatVersion(formatVersion); err != nil { - panic(err) + if err = builder.SetLoc("s3://bucket/test/location"); err != nil { } if err = builder.AddSchema(&tableSchema); err != nil { panic(err) @@ -100,7 +100,7 @@ func builderWithoutChanges(formatVersion int) MetadataBuilder { if err != nil { panic(err) } - builder, err = MetadataBuilderFromBase(meta) + builder, err = MetadataBuilderFromBase(meta, "s3://bucket/test/location/metadata/metadata1.json") if err != nil { panic(err) } @@ -108,6 +108,162 @@ func builderWithoutChanges(formatVersion int) MetadataBuilder { return *builder } +func TestBuildUnpartitionedUnsorted(t *testing.T) { + TestLocation := "file:///tmp/iceberg-test" + tableSchema := schema() + partitionSpec := iceberg.NewPartitionSpecID(0) + + builder, err := NewMetadataBuilder(2) + require.NoError(t, err) + require.NoError(t, builder.SetFormatVersion(2)) + require.NoError(t, builder.AddSchema(&tableSchema)) + require.NoError(t, builder.SetCurrentSchemaID(-1)) + require.NoError(t, builder.AddSortOrder(&UnsortedSortOrder)) + require.NoError(t, builder.SetDefaultSortOrderID(-1)) + require.NoError(t, builder.AddPartitionSpec(&partitionSpec, true)) + require.NoError(t, builder.SetDefaultSpecID(-1)) + require.NoError(t, builder.SetLoc(TestLocation)) + + meta, err := builder.Build() + + require.NoError(t, err) + require.NotNil(t, meta) + + require.Equal(t, 2, meta.Version()) + require.Equal(t, TestLocation, meta.Location()) + require.Equal(t, 0, meta.CurrentSchema().ID) + require.Equal(t, 0, meta.DefaultPartitionSpec()) + require.Equal(t, 0, meta.DefaultSortOrder()) + require.Equal(t, 999, *meta.LastPartitionSpecID()) + require.Equal(t, 3, meta.LastColumnID()) + require.Equal(t, 0, len(meta.Snapshots())) + require.Nil(t, meta.CurrentSnapshot()) + for range meta.Refs() { + t.Fatalf("refs should be empty.") + } + require.Equal(t, len(meta.Properties()), 0) + for range meta.PreviousFiles() { + t.Fatalf("metadata log should be empty.") + } + require.Equal(t, meta.LastSequenceNumber(), int64(0)) + require.Equal(t, meta.LastColumnID(), 3) +} + +func TestReassignIds(t *testing.T) { + TestLocation := "file:///tmp/iceberg-test" + schema := iceberg.NewSchema(10, iceberg.NestedField{ + ID: 11, + Name: "a", + Type: iceberg.PrimitiveTypes.Int64, + Required: true, + }, iceberg.NestedField{ + ID: 12, + Name: "b", + Type: iceberg.PrimitiveTypes.Int64, + Required: true, + }, iceberg.NestedField{ + ID: 13, + Name: "struct", + Type: &iceberg.StructType{ + FieldList: []iceberg.NestedField{ + { + Type: iceberg.PrimitiveTypes.Int64, + ID: 14, + Name: "nested", + Required: true, + }, + }, + }, + Required: true, + }, + iceberg.NestedField{ + ID: 15, + Name: "c", + Type: iceberg.PrimitiveTypes.Int64, + Required: true, + }) + + spec, err := iceberg.NewPartitionSpecOpts(iceberg.WithSpecID(20), + iceberg.AddPartitionFieldByName("a", "a", iceberg.IdentityTransform{}, schema, nil), + iceberg.AddPartitionFieldByName("struct.nested", "nested_partition", iceberg.IdentityTransform{}, schema, nil)) + + require.NoError(t, err) + + sortOrder, err := NewSortOrder(10, []SortField{ + { + SourceID: 11, + Transform: iceberg.IdentityTransform{}, + Direction: SortASC, + NullOrder: NullsFirst, + }, + }) + require.NoError(t, err) + meta, err := NewMetadata( + schema, + &spec, + sortOrder, + TestLocation, + map[string]string{}) + + require.NoError(t, err) + require.NotNil(t, meta) + + expectedSchema := iceberg.NewSchema(0, iceberg.NestedField{ + ID: 1, + Name: "a", + Type: iceberg.PrimitiveTypes.Int64, + Required: true, + }, iceberg.NestedField{ + ID: 2, + Name: "b", + Type: iceberg.PrimitiveTypes.Int64, + Required: true, + }, iceberg.NestedField{ + ID: 3, + Name: "struct", + Type: &iceberg.StructType{ + FieldList: []iceberg.NestedField{ + { + Type: iceberg.PrimitiveTypes.Int64, + // TODO: this is discrepancy with rust impl, is 5 over there + ID: 4, + Name: "nested", + Required: true, + }, + }, + }, + Required: true, + }, + iceberg.NestedField{ + // TODO: this is discrepancy with rust impl, is 4 over there + ID: 5, + Name: "c", + Type: iceberg.PrimitiveTypes.Int64, + Required: true, + }) + id := 1000 + fieldID := 1001 + expectedSpec, err := iceberg.NewPartitionSpecOpts(iceberg.WithSpecID(0), + iceberg.AddPartitionFieldByName("a", "a", iceberg.IdentityTransform{}, expectedSchema, &id), + iceberg.AddPartitionFieldByName("struct.nested", "nested_partition", iceberg.IdentityTransform{}, expectedSchema, &fieldID)) + + require.NoError(t, err) + + expectedSortOrder, err := NewSortOrder(1, []SortField{ + { + SourceID: 1, + Transform: iceberg.IdentityTransform{}, + Direction: SortASC, + NullOrder: NullsFirst, + }, + }) + require.NoError(t, err) + + require.True(t, expectedSchema.Equals(meta.Schemas()[0])) + require.True(t, expectedSpec.Equals(meta.PartitionSpecs()[0])) + require.True(t, expectedSortOrder.Equals(meta.SortOrders()[0])) +} + func TestAddRemovePartitionSpec(t *testing.T) { builder := builderWithoutChanges(2) builderRef := &builder @@ -141,7 +297,7 @@ func TestAddRemovePartitionSpec(t *testing.T) { } require.True(t, found, "expected partition spec to be added") - newBuilder, err := MetadataBuilderFromBase(metadata) + newBuilder, err := MetadataBuilderFromBase(metadata, "") require.NoError(t, err) // Remove the spec require.NoError(t, newBuilder.RemovePartitionSpecs([]int{1})) @@ -214,7 +370,7 @@ func TestSetExistingDefaultPartitionSpec(t *testing.T) { require.True(t, expectedSpec.Equals(metadata.PartitionSpec()), "expected partition spec to match added spec") - newBuilder, err := MetadataBuilderFromBase(metadata) + newBuilder, err := MetadataBuilderFromBase(metadata, "") require.NoError(t, err) require.NotNil(t, newBuilder) @@ -269,7 +425,7 @@ func TestSetRef(t *testing.T) { SnapshotID: 1, ParentSnapshotID: nil, SequenceNumber: 0, - TimestampMs: builder.lastUpdatedMS + 1, + TimestampMs: builder.base.LastUpdatedMillis() + 1, ManifestList: "/snap-1.avro", Summary: &Summary{ Operation: OpAppend, @@ -381,7 +537,7 @@ func TestSetBranchSnapshotCreatesBranchIfNotExists(t *testing.T) { SnapshotID: 2, ParentSnapshotID: nil, SequenceNumber: 0, - TimestampMs: builder.lastUpdatedMS + 1, + TimestampMs: builder.base.LastUpdatedMillis(), ManifestList: "/snap-1.avro", Summary: &Summary{ Operation: OpAppend, @@ -419,7 +575,7 @@ func TestRemoveSnapshotRemovesBranch(t *testing.T) { SnapshotID: 2, ParentSnapshotID: nil, SequenceNumber: 0, - TimestampMs: builder.lastUpdatedMS + 1, + TimestampMs: builder.base.LastUpdatedMillis() + 1, ManifestList: "/snap-1.avro", Summary: &Summary{ Operation: OpAppend, @@ -449,7 +605,7 @@ func TestRemoveSnapshotRemovesBranch(t *testing.T) { require.Equal(t, BranchRef, builder.updates[1].(*setSnapshotRefUpdate).RefType) require.Equal(t, int64(2), builder.updates[1].(*setSnapshotRefUpdate).SnapshotID) - newBuilder, err := MetadataBuilderFromBase(meta) + newBuilder, err := MetadataBuilderFromBase(meta, "") require.NoError(t, err) require.NoError(t, newBuilder.RemoveSnapshots([]int64{snapshot.SnapshotID})) newMeta, err := newBuilder.Build() @@ -464,6 +620,81 @@ func TestRemoveSnapshotRemovesBranch(t *testing.T) { } } +func TestExpireMetadataLog(t *testing.T) { + builder1 := builderWithoutChanges(2) + meta, err := builder1.Build() + require.NoError(t, err) + builder, err := MetadataBuilderFromBase(meta, "s3://bla") + require.NoError(t, err) + err = builder.SetProperties(map[string]string{ + MetadataPreviousVersionsMaxKey: "2", + }) + require.NoError(t, err) + meta, err = builder.Build() + require.NoError(t, err) + require.Len(t, meta.(*metadataV2).MetadataLog, 1) + + location := "p" + newBuilder, err := MetadataBuilderFromBase(meta, location) + require.NoError(t, err) + err = newBuilder.SetProperties(map[string]string{ + "change_nr": "1", + }) + require.NoError(t, err) + meta, err = newBuilder.Build() + require.NoError(t, err) + require.Len(t, meta.(*metadataV2).MetadataLog, 2) + + newBuilder, err = MetadataBuilderFromBase(meta, location) + require.NoError(t, err) + err = newBuilder.SetProperties(map[string]string{ + "change_nr": "2", + }) + require.NoError(t, err) + meta, err = newBuilder.Build() + require.NoError(t, err) + require.Len(t, meta.(*metadataV2).MetadataLog, 2) +} + +func TestV2SequenceNumberCannotDecrease(t *testing.T) { + builder := builderWithoutChanges(2) + schemaID := 0 + snapshot1 := Snapshot{ + SnapshotID: 1, + ParentSnapshotID: nil, + SequenceNumber: 1, + TimestampMs: builder.base.LastUpdatedMillis() + 1, + ManifestList: "/snap-1.avro", + Summary: &Summary{ + Operation: OpAppend, + Properties: map[string]string{}, + }, + SchemaID: &schemaID, + } + + err := builder.AddSnapshot(&snapshot1) + require.NoError(t, err) + + err = builder.SetSnapshotRef(MainBranch, 1, BranchRef, WithMinSnapshotsToKeep(10)) + require.NoError(t, err) + + parentSnapshotID := int64(1) + snapshot2 := Snapshot{ + SnapshotID: 2, + ParentSnapshotID: &parentSnapshotID, + SequenceNumber: 0, // Lower sequence number than previous + TimestampMs: builder.lastUpdatedMS + 1, + ManifestList: "/snap-0.avro", + Summary: &Summary{ + Operation: OpAppend, + Properties: map[string]string{}, + }, + SchemaID: &schemaID, + } + err = builder.AddSnapshot(&snapshot2) + require.ErrorContains(t, err, "can't add snapshot with sequence number 0, must be > than last sequence number 1") +} + func TestCannotAddDuplicateSnapshotID(t *testing.T) { builder := builderWithoutChanges(2) schemaID := 0 @@ -471,7 +702,7 @@ func TestCannotAddDuplicateSnapshotID(t *testing.T) { SnapshotID: 2, ParentSnapshotID: nil, SequenceNumber: 0, - TimestampMs: builder.lastUpdatedMS + 1, + TimestampMs: builder.base.LastUpdatedMillis() + 1, ManifestList: "/snap-1.avro", Summary: &Summary{ Operation: OpAppend, @@ -488,6 +719,31 @@ func TestCannotAddDuplicateSnapshotID(t *testing.T) { require.ErrorContains(t, builder.AddSnapshot(&snapshot), "can't add snapshot with id 2, already exists") } +func TestLastUpdateIncreasedForPropertyOnlyUpdate(t *testing.T) { + builder := builderWithoutChanges(2) + meta, err := builder.Build() + require.NoError(t, err) + lastUpdatedMS := builder.lastUpdatedMS + time.Sleep(5 * time.Millisecond) + // Set a property + + location := "some-location" + + newBuilder, err := MetadataBuilderFromBase(meta, location) + require.NoError(t, err) + + err = newBuilder.SetProperties(map[string]string{ + "foo": "bar", + }) + require.NoError(t, err) + newMeta, err := newBuilder.Build() + require.NoError(t, err) + require.NotNil(t, newMeta) + + // Check that the last updated timestamp has increased + require.Greater(t, newMeta.LastUpdatedMillis(), lastUpdatedMS, "expected last updated timestamp to increase after property update") +} + func TestAddSnapshotRejectsInvalidTimestamp(t *testing.T) { builder := builderWithoutChanges(2) schemaID := 0 @@ -542,7 +798,7 @@ func TestConstructDefaultMainBranch(t *testing.T) { require.NoError(t, err) require.NotNil(t, meta) - builder, err := MetadataBuilderFromBase(meta) + builder, err := MetadataBuilderFromBase(meta, "") require.NoError(t, err) meta, err = builder.Build() @@ -568,7 +824,7 @@ func TestRemoveMainSnapshotRef(t *testing.T) { require.NoError(t, err) require.NotNil(t, meta) require.NotNil(t, meta.CurrentSnapshot()) - builder, err := MetadataBuilderFromBase(meta) + builder, err := MetadataBuilderFromBase(meta, "") require.NoError(t, err) require.NotNil(t, builder.currentSnapshotID) if _, ok := builder.refs[MainBranch]; !ok { @@ -592,7 +848,7 @@ func TestRemoveSchemas(t *testing.T) { meta, err := getTestTableMetadata("TableMetadataV2Valid.json") require.NoError(t, err) require.Len(t, meta.Schemas(), 2, "expected 2 schemas in the metadata") - builder, err := MetadataBuilderFromBase(meta) + builder, err := MetadataBuilderFromBase(meta, "") require.NoError(t, err) err = builder.RemoveSchemas([]int{0}) require.NoError(t, err, "expected to remove schema with ID 1") @@ -634,7 +890,7 @@ func TestUpdateSchema(t *testing.T) { iceberg.NestedField{ID: 2, Name: "x", Type: iceberg.PrimitiveTypes.String, Required: true}, ) - builder, err := MetadataBuilderFromBase(meta) + builder, err := MetadataBuilderFromBase(meta, "") require.NoError(t, err) err = builder.AddSchema(schema2) @@ -715,3 +971,232 @@ func TestRemoveReservedPropertiesFails(t *testing.T) { require.NoError(t, err) require.True(t, builder.HasChanges()) } + +func TestIdsAreReassignedForNewMetadata(t *testing.T) { + // Create schema with ID 10 (should be reassigned to 0) + tableSchema := iceberg.NewSchema( + 10, + iceberg.NestedField{ID: 1, Name: "x", Type: iceberg.PrimitiveTypes.Int64, Required: true}, + iceberg.NestedField{ID: 2, Name: "y", Type: iceberg.PrimitiveTypes.Int64, Required: true, Doc: "comment"}, + iceberg.NestedField{ID: 3, Name: "z", Type: iceberg.PrimitiveTypes.Int64, Required: true}, + ) + partitionSpec := partitionSpec() + sortOrder := sortOrder() + + metadata, err := NewMetadata( + tableSchema, + &partitionSpec, + sortOrder, + "file:///tmp/iceberg-test", + map[string]string{}, + ) + + require.NoError(t, err) + require.NotNil(t, metadata) + + require.Equal(t, 0, metadata.CurrentSchema().ID) + require.Equal(t, 0, metadata.(*metadataV2).CurrentSchemaID) +} + +func TestNewMetadataChanges(t *testing.T) { + tableSchema := schema() + partitionSpec := partitionSpec() + sortOrder := sortOrder() + properties := map[string]string{ + "property 1": "value 1", + } + + builder, err := NewMetadataBuilder(1) + require.NoError(t, err) + require.NoError(t, builder.SetLoc("file:///tmp/iceberg-test")) + require.NoError(t, builder.AddSchema(&tableSchema)) + require.NoError(t, builder.SetCurrentSchemaID(-1)) + require.NoError(t, builder.AddPartitionSpec(&partitionSpec, true)) + require.NoError(t, builder.SetDefaultSpecID(-1)) + require.NoError(t, builder.AddSortOrder(&sortOrder)) + require.NoError(t, builder.SetDefaultSortOrderID(-1)) + require.NoError(t, builder.SetProperties(properties)) + + _, err = builder.Build() + require.NoError(t, err) + + require.Len(t, builder.updates, 8) + + require.IsType(t, &setLocationUpdate{}, builder.updates[0]) + require.Equal(t, "file:///tmp/iceberg-test", builder.updates[0].(*setLocationUpdate).Location) + + require.IsType(t, &addSchemaUpdate{}, builder.updates[1]) + require.True(t, builder.updates[1].(*addSchemaUpdate).Schema.Equals(&tableSchema)) + + require.IsType(t, &setCurrentSchemaUpdate{}, builder.updates[2]) + require.Equal(t, -1, builder.updates[2].(*setCurrentSchemaUpdate).SchemaID) + + require.IsType(t, &addPartitionSpecUpdate{}, builder.updates[3]) + // For new tables, field IDs should be assigned (1000 for first partition field) + addedSpec := builder.updates[3].(*addPartitionSpecUpdate).Spec + require.Equal(t, 0, addedSpec.ID()) + require.Equal(t, 1, addedSpec.Len()) + require.Equal(t, 1000, addedSpec.Field(0).FieldID) + + require.IsType(t, &setDefaultSpecUpdate{}, builder.updates[4]) + require.Equal(t, -1, builder.updates[4].(*setDefaultSpecUpdate).SpecID) + + require.IsType(t, &addSortOrderUpdate{}, builder.updates[5]) + require.True(t, builder.updates[5].(*addSortOrderUpdate).SortOrder.Equals(sortOrder)) + + require.IsType(t, &setDefaultSortOrderUpdate{}, builder.updates[6]) + require.Equal(t, -1, builder.updates[6].(*setDefaultSortOrderUpdate).SortOrderID) + + require.IsType(t, &setPropertiesUpdate{}, builder.updates[7]) + require.Equal(t, iceberg.Properties{"property 1": "value 1"}, builder.updates[7].(*setPropertiesUpdate).Updates) +} + +func TestNewMetadataChangesUnpartitionedUnsorted(t *testing.T) { + tableSchema := *iceberg.NewSchema(0) + partitionSpec := *iceberg.UnpartitionedSpec + sortOrder := UnsortedSortOrder + + builder, err := NewMetadataBuilder(1) + require.NoError(t, err) + require.NoError(t, builder.SetLoc("file:///tmp/iceberg-test")) + require.NoError(t, builder.AddSchema(&tableSchema)) + require.NoError(t, builder.SetCurrentSchemaID(-1)) + require.NoError(t, builder.AddPartitionSpec(&partitionSpec, true)) + require.NoError(t, builder.SetDefaultSpecID(-1)) + require.NoError(t, builder.AddSortOrder(&sortOrder)) + require.NoError(t, builder.SetDefaultSortOrderID(-1)) + + _, err = builder.Build() + require.NoError(t, err) + + // Verify the expected updates were created (7 updates, no properties) + require.Len(t, builder.updates, 7) + + // Check each update type in order + require.IsType(t, &setLocationUpdate{}, builder.updates[0]) + require.Equal(t, "file:///tmp/iceberg-test", builder.updates[0].(*setLocationUpdate).Location) + + require.IsType(t, &addSchemaUpdate{}, builder.updates[1]) + require.True(t, builder.updates[1].(*addSchemaUpdate).Schema.Equals(&tableSchema)) + + require.IsType(t, &setCurrentSchemaUpdate{}, builder.updates[2]) + require.Equal(t, -1, builder.updates[2].(*setCurrentSchemaUpdate).SchemaID) + + require.IsType(t, &addPartitionSpecUpdate{}, builder.updates[3]) + + addedSpec := builder.updates[3].(*addPartitionSpecUpdate).Spec + require.Equal(t, 0, addedSpec.ID()) + require.Equal(t, 0, addedSpec.Len()) // Unpartitioned = no fields + + require.IsType(t, &setDefaultSpecUpdate{}, builder.updates[4]) + require.Equal(t, -1, builder.updates[4].(*setDefaultSpecUpdate).SpecID) + + require.IsType(t, &addSortOrderUpdate{}, builder.updates[5]) + require.True(t, builder.updates[5].(*addSortOrderUpdate).SortOrder.Equals(sortOrder)) + + require.IsType(t, &setDefaultSortOrderUpdate{}, builder.updates[6]) + require.Equal(t, -1, builder.updates[6].(*setDefaultSortOrderUpdate).SortOrderID) +} + +func TestSetCurrentSchemaChangeIsMinusOneIfSchemaWasAddedInThisChange(t *testing.T) { + builder := builderWithoutChanges(2) + + addedSchema := iceberg.NewSchema( + 1, + iceberg.NestedField{ID: 1, Name: "x", Type: iceberg.PrimitiveTypes.Int64, Required: true}, + iceberg.NestedField{ID: 2, Name: "y", Type: iceberg.PrimitiveTypes.Int64, Required: true}, + iceberg.NestedField{ID: 3, Name: "z", Type: iceberg.PrimitiveTypes.Int64, Required: true}, + iceberg.NestedField{ID: 4, Name: "a", Type: iceberg.PrimitiveTypes.Int64, Required: true}, + ) + + err := builder.AddSchema(addedSchema) + require.NoError(t, err) + + err = builder.SetCurrentSchemaID(1) + require.NoError(t, err) + + _, err = builder.Build() + require.NoError(t, err) + + // Should have 2 updates + require.Len(t, builder.updates, 2) + + // First update should be AddSchema + require.IsType(t, &addSchemaUpdate{}, builder.updates[0]) + require.True(t, builder.updates[0].(*addSchemaUpdate).Schema.Equals(addedSchema)) + + // Second update should be SetCurrentSchema with schema_id = -1 (indicates last added) + require.IsType(t, &setCurrentSchemaUpdate{}, builder.updates[1]) + require.Equal(t, -1, builder.updates[1].(*setCurrentSchemaUpdate).SchemaID) +} + +func TestNoMetadataLogForCreateTable(t *testing.T) { + tableSchema := schema() + partitionSpec := partitionSpec() + sortOrder := sortOrder() + + metadata, err := NewMetadata( + &tableSchema, + &partitionSpec, + sortOrder, + "file:///tmp/iceberg-test", + map[string]string{}, + ) + + require.NoError(t, err) + require.NotNil(t, metadata) + + require.Len(t, metadata.(*metadataV2).MetadataLog, 0) +} + +func TestNoMetadataLogEntryForNoPreviousLocation(t *testing.T) { + builder := builderWithoutChanges(2) + require.NoError(t, builder.SetLoc("file:///tmp/iceberg-test")) + metadata, err := builder.Build() + require.NoError(t, err) + require.NotNil(t, metadata) + require.Len(t, metadata.(*metadataV2).MetadataLog, 1) + + newBuilder, err := MetadataBuilderFromBase(metadata, "") + require.NoError(t, err) + + err = newBuilder.SetProperties(map[string]string{ + "foo": "bar", + }) + require.NoError(t, err) + + newMetadata, err := newBuilder.Build() + require.NoError(t, err) + + require.Len(t, newMetadata.(*metadataV2).MetadataLog, 1) +} + +func TestFromMetadataGeneratesMetadataLog(t *testing.T) { + metadataPath := "s3://bucket/test/location/metadata/metadata1.json" + + tableSchema := schema() + partitionSpec := partitionSpec() + sortOrder := sortOrder() + + metadata, err := NewMetadata( + &tableSchema, + &partitionSpec, + sortOrder, + "file:///tmp/iceberg-test", + map[string]string{}, + ) + require.NoError(t, err) + require.NotNil(t, metadata) + + builder, err := MetadataBuilderFromBase(metadata, metadataPath) + require.NoError(t, err) + + err = builder.AddSortOrder(&UnsortedSortOrder) + require.NoError(t, err) + + newMetadata, err := builder.Build() + require.NoError(t, err) + + require.Len(t, newMetadata.(*metadataV2).MetadataLog, 1) + require.Equal(t, metadataPath, newMetadata.(*metadataV2).MetadataLog[0].MetadataFile) +} diff --git a/table/metadata_internal_test.go b/table/metadata_internal_test.go index 7b58eb016..3401780fa 100644 --- a/table/metadata_internal_test.go +++ b/table/metadata_internal_test.go @@ -846,7 +846,7 @@ func TestMetadataV2Serialize(t *testing.T) { } func TestMetadataBuilderSetDefaultSpecIDLastPartition(t *testing.T) { - builder, err := NewMetadataBuilder() + builder, err := NewMetadataBuilder(2) assert.NoError(t, err) schema := schema() assert.NoError(t, builder.AddSchema(&schema)) @@ -860,9 +860,8 @@ func TestMetadataBuilderSetDefaultSpecIDLastPartition(t *testing.T) { } func TestMetadataBuilderSetLastAddedSchema(t *testing.T) { - builder, err := NewMetadataBuilder() + builder, err := NewMetadataBuilder(2) assert.NoError(t, err) - assert.NoError(t, builder.SetFormatVersion(2)) schema := iceberg.NewSchema(1, iceberg.NestedField{ID: 1, Name: "foo", Type: iceberg.StringType{}, Required: true}, ) @@ -874,6 +873,10 @@ func TestMetadataBuilderSetLastAddedSchema(t *testing.T) { assert.NoError(t, builder.SetDefaultSpecID(-1)) + unsorted := UnsortedSortOrder + require.NoError(t, builder.AddSortOrder(&unsorted)) + require.NoError(t, builder.SetDefaultSortOrderID(-1)) + meta, err := builder.Build() assert.NoError(t, err) assert.Equal(t, schema.ID, meta.CurrentSchema().ID) @@ -881,7 +884,7 @@ func TestMetadataBuilderSetLastAddedSchema(t *testing.T) { } func TestMetadataBuilderSchemaIncreasingNumbering(t *testing.T) { - builder, err := NewMetadataBuilder() + builder, err := NewMetadataBuilder(2) assert.NoError(t, err) assert.NoError(t, builder.SetFormatVersion(2)) schema := iceberg.NewSchema(1, @@ -905,7 +908,7 @@ func TestMetadataBuilderSchemaIncreasingNumbering(t *testing.T) { } func TestMetadataBuilderReuseSchema(t *testing.T) { - builder, err := NewMetadataBuilder() + builder, err := NewMetadataBuilder(2) assert.NoError(t, err) schema := iceberg.NewSchema(1, iceberg.NestedField{ID: 1, Name: "foo", Type: iceberg.StringType{}, Required: true}, diff --git a/table/partitioned_fanout_writer_test.go b/table/partitioned_fanout_writer_test.go index 09c172e72..b7e0f6d8b 100644 --- a/table/partitioned_fanout_writer_test.go +++ b/table/partitioned_fanout_writer_test.go @@ -106,7 +106,7 @@ func (s *FanoutWriterTestSuite) testTransformPartition(transform iceberg.Transfo meta, err := NewMetadata(icebergSchema, &spec, UnsortedSortOrder, loc, iceberg.Properties{}) s.Require().NoError(err) - metaBuilder, err := MetadataBuilderFromBase(meta) + metaBuilder, err := MetadataBuilderFromBase(meta, "") s.Require().NoError(err) args := recordWritingArgs{ diff --git a/table/table.go b/table/table.go index 02240eed6..ad0b9f69c 100644 --- a/table/table.go +++ b/table/table.go @@ -83,7 +83,7 @@ func (t Table) LocationProvider() (LocationProvider, error) { } func (t Table) NewTransaction() *Transaction { - meta, _ := MetadataBuilderFromBase(t.metadata) + meta, _ := MetadataBuilderFromBase(t.metadata, t.metadataLocation) return &Transaction{ tbl: &t, diff --git a/table/table_test.go b/table/table_test.go index 01eff6b2a..47fbe9662 100644 --- a/table/table_test.go +++ b/table/table_test.go @@ -789,18 +789,7 @@ func (m *mockedCatalog) LoadTable(ctx context.Context, ident table.Identifier) ( } func (m *mockedCatalog) CommitTable(ctx context.Context, ident table.Identifier, reqs []table.Requirement, updates []table.Update) (table.Metadata, string, error) { - bldr, err := table.MetadataBuilderFromBase(m.metadata) - if err != nil { - return nil, "", err - } - - for _, u := range updates { - if err := u.Apply(bldr); err != nil { - return nil, "", err - } - } - - meta, err := bldr.Build() + meta, err := table.UpdateTableMetadata(m.metadata, updates, "") if err != nil { return nil, "", err } @@ -1336,7 +1325,7 @@ func (m *DeleteOldMetadataMockedCatalog) LoadTable(ctx context.Context, ident ta } func (m *DeleteOldMetadataMockedCatalog) CommitTable(ctx context.Context, ident table.Identifier, reqs []table.Requirement, updates []table.Update) (table.Metadata, string, error) { - bldr, err := table.MetadataBuilderFromBase(m.metadata) + bldr, err := table.MetadataBuilderFromBase(m.metadata, "") if err != nil { return nil, "", err } diff --git a/table/time_travel_test.go b/table/time_travel_test.go index dcc23805e..d20548d1e 100644 --- a/table/time_travel_test.go +++ b/table/time_travel_test.go @@ -280,7 +280,7 @@ func createTestMetadata(snapshots []Snapshot, snapshotLog []SnapshotLogEntry) (M // If we have custom snapshots or logs, we need to modify the metadata if len(snapshots) > 0 || len(snapshotLog) > 0 { - builder, err := MetadataBuilderFromBase(meta) + builder, err := MetadataBuilderFromBase(meta, "") if err != nil { return nil, err }