diff --git a/common/parser/block.go b/common/parser/block.go index c0374d9..705ab7d 100644 --- a/common/parser/block.go +++ b/common/parser/block.go @@ -3,6 +3,7 @@ package parser import ( "database/sql/driver" "math" + "unicode/utf8" "unsafe" "github.com/taosdata/driver-go/v3/common" @@ -108,28 +109,9 @@ type rawConvertFunc func(pStart unsafe.Pointer, row int, arg ...interface{}) dri type rawConvertVarDataFunc func(pHeader, pStart unsafe.Pointer, row int) driver.Value -var rawConvertFuncMap = map[uint8]rawConvertFunc{ - uint8(common.TSDB_DATA_TYPE_BOOL): rawConvertBool, - uint8(common.TSDB_DATA_TYPE_TINYINT): rawConvertTinyint, - uint8(common.TSDB_DATA_TYPE_SMALLINT): rawConvertSmallint, - uint8(common.TSDB_DATA_TYPE_INT): rawConvertInt, - uint8(common.TSDB_DATA_TYPE_BIGINT): rawConvertBigint, - uint8(common.TSDB_DATA_TYPE_UTINYINT): rawConvertUTinyint, - uint8(common.TSDB_DATA_TYPE_USMALLINT): rawConvertUSmallint, - uint8(common.TSDB_DATA_TYPE_UINT): rawConvertUInt, - uint8(common.TSDB_DATA_TYPE_UBIGINT): rawConvertUBigint, - uint8(common.TSDB_DATA_TYPE_FLOAT): rawConvertFloat, - uint8(common.TSDB_DATA_TYPE_DOUBLE): rawConvertDouble, - uint8(common.TSDB_DATA_TYPE_TIMESTAMP): rawConvertTime, -} - -var rawConvertVarDataMap = map[uint8]rawConvertVarDataFunc{ - uint8(common.TSDB_DATA_TYPE_BINARY): rawConvertBinary, - uint8(common.TSDB_DATA_TYPE_NCHAR): rawConvertNchar, - uint8(common.TSDB_DATA_TYPE_JSON): rawConvertJson, - uint8(common.TSDB_DATA_TYPE_VARBINARY): rawConvertVarBinary, - uint8(common.TSDB_DATA_TYPE_GEOMETRY): rawConvertGeometry, -} +var rawConvertFuncSlice = [15]rawConvertFunc{} + +var rawConvertVarDataSlice = [21]rawConvertVarDataFunc{} func ItemIsNull(pHeader unsafe.Pointer, row int) bool { offset := CharOffset(row) @@ -196,20 +178,23 @@ func rawConvertTime(pStart unsafe.Pointer, row int, arg ...interface{}) driver.V } func rawConvertVarBinary(pHeader, pStart unsafe.Pointer, row int) driver.Value { + return rawGetBytes(pHeader, pStart, row) +} + +func rawGetBytes(pHeader, pStart unsafe.Pointer, row int) []byte { offset := *((*int32)(pointer.AddUintptr(pHeader, uintptr(row*4)))) if offset == -1 { return nil } currentRow := pointer.AddUintptr(pStart, uintptr(offset)) clen := *((*uint16)(currentRow)) - currentRow = unsafe.Pointer(uintptr(currentRow) + 2) - - binaryVal := make([]byte, clen) - - for index := uint16(0); index < clen; index++ { - binaryVal[index] = *((*byte)(unsafe.Pointer(uintptr(currentRow) + uintptr(index)))) + if clen == 0 { + return make([]byte, 0) } - return binaryVal[:] + currentRow = pointer.AddUintptr(currentRow, 2) + result := make([]byte, clen) + Copy(currentRow, result, 0, int(clen)) + return result } func rawConvertGeometry(pHeader, pStart unsafe.Pointer, row int) driver.Value { @@ -217,20 +202,11 @@ func rawConvertGeometry(pHeader, pStart unsafe.Pointer, row int) driver.Value { } func rawConvertBinary(pHeader, pStart unsafe.Pointer, row int) driver.Value { - offset := *((*int32)(pointer.AddUintptr(pHeader, uintptr(row*4)))) - if offset == -1 { + result := rawGetBytes(pHeader, pStart, row) + if result == nil { return nil } - currentRow := pointer.AddUintptr(pStart, uintptr(offset)) - clen := *((*uint16)(currentRow)) - currentRow = unsafe.Pointer(uintptr(currentRow) + 2) - - binaryVal := make([]byte, clen) - - for index := uint16(0); index < clen; index++ { - binaryVal[index] = *((*byte)(unsafe.Pointer(uintptr(currentRow) + uintptr(index)))) - } - return string(binaryVal[:]) + return *(*string)(unsafe.Pointer(&result)) } func rawConvertNchar(pHeader, pStart unsafe.Pointer, row int) driver.Value { @@ -240,31 +216,22 @@ func rawConvertNchar(pHeader, pStart unsafe.Pointer, row int) driver.Value { } currentRow := pointer.AddUintptr(pStart, uintptr(offset)) clen := *((*uint16)(currentRow)) / 4 + if clen == 0 { + return "" + } currentRow = unsafe.Pointer(uintptr(currentRow) + 2) - - binaryVal := make([]rune, clen) - - for index := uint16(0); index < clen; index++ { - binaryVal[index] = *((*rune)(unsafe.Pointer(uintptr(currentRow) + uintptr(index*4)))) + utf8Bytes := make([]byte, clen*utf8.UTFMax) + index := 0 + utf32Slice := (*[1 << 30]rune)(currentRow)[:clen:clen] + for _, runeValue := range utf32Slice { + index += utf8.EncodeRune(utf8Bytes[index:], runeValue) } - return string(binaryVal) + utf8Bytes = utf8Bytes[:index] + return *(*string)(unsafe.Pointer(&utf8Bytes)) } func rawConvertJson(pHeader, pStart unsafe.Pointer, row int) driver.Value { - offset := *((*int32)(pointer.AddUintptr(pHeader, uintptr(row*4)))) - if offset == -1 { - return nil - } - currentRow := pointer.AddUintptr(pStart, uintptr(offset)) - clen := *((*uint16)(currentRow)) - currentRow = pointer.AddUintptr(currentRow, 2) - - binaryVal := make([]byte, clen) - - for index := uint16(0); index < clen; index++ { - binaryVal[index] = *((*byte)(pointer.AddUintptr(currentRow, uintptr(index)))) - } - return binaryVal[:] + return rawConvertVarBinary(pHeader, pStart, row) } func ReadBlockSimple(block unsafe.Pointer, precision int) [][]driver.Value { @@ -290,7 +257,7 @@ func ReadBlock(block unsafe.Pointer, blockSize int, colTypes []uint8, precision for column := 0; column < colCount; column++ { colLength := *((*int32)(pointer.AddUintptr(block, lengthOffset+uintptr(column)*Int32Size))) if IsVarDataType(colTypes[column]) { - convertF := rawConvertVarDataMap[colTypes[column]] + convertF := rawConvertVarDataSlice[colTypes[column]] pStart = pointer.AddUintptr(pHeader, Int32Size*uintptr(blockSize)) for row := 0; row < blockSize; row++ { if column == 0 { @@ -299,7 +266,7 @@ func ReadBlock(block unsafe.Pointer, blockSize int, colTypes []uint8, precision r[row][column] = convertF(pHeader, pStart, row) } } else { - convertF := rawConvertFuncMap[colTypes[column]] + convertF := rawConvertFuncSlice[colTypes[column]] pStart = pointer.AddUintptr(pHeader, nullBitMapOffset) for row := 0; row < blockSize; row++ { if column == 0 { @@ -326,11 +293,11 @@ func ReadRow(dest []driver.Value, block unsafe.Pointer, blockSize int, row int, for column := 0; column < colCount; column++ { colLength := *((*int32)(pointer.AddUintptr(block, lengthOffset+uintptr(column)*Int32Size))) if IsVarDataType(colTypes[column]) { - convertF := rawConvertVarDataMap[colTypes[column]] + convertF := rawConvertVarDataSlice[colTypes[column]] pStart = pointer.AddUintptr(pHeader, Int32Size*uintptr(blockSize)) dest[column] = convertF(pHeader, pStart, row) } else { - convertF := rawConvertFuncMap[colTypes[column]] + convertF := rawConvertFuncSlice[colTypes[column]] pStart = pointer.AddUintptr(pHeader, nullBitMapOffset) if ItemIsNull(pHeader, row) { dest[column] = nil @@ -352,7 +319,7 @@ func ReadBlockWithTimeFormat(block unsafe.Pointer, blockSize int, colTypes []uin for column := 0; column < colCount; column++ { colLength := *((*int32)(pointer.AddUintptr(block, lengthOffset+uintptr(column)*Int32Size))) if IsVarDataType(colTypes[column]) { - convertF := rawConvertVarDataMap[colTypes[column]] + convertF := rawConvertVarDataSlice[colTypes[column]] pStart = pointer.AddUintptr(pHeader, uintptr(4*blockSize)) for row := 0; row < blockSize; row++ { if column == 0 { @@ -361,7 +328,7 @@ func ReadBlockWithTimeFormat(block unsafe.Pointer, blockSize int, colTypes []uin r[row][column] = convertF(pHeader, pStart, row) } } else { - convertF := rawConvertFuncMap[colTypes[column]] + convertF := rawConvertFuncSlice[colTypes[column]] pStart = pointer.AddUintptr(pHeader, nullBitMapOffset) for row := 0; row < blockSize; row++ { if column == 0 { @@ -381,12 +348,33 @@ func ReadBlockWithTimeFormat(block unsafe.Pointer, blockSize int, colTypes []uin func ItemRawBlock(colType uint8, pHeader, pStart unsafe.Pointer, row int, precision int, timeFormat FormatTimeFunc) driver.Value { if IsVarDataType(colType) { - return rawConvertVarDataMap[colType](pHeader, pStart, row) + return rawConvertVarDataSlice[colType](pHeader, pStart, row) } else { if ItemIsNull(pHeader, row) { return nil } else { - return rawConvertFuncMap[colType](pStart, row, precision, timeFormat) + return rawConvertFuncSlice[colType](pStart, row, precision, timeFormat) } } } + +func init() { + rawConvertFuncSlice[uint8(common.TSDB_DATA_TYPE_BOOL)] = rawConvertBool + rawConvertFuncSlice[uint8(common.TSDB_DATA_TYPE_TINYINT)] = rawConvertTinyint + rawConvertFuncSlice[uint8(common.TSDB_DATA_TYPE_SMALLINT)] = rawConvertSmallint + rawConvertFuncSlice[uint8(common.TSDB_DATA_TYPE_INT)] = rawConvertInt + rawConvertFuncSlice[uint8(common.TSDB_DATA_TYPE_BIGINT)] = rawConvertBigint + rawConvertFuncSlice[uint8(common.TSDB_DATA_TYPE_UTINYINT)] = rawConvertUTinyint + rawConvertFuncSlice[uint8(common.TSDB_DATA_TYPE_USMALLINT)] = rawConvertUSmallint + rawConvertFuncSlice[uint8(common.TSDB_DATA_TYPE_UINT)] = rawConvertUInt + rawConvertFuncSlice[uint8(common.TSDB_DATA_TYPE_UBIGINT)] = rawConvertUBigint + rawConvertFuncSlice[uint8(common.TSDB_DATA_TYPE_FLOAT)] = rawConvertFloat + rawConvertFuncSlice[uint8(common.TSDB_DATA_TYPE_DOUBLE)] = rawConvertDouble + rawConvertFuncSlice[uint8(common.TSDB_DATA_TYPE_TIMESTAMP)] = rawConvertTime + + rawConvertVarDataSlice[uint8(common.TSDB_DATA_TYPE_BINARY)] = rawConvertBinary + rawConvertVarDataSlice[uint8(common.TSDB_DATA_TYPE_NCHAR)] = rawConvertNchar + rawConvertVarDataSlice[uint8(common.TSDB_DATA_TYPE_JSON)] = rawConvertJson + rawConvertVarDataSlice[uint8(common.TSDB_DATA_TYPE_VARBINARY)] = rawConvertVarBinary + rawConvertVarDataSlice[uint8(common.TSDB_DATA_TYPE_GEOMETRY)] = rawConvertGeometry +} diff --git a/common/parser/mem.go b/common/parser/mem.go new file mode 100644 index 0000000..f0d4b00 --- /dev/null +++ b/common/parser/mem.go @@ -0,0 +1,12 @@ +package parser + +import "unsafe" + +//go:noescape +func memmove(to, from unsafe.Pointer, n uintptr) + +//go:linkname memmove runtime.memmove + +func Copy(source unsafe.Pointer, data []byte, index int, length int) { + memmove(unsafe.Pointer(&data[index]), source, uintptr(length)) +} diff --git a/common/parser/mem.s b/common/parser/mem.s new file mode 100644 index 0000000..e69de29 diff --git a/common/parser/mem_test.go b/common/parser/mem_test.go new file mode 100644 index 0000000..d3e244b --- /dev/null +++ b/common/parser/mem_test.go @@ -0,0 +1,20 @@ +package parser + +import ( + "testing" + "unsafe" + + "github.com/stretchr/testify/assert" +) + +func TestCopy(t *testing.T) { + data := []byte("World") + data1 := make([]byte, 10) + data1[0] = 'H' + data1[1] = 'e' + data1[2] = 'l' + data1[3] = 'l' + data1[4] = 'o' + Copy(unsafe.Pointer(&data[0]), data1, 5, 5) + assert.Equal(t, "HelloWorld", string(data1)) +} diff --git a/taosSql/rows.go b/taosSql/rows.go index aaf684d..54b61a8 100644 --- a/taosSql/rows.go +++ b/taosSql/rows.go @@ -19,7 +19,6 @@ type rows struct { block unsafe.Pointer blockOffset int blockSize int - lengthList []int result unsafe.Pointer precision int isStmt bool @@ -107,7 +106,6 @@ func (rs *rows) taosFetchBlock() error { } rs.blockSize = result.N rs.block = wrapper.TaosGetRawBlock(result.Res) - rs.lengthList = wrapper.FetchLengths(rs.result, len(rs.rowsHeader.ColLength)) rs.blockOffset = 0 return nil } diff --git a/taosWS/connection.go b/taosWS/connection.go index f959c0a..c465815 100644 --- a/taosWS/connection.go +++ b/taosWS/connection.go @@ -38,6 +38,11 @@ const ( STMTUseResult = "use_result" ) +const ( + BinaryQueryMessage uint64 = 6 + FetchRawBlockMessage uint64 = 7 +) + var ( NotQueryError = errors.New("sql is an update statement not a query statement") ReadTimeoutError = errors.New("read timeout") @@ -347,6 +352,18 @@ func WriteUint64(buffer *bytes.Buffer, v uint64) { buffer.WriteByte(byte(v >> 56)) } +func WriteUint32(buffer *bytes.Buffer, v uint32) { + buffer.WriteByte(byte(v)) + buffer.WriteByte(byte(v >> 8)) + buffer.WriteByte(byte(v >> 16)) + buffer.WriteByte(byte(v >> 24)) +} + +func WriteUint16(buffer *bytes.Buffer, v uint16) { + buffer.WriteByte(byte(v)) + buffer.WriteByte(byte(v >> 8)) +} + func (tc *taosConn) stmtAddBatch(stmtID uint64) error { reqID := tc.generateReqID() req := &StmtAddBatchRequest{ @@ -467,45 +484,8 @@ func (tc *taosConn) ExecContext(ctx context.Context, query string, args []driver return tc.execCtx(ctx, query, args) } -func (tc *taosConn) execCtx(_ context.Context, query string, args []driver.NamedValue) (driver.Result, error) { - if tc.isClosed() { - return nil, driver.ErrBadConn - } - if len(args) != 0 { - if !tc.cfg.interpolateParams { - return nil, driver.ErrSkip - } - // try to interpolate the parameters to save extra round trips for preparing and closing a statement - prepared, err := common.InterpolateParams(query, args) - if err != nil { - return nil, err - } - query = prepared - } - reqID := tc.generateReqID() - req := &WSQueryReq{ - ReqID: reqID, - SQL: query, - } - reqArgs, err := json.Marshal(req) - if err != nil { - return nil, err - } - action := &WSAction{ - Action: WSQuery, - Args: reqArgs, - } - tc.buf.Reset() - err = jsonI.NewEncoder(tc.buf).Encode(action) - if err != nil { - return nil, err - } - err = tc.writeText(tc.buf.Bytes()) - if err != nil { - return nil, err - } - var resp WSQueryResp - err = tc.readTo(&resp) +func (tc *taosConn) execCtx(ctx context.Context, query string, args []driver.NamedValue) (driver.Result, error) { + resp, err := tc.doQuery(ctx, query, args) if err != nil { return nil, err } @@ -523,7 +503,31 @@ func (tc *taosConn) QueryContext(ctx context.Context, query string, args []drive return tc.queryCtx(ctx, query, args) } -func (tc *taosConn) queryCtx(_ context.Context, query string, args []driver.NamedValue) (driver.Rows, error) { +func (tc *taosConn) queryCtx(ctx context.Context, query string, args []driver.NamedValue) (driver.Rows, error) { + resp, err := tc.doQuery(ctx, query, args) + if err != nil { + return nil, err + } + if resp.Code != 0 { + return nil, taosErrors.NewError(resp.Code, resp.Message) + } + if resp.IsUpdate { + return nil, NotQueryError + } + rs := &rows{ + buf: &bytes.Buffer{}, + conn: tc, + resultID: resp.ID, + fieldsCount: resp.FieldsCount, + fieldsNames: resp.FieldsNames, + fieldsTypes: resp.FieldsTypes, + fieldsLengths: resp.FieldsLengths, + precision: resp.Precision, + } + return rs, err +} + +func (tc *taosConn) doQuery(_ context.Context, query string, args []driver.NamedValue) (*WSQueryResp, error) { if tc.isClosed() { return nil, driver.ErrBadConn } @@ -539,24 +543,15 @@ func (tc *taosConn) queryCtx(_ context.Context, query string, args []driver.Name query = prepared } reqID := tc.generateReqID() - req := &WSQueryReq{ - ReqID: reqID, - SQL: query, - } - reqArgs, err := json.Marshal(req) - if err != nil { - return nil, err - } - action := &WSAction{ - Action: WSQuery, - Args: reqArgs, - } tc.buf.Reset() - err = jsonI.NewEncoder(tc.buf).Encode(action) - if err != nil { - return nil, err - } - err = tc.writeText(tc.buf.Bytes()) + + WriteUint64(tc.buf, reqID) // req id + WriteUint64(tc.buf, 0) // message id + WriteUint64(tc.buf, BinaryQueryMessage) + WriteUint16(tc.buf, 1) // version + WriteUint32(tc.buf, uint32(len(query))) // sql length + tc.buf.WriteString(query) + err := tc.writeBinary(tc.buf.Bytes()) if err != nil { return nil, err } @@ -565,23 +560,7 @@ func (tc *taosConn) queryCtx(_ context.Context, query string, args []driver.Name if err != nil { return nil, err } - if resp.Code != 0 { - return nil, taosErrors.NewError(resp.Code, resp.Message) - } - if resp.IsUpdate { - return nil, NotQueryError - } - rs := &rows{ - buf: &bytes.Buffer{}, - conn: tc, - resultID: resp.ID, - fieldsCount: resp.FieldsCount, - fieldsNames: resp.FieldsNames, - fieldsTypes: resp.FieldsTypes, - fieldsLengths: resp.FieldsLengths, - precision: resp.Precision, - } - return rs, err + return &resp, nil } func (tc *taosConn) Ping(ctx context.Context) (err error) { diff --git a/taosWS/connection_test.go b/taosWS/connection_test.go index 6aee030..e93d710 100644 --- a/taosWS/connection_test.go +++ b/taosWS/connection_test.go @@ -46,3 +46,29 @@ func Test_formatBytes(t *testing.T) { }) } } + +func TestBadConnection(t *testing.T) { + defer func() { + if r := recover(); r != nil { + // bad connection should not panic + t.Fatalf("panic: %v", r) + } + }() + + cfg, err := parseDSN(dataSourceName) + if err != nil { + t.Fatalf("parseDSN error: %v", err) + } + conn, err := newTaosConn(cfg) + if err != nil { + t.Fatalf("newTaosConn error: %v", err) + } + + // to test bad connection, we manually close the connection + conn.Close() + + _, err = conn.Query("select 1", nil) + if err == nil { + t.Fatalf("query should fail") + } +} diff --git a/taosWS/rows.go b/taosWS/rows.go index 7677ab7..636f54c 100644 --- a/taosWS/rows.go +++ b/taosWS/rows.go @@ -3,14 +3,15 @@ package taosWS import ( "bytes" "database/sql/driver" + "encoding/binary" "encoding/json" + "fmt" "io" "reflect" "unsafe" "github.com/taosdata/driver-go/v3/common" "github.com/taosdata/driver-go/v3/common/parser" - "github.com/taosdata/driver-go/v3/common/pointer" taosErrors "github.com/taosdata/driver-go/v3/errors" ) @@ -86,75 +87,53 @@ func (rs *rows) Next(dest []driver.Value) error { func (rs *rows) taosFetchBlock() error { reqID := rs.conn.generateReqID() - req := &WSFetchReq{ - ReqID: reqID, - ID: rs.resultID, - } - args, err := json.Marshal(req) - if err != nil { - return err - } - action := &WSAction{ - Action: WSFetch, - Args: args, - } rs.buf.Reset() - - err = jsonI.NewEncoder(rs.buf).Encode(action) + WriteUint64(rs.buf, reqID) // req id + WriteUint64(rs.buf, rs.resultID) // message id + WriteUint64(rs.buf, FetchRawBlockMessage) + WriteUint16(rs.buf, 1) // version + err := rs.conn.writeBinary(rs.buf.Bytes()) if err != nil { return err } - err = rs.conn.writeText(rs.buf.Bytes()) + respBytes, err := rs.conn.readBytes() if err != nil { return err } - var resp WSFetchResp - err = rs.conn.readTo(&resp) - if err != nil { - return err + if len(respBytes) < 51 { + return taosErrors.NewError(0xffff, "invalid fetch raw block response") + } + version := binary.LittleEndian.Uint16(respBytes[16:]) + if version != 1 { + return taosErrors.NewError(0xffff, fmt.Sprintf("unsupported fetch raw block version: %d", version)) + } + code := binary.LittleEndian.Uint32(respBytes[34:]) + msgLen := int(binary.LittleEndian.Uint32(respBytes[38:])) + if len(respBytes) < 51+msgLen { + return taosErrors.NewError(0xffff, "invalid fetch raw block response") } - if resp.Code != 0 { - return taosErrors.NewError(resp.Code, resp.Message) + errMsg := string(respBytes[42 : 42+msgLen]) + if code != 0 { + return taosErrors.NewError(int(code), errMsg) } - if resp.Completed { + completed := respBytes[50+msgLen] == 1 + if completed { rs.blockSize = 0 return nil } else { - rs.blockSize = resp.Rows - return rs.fetchBlock() - } -} - -func (rs *rows) fetchBlock() error { - reqID := rs.conn.generateReqID() - req := &WSFetchBlockReq{ - ReqID: reqID, - ID: rs.resultID, - } - args, err := json.Marshal(req) - if err != nil { - return err - } - action := &WSAction{ - Action: WSFetchBlock, - Args: args, - } - rs.buf.Reset() - err = jsonI.NewEncoder(rs.buf).Encode(action) - if err != nil { - return err - } - err = rs.conn.writeText(rs.buf.Bytes()) - if err != nil { - return err - } - respBytes, err := rs.conn.readBytes() - if err != nil { - return err + if len(respBytes) < 55+msgLen { + return taosErrors.NewError(0xffff, "invalid fetch raw block response") + } + blockLength := binary.LittleEndian.Uint32(respBytes[51+msgLen:]) + if len(respBytes) < 55+msgLen+int(blockLength) { + return taosErrors.NewError(0xffff, "invalid fetch raw block response") + } + rawBlock := respBytes[55+msgLen : 55+msgLen+int(blockLength)] + rs.block = rawBlock + rs.blockPtr = unsafe.Pointer(&rs.block[0]) + rs.blockSize = int(parser.RawBlockGetNumOfRows(rs.blockPtr)) + rs.blockOffset = 0 } - rs.block = respBytes - rs.blockPtr = pointer.AddUintptr(unsafe.Pointer(&rs.block[0]), 16) - rs.blockOffset = 0 return nil } diff --git a/ws/stmt/connector.go b/ws/stmt/connector.go index c33e57b..0cb2d39 100644 --- a/ws/stmt/connector.go +++ b/ws/stmt/connector.go @@ -3,6 +3,7 @@ package stmt import ( "container/list" "context" + "encoding/binary" "errors" "fmt" "net/url" @@ -130,6 +131,7 @@ func NewConnector(config *Config) (*Connector, error) { } wsClient.TextMessageHandler = connector.handleTextMessage + wsClient.BinaryMessageHandler = connector.handleBinaryMessage wsClient.ErrorHandler = connector.handleError go wsClient.WritePump() go wsClient.ReadPump() @@ -159,6 +161,17 @@ func (c *Connector) handleTextMessage(message []byte) { c.listLock.Unlock() } +func (c *Connector) handleBinaryMessage(message []byte) { + reqID := binary.LittleEndian.Uint64(message[8:16]) + c.listLock.Lock() + element := c.findOutChanByID(reqID) + if element != nil { + element.Value.(*IndexedChan).channel <- message + c.sendChanList.Remove(element) + } + c.listLock.Unlock() +} + type IndexedChan struct { index uint64 channel chan []byte diff --git a/ws/stmt/proto.go b/ws/stmt/proto.go index 2fed0ab..b5dc92d 100644 --- a/ws/stmt/proto.go +++ b/ws/stmt/proto.go @@ -15,6 +15,10 @@ const ( STMTAddBatch = "add_batch" STMTExec = "exec" STMTClose = "close" + STMTUseResult = "use_result" + WSFetch = "fetch" + WSFetchBlock = "fetch_block" + WSFreeResult = "free_result" ) type ConnectReq struct { @@ -134,3 +138,50 @@ type CloseReq struct { ReqID uint64 `json:"req_id"` StmtID uint64 `json:"stmt_id"` } + +type UseResultReq struct { + ReqID uint64 `json:"req_id"` + StmtID uint64 `json:"stmt_id"` +} + +type UseResultResp struct { + Code int `json:"code"` + Message string `json:"message"` + Action string `json:"action"` + ReqID uint64 `json:"req_id"` + Timing int64 `json:"timing"` + StmtID uint64 `json:"stmt_id"` + ResultID uint64 `json:"result_id"` + FieldsCount int `json:"fields_count"` + FieldsNames []string `json:"fields_names"` + FieldsTypes []uint8 `json:"fields_types"` + FieldsLengths []int64 `json:"fields_lengths"` + Precision int `json:"precision"` +} + +type WSFetchReq struct { + ReqID uint64 `json:"req_id"` + ID uint64 `json:"id"` +} + +type WSFetchResp struct { + Code int `json:"code"` + Message string `json:"message"` + Action string `json:"action"` + ReqID uint64 `json:"req_id"` + Timing int64 `json:"timing"` + ID uint64 `json:"id"` + Completed bool `json:"completed"` + Lengths []int `json:"lengths"` + Rows int `json:"rows"` +} + +type WSFetchBlockReq struct { + ReqID uint64 `json:"req_id"` + ID uint64 `json:"id"` +} + +type WSFreeResultRequest struct { + ReqID uint64 `json:"req_id"` + ID uint64 `json:"id"` +} diff --git a/ws/stmt/rows.go b/ws/stmt/rows.go new file mode 100644 index 0000000..78f6c75 --- /dev/null +++ b/ws/stmt/rows.go @@ -0,0 +1,171 @@ +package stmt + +import ( + "bytes" + "database/sql/driver" + "encoding/json" + "io" + "reflect" + "unsafe" + + "github.com/taosdata/driver-go/v3/common" + "github.com/taosdata/driver-go/v3/common/parser" + "github.com/taosdata/driver-go/v3/common/pointer" + taosErrors "github.com/taosdata/driver-go/v3/errors" + "github.com/taosdata/driver-go/v3/ws/client" +) + +type Rows struct { + buf *bytes.Buffer + blockPtr unsafe.Pointer + blockOffset int + blockSize int + resultID uint64 + block []byte + conn *Connector + fieldsCount int + fieldsNames []string + fieldsTypes []uint8 + fieldsLengths []int64 + precision int +} + +func (rs *Rows) Columns() []string { + return rs.fieldsNames +} + +func (rs *Rows) ColumnTypeDatabaseTypeName(i int) string { + return common.TypeNameMap[int(rs.fieldsTypes[i])] +} + +func (rs *Rows) ColumnTypeLength(i int) (length int64, ok bool) { + return rs.fieldsLengths[i], ok +} + +func (rs *Rows) ColumnTypeScanType(i int) reflect.Type { + t, exist := common.ColumnTypeMap[int(rs.fieldsTypes[i])] + if !exist { + return common.UnknownType + } + return t +} + +func (rs *Rows) Close() error { + rs.blockPtr = nil + rs.block = nil + return rs.freeResult() +} + +func (rs *Rows) Next(dest []driver.Value) error { + if rs.blockPtr == nil || rs.blockOffset >= rs.blockSize { + err := rs.taosFetchBlock() + if err != nil { + return err + } + } + if rs.blockSize == 0 { + rs.blockPtr = nil + rs.block = nil + return io.EOF + } + parser.ReadRow(dest, rs.blockPtr, rs.blockSize, rs.blockOffset, rs.fieldsTypes, rs.precision) + rs.blockOffset += 1 + return nil +} + +func (rs *Rows) taosFetchBlock() error { + reqID := rs.conn.generateReqID() + req := &WSFetchReq{ + ReqID: reqID, + ID: rs.resultID, + } + args, err := json.Marshal(req) + if err != nil { + return err + } + action := &client.WSAction{ + Action: WSFetch, + Args: args, + } + rs.buf.Reset() + envelope := rs.conn.client.GetEnvelope() + err = client.JsonI.NewEncoder(envelope.Msg).Encode(action) + if err != nil { + rs.conn.client.PutEnvelope(envelope) + return err + } + respBytes, err := rs.conn.sendText(reqID, envelope) + if err != nil { + return err + } + var resp WSFetchResp + err = client.JsonI.Unmarshal(respBytes, &resp) + if err != nil { + return err + } + if resp.Code != 0 { + return taosErrors.NewError(resp.Code, resp.Message) + } + if resp.Completed { + rs.blockSize = 0 + return nil + } else { + rs.blockSize = resp.Rows + return rs.fetchBlock() + } +} + +func (rs *Rows) fetchBlock() error { + req := &WSFetchBlockReq{ + ReqID: rs.resultID, + ID: rs.resultID, + } + args, err := client.JsonI.Marshal(req) + if err != nil { + return err + } + action := &client.WSAction{ + Action: WSFetchBlock, + Args: args, + } + rs.buf.Reset() + envelope := rs.conn.client.GetEnvelope() + err = client.JsonI.NewEncoder(envelope.Msg).Encode(action) + if err != nil { + rs.conn.client.PutEnvelope(envelope) + return err + } + respBytes, err := rs.conn.sendText(rs.resultID, envelope) + if err != nil { + return err + } + rs.block = respBytes + rs.blockPtr = pointer.AddUintptr(unsafe.Pointer(&rs.block[0]), 16) + rs.blockOffset = 0 + return nil +} + +func (rs *Rows) freeResult() error { + reqID := rs.conn.generateReqID() + req := &WSFreeResultRequest{ + ReqID: reqID, + ID: rs.resultID, + } + args, err := client.JsonI.Marshal(req) + if err != nil { + return err + } + action := &client.WSAction{ + Action: WSFreeResult, + Args: args, + } + rs.buf.Reset() + envelope := rs.conn.client.GetEnvelope() + err = client.JsonI.NewEncoder(envelope.Msg).Encode(action) + if err != nil { + rs.conn.client.PutEnvelope(envelope) + return err + } + rs.conn.sendTextWithoutResp(envelope) + return nil +} diff --git a/ws/stmt/stmt.go b/ws/stmt/stmt.go index e3c4c74..9833647 100644 --- a/ws/stmt/stmt.go +++ b/ws/stmt/stmt.go @@ -1,6 +1,7 @@ package stmt import ( + "bytes" "encoding/binary" "github.com/taosdata/driver-go/v3/common/param" @@ -230,6 +231,50 @@ func (s *Stmt) GetAffectedRows() int { return s.lastAffected } +func (s *Stmt) UseResult() (*Rows, error) { + reqID := s.connector.generateReqID() + req := &UseResultReq{ + ReqID: reqID, + StmtID: s.id, + } + args, err := client.JsonI.Marshal(req) + if err != nil { + return nil, err + } + action := &client.WSAction{ + Action: STMTUseResult, + Args: args, + } + envelope := s.connector.client.GetEnvelope() + err = client.JsonI.NewEncoder(envelope.Msg).Encode(action) + if err != nil { + s.connector.client.PutEnvelope(envelope) + return nil, err + } + respBytes, err := s.connector.sendText(reqID, envelope) + if err != nil { + return nil, err + } + var resp UseResultResp + err = client.JsonI.Unmarshal(respBytes, &resp) + if err != nil { + return nil, err + } + if resp.Code != 0 { + return nil, taosErrors.NewError(resp.Code, resp.Message) + } + return &Rows{ + buf: &bytes.Buffer{}, + conn: s.connector, + resultID: resp.ResultID, + fieldsCount: resp.FieldsCount, + fieldsNames: resp.FieldsNames, + fieldsTypes: resp.FieldsTypes, + fieldsLengths: resp.FieldsLengths, + precision: resp.Precision, + }, nil +} + func (s *Stmt) Close() error { reqID := s.connector.generateReqID() req := &CloseReq{ diff --git a/ws/stmt/stmt_test.go b/ws/stmt/stmt_test.go index d0ffc69..7cd9496 100644 --- a/ws/stmt/stmt_test.go +++ b/ws/stmt/stmt_test.go @@ -18,12 +18,12 @@ import ( "github.com/taosdata/driver-go/v3/ws/client" ) -func prepareEnv() error { +func prepareEnv(db string) error { var err error steps := []string{ - "drop database if exists test_ws_stmt", - "create database test_ws_stmt", - "create table test_ws_stmt.all_json(ts timestamp," + + "drop database if exists " + db, + "create database " + db, + "create table " + db + ".all_json(ts timestamp," + "c1 bool," + "c2 tinyint," + "c3 smallint," + @@ -39,7 +39,7 @@ func prepareEnv() error { "c13 nchar(20)" + ")" + "tags(t json)", - "create table test_ws_stmt.all_all(" + + "create table " + db + ".all_all(" + "ts timestamp," + "c1 bool," + "c2 tinyint," + @@ -80,11 +80,11 @@ func prepareEnv() error { return nil } -func cleanEnv() error { +func cleanEnv(db string) error { var err error time.Sleep(2 * time.Second) steps := []string{ - "drop database if exists test_ws_stmt", + "drop database if exists " + db, } for _, step := range steps { err = doRequest(step) @@ -151,12 +151,12 @@ func query(payload string) (*common.TDEngineRestfulResp, error) { // @date: 2023/10/13 11:35 // @description: test stmt over websocket func TestStmt(t *testing.T) { - err := prepareEnv() + err := prepareEnv("test_ws_stmt") if err != nil { t.Error(err) return } - defer cleanEnv() + defer cleanEnv("test_ws_stmt") now := time.Now() config := NewConfig("ws://127.0.0.1:6041", 0) config.SetConnectUser("root") @@ -615,3 +615,405 @@ func marshalBody(body io.Reader, bufferSize int) (*common.TDEngineRestfulResp, e } return &result, nil } + +func TestSTMTQuery(t *testing.T) { + err := prepareEnv("test_ws_stmt_query") + if err != nil { + t.Error(err) + return + } + defer cleanEnv("test_ws_stmt_query") + now := time.Now() + config := NewConfig("ws://127.0.0.1:6041", 0) + config.SetConnectUser("root") + config.SetConnectPass("taosdata") + config.SetConnectDB("test_ws_stmt_query") + config.SetMessageTimeout(common.DefaultMessageTimeout) + config.SetWriteWait(common.DefaultWriteWait) + config.SetEnableCompression(true) + config.SetErrorHandler(func(connector *Connector, err error) { + t.Log(err) + }) + config.SetCloseHandler(func() { + t.Log("stmt websocket closed") + }) + connector, err := NewConnector(config) + if err != nil { + t.Error(err) + return + } + defer connector.Close() + { + stmt, err := connector.Init() + if err != nil { + t.Error(err) + return + } + defer stmt.Close() + err = stmt.Prepare("insert into ? using all_json tags(?) values(?,?,?,?,?,?,?,?,?,?,?,?,?,?)") + if err != nil { + t.Error(err) + return + } + err = stmt.SetTableName("tb1") + if err != nil { + t.Error(err) + return + } + err = stmt.SetTags(param.NewParam(1).AddJson([]byte(`{"tb":1}`)), param.NewColumnType(1).AddJson(0)) + if err != nil { + t.Error(err) + return + } + params := []*param.Param{ + param.NewParam(3).AddTimestamp(now, 0).AddTimestamp(now.Add(time.Second), 0).AddTimestamp(now.Add(time.Second*2), 0), + param.NewParam(3).AddBool(true).AddNull().AddBool(true), + param.NewParam(3).AddTinyint(1).AddNull().AddTinyint(1), + param.NewParam(3).AddSmallint(1).AddNull().AddSmallint(1), + param.NewParam(3).AddInt(1).AddNull().AddInt(1), + param.NewParam(3).AddBigint(1).AddNull().AddBigint(1), + param.NewParam(3).AddUTinyint(1).AddNull().AddUTinyint(1), + param.NewParam(3).AddUSmallint(1).AddNull().AddUSmallint(1), + param.NewParam(3).AddUInt(1).AddNull().AddUInt(1), + param.NewParam(3).AddUBigint(1).AddNull().AddUBigint(1), + param.NewParam(3).AddFloat(1).AddNull().AddFloat(1), + param.NewParam(3).AddDouble(1).AddNull().AddDouble(1), + param.NewParam(3).AddBinary([]byte("test_binary")).AddNull().AddBinary([]byte("test_binary")), + param.NewParam(3).AddNchar("test_nchar").AddNull().AddNchar("test_nchar"), + } + paramTypes := param.NewColumnType(14). + AddTimestamp(). + AddBool(). + AddTinyint(). + AddSmallint(). + AddInt(). + AddBigint(). + AddUTinyint(). + AddUSmallint(). + AddUInt(). + AddUBigint(). + AddFloat(). + AddDouble(). + AddBinary(0). + AddNchar(0) + err = stmt.BindParam(params, paramTypes) + if err != nil { + t.Error(err) + return + } + err = stmt.AddBatch() + if err != nil { + t.Error(err) + return + } + err = stmt.Exec() + if err != nil { + t.Error(err) + return + } + affected := stmt.GetAffectedRows() + if !assert.Equal(t, 3, affected) { + return + } + err = stmt.Prepare("select * from all_json where ts >=? order by ts") + assert.NoError(t, err) + queryTime := now.Format(time.RFC3339Nano) + params = []*param.Param{param.NewParam(1).AddBinary([]byte(queryTime))} + paramTypes = param.NewColumnType(1).AddBinary(len(queryTime)) + err = stmt.BindParam(params, paramTypes) + assert.NoError(t, err) + err = stmt.AddBatch() + assert.NoError(t, err) + err = stmt.Exec() + assert.NoError(t, err) + rows, err := stmt.UseResult() + assert.NoError(t, err) + columns := rows.Columns() + assert.Equal(t, 15, len(columns)) + expectColumns := []string{ + "ts", + "c1", + "c2", + "c3", + "c4", + "c5", + "c6", + "c7", + "c8", + "c9", + "c10", + "c11", + "c12", + "c13", + "t", + } + for i := 0; i < 14; i++ { + assert.Equal(t, columns[i], expectColumns[i]) + rows.ColumnTypeDatabaseTypeName(i) + rows.ColumnTypeLength(i) + rows.ColumnTypeScanType(i) + } + var result [][]driver.Value + for { + values := make([]driver.Value, 15) + err = rows.Next(values) + if err != nil { + if err == io.EOF { + break + } + assert.NoError(t, err) + } + result = append(result, values) + } + assert.Equal(t, 3, len(result)) + row1 := result[0] + assert.Equal(t, now.UnixNano()/1e6, row1[0].(time.Time).UnixNano()/1e6) + assert.Equal(t, true, row1[1]) + assert.Equal(t, int8(1), row1[2]) + assert.Equal(t, int16(1), row1[3]) + assert.Equal(t, int32(1), row1[4]) + assert.Equal(t, int64(1), row1[5]) + assert.Equal(t, uint8(1), row1[6]) + assert.Equal(t, uint16(1), row1[7]) + assert.Equal(t, uint32(1), row1[8]) + assert.Equal(t, uint64(1), row1[9]) + assert.Equal(t, float32(1), row1[10]) + assert.Equal(t, float64(1), row1[11]) + assert.Equal(t, "test_binary", row1[12]) + assert.Equal(t, "test_nchar", row1[13]) + assert.Equal(t, []byte(`{"tb":1}`), row1[14]) + row2 := result[1] + assert.Equal(t, now.Add(time.Second).UnixNano()/1e6, row2[0].(time.Time).UnixNano()/1e6) + for i := 1; i < 14; i++ { + assert.Nil(t, row2[i]) + } + assert.Equal(t, []byte(`{"tb":1}`), row2[14]) + row3 := result[2] + assert.Equal(t, now.Add(time.Second*2).UnixNano()/1e6, row3[0].(time.Time).UnixNano()/1e6) + assert.Equal(t, true, row3[1]) + assert.Equal(t, int8(1), row3[2]) + assert.Equal(t, int16(1), row3[3]) + assert.Equal(t, int32(1), row3[4]) + assert.Equal(t, int64(1), row3[5]) + assert.Equal(t, uint8(1), row3[6]) + assert.Equal(t, uint16(1), row3[7]) + assert.Equal(t, uint32(1), row3[8]) + assert.Equal(t, uint64(1), row3[9]) + assert.Equal(t, float32(1), row3[10]) + assert.Equal(t, float64(1), row3[11]) + assert.Equal(t, "test_binary", row3[12]) + assert.Equal(t, "test_nchar", row3[13]) + assert.Equal(t, []byte(`{"tb":1}`), row3[14]) + } + { + stmt, err := connector.Init() + if err != nil { + t.Error(err) + return + } + defer stmt.Close() + err = stmt.Prepare("insert into ? using all_all tags(?,?,?,?,?,?,?,?,?,?,?,?,?,?) values(?,?,?,?,?,?,?,?,?,?,?,?,?,?)") + err = stmt.SetTableName("tb1") + if err != nil { + t.Error(err) + return + } + + err = stmt.SetTableName("tb2") + if err != nil { + t.Error(err) + return + } + err = stmt.SetTags( + param.NewParam(14). + AddTimestamp(now, 0). + AddBool(true). + AddTinyint(2). + AddSmallint(2). + AddInt(2). + AddBigint(2). + AddUTinyint(2). + AddUSmallint(2). + AddUInt(2). + AddUBigint(2). + AddFloat(2). + AddDouble(2). + AddBinary([]byte("tb2")). + AddNchar("tb2"), + param.NewColumnType(14). + AddTimestamp(). + AddBool(). + AddTinyint(). + AddSmallint(). + AddInt(). + AddBigint(). + AddUTinyint(). + AddUSmallint(). + AddUInt(). + AddUBigint(). + AddFloat(). + AddDouble(). + AddBinary(0). + AddNchar(0), + ) + if err != nil { + t.Error(err) + return + } + params := []*param.Param{ + param.NewParam(3).AddTimestamp(now, 0).AddTimestamp(now.Add(time.Second), 0).AddTimestamp(now.Add(time.Second*2), 0), + param.NewParam(3).AddBool(true).AddNull().AddBool(true), + param.NewParam(3).AddTinyint(1).AddNull().AddTinyint(1), + param.NewParam(3).AddSmallint(1).AddNull().AddSmallint(1), + param.NewParam(3).AddInt(1).AddNull().AddInt(1), + param.NewParam(3).AddBigint(1).AddNull().AddBigint(1), + param.NewParam(3).AddUTinyint(1).AddNull().AddUTinyint(1), + param.NewParam(3).AddUSmallint(1).AddNull().AddUSmallint(1), + param.NewParam(3).AddUInt(1).AddNull().AddUInt(1), + param.NewParam(3).AddUBigint(1).AddNull().AddUBigint(1), + param.NewParam(3).AddFloat(1).AddNull().AddFloat(1), + param.NewParam(3).AddDouble(1).AddNull().AddDouble(1), + param.NewParam(3).AddBinary([]byte("test_binary")).AddNull().AddBinary([]byte("test_binary")), + param.NewParam(3).AddNchar("test_nchar").AddNull().AddNchar("test_nchar"), + } + paramTypes := param.NewColumnType(14). + AddTimestamp(). + AddBool(). + AddTinyint(). + AddSmallint(). + AddInt(). + AddBigint(). + AddUTinyint(). + AddUSmallint(). + AddUInt(). + AddUBigint(). + AddFloat(). + AddDouble(). + AddBinary(0). + AddNchar(0) + err = stmt.BindParam(params, paramTypes) + if err != nil { + t.Error(err) + return + } + err = stmt.AddBatch() + if err != nil { + t.Error(err) + return + } + err = stmt.Exec() + if err != nil { + t.Error(err) + return + } + affected := stmt.GetAffectedRows() + if !assert.Equal(t, 3, affected) { + return + } + err = stmt.Prepare("select * from all_all where ts >=? order by ts") + assert.NoError(t, err) + queryTime := now.Format(time.RFC3339Nano) + params = []*param.Param{param.NewParam(1).AddBinary([]byte(queryTime))} + paramTypes = param.NewColumnType(1).AddBinary(len(queryTime)) + err = stmt.BindParam(params, paramTypes) + assert.NoError(t, err) + err = stmt.AddBatch() + assert.NoError(t, err) + err = stmt.Exec() + assert.NoError(t, err) + rows, err := stmt.UseResult() + assert.NoError(t, err) + columns := rows.Columns() + assert.Equal(t, 28, len(columns)) + var result [][]driver.Value + for { + values := make([]driver.Value, 28) + err = rows.Next(values) + if err != nil { + if err == io.EOF { + break + } + assert.NoError(t, err) + } + result = append(result, values) + } + assert.Equal(t, 3, len(result)) + row1 := result[0] + assert.Equal(t, now.UnixNano()/1e6, row1[0].(time.Time).UnixNano()/1e6) + assert.Equal(t, true, row1[1]) + assert.Equal(t, int8(1), row1[2]) + assert.Equal(t, int16(1), row1[3]) + assert.Equal(t, int32(1), row1[4]) + assert.Equal(t, int64(1), row1[5]) + assert.Equal(t, uint8(1), row1[6]) + assert.Equal(t, uint16(1), row1[7]) + assert.Equal(t, uint32(1), row1[8]) + assert.Equal(t, uint64(1), row1[9]) + assert.Equal(t, float32(1), row1[10]) + assert.Equal(t, float64(1), row1[11]) + assert.Equal(t, "test_binary", row1[12]) + assert.Equal(t, "test_nchar", row1[13]) + assert.Equal(t, now.UnixNano()/1e6, row1[14].(time.Time).UnixNano()/1e6) + assert.Equal(t, true, row1[15]) + assert.Equal(t, int8(2), row1[16]) + assert.Equal(t, int16(2), row1[17]) + assert.Equal(t, int32(2), row1[18]) + assert.Equal(t, int64(2), row1[19]) + assert.Equal(t, uint8(2), row1[20]) + assert.Equal(t, uint16(2), row1[21]) + assert.Equal(t, uint32(2), row1[22]) + assert.Equal(t, uint64(2), row1[23]) + assert.Equal(t, float32(2), row1[24]) + assert.Equal(t, float64(2), row1[25]) + assert.Equal(t, "tb2", row1[26]) + assert.Equal(t, "tb2", row1[27]) + row2 := result[1] + assert.Equal(t, now.Add(time.Second).UnixNano()/1e6, row2[0].(time.Time).UnixNano()/1e6) + for i := 1; i < 14; i++ { + assert.Nil(t, row2[i]) + } + assert.Equal(t, now.UnixNano()/1e6, row1[14].(time.Time).UnixNano()/1e6) + assert.Equal(t, true, row1[15]) + assert.Equal(t, int8(2), row1[16]) + assert.Equal(t, int16(2), row1[17]) + assert.Equal(t, int32(2), row1[18]) + assert.Equal(t, int64(2), row1[19]) + assert.Equal(t, uint8(2), row1[20]) + assert.Equal(t, uint16(2), row1[21]) + assert.Equal(t, uint32(2), row1[22]) + assert.Equal(t, uint64(2), row1[23]) + assert.Equal(t, float32(2), row1[24]) + assert.Equal(t, float64(2), row1[25]) + assert.Equal(t, "tb2", row1[26]) + assert.Equal(t, "tb2", row1[27]) + row3 := result[2] + assert.Equal(t, now.Add(time.Second*2).UnixNano()/1e6, row3[0].(time.Time).UnixNano()/1e6) + assert.Equal(t, true, row3[1]) + assert.Equal(t, int8(1), row3[2]) + assert.Equal(t, int16(1), row3[3]) + assert.Equal(t, int32(1), row3[4]) + assert.Equal(t, int64(1), row3[5]) + assert.Equal(t, uint8(1), row3[6]) + assert.Equal(t, uint16(1), row3[7]) + assert.Equal(t, uint32(1), row3[8]) + assert.Equal(t, uint64(1), row3[9]) + assert.Equal(t, float32(1), row3[10]) + assert.Equal(t, float64(1), row3[11]) + assert.Equal(t, "test_binary", row3[12]) + assert.Equal(t, "test_nchar", row3[13]) + assert.Equal(t, now.UnixNano()/1e6, row3[14].(time.Time).UnixNano()/1e6) + assert.Equal(t, true, row3[15]) + assert.Equal(t, int8(2), row3[16]) + assert.Equal(t, int16(2), row3[17]) + assert.Equal(t, int32(2), row3[18]) + assert.Equal(t, int64(2), row3[19]) + assert.Equal(t, uint8(2), row3[20]) + assert.Equal(t, uint16(2), row3[21]) + assert.Equal(t, uint32(2), row3[22]) + assert.Equal(t, uint64(2), row3[23]) + assert.Equal(t, float32(2), row3[24]) + assert.Equal(t, float64(2), row3[25]) + assert.Equal(t, "tb2", row3[26]) + assert.Equal(t, "tb2", row3[27]) + } +}