-
Notifications
You must be signed in to change notification settings - Fork 139
feat(table): Support Dynamic Partition Overwrite #482
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -24,6 +24,8 @@ import ( | |
| "fmt" | ||
| "runtime" | ||
| "slices" | ||
| "strconv" | ||
| "strings" | ||
| "sync" | ||
| "time" | ||
|
|
||
|
|
@@ -343,6 +345,250 @@ func (t *Transaction) AddFiles(ctx context.Context, files []string, snapshotProp | |
| return t.apply(updates, reqs) | ||
| } | ||
|
|
||
| // DynamicPartitionOverwrite performs a dynamic partition overwrite operation. | ||
| // It detects partition values in the provided arrow table using the current | ||
| // partition spec, deletes existing partitions matching these values, and then | ||
| // appends the new data. | ||
| func (t *Transaction) DynamicPartitionOverwrite(ctx context.Context, tbl arrow.Table, batchSize int64, snapshotProps iceberg.Properties) error { | ||
| if t.meta.CurrentSpec().IsUnpartitioned() { | ||
| return fmt.Errorf("%w: cannot apply dynamic overwrite on an unpartitioned table", ErrInvalidOperation) | ||
| } | ||
|
|
||
| // Check that all partition fields use identity transforms | ||
| currentSpec := t.meta.CurrentSpec() | ||
| for field := range currentSpec.Fields() { | ||
| if _, ok := field.Transform.(iceberg.IdentityTransform); !ok { | ||
| return fmt.Errorf("%w: dynamic overwrite does not support non-identity-transform fields in partition spec: %s", | ||
| ErrInvalidOperation, field.Name) | ||
| } | ||
| } | ||
|
|
||
| if tbl.NumRows() == 0 { | ||
| return nil | ||
| } | ||
|
Comment on lines
+370
to
+372
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. shouldn't this overwrite the partition with an empty partition?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. cmiiw but in the spark writer is it quite similar https://github.com/apache/iceberg/blob/0651b8913d27c3b1c9aca4a9609bec521905fb36/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java#L297-L305 |
||
|
|
||
| fs, err := t.tbl.fsF(ctx) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| commitUUID := uuid.New() | ||
| rdr := array.NewTableReader(tbl, batchSize) | ||
| defer rdr.Release() | ||
| dataFiles := recordsToDataFiles(ctx, t.tbl.Location(), t.meta, recordWritingArgs{ | ||
| sc: tbl.Schema(), | ||
| itr: array.IterFromReader(rdr), | ||
| fs: fs.(io.WriteFileIO), | ||
| writeUUID: &commitUUID, | ||
| }) | ||
|
|
||
| var allDataFiles []iceberg.DataFile | ||
| for df, err := range dataFiles { | ||
| if err != nil { | ||
| return err | ||
| } | ||
| allDataFiles = append(allDataFiles, df) | ||
| } | ||
|
|
||
| partitionsToOverwrite := make(map[string]struct{}) | ||
| for _, df := range allDataFiles { | ||
| partitionKey := fmt.Sprintf("%v", df.Partition()) | ||
| partitionsToOverwrite[partitionKey] = struct{}{} | ||
| } | ||
|
Comment on lines
390
to
415
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you can probably merge these loops |
||
|
|
||
| deleteFilter := t.buildPartitionPredicate(partitionsToOverwrite) | ||
|
|
||
| if err := t.Delete(ctx, deleteFilter, snapshotProps); err != nil { | ||
| return err | ||
| } | ||
|
|
||
| appendFiles := t.appendSnapshotProducer(fs, snapshotProps) | ||
| for _, df := range allDataFiles { | ||
| appendFiles.appendDataFile(df) | ||
| } | ||
|
|
||
| updates, reqs, err := appendFiles.commit() | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| return t.apply(updates, reqs) | ||
| } | ||
|
|
||
| // Delete performs a delete operation with the given filter and snapshot properties. | ||
| func (t *Transaction) Delete(ctx context.Context, filter iceberg.BooleanExpression, snapshotProps iceberg.Properties) error { | ||
dttung2905 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| fs, err := t.tbl.fsF(ctx) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| deleteProducer := t.updateSnapshot(fs, snapshotProps).mergeOverwrite(nil) | ||
|
||
|
|
||
| currentSnapshot := t.meta.currentSnapshot() | ||
| if currentSnapshot == nil { | ||
| return fmt.Errorf("%w: cannot delete from table without existing snapshot", ErrInvalidOperation) | ||
| } | ||
|
|
||
| scan, err := t.Scan(WithRowFilter(filter)) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| fileScan, err := scan.PlanFiles(ctx) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| // Mark files for deletion | ||
| for _, task := range fileScan { | ||
| deleteProducer.deleteDataFile(task.File) | ||
| } | ||
|
|
||
| updates, reqs, err := deleteProducer.commit() | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| return t.apply(updates, reqs) | ||
| } | ||
|
|
||
| // buildPartitionPredicate builds a filter predicate matching any of the input partition records. | ||
| func (t *Transaction) buildPartitionPredicate(partitionRecords map[string]struct{}) iceberg.BooleanExpression { | ||
| partitionSpec := t.meta.CurrentSpec() | ||
| schema := t.meta.CurrentSchema() | ||
|
|
||
| var partitionFields []string | ||
| for field := range partitionSpec.Fields() { | ||
| if field, ok := schema.FindFieldByID(field.SourceID); ok { | ||
| partitionFields = append(partitionFields, field.Name) | ||
| } | ||
| } | ||
|
|
||
| // Build OR expression for all partitions | ||
| var expressions []iceberg.BooleanExpression | ||
|
|
||
| for partitionKey := range partitionRecords { | ||
| partitionValues := parsePartitionKey(partitionKey, partitionFields) | ||
|
|
||
| // Build AND expression for this partition | ||
| var partitionExprs []iceberg.BooleanExpression | ||
| for i, fieldName := range partitionFields { | ||
| if i < len(partitionValues) { | ||
| value := partitionValues[i] | ||
| if value == nil { | ||
| partitionExprs = append(partitionExprs, iceberg.IsNull(iceberg.Reference(fieldName))) | ||
| } else { | ||
| // Create an expression based on a field type | ||
| if field, ok := schema.FindFieldByName(fieldName); ok { | ||
| partitionExprs = append(partitionExprs, createEqualToExpression(iceberg.Reference(fieldName), value, field.Type)) | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| if len(partitionExprs) > 0 { | ||
| partitionExpr := partitionExprs[0] | ||
| for _, expr := range partitionExprs[1:] { | ||
| partitionExpr = iceberg.NewAnd(partitionExpr, expr) | ||
| } | ||
|
||
| expressions = append(expressions, partitionExpr) | ||
| } | ||
| } | ||
|
|
||
| if len(expressions) == 0 { | ||
| return iceberg.AlwaysFalse{} | ||
| } | ||
|
|
||
| result := expressions[0] | ||
| for _, expr := range expressions[1:] { | ||
| result = iceberg.NewOr(result, expr) | ||
| } | ||
|
||
|
|
||
| return result | ||
| } | ||
|
|
||
| // parsePartitionKey parses a partition key string into individual values. | ||
| func parsePartitionKey(partitionKey string, fieldNames []string) []interface{} { | ||
| // Simple parsing for demonstration - assumes a format like "field1=value1/field2=value2" | ||
| parts := strings.Split(partitionKey, "/") | ||
| values := make([]interface{}, len(fieldNames)) | ||
|
||
|
|
||
| for i, part := range parts { | ||
| if i >= len(fieldNames) { | ||
| break | ||
| } | ||
|
|
||
| if strings.Contains(part, "=") { | ||
| kv := strings.SplitN(part, "=", 2) | ||
| if len(kv) == 2 { | ||
| values[i] = parsePartitionValue(kv[1]) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| return values | ||
| } | ||
|
|
||
| // parsePartitionValue converts a string partition value to the appropriate type. | ||
| func parsePartitionValue(valueStr string) interface{} { | ||
| if valueStr == "null" || valueStr == "" { | ||
| return nil | ||
| } | ||
|
|
||
| if i, err := strconv.ParseInt(valueStr, 10, 64); err == nil { | ||
| return i | ||
| } | ||
|
|
||
| if f, err := strconv.ParseFloat(valueStr, 64); err == nil { | ||
| return f | ||
| } | ||
|
|
||
| if b, err := strconv.ParseBool(valueStr); err == nil { | ||
| return b | ||
| } | ||
|
|
||
| return valueStr | ||
| } | ||
|
|
||
| // createEqualToExpression creates an EqualTo expression with the correct type | ||
| func createEqualToExpression(term iceberg.UnboundTerm, value interface{}, typ iceberg.Type) iceberg.BooleanExpression { | ||
| switch t := typ.(type) { | ||
| case iceberg.PrimitiveType: | ||
| switch t { | ||
| case iceberg.PrimitiveTypes.Int32: | ||
| if v, ok := value.(int32); ok { | ||
| return iceberg.EqualTo(term, v) | ||
| } | ||
| case iceberg.PrimitiveTypes.Int64: | ||
| if v, ok := value.(int64); ok { | ||
| return iceberg.EqualTo(term, v) | ||
| } | ||
| case iceberg.PrimitiveTypes.Float32: | ||
| if v, ok := value.(float32); ok { | ||
| return iceberg.EqualTo(term, v) | ||
| } | ||
| case iceberg.PrimitiveTypes.Float64: | ||
| if v, ok := value.(float64); ok { | ||
| return iceberg.EqualTo(term, v) | ||
| } | ||
| case iceberg.PrimitiveTypes.String: | ||
| if v, ok := value.(string); ok { | ||
| return iceberg.EqualTo(term, v) | ||
| } | ||
| case iceberg.PrimitiveTypes.Bool: | ||
| if v, ok := value.(bool); ok { | ||
| return iceberg.EqualTo(term, v) | ||
| } | ||
| } | ||
| } | ||
|
||
|
|
||
| // Fallback to string | ||
| if v, ok := value.(string); ok { | ||
| return iceberg.EqualTo(term, v) | ||
| } | ||
|
|
||
| return iceberg.EqualTo(term, fmt.Sprintf("%v", value)) | ||
| } | ||
|
|
||
| func (t *Transaction) Scan(opts ...ScanOption) (*Scan, error) { | ||
| updatedMeta, err := t.meta.Build() | ||
| if err != nil { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this defined in the spec? Or is this just a NotYetImplemented thing?