Skip to content

Cannot append to a partitioned table with a map(string, string) column #669

@ericseguin-northstar

Description

@ericseguin-northstar

Apache Iceberg version

iceberg-go version: v0.4.0
arrow-go version: v18.4.1

Please describe the bug 🐞

When I call tbl.Append() on a partitioned table with a map(string, string) column where the value is nullable, I get the following error:

not implemented: function 'array_take' has no kernel matching input types (map<utf8, utf8, items_nullable>, int64)

If the table is unpartitioned or if the map column is removed the operation works without issue.

To reproduce

Here's a very minimal example that reproduces the issue on my machine:

package reproduce

import (
	"database/sql"
	"testing"
	"time"

	"github.com/apache/arrow-go/v18/arrow"
	"github.com/apache/arrow-go/v18/arrow/array"
	"github.com/apache/arrow-go/v18/arrow/memory"
	"github.com/apache/iceberg-go"
	"github.com/apache/iceberg-go/catalog/rest"
	"github.com/apache/iceberg-go/table"
	"github.com/stretchr/testify/require"
	"github.com/trinodb/trino-go-client/trino"
)

func TestReproduce(t *testing.T) {
	ctx := t.Context()

	// create a table with a MAP column and an arbitrary TIMESTAMP column for partitioning
	// use Trino so we can enable partitioning
	trinoCfg := trino.Config{
		Catalog:   "<catalog name in Trino>",
		ServerURI: "<trino endpoint>",
		Schema:    "public",
	}
	trinoDSN, err := trinoCfg.FormatDSN()
	require.NoError(t, err)

	db, err := sql.Open("trino", trinoDSN)
	require.NoError(t, err)

	_, err = db.ExecContext(ctx, `
		CREATE TABLE my_table (
			date TIMESTAMP,
			my_map MAP(VARCHAR, VARCHAR)
		) WITH (
			partitioning = ARRAY['day(date)']
		)`)
	require.NoError(t, err)
	defer db.ExecContext(ctx, "DROP TABLE my_table")

	// get the Iceberg table
	cat, err := rest.NewCatalog(ctx, "rest", "<catalog REST endpoint>") // I'm using nessie
	require.NoError(t, err)

	tbl, err := cat.LoadTable(ctx, []string{"public", "my_table"})
	require.NoError(t, err)

	// print obtained schema in case it's helpful for investigation
	arrowSchema, err := table.SchemaToArrowSchema(tbl.Schema(), nil, true, false)
	require.NoError(t, err)

	t.Logf("Iceberg schema:\n%s", tbl.Schema().String())
	t.Logf("Arrow schema:\n%s", arrowSchema.String())

	// append a dummy record
	rb := array.NewRecordBuilder(memory.NewGoAllocator(), arrowSchema)
	defer rb.Release()

	rb.Field(0).(*array.TimestampBuilder).Append(arrow.Timestamp(time.Now().UnixMicro()))
	mb := rb.Field(1).(*array.MapBuilder)
	mb.Append(true)
	mb.KeyBuilder().(*array.StringBuilder).Append("key")
	mb.ItemBuilder().(*array.StringBuilder).Append("val")

	rec := rb.NewRecordBatch()
	defer rec.Release()

	rr, err := array.NewRecordReader(arrowSchema, []arrow.RecordBatch{rec})
	require.NoError(t, err)
	defer rr.Release()

	_, err = tbl.Append(ctx, rr, iceberg.Properties{})
	require.NoError(t, err) // fails here
}

This fails at the tbl.Append() call as marked:

--- FAIL: TestReproduce (0.31s)
    <...>/reproduce_test.go:74: Iceberg schema:
        table {
        	1: date: required timestamp
        	2: my_map: required map<string, string>
        }
    <...>/reproduce_test.go:75: Arrow schema:
        schema:
          fields: 2
            - date: type=timestamp[us]
              metadata: ["PARQUET:field_id": "1"]
            - my_map: type=map<utf8, utf8, items_nullable>
                metadata: ["PARQUET:field_id": "2"]
    <...>/reproduce_test.go:94: 
        	Error Trace:	<...>/reproduce_test.go:94
        	Error:      	Received unexpected error:
        	            	not implemented: function 'array_take' has no kernel matching input types (map<utf8, utf8, items_nullable>, int64)
        	Test:       	TestReproduce

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions