Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion catalog/glue/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,7 @@ func TestSchemasToGlueColumns(t *testing.T) {
metadata, err := table.NewMetadata(schemas[0], nil, table.SortOrder{}, "s3://example/path", nil)
assert.NoError(t, err)

mb, err := table.MetadataBuilderFromBase(metadata)
mb, err := table.MetadataBuilderFromBase(metadata, "")
assert.NoError(t, err)

err = mb.AddSchema(schemas[1])
Expand Down
30 changes: 1 addition & 29 deletions catalog/internal/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,35 +69,7 @@ func WriteMetadata(ctx context.Context, metadata table.Metadata, loc string, pro
}

func UpdateTableMetadata(base table.Metadata, updates []table.Update, metadataLoc string) (table.Metadata, error) {
bldr, err := table.MetadataBuilderFromBase(base)
if err != nil {
return nil, err
}

for _, update := range updates {
if err := update.Apply(bldr); err != nil {
return nil, err
}
}

if bldr.HasChanges() {
if metadataLoc != "" {
maxMetadataLogEntries := max(1,
base.Properties().GetInt(
table.MetadataPreviousVersionsMaxKey, table.MetadataPreviousVersionsMaxDefault))

bldr.TrimMetadataLogs(maxMetadataLogEntries + 1).
AppendMetadataLog(table.MetadataLogEntry{
MetadataFile: metadataLoc,
TimestampMs: base.LastUpdatedMillis(),
})
}
if base.LastUpdatedMillis() == bldr.LastUpdatedMS() {
bldr.SetLastUpdatedMS()
}
}

return bldr.Build()
return table.UpdateTableMetadata(base, updates, metadataLoc)
}

func CreateStagedTable(ctx context.Context, catprops iceberg.Properties, nspropsFn GetNamespacePropsFn, ident table.Identifier, sc *iceberg.Schema, opts ...catalog.CreateTableOpt) (table.StagedTable, error) {
Expand Down
6 changes: 5 additions & 1 deletion errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,16 @@

package iceberg

import "errors"
import (
"errors"
"fmt"
)

var (
ErrInvalidTypeString = errors.New("invalid type")
ErrNotImplemented = errors.New("not implemented")
ErrInvalidArgument = errors.New("invalid argument")
ErrInvalidFormatVersion = fmt.Errorf("%w: invalid format version", ErrInvalidArgument)
ErrInvalidSchema = errors.New("invalid schema")
ErrInvalidTransform = errors.New("invalid transform syntax")
ErrType = errors.New("type error")
Expand Down
2 changes: 1 addition & 1 deletion table/arrow_utils_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func (suite *FileStatsMetricsSuite) getDataFile(meta iceberg.Properties, writeSt

schema := tableMeta.CurrentSchema()
if len(meta) > 0 {
bldr, err := MetadataBuilderFromBase(tableMeta)
bldr, err := MetadataBuilderFromBase(tableMeta, "")
suite.Require().NoError(err)
err = bldr.SetProperties(meta)
suite.Require().NoError(err)
Expand Down
101 changes: 70 additions & 31 deletions table/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ type MetadataBuilder struct {
defaultSortOrderID int
refs map[string]SnapshotRef

previousFileEntry *MetadataLogEntry
// >v1 specific
lastSequenceNumber *int64
// update tracking
Expand All @@ -183,28 +184,41 @@ type MetadataBuilder struct {
lastAddedSortOrderID *int
}

func NewMetadataBuilder() (*MetadataBuilder, error) {
func NewMetadataBuilder(formatVersion int) (*MetadataBuilder, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should probably error if an invalid format version is passed (i.e. one we don't support)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, missed that, added, thanks!

if formatVersion < 1 || formatVersion > supportedTableFormatVersion {
return nil, fmt.Errorf("%w: %d", iceberg.ErrInvalidFormatVersion, formatVersion)
}

return &MetadataBuilder{
updates: make([]Update, 0),
schemaList: make([]*iceberg.Schema, 0),
specs: make([]iceberg.PartitionSpec, 0),
props: make(iceberg.Properties),
snapshotList: make([]Snapshot, 0),
snapshotLog: make([]SnapshotLogEntry, 0),
metadataLog: make([]MetadataLogEntry, 0),
sortOrderList: make([]SortOrder, 0),
refs: make(map[string]SnapshotRef),
updates: make([]Update, 0),
schemaList: make([]*iceberg.Schema, 0),
specs: make([]iceberg.PartitionSpec, 0),
props: make(iceberg.Properties),
snapshotList: make([]Snapshot, 0),
snapshotLog: make([]SnapshotLogEntry, 0),
metadataLog: make([]MetadataLogEntry, 0),
sortOrderList: make([]SortOrder, 0),
refs: make(map[string]SnapshotRef),
currentSchemaID: -1,
defaultSortOrderID: -1,
defaultSpecID: -1,
lastColumnId: -1,
formatVersion: formatVersion,
}, nil
}

func MetadataBuilderFromBase(metadata Metadata) (*MetadataBuilder, error) {
// MetadataBuilderFromBase creates a MetadataBuilder from an existing Metadata object.
// currentFileLocation is the location where the current version of the metadata
// file is stored. This is used to update the metadata log. If currentFileLocation is
// empty, the metadata log will not be updated. This should only be used to stage-create tables.
func MetadataBuilderFromBase(metadata Metadata, currentFileLocation string) (*MetadataBuilder, error) {
b := &MetadataBuilder{}
b.base = metadata

b.formatVersion = metadata.Version()
b.uuid = metadata.TableUUID()
b.loc = metadata.Location()
b.lastUpdatedMS = metadata.LastUpdatedMillis()
b.lastUpdatedMS = 0
Comment on lines -207 to +221
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the reasoning for this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's java & rust behavior, the if condition in Build() checks if b.lastUpdatedMS == 0 and only then updates it to Now(), according to spec, it should always be updated to Now():

image

b.lastColumnId = metadata.LastColumnID()
b.schemaList = slices.Clone(metadata.Schemas())
b.currentSchemaID = metadata.CurrentSchema().ID
Expand All @@ -229,6 +243,13 @@ func MetadataBuilderFromBase(metadata Metadata) (*MetadataBuilder, error) {
b.snapshotLog = slices.Collect(metadata.SnapshotLogs())
b.metadataLog = slices.Collect(metadata.PreviousFiles())

if currentFileLocation != "" {
b.previousFileEntry = &MetadataLogEntry{
MetadataFile: currentFileLocation,
TimestampMs: metadata.LastUpdatedMillis(),
}
}

return b, nil
}

Expand Down Expand Up @@ -451,6 +472,7 @@ func (b *MetadataBuilder) AddSortOrder(sortOrder *SortOrder) error {
}

b.sortOrderList = append(sortOrders, *sortOrder)
b.lastAddedSortOrderID = &sortOrder.orderID
b.updates = append(b.updates, NewAddSortOrderUpdate(sortOrder))

return nil
Expand All @@ -474,8 +496,7 @@ func (b *MetadataBuilder) RemoveProperties(keys []string) error {
}

func (b *MetadataBuilder) SetCurrentSchemaID(currentSchemaID int) error {
takeLast := currentSchemaID == -1
if takeLast {
if currentSchemaID == -1 {
if b.lastAddedSchemaID == nil {
return errors.New("can't set current schema to last added schema, no schema has been added")
}
Expand All @@ -491,7 +512,7 @@ func (b *MetadataBuilder) SetCurrentSchemaID(currentSchemaID int) error {
return fmt.Errorf("can't set current schema to schema with id %d: %w", currentSchemaID, err)
}

if takeLast {
if b.lastAddedSchemaID != nil && currentSchemaID == *b.lastAddedSchemaID {
b.updates = append(b.updates, NewSetCurrentSchemaUpdate(-1))
} else {
b.updates = append(b.updates, NewSetCurrentSchemaUpdate(currentSchemaID))
Expand All @@ -503,14 +524,10 @@ func (b *MetadataBuilder) SetCurrentSchemaID(currentSchemaID int) error {

func (b *MetadataBuilder) SetDefaultSortOrderID(defaultSortOrderID int) error {
if defaultSortOrderID == -1 {
defaultSortOrderID = maxBy(b.sortOrderList, func(s SortOrder) int {
return s.OrderID()
})
if !slices.ContainsFunc(b.updates, func(u Update) bool {
return u.Action() == UpdateAddSortOrder && u.(*addSortOrderUpdate).SortOrder.OrderID() == defaultSortOrderID
}) {
if b.lastAddedSortOrderID == nil {
return errors.New("can't set default sort order to last added with no added sort orders")
}
defaultSortOrderID = *b.lastAddedSortOrderID
}

if defaultSortOrderID == b.defaultSortOrderID {
Expand All @@ -521,17 +538,20 @@ func (b *MetadataBuilder) SetDefaultSortOrderID(defaultSortOrderID int) error {
return fmt.Errorf("can't set default sort order to sort order with id %d: %w", defaultSortOrderID, err)
}

b.updates = append(b.updates, NewSetDefaultSortOrderUpdate(defaultSortOrderID))
if b.lastAddedSortOrderID != nil && defaultSortOrderID == *b.lastAddedSortOrderID {
b.updates = append(b.updates, NewSetDefaultSortOrderUpdate(-1))
} else {
b.updates = append(b.updates, NewSetDefaultSortOrderUpdate(defaultSortOrderID))
}

b.defaultSortOrderID = defaultSortOrderID

return nil
}

func (b *MetadataBuilder) SetDefaultSpecID(defaultSpecID int) error {
lastUsed := false
if defaultSpecID == -1 {
if b.lastAddedPartitionID != nil {
lastUsed = true
defaultSpecID = *b.lastAddedPartitionID
} else {
return errors.New("can't set default spec to last added with no added partition specs")
Expand All @@ -546,7 +566,7 @@ func (b *MetadataBuilder) SetDefaultSpecID(defaultSpecID int) error {
return fmt.Errorf("can't set default spec to spec with id %d: %w", defaultSpecID, err)
}

if lastUsed {
if b.lastAddedPartitionID != nil && defaultSpecID == *b.lastAddedPartitionID {
b.updates = append(b.updates, NewSetDefaultSpecUpdate(-1))
} else {
b.updates = append(b.updates, NewSetDefaultSpecUpdate(defaultSpecID))
Expand All @@ -563,7 +583,7 @@ func (b *MetadataBuilder) SetFormatVersion(formatVersion int) error {
}

if formatVersion > supportedTableFormatVersion {
return fmt.Errorf("unsupported format version %d", formatVersion)
return fmt.Errorf("%w: %d", iceberg.ErrInvalidFormatVersion, formatVersion)
}

if formatVersion == b.formatVersion {
Expand Down Expand Up @@ -757,6 +777,14 @@ func (b *MetadataBuilder) buildCommonMetadata() (*commonMetadata, error) {
b.lastUpdatedMS = time.Now().UnixMilli()
}

if b.previousFileEntry != nil && b.HasChanges() {
maxMetadataLogEntries := max(1,
b.base.Properties().GetInt(
MetadataPreviousVersionsMaxKey, MetadataPreviousVersionsMaxDefault))
b.AppendMetadataLog(*b.previousFileEntry)
b.TrimMetadataLogs(maxMetadataLogEntries)
}

return &commonMetadata{
FormatVersion: b.formatVersion,
UUID: b.uuid,
Expand Down Expand Up @@ -1702,15 +1730,11 @@ func NewMetadataWithUUID(sc *iceberg.Schema, partitions *iceberg.PartitionSpec,
return nil, err
}

builder, err := NewMetadataBuilder()
builder, err := NewMetadataBuilder(formatVersion)
if err != nil {
return nil, err
}

if err = builder.SetFormatVersion(formatVersion); err != nil {
return nil, err
}

if err = builder.SetUUID(tableUuid); err != nil {
return nil, err
}
Expand Down Expand Up @@ -1791,3 +1815,18 @@ func reassignIDs(sc *iceberg.Schema, partitions *iceberg.PartitionSpec, sortOrde
sortOrder: freshSortOrder,
}, nil
}

func UpdateTableMetadata(base Metadata, updates []Update, metadataLoc string) (Metadata, error) {
bldr, err := MetadataBuilderFromBase(base, metadataLoc)
if err != nil {
return nil, err
}

for _, update := range updates {
if err := update.Apply(bldr); err != nil {
return nil, err
}
}

return bldr.Build()
}
Loading
Loading