Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
220 changes: 219 additions & 1 deletion table/arrow_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -1310,5 +1310,223 @@ func recordsToDataFiles(ctx context.Context, rootLocation string, meta *Metadata
return writeFiles(ctx, rootLocation, args.fs, meta, tasks)
}

panic(fmt.Errorf("%w: write stream with partitions", iceberg.ErrNotImplemented))
// For partitioned tables, group records by partition and write each group separately
// This is a basic implementation for identity transforms (required by dynamic partition overwrite)
return writePartitionedFiles(ctx, rootLocation, args.fs, meta, args, nextCount, stopCount, taskSchema, targetFileSize)
}

// writePartitionedFiles groups records by partition and writes each group as a separate file.
// This implementation supports identity transforms only (required by dynamic partition overwrite).
func writePartitionedFiles(
ctx context.Context,
rootLocation string,
fs iceio.WriteFileIO,
meta *MetadataBuilder,
args recordWritingArgs,
nextCount func() (int, bool),
stopCount func(),
taskSchema *iceberg.Schema,
targetFileSize int64,
) iter.Seq2[iceberg.DataFile, error] {
partitionSpec := meta.CurrentSpec()
schema := meta.CurrentSchema()

// Collect all records first
var allRecords []arrow.Record
for rec, err := range args.itr {
if err != nil {
return func(yield func(iceberg.DataFile, error) bool) {
yield(nil, err)
}
}
rec.Retain()
allRecords = append(allRecords, rec)
}
defer func() {
for _, rec := range allRecords {
rec.Release()
}
}()

if len(allRecords) == 0 {
return func(yield func(iceberg.DataFile, error) bool) {}
}

// Get partition field information
var partitionFields []iceberg.PartitionField
var partitionSourceNames []string
for field := range partitionSpec.Fields() {
partitionFields = append(partitionFields, field)
if sourceField, ok := schema.FindFieldByID(field.SourceID); ok {
partitionSourceNames = append(partitionSourceNames, sourceField.Name)
}
}

// For identity transforms, we can extract partition values directly from the records
// Group records by partition
partitionGroups := make(map[string][]arrow.Record)

for _, rec := range allRecords {
// Extract partition key from first row (all rows in a record should have same partition for identity)
partitionKey := extractPartitionKey(rec, partitionFields, partitionSourceNames, schema)

group := partitionGroups[partitionKey]
group = append(group, rec)
partitionGroups[partitionKey] = group
}

// Write each partition group
return func(yield func(iceberg.DataFile, error) bool) {
defer stopCount()

for partitionKey, records := range partitionGroups {
_ = partitionKey

for batch := range binPackPartitionRecords(records, 20, targetFileSize) {
cnt, _ := nextCount()
t := WriteTask{
Uuid: *args.writeUUID,
ID: cnt,
Schema: taskSchema,
Batches: batch,
}

df, err := writePartitionedBatch(ctx, rootLocation, fs, meta, t, partitionSpec)
if err != nil {
if !yield(nil, err) {
return
}
continue
}

if !yield(df, nil) {
return
}
}
}
}
}

// extractPartitionKey extracts a partition key string from the first row of a record.
// This works for identity transforms only.
func extractPartitionKey(rec arrow.Record, partitionFields []iceberg.PartitionField, partitionSourceNames []string, schema *iceberg.Schema) string {
if rec.NumRows() == 0 {
return ""
}

var keyParts []string
for i, field := range partitionFields {
if i >= len(partitionSourceNames) {
continue
}

colName := partitionSourceNames[i]
colIdx := rec.Schema().FieldIndices(colName)
if len(colIdx) == 0 {
keyParts = append(keyParts, fmt.Sprintf("%d=null", field.FieldID))
continue
}

arr := rec.Column(colIdx[0])
if arr.Len() == 0 {
keyParts = append(keyParts, fmt.Sprintf("%d=null", field.FieldID))
continue
}

// Get value from first row
val := getValueFromArray(arr, 0)
keyParts = append(keyParts, fmt.Sprintf("%d=%v", field.FieldID, val))
}

return strings.Join(keyParts, "/")
}

// getValueFromArray extracts a value from an Arrow array at the given index.
// For chunked arrays, it uses the first chunk.
func getValueFromArray(arr arrow.Array, idx int) interface{} {
if arr.Len() <= idx {
return nil
}

if arr.IsNull(idx) {
return nil
}

// Handle different array types
switch a := arr.(type) {
case *array.Int32:
return a.Value(idx)
case *array.Int64:
return a.Value(idx)
case *array.Float32:
return a.Value(idx)
case *array.Float64:
return a.Value(idx)
case *array.String:
return a.Value(idx)
case *array.Boolean:
return a.Value(idx)
case *array.Date32:
return a.Value(idx)
case *array.Timestamp:
return a.Value(idx)
default:
// For other types, use string representation
return a.ValueStr(idx)
}
}

// binPackPartitionRecords bins records into batches similar to binPackRecords.
func binPackPartitionRecords(records []arrow.Record, recordLookback int, targetFileSize int64) iter.Seq[[]arrow.Record] {
return internal.PackingIterator(func(yield func(arrow.Record) bool) {
for _, rec := range records {
rec.Retain()
if !yield(rec) {
return
}
}
}, targetFileSize, recordLookback, recordNBytes, false)
}

// writePartitionedBatch writes a batch of records for a partitioned table.
func writePartitionedBatch(ctx context.Context, rootLocation string, fs iceio.WriteFileIO, meta *MetadataBuilder, task WriteTask, spec iceberg.PartitionSpec) (iceberg.DataFile, error) {
locProvider, err := LoadLocationProvider(rootLocation, meta.props)
if err != nil {
return nil, err
}

format := tblutils.GetFileFormat(iceberg.ParquetFile)
fileSchema := meta.CurrentSchema()
sanitized, err := iceberg.SanitizeColumnNames(fileSchema)
if err != nil {
return nil, err
}

if !sanitized.Equals(fileSchema) {
fileSchema = sanitized
}

batches := make([]arrow.Record, len(task.Batches))
for i, b := range task.Batches {
rec, err := ToRequestedSchema(ctx, fileSchema, task.Schema, b, false, true, false)
if err != nil {
return nil, err
}
batches[i] = rec
}

statsCols, err := computeStatsPlan(fileSchema, meta.props)
if err != nil {
return nil, err
}

filePath := locProvider.NewDataLocation(task.GenerateDataFileName("parquet"))

return format.WriteDataFile(ctx, fs, tblutils.WriteFileInfo{
FileSchema: fileSchema,
Spec: spec,
FileName: filePath,
StatsCols: statsCols,
WriteProps: format.GetWriteProperties(meta.props),
}, batches)
}
Loading