diff --git a/common/parser/raw.go b/common/parser/raw.go index 21ede0c..61ad125 100644 --- a/common/parser/raw.go +++ b/common/parser/raw.go @@ -44,19 +44,25 @@ func (p *TMQRawDataParser) getTypeSkip(t int8) (int, error) { } func (p *TMQRawDataParser) skipHead() error { - t := p.parseInt8() - skip, err := p.getTypeSkip(t) - if err != nil { - return err - } - p.skip(skip) - t = p.parseInt8() - skip, err = p.getTypeSkip(t) - if err != nil { - return err + v := p.parseInt8() + if v >= 100 { + skip := p.parseInt32() + p.skip(int(skip)) + return nil + } else { + skip, err := p.getTypeSkip(v) + if err != nil { + return err + } + p.skip(skip) + v = p.parseInt8() + skip, err = p.getTypeSkip(v) + if err != nil { + return err + } + p.skip(skip) + return nil } - p.skip(skip) - return nil } func (p *TMQRawDataParser) skip(count int) { diff --git a/common/parser/raw_test.go b/common/parser/raw_test.go index ef6f56c..521a626 100644 --- a/common/parser/raw_test.go +++ b/common/parser/raw_test.go @@ -795,3 +795,255 @@ func TestParseTenBlock(t *testing.T) { assert.Equal(t, [][]driver.Value{{ts, int32(i + 1)}}, value) } } + +func TestVersion100Block(t *testing.T) { + data := []byte{ + 0x64, //version + 0x12, 0x00, 0x00, 0x00, // skip 18 bytes + 0x11, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x11, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x01, 0x00, 0x00, 0x00, //block count 1 + + 0x01, // with table name + 0x01, // with schema + + 0x92, 0x02, // block length 274 + 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, + 0x00, + + 0x02, 0x00, 0x00, 0x00, + 0x00, 0x01, 0x00, 0x00, // 256 + 0x01, 0x00, 0x00, 0x00, // rows + 0x0e, 0x00, 0x00, 0x00, // cols + 0x00, 0x00, 0x00, 0x80, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + + 0x09, 0x08, 0x00, 0x00, 0x00, + 0x01, 0x01, 0x00, 0x00, 0x00, + 0x02, 0x01, 0x00, 0x00, 0x00, + 0x03, 0x02, 0x00, 0x00, 0x00, + 0x04, 0x04, 0x00, 0x00, 0x00, + 0x05, 0x08, 0x00, 0x00, 0x00, + 0x0b, 0x01, 0x00, 0x00, 0x00, + 0x0c, 0x02, 0x00, 0x00, 0x00, + 0x0d, 0x04, 0x00, 0x00, 0x00, + 0x0e, 0x08, 0x00, 0x00, 0x00, + 0x06, 0x04, 0x00, 0x00, 0x00, + 0x07, 0x08, 0x00, 0x00, 0x00, + 0x08, 0x16, 0x00, 0x00, 0x00, + 0x0a, 0x52, 0x00, 0x00, 0x00, + + 0x08, 0x00, 0x00, 0x00, + 0x01, 0x00, 0x00, 0x00, + 0x01, 0x00, 0x00, 0x00, + 0x02, 0x00, 0x00, 0x00, + 0x04, 0x00, 0x00, 0x00, + 0x08, 0x00, 0x00, 0x00, + 0x01, 0x00, 0x00, 0x00, + 0x02, 0x00, 0x00, 0x00, + 0x04, 0x00, 0x00, 0x00, + 0x08, 0x00, 0x00, 0x00, + 0x04, 0x00, 0x00, 0x00, + 0x08, 0x00, 0x00, 0x00, + 0x08, 0x00, 0x00, 0x00, + 0x16, 0x00, 0x00, 0x00, + + 0x00, + 0x9e, 0x37, 0x6a, 0x04, 0x8f, 0x01, 0x00, 0x00, + + 0x00, + 0x01, + + 0x00, + 0x02, + + 0x00, + 0x03, 0x00, + + 0x00, + 0x04, 0x00, 0x00, 0x00, + + 0x00, + 0x05, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + + 0x00, + 0x06, + + 0x00, + 0x07, 0x00, + + 0x00, + 0x08, 0x00, 0x00, 0x00, + + 0x00, + 0x09, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + + 0x00, + 0xcf, 0xf7, 0x21, 0x41, + + 0x00, + 0xe5, 0xd0, 0x22, 0xdb, 0xf9, 0x3e, 0x26, 0x40, + + 0x00, 0x00, 0x00, 0x00, + 0x06, 0x00, + 0x62, 0x69, 0x6e, 0x61, 0x72, 0x79, + + 0x00, 0x00, 0x00, 0x00, + 0x14, 0x00, + 0x6e, 0x00, 0x00, 0x00, + 0x63, 0x00, 0x00, 0x00, + 0x68, 0x00, 0x00, 0x00, + 0x61, 0x00, 0x00, 0x00, + 0x72, 0x00, 0x00, 0x00, + + 0x00, // + + 0x1c, // cols 14 + 0x00, // version + + // col meta + 0x09, 0x01, 0x10, 0x02, 0x03, 0x74, 0x73, 0x00, + 0x01, 0x01, 0x02, 0x04, 0x03, 0x63, 0x31, 0x00, + 0x02, 0x01, 0x02, 0x06, 0x03, 0x63, 0x32, 0x00, + 0x03, 0x01, 0x04, 0x08, 0x03, 0x63, 0x33, 0x00, + 0x04, 0x01, 0x08, 0x0a, 0x03, 0x63, 0x34, 0x00, + 0x05, 0x01, 0x10, 0x0c, 0x03, 0x63, 0x35, 0x00, + 0x0b, 0x01, 0x02, 0x0e, 0x03, 0x63, 0x36, 0x00, + 0x0c, 0x01, 0x04, 0x10, 0x03, 0x63, 0x37, 0x00, + 0x0d, 0x01, 0x08, 0x12, 0x03, 0x63, 0x38, 0x00, + 0x0e, 0x01, 0x10, 0x14, 0x03, 0x63, 0x39, 0x00, + 0x06, 0x01, 0x08, 0x16, 0x04, 0x63, 0x31, 0x30, 0x00, + 0x07, 0x01, 0x10, 0x18, 0x04, 0x63, 0x31, 0x31, 0x00, + 0x08, 0x01, 0x2c, 0x1a, 0x04, 0x63, 0x31, 0x32, 0x00, + 0x0a, 0x01, 0xa4, 0x01, 0x1c, 0x04, 0x63, 0x31, 0x33, 0x00, + + 0x06, // table name + 0x74, 0x5f, 0x61, 0x6c, 0x6c, 0x00, + // sleep time + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + } + parser := NewTMQRawDataParser() + blockInfos, err := parser.Parse(unsafe.Pointer(&data[0])) + assert.NoError(t, err) + assert.Equal(t, 1, len(blockInfos)) + assert.Equal(t, 0, blockInfos[0].Precision) + assert.Equal(t, 14, len(blockInfos[0].Schema)) + assert.Equal(t, []*TMQRawDataSchema{ + { + ColType: 9, + Flag: 1, + Bytes: 8, + ColID: 1, + Name: "ts", + }, + { + ColType: 1, + Flag: 1, + Bytes: 1, + ColID: 2, + Name: "c1", + }, + { + ColType: 2, + Flag: 1, + Bytes: 1, + ColID: 3, + Name: "c2", + }, + { + ColType: 3, + Flag: 1, + Bytes: 2, + ColID: 4, + Name: "c3", + }, + { + ColType: 4, + Flag: 1, + Bytes: 4, + ColID: 5, + Name: "c4", + }, + { + ColType: 5, + Flag: 1, + Bytes: 8, + ColID: 6, + Name: "c5", + }, + { + ColType: 11, + Flag: 1, + Bytes: 1, + ColID: 7, + Name: "c6", + }, + { + ColType: 12, + Flag: 1, + Bytes: 2, + ColID: 8, + Name: "c7", + }, + { + ColType: 13, + Flag: 1, + Bytes: 4, + ColID: 9, + Name: "c8", + }, + { + ColType: 14, + Flag: 1, + Bytes: 8, + ColID: 10, + Name: "c9", + }, + { + ColType: 6, + Flag: 1, + Bytes: 4, + ColID: 11, + Name: "c10", + }, + { + ColType: 7, + Flag: 1, + Bytes: 8, + ColID: 12, + Name: "c11", + }, + { + ColType: 8, + Flag: 1, + Bytes: 22, + ColID: 13, + Name: "c12", + }, + { + ColType: 10, + Flag: 1, + Bytes: 82, + ColID: 14, + Name: "c13", + }, + }, blockInfos[0].Schema) + assert.Equal(t, "t_all", blockInfos[0].TableName) + value := ReadBlockSimple(blockInfos[0].RawBlock, blockInfos[0].Precision) + expect := []driver.Value{ + time.Unix(0, 1713766021022000000).Local(), + true, + int8(2), + int16(3), + int32(4), + int64(5), + uint8(6), + uint16(7), + uint32(8), + uint64(9), + float32(10.123), + float64(11.123), + "binary", + "nchar", + } + assert.Equal(t, [][]driver.Value{expect}, value) +} diff --git a/wrapper/tmq_test.go b/wrapper/tmq_test.go index b4992cf..4e201fa 100644 --- a/wrapper/tmq_test.go +++ b/wrapper/tmq_test.go @@ -1074,7 +1074,7 @@ func TestTMQModify(t *testing.T) { h2 := cgo.NewHandle(c2) targetConn, err := TaosConnect("", "root", "taosdata", "tmq_test_db_modify_target", 0) assert.NoError(t, err) - defer TaosFreeResult(targetConn) + defer TaosClose(targetConn) result = TaosQuery(conn, "create table stb (ts timestamp,"+ "c1 bool,"+ "c2 tinyint,"+ @@ -1170,70 +1170,41 @@ func TestTMQModify(t *testing.T) { } d, err := query(targetConn, "describe stb") assert.NoError(t, err) - if len(d[0]) == 4 { - assert.Equal(t, [][]driver.Value{ - {"ts", "TIMESTAMP", int32(8), ""}, - {"c1", "BOOL", int32(1), ""}, - {"c2", "TINYINT", int32(1), ""}, - {"c3", "SMALLINT", int32(2), ""}, - {"c4", "INT", int32(4), ""}, - {"c5", "BIGINT", int32(8), ""}, - {"c6", "TINYINT UNSIGNED", int32(1), ""}, - {"c7", "SMALLINT UNSIGNED", int32(2), ""}, - {"c8", "INT UNSIGNED", int32(4), ""}, - {"c9", "BIGINT UNSIGNED", int32(8), ""}, - {"c10", "FLOAT", int32(4), ""}, - {"c11", "DOUBLE", int32(8), ""}, - {"c12", "VARCHAR", int32(20), ""}, - {"c13", "NCHAR", int32(20), ""}, - {"tts", "TIMESTAMP", int32(8), "TAG"}, - {"tc1", "BOOL", int32(1), "TAG"}, - {"tc2", "TINYINT", int32(1), "TAG"}, - {"tc3", "SMALLINT", int32(2), "TAG"}, - {"tc4", "INT", int32(4), "TAG"}, - {"tc5", "BIGINT", int32(8), "TAG"}, - {"tc6", "TINYINT UNSIGNED", int32(1), "TAG"}, - {"tc7", "SMALLINT UNSIGNED", int32(2), "TAG"}, - {"tc8", "INT UNSIGNED", int32(4), "TAG"}, - {"tc9", "BIGINT UNSIGNED", int32(8), "TAG"}, - {"tc10", "FLOAT", int32(4), "TAG"}, - {"tc11", "DOUBLE", int32(8), "TAG"}, - {"tc12", "VARCHAR", int32(20), "TAG"}, - {"tc13", "NCHAR", int32(20), "TAG"}, - }, d) - } else { - assert.Equal(t, [][]driver.Value{ - {"ts", "TIMESTAMP", int32(8), "", ""}, - {"c1", "BOOL", int32(1), "", ""}, - {"c2", "TINYINT", int32(1), "", ""}, - {"c3", "SMALLINT", int32(2), "", ""}, - {"c4", "INT", int32(4), "", ""}, - {"c5", "BIGINT", int32(8), "", ""}, - {"c6", "TINYINT UNSIGNED", int32(1), "", ""}, - {"c7", "SMALLINT UNSIGNED", int32(2), "", ""}, - {"c8", "INT UNSIGNED", int32(4), "", ""}, - {"c9", "BIGINT UNSIGNED", int32(8), "", ""}, - {"c10", "FLOAT", int32(4), "", ""}, - {"c11", "DOUBLE", int32(8), "", ""}, - {"c12", "VARCHAR", int32(20), "", ""}, - {"c13", "NCHAR", int32(20), "", ""}, - {"tts", "TIMESTAMP", int32(8), "TAG", ""}, - {"tc1", "BOOL", int32(1), "TAG", ""}, - {"tc2", "TINYINT", int32(1), "TAG", ""}, - {"tc3", "SMALLINT", int32(2), "TAG", ""}, - {"tc4", "INT", int32(4), "TAG", ""}, - {"tc5", "BIGINT", int32(8), "TAG", ""}, - {"tc6", "TINYINT UNSIGNED", int32(1), "TAG", ""}, - {"tc7", "SMALLINT UNSIGNED", int32(2), "TAG", ""}, - {"tc8", "INT UNSIGNED", int32(4), "TAG", ""}, - {"tc9", "BIGINT UNSIGNED", int32(8), "TAG", ""}, - {"tc10", "FLOAT", int32(4), "TAG", ""}, - {"tc11", "DOUBLE", int32(8), "TAG", ""}, - {"tc12", "VARCHAR", int32(20), "TAG", ""}, - {"tc13", "NCHAR", int32(20), "TAG", ""}, - }, d) + expect := [][]driver.Value{ + {"ts", "TIMESTAMP", int32(8), ""}, + {"c1", "BOOL", int32(1), ""}, + {"c2", "TINYINT", int32(1), ""}, + {"c3", "SMALLINT", int32(2), ""}, + {"c4", "INT", int32(4), ""}, + {"c5", "BIGINT", int32(8), ""}, + {"c6", "TINYINT UNSIGNED", int32(1), ""}, + {"c7", "SMALLINT UNSIGNED", int32(2), ""}, + {"c8", "INT UNSIGNED", int32(4), ""}, + {"c9", "BIGINT UNSIGNED", int32(8), ""}, + {"c10", "FLOAT", int32(4), ""}, + {"c11", "DOUBLE", int32(8), ""}, + {"c12", "VARCHAR", int32(20), ""}, + {"c13", "NCHAR", int32(20), ""}, + {"tts", "TIMESTAMP", int32(8), "TAG"}, + {"tc1", "BOOL", int32(1), "TAG"}, + {"tc2", "TINYINT", int32(1), "TAG"}, + {"tc3", "SMALLINT", int32(2), "TAG"}, + {"tc4", "INT", int32(4), "TAG"}, + {"tc5", "BIGINT", int32(8), "TAG"}, + {"tc6", "TINYINT UNSIGNED", int32(1), "TAG"}, + {"tc7", "SMALLINT UNSIGNED", int32(2), "TAG"}, + {"tc8", "INT UNSIGNED", int32(4), "TAG"}, + {"tc9", "BIGINT UNSIGNED", int32(8), "TAG"}, + {"tc10", "FLOAT", int32(4), "TAG"}, + {"tc11", "DOUBLE", int32(8), "TAG"}, + {"tc12", "VARCHAR", int32(20), "TAG"}, + {"tc13", "NCHAR", int32(20), "TAG"}, + } + for rowIndex, values := range d { + for i := 0; i < 4; i++ { + assert.Equal(t, expect[rowIndex][i], values[i]) + } } - }) TMQUnsubscribe(tmq)