Skip to content
This repository was archived by the owner on Sep 28, 2022. It is now read-only.

Commit b250342

Browse files
committed
fix single batch clear bug, add logging, change record ID translation
a lot of this logging should probably be tracing the toTranslateID change became necessary because although IDs were coming in roughly sorted, they were getting randomized by the map. This was fine until the translated IDs crossed a slice boundary, and then 1000s of requests were being generated for a single batch as the value importer flipped back and forth between slices. the bug fix is explained in a comment, but basically makeFragments iterated over b.rowIDs and assumed that anything in clearRowIDs would also be in rowIDs. In this particular case, nothing had yet been added to rowIDs, so the clear was skipped.
1 parent d8d2108 commit b250342

File tree

2 files changed

+104
-42
lines changed

2 files changed

+104
-42
lines changed

gpexp/importbatch.go

+51-27
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55

66
"github.com/pilosa/go-pilosa"
77
"github.com/pilosa/go-pilosa/egpool"
8+
"github.com/pilosa/pilosa/logger"
89
"github.com/pilosa/pilosa/roaring"
910
"github.com/pkg/errors"
1011
)
@@ -102,16 +103,13 @@ type Batch struct {
102103
// those keys map to.
103104
toTranslateSets map[string]map[string][]int
104105

105-
// for string ids which we weren't able to immediately translate,
106-
// keep a map of which record(s) each string id maps to.
107-
//
108-
// TODO:
109-
// this is probably super inefficient in the (common) case where
110-
// each record has a different string ID. In that case, a simple
111-
// slice of strings would probably work better.
112-
toTranslateID map[string][]int
106+
// toTranslateID maps each string key to a record index - this
107+
// will get translated into Batch.rowIDs
108+
toTranslateID []string
113109

114110
transCache Translator
111+
112+
log logger.Logger
115113
}
116114

117115
func (b *Batch) Len() int { return len(b.ids) }
@@ -128,6 +126,13 @@ func OptTranslator(t Translator) BatchOption {
128126
}
129127
}
130128

129+
func OptLogger(l logger.Logger) BatchOption {
130+
return func(b *Batch) error {
131+
b.log = l
132+
return nil
133+
}
134+
}
135+
131136
// NewBatch initializes a new Batch object which will use the given
132137
// Pilosa client, index, set of fields, and will take "size" records
133138
// before returning ErrBatchNowFull. The positions of the Fields in
@@ -180,8 +185,9 @@ func NewBatch(client *pilosa.Client, size int, index *pilosa.Index, fields []*pi
180185
toTranslate: tt,
181186
toTranslateClear: make(map[int]map[string][]int),
182187
toTranslateSets: ttSets,
183-
toTranslateID: make(map[string][]int),
184188
transCache: NewMapTranslator(),
189+
190+
log: logger.NopLogger,
185191
}
186192
if hasTime {
187193
b.times = make([]QuantizedTime, 0, size)
@@ -314,12 +320,10 @@ func (b *Batch) Add(rec Row) error {
314320
} else if ok {
315321
b.ids = append(b.ids, colID)
316322
} else {
317-
ints, ok := b.toTranslateID[rid]
318-
if !ok {
319-
ints = make([]int, 0)
323+
if b.toTranslateID == nil {
324+
b.toTranslateID = make([]string, cap(b.ids))
320325
}
321-
ints = append(ints, len(b.ids))
322-
b.toTranslateID[rid] = ints
326+
b.toTranslateID[len(b.ids)] = rid
323327
b.ids = append(b.ids, 0)
324328
}
325329
return nil
@@ -484,6 +488,13 @@ func (b *Batch) Add(rec Row) error {
484488
default:
485489
return errors.Errorf("Clearing a value '%v' Type %[1]T is not currently supported (field '%s')", val, field.Name())
486490
}
491+
// nil extend b.rowIDs so we don't run into a horrible bug
492+
// where we skip doing clears because b.rowIDs doesn't have a
493+
// value for htis field
494+
for len(b.rowIDs[i]) <= curPos {
495+
b.rowIDs[i] = append(b.rowIDs[i], nilSentinel)
496+
}
497+
487498
}
488499

489500
if len(b.ids) == cap(b.ids) {
@@ -524,29 +535,29 @@ func (b *Batch) Import() error {
524535
}
525536

526537
func (b *Batch) doTranslation() error {
527-
var keys []string
538+
keys := make([]string, 0)
539+
var indexes []int
528540

529541
// translate column keys if there are any
530-
if len(b.toTranslateID) > 0 {
531-
keys = make([]string, 0, len(b.toTranslateID))
532-
for k := range b.toTranslateID {
542+
for i, k := range b.toTranslateID {
543+
if k != "" {
533544
keys = append(keys, k)
545+
indexes = append(indexes, i)
534546
}
547+
}
548+
if len(keys) > 0 {
549+
start := time.Now()
535550
ids, err := b.client.TranslateColumnKeys(b.index, keys)
536551
if err != nil {
537552
return errors.Wrap(err, "translating col keys")
538553
}
554+
b.log.Debugf("translating %d column keys took %v", len(keys), time.Since(start))
539555
if err := b.transCache.AddCols(b.index.Name(), keys, ids); err != nil {
540556
return errors.Wrap(err, "adding cols to cache")
541557
}
542-
for j, key := range keys {
543-
id := ids[j]
544-
for _, recordIdx := range b.toTranslateID[key] {
545-
b.ids[recordIdx] = id
546-
}
558+
for j, id := range ids {
559+
b.ids[indexes[j]] = id
547560
}
548-
} else {
549-
keys = make([]string, 0)
550561
}
551562

552563
// translate row keys
@@ -572,10 +583,12 @@ func (b *Batch) doTranslation() error {
572583
}
573584

574585
// translate keys from Pilosa
586+
start := time.Now()
575587
ids, err := b.client.TranslateRowKeys(b.headerMap[fieldName], keys)
576588
if err != nil {
577589
return errors.Wrap(err, "translating row keys")
578590
}
591+
b.log.Debugf("translating %d row keys for %s took %v", len(keys), fieldName, time.Since(start))
579592
if err := b.transCache.AddRows(b.index.Name(), fieldName, keys, ids); err != nil {
580593
return errors.Wrap(err, "adding rows to cache")
581594
}
@@ -611,10 +624,12 @@ func (b *Batch) doTranslation() error {
611624
continue
612625
}
613626
// translate keys from Pilosa
627+
start := time.Now()
614628
ids, err := b.client.TranslateRowKeys(b.headerMap[fieldName], keys)
615629
if err != nil {
616630
return errors.Wrap(err, "translating row keys (sets)")
617631
}
632+
b.log.Debugf("translating %d row keys(sets) for %s took %v", len(keys), fieldName, time.Since(start))
618633
if err := b.transCache.AddRows(b.index.Name(), fieldName, keys, ids); err != nil {
619634
return errors.Wrap(err, "adding rows to cache")
620635
}
@@ -645,15 +660,20 @@ func (b *Batch) doImport() error {
645660
field := field
646661
viewMap := viewMap
647662
shard := shard
663+
648664
eg.Go(func() error {
649665
clearViewMap := clearFrags.GetViewMap(shard, field)
650666
if len(clearViewMap) > 0 {
667+
start := time.Now()
651668
err := b.client.ImportRoaringBitmap(b.index.Field(field), shard, clearViewMap, true)
652669
if err != nil {
653670
return errors.Wrapf(err, "import clearing clearing data for %s", field)
654671
}
672+
b.log.Debugf("imp-roar-clr %s,shard:%d,views:%d %v", field, shard, len(clearViewMap), time.Since(start))
655673
}
674+
start := time.Now()
656675
err := b.client.ImportRoaringBitmap(b.index.Field(field), shard, viewMap, false)
676+
b.log.Debugf("imp-roar %s,shard:%d,views:%d %v", field, shard, len(clearViewMap), time.Since(start))
657677
return errors.Wrapf(err, "importing data for %s", field)
658678
})
659679
}
@@ -845,7 +865,9 @@ func (b *Batch) importValueData() error {
845865
return errors.Wrap(err, "encoding import values")
846866
}
847867
eg.Go(func() error {
868+
start := time.Now()
848869
err := b.client.DoImportValues(b.index.Name(), shard, path, data)
870+
b.log.Debugf("imp-vals %s,shard:%d,data:%d %v", field, shard, len(data), time.Since(start))
849871
return errors.Wrapf(err, "importing values for %s", field)
850872
})
851873
startIdx = i
@@ -910,7 +932,9 @@ func (b *Batch) importMutexData() error {
910932
return errors.Wrap(err, "encoding mutex import")
911933
}
912934
eg.Go(func() error {
935+
start := time.Now()
913936
err := b.client.DoImport(b.index.Name(), shard, path, data)
937+
b.log.Debugf("imp-vals %s,shard:%d,data:%d %v", field, shard, len(data), time.Since(start))
914938
return errors.Wrapf(err, "importing values for %s", field)
915939
})
916940
startIdx = i
@@ -952,8 +976,8 @@ func (b *Batch) reset() {
952976
delete(clearMap, k)
953977
}
954978
}
955-
for k := range b.toTranslateID {
956-
delete(b.toTranslateID, k) // TODO pool these slices
979+
for i := range b.toTranslateID {
980+
b.toTranslateID[i] = ""
957981
}
958982
for k := range b.values {
959983
delete(b.values, k) // TODO pool these slices

gpexp/importbatch_test.go

+53-15
Original file line numberDiff line numberDiff line change
@@ -294,6 +294,7 @@ func TestStringSliceEmptyAndNil(t *testing.T) {
294294
t.Fatalf("querying: %v", err)
295295
}
296296

297+
// TODO test is flaky because we can't guarantee what a,b,c map to
297298
expectations := [][]uint64{{2}, {0, 2}, {3}, {3}, {2}}
298299
for i, re := range resp.Results() {
299300
if !reflect.DeepEqual(re.Row().Columns, expectations[i]) {
@@ -409,6 +410,55 @@ func TestStringSlice(t *testing.T) {
409410

410411
}
411412

413+
func TestSingleClearBatchRegression(t *testing.T) {
414+
client := pilosa.DefaultClient()
415+
schema := pilosa.NewSchema()
416+
idx := schema.Index("gopilosatest-blah")
417+
numFields := 1
418+
fields := make([]*pilosa.Field, numFields)
419+
fields[0] = idx.Field("zero", pilosa.OptFieldKeys(true))
420+
421+
err := client.SyncSchema(schema)
422+
if err != nil {
423+
t.Fatalf("syncing schema: %v", err)
424+
}
425+
defer func() {
426+
err := client.DeleteIndex(idx)
427+
if err != nil {
428+
t.Logf("problem cleaning up from test: %v", err)
429+
}
430+
}()
431+
432+
_, err = client.Query(fields[0].Set("row1", 1))
433+
if err != nil {
434+
t.Fatalf("setting bit: %v", err)
435+
}
436+
437+
b, err := NewBatch(client, 1, idx, fields)
438+
if err != nil {
439+
t.Fatalf("getting new batch: %v", err)
440+
}
441+
r := Row{ID: uint64(1), Values: make([]interface{}, numFields), Clears: make(map[int]interface{})}
442+
r.Values[0] = nil
443+
r.Clears[0] = "row1"
444+
err = b.Add(r)
445+
if err != ErrBatchNowFull {
446+
t.Fatalf("wrong error from batch add: %v", err)
447+
}
448+
449+
err = b.Import()
450+
if err != nil {
451+
t.Fatalf("error importing: %v", err)
452+
}
453+
454+
resp, err := client.Query(fields[0].Row("row1"))
455+
result := resp.Results()[0].Row().Columns
456+
if len(result) != 0 {
457+
t.Fatalf("unexpected values in row: result %+v", result)
458+
}
459+
460+
}
461+
412462
func TestBatches(t *testing.T) {
413463
client := pilosa.DefaultClient()
414464
schema := pilosa.NewSchema()
@@ -847,21 +897,9 @@ func TestBatchesStringIDs(t *testing.T) {
847897
if len(b.toTranslateID) != 3 {
848898
t.Fatalf("id translation table unexpected size: %v", b.toTranslateID)
849899
}
850-
for k, indexes := range b.toTranslateID {
851-
if k == "0" {
852-
if !reflect.DeepEqual(indexes, []int{0}) {
853-
t.Fatalf("unexpected result k: %s, indexes: %v", k, indexes)
854-
}
855-
}
856-
if k == "1" {
857-
if !reflect.DeepEqual(indexes, []int{1}) {
858-
t.Fatalf("unexpected result k: %s, indexes: %v", k, indexes)
859-
}
860-
}
861-
if k == "2" {
862-
if !reflect.DeepEqual(indexes, []int{2}) {
863-
t.Fatalf("unexpected result k: %s, indexes: %v", k, indexes)
864-
}
900+
for i, k := range b.toTranslateID {
901+
if ik, err := strconv.Atoi(k); err != nil || ik != i {
902+
t.Errorf("unexpected toTranslateID key %s at index %d", k, i)
865903
}
866904
}
867905

0 commit comments

Comments
 (0)