Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-983911 Add support for querying the vector data type #997

Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 45 additions & 4 deletions converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,10 @@
}

// snowflakeTypeToGo translates Snowflake data type to Go data type.
func snowflakeTypeToGo(dbtype snowflakeType, scale int64) reflect.Type {
switch dbtype {
func snowflakeTypeToGo(rowType execResponseRowType) reflect.Type {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure of this. This function was independent of API response and it was good - separaton of concernts. I'm wondering if some specific struct can be used here instead.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could parse the response type in the caller of this function and pass down some type of options struct with the vector metadata. However, since this function is only called in one place right now (and in the context of a request), it felt cleaner to have the function take in execResponseRowType.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that separations of layers (API/mapping) is good :) I would prefer to have API parsing closer to API functions, and this function preserve with just types at it was before.

switch getSnowflakeType(rowType.Type) {
case fixedType:
if scale == 0 {
if rowType.Scale == 0 {
return reflect.TypeOf(int64(0))
}
return reflect.TypeOf(float64(0))
Expand All @@ -108,8 +108,22 @@
return reflect.TypeOf([]byte{})
case booleanType:
return reflect.TypeOf(true)
case vectorType:
if len(rowType.Fields) != 1 {
logger.Errorf("invalid response row type for vector: %+v", rowType)
return reflect.TypeOf("")
}

Check warning on line 115 in converter.go

View check run for this annotation

Codecov / codecov/patch

converter.go#L113-L115

Added lines #L113 - L115 were not covered by tests
switch getSnowflakeType(rowType.Fields[0].Type) {
case fixedType:
return reflect.TypeOf([]int32{})
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on the documentation, in V2 there may be different number of bits per type. Does this PR cover only V1?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we only currently support 32 bit values.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we mention V1 in the commit message (so for now - in PR name only)?

case realType:
return reflect.TypeOf([]float32{})
default:
logger.Errorf("invalid element type for vector: %+v", rowType.Fields[0].Type)
return reflect.TypeOf("")

Check warning on line 123 in converter.go

View check run for this annotation

Codecov / codecov/patch

converter.go#L121-L123

Added lines #L121 - L123 were not covered by tests
}
}
logger.Errorf("unsupported dbtype is specified. %v", dbtype)
logger.Errorf("unsupported dbtype is specified. %v", rowType.Type)
return reflect.TypeOf("")
}

Expand Down Expand Up @@ -610,6 +624,33 @@
}
}
return err
case vectorType:
vectorData := srcValue.(*array.FixedSizeList)
datatype := vectorData.DataType().(*arrow.FixedSizeListType)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this work only for arrow? What should happen if a client uses ALTER SESSION SET GO_QUERY_RESULT_FORMAT = JSON?

dim := int(datatype.Len())
switch datatype.Elem().ID() {
case arrow.INT32:
values := vectorData.ListValues().(*array.Int32).Int32Values()
for i := 0; i < vectorData.Len(); i++ {
if vectorData.IsNull(i) {
destcol[i] = []int32(nil)
} else {
destcol[i] = values[i*dim : (i+1)*dim]
}
}
case arrow.FLOAT32:
values := vectorData.ListValues().(*array.Float32).Float32Values()
for i := 0; i < vectorData.Len(); i++ {
if vectorData.IsNull(i) {
destcol[i] = []float32(nil)
} else {
destcol[i] = values[i*dim : (i+1)*dim]
}
}
default:
return fmt.Errorf("unsupported element type %q for a vector", datatype.Elem().String())

Check warning on line 651 in converter.go

View check run for this annotation

Codecov / codecov/patch

converter.go#L650-L651

Added lines #L650 - L651 were not covered by tests
}
return err
}

return fmt.Errorf("unsupported data type")
Expand Down
106 changes: 85 additions & 21 deletions converter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,35 +139,41 @@ func TestGoTypeToSnowflake(t *testing.T) {
}

type tcSnowflakeTypeToGo struct {
in snowflakeType
scale int64
out reflect.Type
in execResponseRowType
out reflect.Type
}

func TestSnowflakeTypeToGo(t *testing.T) {
testcases := []tcSnowflakeTypeToGo{
{in: fixedType, scale: 0, out: reflect.TypeOf(int64(0))},
{in: fixedType, scale: 2, out: reflect.TypeOf(float64(0))},
{in: realType, scale: 0, out: reflect.TypeOf(float64(0))},
{in: textType, scale: 0, out: reflect.TypeOf("")},
{in: dateType, scale: 0, out: reflect.TypeOf(time.Now())},
{in: timeType, scale: 0, out: reflect.TypeOf(time.Now())},
{in: timestampLtzType, scale: 0, out: reflect.TypeOf(time.Now())},
{in: timestampNtzType, scale: 0, out: reflect.TypeOf(time.Now())},
{in: timestampTzType, scale: 0, out: reflect.TypeOf(time.Now())},
{in: objectType, scale: 0, out: reflect.TypeOf("")},
{in: variantType, scale: 0, out: reflect.TypeOf("")},
{in: arrayType, scale: 0, out: reflect.TypeOf("")},
{in: binaryType, scale: 0, out: reflect.TypeOf([]byte{})},
{in: booleanType, scale: 0, out: reflect.TypeOf(true)},
{in: sliceType, scale: 0, out: reflect.TypeOf("")},
{in: execResponseRowType{Type: "FIXED"}, out: reflect.TypeOf(int64(0))},
{in: execResponseRowType{Type: "FIXED", Scale: 2}, out: reflect.TypeOf(float64(0))},
{in: execResponseRowType{Type: "REAL"}, out: reflect.TypeOf(float64(0))},
{in: execResponseRowType{Type: "TEXT"}, out: reflect.TypeOf("")},
{in: execResponseRowType{Type: "TIME"}, out: reflect.TypeOf(time.Now())},
{in: execResponseRowType{Type: "TIME"}, out: reflect.TypeOf(time.Now())},
{in: execResponseRowType{Type: "TIMESTAMP_LTZ"}, out: reflect.TypeOf(time.Now())},
{in: execResponseRowType{Type: "TIMESTAMP_NTZ"}, out: reflect.TypeOf(time.Now())},
{in: execResponseRowType{Type: "TIMESTAMP_TZ"}, out: reflect.TypeOf(time.Now())},
{in: execResponseRowType{Type: "OBJECT"}, out: reflect.TypeOf("")},
{in: execResponseRowType{Type: "VARIANT"}, out: reflect.TypeOf("")},
{in: execResponseRowType{Type: "ARRAY"}, out: reflect.TypeOf("")},
{in: execResponseRowType{Type: "BINARY"}, out: reflect.TypeOf([]byte{})},
{in: execResponseRowType{Type: "BOOLEAN"}, out: reflect.TypeOf(true)},
{in: execResponseRowType{Type: "SLICE"}, out: reflect.TypeOf("")},
{
in: execResponseRowType{Type: "VECTOR", Fields: []execResponseRowFieldType{{Type: "FIXED"}}},
out: reflect.TypeOf([]int32{}),
},
{
in: execResponseRowType{Type: "VECTOR", Fields: []execResponseRowFieldType{{Type: "REAL"}}},
out: reflect.TypeOf([]float32{}),
},
}
for _, test := range testcases {
t.Run(fmt.Sprintf("%v_%v", test.in, test.out), func(t *testing.T) {
a := snowflakeTypeToGo(test.in, test.scale)
a := snowflakeTypeToGo(test.in)
if a != test.out {
t.Errorf("failed. in: %v, scale: %v, expected: %v, got: %v",
test.in, test.scale, test.out, a)
t.Errorf("failed. in: %v, expected: %v, got: %v", test.in, test.out, a)
}
})
}
Expand Down Expand Up @@ -849,6 +855,64 @@ func TestArrowToValue(t *testing.T) {
return -1
},
},
{
logical: "vector",
values: [][]int32{nil, {1, 2, 3}},
builder: array.NewFixedSizeListBuilder(pool, 3, &arrow.Int32Type{}),
append: func(b array.Builder, vs interface{}) {
for _, v := range vs.([][]int32) {
lb := b.(*array.FixedSizeListBuilder)
vb := lb.ValueBuilder().(*array.Int32Builder)
if len(v) == 0 {
lb.AppendNull()
vb.AppendValues([]int32{-1, -1, -1}, nil)
continue
}

lb.Append(true)
for _, e := range v {
vb.Append(e)
}
}
},
compare: func(src interface{}, dst []snowflakeValue) int {
for i, v := range src.([][]int32) {
if !reflect.DeepEqual(v, dst[i]) {
return i
}
}
return -1
},
},
{
logical: "vector",
values: [][]float32{nil, {1.1, 2.2, 3}},
builder: array.NewFixedSizeListBuilder(pool, 3, &arrow.Float32Type{}),
append: func(b array.Builder, vs interface{}) {
for _, v := range vs.([][]float32) {
lb := b.(*array.FixedSizeListBuilder)
vb := lb.ValueBuilder().(*array.Float32Builder)
if len(v) == 0 {
lb.AppendNull()
vb.AppendValues([]float32{-1, -1, -1}, nil)
continue
}

lb.Append(true)
for _, e := range v {
vb.Append(e)
}
}
},
compare: func(src interface{}, dst []snowflakeValue) int {
for i, v := range src.([][]float32) {
if !reflect.DeepEqual(v, dst[i]) {
return i
}
}
return -1
},
},
} {
testName := tc.logical
if tc.physical != "" {
Expand Down
38 changes: 38 additions & 0 deletions datatype.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
binaryType
timeType
booleanType
vectorType
// the following are not snowflake types per se but internal types
nullType
sliceType
Expand All @@ -47,6 +48,7 @@
"BINARY": binaryType,
"TIME": timeType,
"BOOLEAN": booleanType,
"VECTOR": vectorType,
"NULL": nullType,
"SLICE": sliceType,
"CHANGE_TYPE": changeType,
Expand Down Expand Up @@ -104,6 +106,8 @@
DataTypeTime = []byte{timeType.Byte()}
// DataTypeBoolean is a BOOLEAN datatype.
DataTypeBoolean = []byte{booleanType.Byte()}
// DataTypeVector is a VECTOR datatype.
DataTypeVector = []byte{vectorType.Byte()}
)

// dataTypeMode returns the subsequent data type in a string representation.
Expand Down Expand Up @@ -131,6 +135,40 @@
return tsmode, nil
}

type vectorElements interface {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering about one thing. In the future we are expected to implement structured types, which contains arrays. Array is a very similar structure to vector, just more generic I'd say. Maybe we can prepare the code here to use these arrays instead of native backend type as vector?

~int32 | ~float32
}

// SQLVector is a wrapper type used to support deserializing SQL values into slices
// in database/sql scans. Cast slice pointers as *SQLVector[T] when passing them to
// a database/sql Scan method. The slice will be populated with the corresponding
// column value when the scan completes.
//
// Here is an example:
//
// var v []int32
// err := rows.Scan((*SQLVector[int32])(&v))
type SQLVector[T vectorElements] []T

// Vector is syntactic sugar for wrapping slices in SQLVector[t] so that they
// can be deserialized in database/sql scans.
//
// Here is an example:
//
// var v []int32
// err := rows.Scan(Vector(&v))
func Vector[T vectorElements](value *[]T) *SQLVector[T] {
return (*SQLVector[T])(value)
}

func (v *SQLVector[T]) Scan(src any) error {
if vec, ok := src.([]T); ok {
*v = vec
return nil
}
return fmt.Errorf("cannot convert %T to a vector of type %T", src, *v)

Check warning on line 169 in datatype.go

View check run for this annotation

Codecov / codecov/patch

datatype.go#L169

Added line #L169 was not covered by tests
}

// SnowflakeParameter includes the columns output from SHOW PARAMETER command.
type SnowflakeParameter struct {
Key string
Expand Down
43 changes: 43 additions & 0 deletions driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"net/url"
"os"
"os/signal"
"reflect"
"strings"
"syscall"
"testing"
Expand Down Expand Up @@ -1263,6 +1264,48 @@ func testArray(t *testing.T, json bool) {
})
}

func TestVector(t *testing.T) {
sfc-gh-rpanchapakesan marked this conversation as resolved.
Show resolved Hide resolved
testVector(t, false)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are lacking the test for JSON, right?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added tests for JSON to old_driver_test.go.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe that adding it close to the Arrow tests is better for now.

}

func testVector(t *testing.T, json bool) {
sfc-gh-pfus marked this conversation as resolved.
Show resolved Hide resolved
runDBTest(t, func(dbt *DBTest) {
if json {
dbt.mustExec(forceJSON)
}
// dbt.mustExec(`create temporary table test_vector (i vector(int,3), f vector(float,3))`)
sfc-gh-rpanchapakesan marked this conversation as resolved.
Show resolved Hide resolved
rowsInt := dbt.mustQuery(`select [1,2,3]::vector(int, 3)`)
defer rowsInt.Close()
if ok := rowsInt.Next(); !ok {
t.Error("no rows")
}

var gotInt []int32
if err := rowsInt.Scan(Vector(&gotInt)); err != nil {
t.Error(err)
}
wantInt := []int32{1, 2, 3}
if !reflect.DeepEqual(gotInt, wantInt) {
t.Errorf("incorrect vector deserialized: got %v, want %v", gotInt, wantInt)
}

rowsFloat := dbt.mustQuery(`select [1.1,2.2,3,4,5]::vector(float, 5)`)
sfc-gh-rpanchapakesan marked this conversation as resolved.
Show resolved Hide resolved
defer rowsFloat.Close()
if ok := rowsFloat.Next(); !ok {
t.Error("no rows")
}

var gotFloat []float32
if err := rowsFloat.Scan(Vector(&gotFloat)); err != nil {
t.Error(err)
}
wantFloat := []float32{1.1, 2.2, 3, 4, 5}
if !reflect.DeepEqual(gotFloat, wantFloat) {
t.Errorf("incorrect vector deserialized: got %v, want %v", gotFloat, wantFloat)
}
})
}

func TestLargeSetResult(t *testing.T) {
CustomJSONDecoderEnabled = false
testLargeSetResult(t, 100000, false)
Expand Down
26 changes: 19 additions & 7 deletions query.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,26 @@ type contextData struct {
Base64Data string `json:"base64Data,omitempty"`
}

type execResponseRowFieldType struct {
ByteLength int64 `json:"byteLength"`
Length int64 `json:"length"`
Type string `json:"type"`
Precision int64 `json:"precision"`
Scale int64 `json:"scale"`
Nullable bool `json:"nullable"`
Fields []execResponseRowFieldType `json:"fields"`
}

type execResponseRowType struct {
Name string `json:"name"`
ByteLength int64 `json:"byteLength"`
Length int64 `json:"length"`
Type string `json:"type"`
Precision int64 `json:"precision"`
Scale int64 `json:"scale"`
Nullable bool `json:"nullable"`
Name string `json:"name"`
ByteLength int64 `json:"byteLength"`
Length int64 `json:"length"`
Type string `json:"type"`
Precision int64 `json:"precision"`
Scale int64 `json:"scale"`
Nullable bool `json:"nullable"`
VectorDimension int `json:"vectorDimension"`
Fields []execResponseRowFieldType `json:"fields"`
}

type execResponseChunk struct {
Expand Down
4 changes: 1 addition & 3 deletions rows.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,7 @@ func (rows *snowflakeRows) ColumnTypeScanType(index int) reflect.Type {
if err := rows.waitForAsyncQueryStatus(); err != nil {
return nil
}
return snowflakeTypeToGo(
getSnowflakeType(rows.ChunkDownloader.getRowType()[index].Type),
rows.ChunkDownloader.getRowType()[index].Scale)
return snowflakeTypeToGo(rows.ChunkDownloader.getRowType()[index])
}

func (rows *snowflakeRows) GetQueryID() string {
Expand Down
Loading