Skip to content
This repository was archived by the owner on Sep 28, 2022. It is now read-only.

Commit ff1a15c

Browse files
committed
fix some stringarray bugs in importbatch
1 parent 216dbcb commit ff1a15c

File tree

2 files changed

+159
-11
lines changed

2 files changed

+159
-11
lines changed

gpexp/importbatch.go

+25-11
Original file line numberDiff line numberDiff line change
@@ -387,9 +387,12 @@ func (b *Batch) Add(rec Row) error {
387387
}
388388
rowIDSets, ok := b.rowIDSets[field.Name()]
389389
if !ok {
390-
rowIDSets = make([][]uint64, cap(b.ids))
390+
rowIDSets = make([][]uint64, len(b.ids)-1, cap(b.ids))
391391
b.rowIDSets[field.Name()] = rowIDSets
392392
}
393+
for len(rowIDSets) < len(b.ids)-1 {
394+
rowIDSets = append(rowIDSets, nil) // nil extend
395+
}
393396

394397
rowIDs := make([]uint64, 0, len(val))
395398
for _, k := range val {
@@ -409,17 +412,19 @@ func (b *Batch) Add(rec Row) error {
409412
b.toTranslateSets[field.Name()][k] = ints
410413
}
411414
}
412-
rowIDSets[curPos] = rowIDs
415+
b.rowIDSets[field.Name()] = append(rowIDSets, rowIDs)
413416
case []uint64:
414417
if len(val) == 0 {
415418
continue
416419
}
417420
rowIDSets, ok := b.rowIDSets[field.Name()]
418421
if !ok {
419-
rowIDSets = make([][]uint64, cap(b.ids))
422+
rowIDSets = make([][]uint64, len(b.ids)-1, cap(b.ids))
423+
}
424+
for len(rowIDSets) < len(b.ids)-1 {
425+
rowIDSets = append(rowIDSets, nil) // nil extend
420426
}
421-
rowIDSets[len(b.ids)-1] = val // TODO do we need to copy val here?
422-
b.rowIDSets[field.Name()] = rowIDSets
427+
b.rowIDSets[field.Name()] = append(rowIDSets, val)
423428
case nil:
424429
if field.Opts().Type() == pilosa.FieldTypeInt || field.Opts().Type() == pilosa.FieldTypeDecimal {
425430
b.values[field.Name()] = append(b.values[field.Name()], 0)
@@ -611,6 +616,8 @@ func (b *Batch) doTranslation() error {
611616
return errors.Wrap(err, "adding rows to cache")
612617
}
613618
rowIDSets := b.rowIDSets[fieldName]
619+
rowIDSets = rowIDSets[:cap(b.ids)]
620+
b.rowIDSets[fieldName] = rowIDSets
614621
for j, key := range keys {
615622
rowID := ids[j]
616623
for _, recordIdx := range tt[key] {
@@ -729,6 +736,12 @@ func (b *Batch) makeFragments() (frags fragments, clearFrags fragments, err erro
729736
for fname, rowIDSets := range b.rowIDSets {
730737
if len(rowIDSets) == 0 {
731738
continue
739+
} else if len(rowIDSets) < len(b.ids) {
740+
// rowIDSets is guaranteed to have capacity == to b.ids,
741+
// but if the last record had a nil for this field, it
742+
// might not have the same length, so we re-slice it to
743+
// ensure the lengths are the same.
744+
rowIDSets = rowIDSets[:len(b.ids)]
732745
}
733746
field := b.headerMap[fname]
734747
opts := field.Opts()
@@ -912,19 +925,20 @@ func (b *Batch) reset() {
912925
b.ids = b.ids[:0]
913926
b.times = b.times[:0]
914927
for i, rowIDs := range b.rowIDs {
915-
fieldName := b.header[i].Name()
916928
b.rowIDs[i] = rowIDs[:0]
917-
rowIDSet := b.rowIDSets[fieldName]
918-
b.rowIDSets[fieldName] = rowIDSet[:0]
919929
m := b.toTranslate[i]
920930
for k := range m {
921931
delete(m, k) // TODO pool these slices
922932
}
923-
m = b.toTranslateSets[fieldName]
924-
for k := range m {
925-
delete(m, k)
933+
}
934+
for _, tts := range b.toTranslateSets {
935+
for k := range tts {
936+
delete(tts, k)
926937
}
927938
}
939+
for field, rowIDSet := range b.rowIDSets {
940+
b.rowIDSets[field] = rowIDSet[:0]
941+
}
928942
for _, rowIDs := range b.clearRowIDs {
929943
for k := range rowIDs {
930944
delete(rowIDs, k)

gpexp/importbatch_test.go

+134
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package gpexp
22

33
import (
44
"reflect"
5+
"sort"
56
"strconv"
67
"testing"
78
"time"
@@ -12,6 +13,139 @@ import (
1213

1314
// TODO test against cluster
1415

16+
func TestStringSliceCombos(t *testing.T) {
17+
client := pilosa.DefaultClient()
18+
schema := pilosa.NewSchema()
19+
idx := schema.Index("test-string-slicecombos")
20+
fields := make([]*pilosa.Field, 1)
21+
fields[0] = idx.Field("a1", pilosa.OptFieldKeys(true), pilosa.OptFieldTypeSet(pilosa.CacheTypeRanked, 100))
22+
err := client.SyncSchema(schema)
23+
if err != nil {
24+
t.Fatalf("syncing schema: %v", err)
25+
}
26+
defer func() {
27+
err := client.DeleteIndex(idx)
28+
if err != nil {
29+
t.Logf("problem cleaning up from test: %v", err)
30+
}
31+
}()
32+
33+
b, err := NewBatch(client, 5, idx, fields)
34+
if err != nil {
35+
t.Fatalf("creating new batch: %v", err)
36+
}
37+
38+
records := []Row{
39+
{ID: uint64(0), Values: []interface{}{[]string{"a", "b", "c"}}},
40+
{ID: uint64(1), Values: []interface{}{[]string{"z"}}},
41+
{ID: uint64(2), Values: []interface{}{[]string{}}},
42+
{ID: uint64(3), Values: []interface{}{[]string{"q", "r", "s", "t", "c"}}},
43+
{ID: uint64(4), Values: []interface{}{nil}},
44+
{ID: uint64(5), Values: []interface{}{[]string{"a", "b", "c"}}},
45+
{ID: uint64(6), Values: []interface{}{[]string{"a", "b", "c"}}},
46+
{ID: uint64(7), Values: []interface{}{[]string{"z"}}},
47+
{ID: uint64(8), Values: []interface{}{[]string{}}},
48+
{ID: uint64(9), Values: []interface{}{[]string{"q", "r", "s", "t"}}},
49+
{ID: uint64(10), Values: []interface{}{nil}},
50+
{ID: uint64(11), Values: []interface{}{[]string{"a", "b", "c"}}},
51+
}
52+
53+
err = ingestRecords(records, b)
54+
if err != nil {
55+
t.Fatalf("importing: %v", err)
56+
}
57+
58+
a1 := fields[0]
59+
60+
result := tq(t, client, a1.TopN(10))
61+
rez := sortableCRI(result.CountItems())
62+
sort.Sort(rez)
63+
exp := sortableCRI{
64+
{Key: "a", Count: 4},
65+
{Key: "b", Count: 4},
66+
{Key: "c", Count: 5},
67+
{Key: "q", Count: 2},
68+
{Key: "r", Count: 2},
69+
{Key: "s", Count: 2},
70+
{Key: "t", Count: 2},
71+
{Key: "z", Count: 2},
72+
}
73+
sort.Sort(exp)
74+
errorIfNotEqual(t, exp, rez)
75+
76+
result = tq(t, client, a1.Row("a"))
77+
errorIfNotEqual(t, result.Row().Columns, []uint64{0, 5, 6, 11})
78+
result = tq(t, client, a1.Row("b"))
79+
errorIfNotEqual(t, result.Row().Columns, []uint64{0, 5, 6, 11})
80+
result = tq(t, client, a1.Row("c"))
81+
errorIfNotEqual(t, result.Row().Columns, []uint64{0, 3, 5, 6, 11})
82+
result = tq(t, client, a1.Row("z"))
83+
errorIfNotEqual(t, result.Row().Columns, []uint64{1, 7})
84+
result = tq(t, client, a1.Row("q"))
85+
errorIfNotEqual(t, result.Row().Columns, []uint64{3, 9})
86+
result = tq(t, client, a1.Row("r"))
87+
errorIfNotEqual(t, result.Row().Columns, []uint64{3, 9})
88+
result = tq(t, client, a1.Row("s"))
89+
errorIfNotEqual(t, result.Row().Columns, []uint64{3, 9})
90+
result = tq(t, client, a1.Row("t"))
91+
errorIfNotEqual(t, result.Row().Columns, []uint64{3, 9})
92+
}
93+
94+
func errorIfNotEqual(t *testing.T, exp, got interface{}) {
95+
t.Helper()
96+
if !reflect.DeepEqual(exp, got) {
97+
t.Errorf("unequal exp/got:\n%v\n%v", exp, got)
98+
}
99+
}
100+
101+
type sortableCRI []pilosa.CountResultItem
102+
103+
func (s sortableCRI) Len() int { return len(s) }
104+
func (s sortableCRI) Less(i, j int) bool {
105+
if s[i].Count != s[j].Count {
106+
return s[i].Count > s[j].Count
107+
}
108+
if s[i].ID != s[j].ID {
109+
return s[i].ID < s[j].ID
110+
}
111+
if s[i].Key != s[j].Key {
112+
return s[i].Key < s[j].Key
113+
}
114+
return true
115+
}
116+
func (s sortableCRI) Swap(i, j int) {
117+
s[i], s[j] = s[j], s[i]
118+
}
119+
120+
func tq(t *testing.T, client *pilosa.Client, query pilosa.PQLQuery) pilosa.QueryResult {
121+
resp, err := client.Query(query)
122+
if err != nil {
123+
t.Fatalf("querying: %v", err)
124+
}
125+
return resp.Results()[0]
126+
}
127+
128+
func ingestRecords(records []Row, batch *Batch) error {
129+
for _, rec := range records {
130+
err := batch.Add(rec)
131+
if err == ErrBatchNowFull {
132+
err = batch.Import()
133+
if err != nil {
134+
return errors.Wrap(err, "importing batch")
135+
}
136+
} else if err != nil {
137+
return errors.Wrap(err, "while adding record")
138+
}
139+
}
140+
if batch.Len() > 0 {
141+
err := batch.Import()
142+
if err != nil {
143+
return errors.Wrap(err, "importing batch")
144+
}
145+
}
146+
return nil
147+
}
148+
15149
func TestImportBatchInts(t *testing.T) {
16150
client := pilosa.DefaultClient()
17151
schema := pilosa.NewSchema()

0 commit comments

Comments
 (0)