From cf8fb4c5f660f8813706c2057829f2abe7c17143 Mon Sep 17 00:00:00 2001 From: Matt Jaffee Date: Mon, 11 Nov 2019 15:49:34 -0600 Subject: [PATCH 01/13] support for creation of decimal fields, expose import code MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit go-pilosa can now create decimal type fields — this functionality is currently only available in Molecula's Pilosa fork. expose import code so that the batch importer can use it to import to mutex fields. Allow mutex fields in the batch importer. For some reason go-pilosa was still generating deprecated "Range" queries instead of "Row", so put a stop to that. --- client.go | 76 ++++++++++++++++++++++++++++++---------- client_it_test.go | 20 +++++++++++ gpexp/importbatch.go | 83 ++++++++++++++++++++++++++++++++++++++++++-- orm.go | 33 +++++++++++++++--- orm_test.go | 26 +++++++++----- 5 files changed, 206 insertions(+), 32 deletions(-) diff --git a/client.go b/client.go index f11d68d..8ad03d8 100644 --- a/client.go +++ b/client.go @@ -725,6 +725,59 @@ func (c *Client) importValues(field *Field, return errors.Wrap(err, "importing values to nodes") } +// Import imports data for a single shard using the regular import +// endpoint rather than import-roaring. This is good for e.g. mutex or +// bool fields where import-roaring is not supported. +func (c *Client) Import(index, field string, shard uint64, vals, ids []uint64, clear bool) error { + path, data, err := c.EncodeImport(index, field, shard, vals, ids, clear) + if err != nil { + return errors.Wrap(err, "encoding import request") + } + err = c.DoImport(index, shard, path, data) + return errors.Wrap(err, "doing import") +} + +// EncodeImport computes the HTTP path and payload for an import +// request. It is typically followed by a call to DoImport. +func (c *Client) EncodeImport(index, field string, shard uint64, vals, ids []uint64, clear bool) (path string, data []byte, err error) { + msg := &pbuf.ImportRequest{ + Index: index, + Field: field, + Shard: shard, + RowIDs: vals, + ColumnIDs: ids, + } + data, err = proto.Marshal(msg) + if err != nil { + return "", nil, errors.Wrap(err, "marshaling Import to protobuf") + } + path = fmt.Sprintf("/index/%s/field/%s/import?clear=%s&ignoreKeyCheck=true", index, field, strconv.FormatBool(clear)) + return path, data, nil +} + +// DoImport takes a path and data payload (normally from EncodeImport +// or EncodeImportValues), logs the import, finds all nodes which own +// this shard, and concurrently imports to those nodes. +func (c *Client) DoImport(index string, shard uint64, path string, data []byte) error { + // TODO: figure out if the forwarding logic in Pilosa is the same + // for importing values as it is for importing mutex/bool. We may + // need to change that... + c.logImport(index, path, shard, false, data) + + uris, err := c.getURIsForShard(index, shard) + if err != nil { + return errors.Wrap(err, "getting uris") + } + + eg := errgroup.Group{} + for _, uri := range uris { + eg.Go(func() error { + return c.importData(uri, path, data) + }) + } + return errors.Wrap(eg.Wait(), "importing to nodes") +} + // ImportValues takes the given integer values and column ids (which // must all be in the given shard) and imports them into the given // index,field,shard on all nodes which should hold that shard. It @@ -756,30 +809,15 @@ func (c *Client) EncodeImportValues(index, field string, shard uint64, vals []in } data, err = proto.Marshal(msg) if err != nil { - return "", nil, errors.Wrap(err, "marshaling to protobuf") + return "", nil, errors.Wrap(err, "marshaling ImportValue to protobuf") } path = fmt.Sprintf("/index/%s/field/%s/import?clear=%s&ignoreKeyCheck=true", index, field, strconv.FormatBool(clear)) return path, data, nil } -// DoImportValues takes a path and data payload (normally from -// EncodeImportValues), logs the import, finds all nodes which own -// this shard, and concurrently imports to those nodes. +// DoImportValues is deprecated. Use DoImport. func (c *Client) DoImportValues(index string, shard uint64, path string, data []byte) error { - c.logImport(index, path, shard, false, data) - - uris, err := c.getURIsForShard(index, shard) - if err != nil { - return errors.Wrap(err, "getting uris") - } - - eg := errgroup.Group{} - for _, uri := range uris { - eg.Go(func() error { - return c.importData(uri, path, data) - }) - } - return errors.Wrap(eg.Wait(), "importing values to nodes") + return c.DoImport(index, shard, path, data) } func importPathData(field *Field, shard uint64, msg proto.Message, options *ImportOptions) (path string, data []byte, err error) { @@ -1940,6 +1978,7 @@ type SchemaOptions struct { TimeQuantum string `json:"timeQuantum"` Min int64 `json:"min"` Max int64 `json:"max"` + Scale int64 `json:"scale"` Keys bool `json:"keys"` NoStandardView bool `json:"noStandardView"` TrackExistence bool `json:"trackExistence"` @@ -1962,6 +2001,7 @@ func (so SchemaOptions) asFieldOptions() *FieldOptions { timeQuantum: TimeQuantum(so.TimeQuantum), min: so.Min, max: so.Max, + scale: so.Scale, keys: so.Keys, noStandardView: so.NoStandardView, } diff --git a/client_it_test.go b/client_it_test.go index a58ab01..bcdd9b7 100644 --- a/client_it_test.go +++ b/client_it_test.go @@ -237,6 +237,26 @@ func TestOrmCount(t *testing.T) { } } +func TestDecimalField(t *testing.T) { + client := getClient() + defer client.Close() + decField := index.Field("a-decimal", OptFieldTypeDecimal(3)) + err := client.EnsureField(decField) + if err != nil { + t.Fatal(err) + } + + sch, err := client.Schema() + if err != nil { + t.Fatalf("getting schema: %v", err) + } + idx := sch.indexes["go-testindex"] + if opts := idx.Field("a-decimal").Options(); opts.scale != 3 { + t.Fatalf("scale should be 3, but: %v", opts) + } + +} + func TestIntersectReturns(t *testing.T) { client := getClient() defer client.Close() diff --git a/gpexp/importbatch.go b/gpexp/importbatch.go index c121e2e..e2d88e6 100644 --- a/gpexp/importbatch.go +++ b/gpexp/importbatch.go @@ -49,6 +49,7 @@ type RecordBatch interface { // | string | set | keys=true | // | uint64 | set | any | // | int64 | int | any | +// | float64| int | scale | TODO // | nil | any | | // // nil values are ignored. @@ -140,10 +141,17 @@ func NewBatch(client *pilosa.Client, size int, index *pilosa.Index, fields []*pi } rowIDs[i] = make([]uint64, 0, size) // TODO make this on-demand when it gets used. could be a string array field. hasTime = typ == pilosa.FieldTypeTime || hasTime - case pilosa.FieldTypeInt: + case pilosa.FieldTypeInt, pilosa.FieldTypeDecimal: values[field.Name()] = make([]int64, 0, size) + case pilosa.FieldTypeMutex: + // similar to set/time fields, but no need to support sets + // of values (hence no ttSets) + if opts.Keys() { + tt[i] = make(map[string][]int) + } + rowIDs[i] = make([]uint64, 0, size) default: - return nil, errors.Errorf("field type %s is not currently supported through Batch", typ) + return nil, errors.Errorf("field type '%s' is not currently supported through Batch", typ) } } b := &Batch{ @@ -522,6 +530,9 @@ func (b *Batch) doImport() error { eg.Go(func() error { return b.importValueData() }) + eg.Go(func() error { + return b.importMutexData() + }) return eg.Wait() } @@ -543,6 +554,9 @@ func (b *Batch) makeFragments() (fragments, error) { } field := b.header[i] opts := field.Opts() + if opts.Type() == pilosa.FieldTypeMutex { + continue // we handle mutex fields separately — they can't use importRoaring + } curShard := ^uint64(0) // impossible sentinel value for shard. var curBM *roaring.Bitmap for j := range b.ids { @@ -690,6 +704,71 @@ func (b *Batch) importValueData() error { return errors.Wrap(err, "importing value data") } +// TODO this should work for bools as well - just need to support them +// at batch creation time and when calling Add, I think. +func (b *Batch) importMutexData() error { + shardWidth := b.index.ShardWidth() + if shardWidth == 0 { + shardWidth = pilosa.DefaultShardWidth + } + eg := errgroup.Group{} + ids := make([]uint64, 0, len(b.ids)) + + for findex, rowIDs := range b.rowIDs { + field := b.header[findex] + if field.Opts().Type() != pilosa.FieldTypeMutex { + continue + } + ids = ids[:0] + + // get slice of column ids for non-nil rowIDs and cut nil row + // IDs out of rowIDs. + idsIndex := 0 + for i, id := range b.ids { + rowID := rowIDs[i] + if rowID == nilSentinel { + continue + } + rowIDs[idsIndex] = rowID + ids = append(ids, id) + idsIndex++ + } + rowIDs = rowIDs[:idsIndex] + + if len(ids) == 0 { + continue + } + curShard := ids[0] / shardWidth + startIdx := 0 + for i := 1; i <= len(ids); i++ { + var recordID uint64 + if i < len(ids) { + recordID = ids[i] + } else { + recordID = (curShard + 2) * shardWidth + } + + if recordID/shardWidth != curShard { + endIdx := i + shard := curShard + field := field + path, data, err := b.client.EncodeImport(b.index.Name(), field.Name(), shard, rowIDs[startIdx:endIdx], ids[startIdx:endIdx], false) + if err != nil { + return errors.Wrap(err, "encoding mutex import") + } + eg.Go(func() error { + err := b.client.DoImport(b.index.Name(), shard, path, data) + return errors.Wrapf(err, "importing values for %s", field) + }) + startIdx = i + curShard = recordID / shardWidth + } + } + } + err := eg.Wait() + return errors.Wrap(err, "importing mutex data") +} + // reset is called at the end of importing to ready the batch for the // next round. Where possible it does not re-allocate memory. func (b *Batch) reset() { diff --git a/orm.go b/orm.go index c51e6c6..4569928 100644 --- a/orm.go +++ b/orm.go @@ -757,6 +757,7 @@ type FieldOptions struct { cacheSize int min int64 max int64 + scale int64 keys bool noStandardView bool } @@ -793,6 +794,11 @@ func (fo FieldOptions) Max() int64 { return fo.max } +// Scale returns the scale for a decimal field. +func (fo FieldOptions) Scale() int64 { + return fo.scale +} + // Keys returns whether this field uses keys instead of IDs func (fo FieldOptions) Keys() bool { return fo.keys @@ -827,6 +833,10 @@ func (fo FieldOptions) String() string { case FieldTypeInt: mopt["min"] = fo.min mopt["max"] = fo.max + case FieldTypeDecimal: + mopt["min"] = fo.min + mopt["max"] = fo.max + mopt["scale"] = fo.scale case FieldTypeTime: mopt["timeQuantum"] = string(fo.timeQuantum) mopt["noStandardView"] = fo.noStandardView @@ -873,7 +883,7 @@ func OptFieldTypeInt(limits ...int64) FieldOption { max := int64(math.MaxInt64) if len(limits) > 2 { - panic("error: OptFieldTypInt accepts at most 2 arguments") + panic("error: OptFieldTypeInt accepts at most 2 arguments") } if len(limits) > 0 { min = limits[0] @@ -916,6 +926,17 @@ func OptFieldTypeBool() FieldOption { } } +func OptFieldTypeDecimal(scale int64, minmax ...int64) FieldOption { + // reuse int logic for handling min/max + intopt := OptFieldTypeInt(minmax...) + + return func(options *FieldOptions) { + intopt(options) + options.fieldType = FieldTypeDecimal + options.scale = scale + } +} + // OptFieldKeys sets whether field uses string keys. func OptFieldKeys(keys bool) FieldOption { return func(options *FieldOptions) { @@ -1199,6 +1220,10 @@ const ( // FieldTypeBool is the boolean field type. // See: https://www.pilosa.com/docs/latest/data-model/#boolean FieldTypeBool FieldType = "bool" + // FieldTypeDecimal can store floating point numbers as integers + // with a scale factor. This field type is only available in + // Molecula's Pilosa with enterprise extensions. + FieldTypeDecimal FieldType = "decimal" ) // TimeQuantum type represents valid time quantum values time fields. @@ -1272,7 +1297,7 @@ func (f *Field) NotEquals(n int) *PQLRowQuery { // NotNull creates a not equal to null query. func (f *Field) NotNull() *PQLRowQuery { - text := fmt.Sprintf("Range(%s != null)", f.name) + text := fmt.Sprintf("Row(%s != null)", f.name) q := NewPQLRowQuery(text, f.index, nil) q.hasKeys = f.options.keys || f.index.options.keys return q @@ -1280,7 +1305,7 @@ func (f *Field) NotNull() *PQLRowQuery { // Between creates a between query. func (f *Field) Between(a int, b int) *PQLRowQuery { - text := fmt.Sprintf("Range(%s >< [%d,%d])", f.name, a, b) + text := fmt.Sprintf("Row(%s >< [%d,%d])", f.name, a, b) q := NewPQLRowQuery(text, f.index, nil) q.hasKeys = f.options.keys || f.index.options.keys return q @@ -1450,7 +1475,7 @@ func (f *Field) RowsPreviousLimitColumn(rowIDOrKey interface{}, limit int64, col } func (f *Field) binaryOperation(op string, n int) *PQLRowQuery { - text := fmt.Sprintf("Range(%s %s %d)", f.name, op, n) + text := fmt.Sprintf("Row(%s %s %d)", f.name, op, n) q := NewPQLRowQuery(text, f.index, nil) q.hasKeys = f.options.keys || f.index.options.keys return q diff --git a/orm_test.go b/orm_test.go index 289acc2..6e33d71 100644 --- a/orm_test.go +++ b/orm_test.go @@ -414,49 +414,49 @@ func TestTopN(t *testing.T) { func TestFieldLT(t *testing.T) { comparePQL(t, - "Range(collaboration < 10)", + "Row(collaboration < 10)", collabField.LT(10)) } func TestFieldLTE(t *testing.T) { comparePQL(t, - "Range(collaboration <= 10)", + "Row(collaboration <= 10)", collabField.LTE(10)) } func TestFieldGT(t *testing.T) { comparePQL(t, - "Range(collaboration > 10)", + "Row(collaboration > 10)", collabField.GT(10)) } func TestFieldGTE(t *testing.T) { comparePQL(t, - "Range(collaboration >= 10)", + "Row(collaboration >= 10)", collabField.GTE(10)) } func TestFieldEquals(t *testing.T) { comparePQL(t, - "Range(collaboration == 10)", + "Row(collaboration == 10)", collabField.Equals(10)) } func TestFieldNotEquals(t *testing.T) { comparePQL(t, - "Range(collaboration != 10)", + "Row(collaboration != 10)", collabField.NotEquals(10)) } func TestFieldNotNull(t *testing.T) { comparePQL(t, - "Range(collaboration != null)", + "Row(collaboration != null)", collabField.NotNull()) } func TestFieldBetween(t *testing.T) { comparePQL(t, - "Range(collaboration >< [10,20])", + "Row(collaboration >< [10,20])", collabField.Between(10, 20)) } @@ -1020,6 +1020,16 @@ func TestBoolFieldOptions(t *testing.T) { compareFieldOptions(t, field.Options(), FieldTypeBool, TimeQuantumNone, CacheTypeDefault, 0, 0, 0) } +func TestDecimalFieldOptions(t *testing.T) { + field := sampleIndex.Field("decimal-field", OptFieldTypeDecimal(3, 7, 999)) + jsonString := field.options.String() + targetString := `{"options":{"type":"decimal","scale":3,"max":999,"min":7}}` + if sortedString(targetString) != sortedString(jsonString) { + t.Fatalf("`%s` != `%s`", targetString, jsonString) + } + compareFieldOptions(t, field.Options(), FieldTypeDecimal, TimeQuantumNone, CacheTypeDefault, 0, 7, 999) +} + func TestEncodeMapPanicsOnMarshalFailure(t *testing.T) { defer func() { recover() From cc4c830fecf3bc5ff495652c02b4109b6f631717 Mon Sep 17 00:00:00 2001 From: Matt Jaffee Date: Tue, 12 Nov 2019 21:24:03 -0600 Subject: [PATCH 02/13] implement clearing of set fields in importbatch --- gpexp/importbatch.go | 195 ++++++++++++++++++++++++++++++-------- gpexp/importbatch_test.go | 52 +++++++++- 2 files changed, 203 insertions(+), 44 deletions(-) diff --git a/gpexp/importbatch.go b/gpexp/importbatch.go index e2d88e6..16b1682 100644 --- a/gpexp/importbatch.go +++ b/gpexp/importbatch.go @@ -65,6 +65,13 @@ type Batch struct { // rowIDs is a map of field index (in the header) to slices of // length batchSize which contain row IDs. rowIDs map[int][]uint64 + // clearRowIDs is a map[fieldIndex][idsIndex]rowID we don't expect + // clears to happen very often, so we store the idIndex/value + // mapping in a map rather than a slice as we do for rowIDs. This + // is a potentially temporary workaround to allow packed boolean + // fields to clear "false" values. Packed fields may be more + // completely supported by Pilosa in future. + clearRowIDs map[int]map[int]uint64 // rowIDSets is a map from field name to a batchSize slice of // slices of row IDs. When a given record can have more than one @@ -81,10 +88,11 @@ type Batch struct { // integer field which has nil values. nullIndices map[string][]uint64 - // TODO support mutex and bool fields. + // TODO support bool fields. // for each field, keep a map of key to which record indexes that key mapped to - toTranslate map[int]map[string][]int + toTranslate map[int]map[string][]int + toTranslateClear map[int]map[string][]int // toTranslateSets is a map from field name to a map of string // keys that need to be translated to sets of record indexes which @@ -155,19 +163,21 @@ func NewBatch(client *pilosa.Client, size int, index *pilosa.Index, fields []*pi } } b := &Batch{ - client: client, - header: fields, - headerMap: headerMap, - index: index, - ids: make([]uint64, 0, size), - rowIDs: rowIDs, - rowIDSets: make(map[string][][]uint64), - values: values, - nullIndices: make(map[string][]uint64), - toTranslate: tt, - toTranslateSets: ttSets, - toTranslateID: make(map[string][]int), - transCache: NewMapTranslator(), + client: client, + header: fields, + headerMap: headerMap, + index: index, + ids: make([]uint64, 0, size), + rowIDs: rowIDs, + clearRowIDs: make(map[int]map[int]uint64), + rowIDSets: make(map[string][][]uint64), + values: values, + nullIndices: make(map[string][]uint64), + toTranslate: tt, + toTranslateClear: make(map[int]map[string][]int), + toTranslateSets: ttSets, + toTranslateID: make(map[string][]int), + transCache: NewMapTranslator(), } if hasTime { b.times = make([]QuantizedTime, 0, size) @@ -185,6 +195,7 @@ func NewBatch(client *pilosa.Client, size int, index *pilosa.Index, fields []*pi type Row struct { ID interface{} Values []interface{} + Clears map[int]interface{} Time QuantizedTime } @@ -269,7 +280,9 @@ func (qt *QuantizedTime) views(q pilosa.TimeQuantum) ([]string, error) { // Add adds a record to the batch. Performance will be best if record // IDs are shard-sorted. That is, all records which belong to the same // Pilosa shard are added adjacent to each other. If the records are -// also in-order within a shard this will likely help as well. +// also in-order within a shard this will likely help as well. Add +// clears rec.Clears when it returns normally (either a nil error or +// BatchNowFull), it does not clear rec.Values although... TODO. func (b *Batch) Add(rec Row) error { if len(b.ids) == cap(b.ids) { return ErrBatchAlreadyFull @@ -368,7 +381,7 @@ func (b *Batch) Add(rec Row) error { } b.rowIDSets[field.Name()] = append(rowIDSets, rowIDs) case nil: - if field.Opts().Type() == pilosa.FieldTypeInt { + if field.Opts().Type() == pilosa.FieldTypeInt || field.Opts().Type() == pilosa.FieldTypeDecimal { b.values[field.Name()] = append(b.values[field.Name()], 0) nullIndices, ok := b.nullIndices[field.Name()] if !ok { @@ -384,9 +397,45 @@ func (b *Batch) Add(rec Row) error { return errors.Errorf("Val %v Type %[1]T is not currently supported. Use string, uint64 (row id), or int64 (integer value)", val) } } + + for i, uval := range rec.Clears { + field := b.header[i] + if _, ok := b.clearRowIDs[i]; !ok { + b.clearRowIDs[i] = make(map[int]uint64) + } + switch val := uval.(type) { + case string: + clearRows := b.clearRowIDs[i] + // translate val and add to clearRows + if rowID, ok, err := b.transCache.GetRow(b.index.Name(), field.Name(), val); err != nil { + return errors.Wrap(err, "translating row") + } else if ok { + clearRows[curPos] = rowID + } else { + _, ok := b.toTranslateClear[i] + if !ok { + b.toTranslateClear[i] = make(map[string][]int) + } + ints, ok := b.toTranslateClear[i][val] + if !ok { + ints = make([]int, 0) + } + ints = append(ints, curPos) + b.toTranslateClear[i][val] = ints + } + case uint64: + b.clearRowIDs[i][curPos] = val + default: + return errors.Errorf("Clearing a value '%v' Type %[1]T is not currently supported (field '%s')", val, field.Name()) + } + } + if len(b.ids) == cap(b.ids) { return ErrBatchNowFull } + for k := range rec.Clears { + delete(rec.Clears, k) + } return nil } @@ -456,6 +505,14 @@ func (b *Batch) doTranslation() error { for k := range tt { keys = append(keys, k) } + // append keys to clear so we can translate them all in one + // request. ttEnd is the index where clearing starts which we + // use later on. + ttEnd := len(keys) + ttc := b.toTranslateClear[i] + for k := range ttc { + keys = append(keys, k) + } if len(keys) == 0 { continue @@ -472,12 +529,22 @@ func (b *Batch) doTranslation() error { // fill out missing IDs in local batch records with translated IDs rows := b.rowIDs[i] - for j, key := range keys { + for j := 0; j < ttEnd; j++ { + key := keys[j] id := ids[j] for _, recordIdx := range tt[key] { rows[recordIdx] = id } } + // fill out missing IDs in clear lists. + clearRows := b.clearRowIDs[i] + for j := ttEnd; j < len(keys); j++ { + key := keys[j] + id := ids[j] + for _, recordIdx := range ttc[key] { + clearRows[recordIdx] = id + } + } } for fieldName, tt := range b.toTranslateSets { @@ -513,15 +580,23 @@ func (b *Batch) doTranslation() error { func (b *Batch) doImport() error { eg := errgroup.Group{} - frags, err := b.makeFragments() + frags, clearFrags, err := b.makeFragments() if err != nil { return errors.Wrap(err, "making fragments") } + for shard, fieldMap := range frags { for field, viewMap := range fieldMap { field := field viewMap := viewMap eg.Go(func() error { + clearViewMap := clearFrags.GetViewMap(shard, field) + if len(clearViewMap) > 0 { + err := b.client.ImportRoaringBitmap(b.index.Field(field), shard, clearViewMap, true) + if err != nil { + return errors.Wrapf(err, "import clearing clearing data for %s", field) + } + } err := b.client.ImportRoaringBitmap(b.index.Field(field), shard, viewMap, false) return errors.Wrapf(err, "importing data for %s", field) }) @@ -542,16 +617,22 @@ func (b *Batch) doImport() error { // if needed though). var nilSentinel = ^uint64(0) -func (b *Batch) makeFragments() (fragments, error) { +func (b *Batch) makeFragments() (frags fragments, clearFrags fragments, err error) { shardWidth := b.index.ShardWidth() if shardWidth == 0 { shardWidth = pilosa.DefaultShardWidth } - frags := make(fragments) + frags = make(fragments) + clearFrags = make(fragments) + emptyClearRows := make(map[int]uint64) for i, rowIDs := range b.rowIDs { if len(rowIDs) == 0 { continue // this can happen when the values that came in for this field were string slices } + clearRows := b.clearRowIDs[i] + if clearRows == nil { + clearRows = emptyClearRows + } field := b.header[i] opts := field.Opts() if opts.Type() == pilosa.FieldTypeMutex { @@ -559,33 +640,43 @@ func (b *Batch) makeFragments() (fragments, error) { } curShard := ^uint64(0) // impossible sentinel value for shard. var curBM *roaring.Bitmap + var clearBM *roaring.Bitmap for j := range b.ids { col, row := b.ids[j], rowIDs[j] - if row == nilSentinel { - continue - } if col/shardWidth != curShard { curShard = col / shardWidth curBM = frags.GetOrCreate(curShard, field.Name(), "") + clearBM = clearFrags.GetOrCreate(curShard, field.Name(), "") } - // TODO this is super ugly, but we want to avoid setting - // bits on the standard view in the specific case when - // there isn't one. Should probably refactor this whole - // loop to be more general w.r.t. views. Also... tests for - // the NoStandardView case would be great. - if !(opts.Type() == pilosa.FieldTypeTime && opts.NoStandardView()) { - curBM.DirectAdd(row*shardWidth + (col % shardWidth)) - } - if opts.Type() == pilosa.FieldTypeTime { - views, err := b.times[j].views(opts.TimeQuantum()) - if err != nil { - return nil, errors.Wrap(err, "calculating views") + if row != nilSentinel { + // TODO this is super ugly, but we want to avoid setting + // bits on the standard view in the specific case when + // there isn't one. Should probably refactor this whole + // loop to be more general w.r.t. views. Also... tests for + // the NoStandardView case would be great. + if !(opts.Type() == pilosa.FieldTypeTime && opts.NoStandardView()) { + curBM.DirectAdd(row*shardWidth + (col % shardWidth)) } - for _, view := range views { - tbm := frags.GetOrCreate(curShard, field.Name(), view) - tbm.DirectAdd(row*shardWidth + (col % shardWidth)) + if opts.Type() == pilosa.FieldTypeTime { + views, err := b.times[j].views(opts.TimeQuantum()) + if err != nil { + return nil, nil, errors.Wrap(err, "calculating views") + } + for _, view := range views { + tbm := frags.GetOrCreate(curShard, field.Name(), view) + tbm.DirectAdd(row*shardWidth + (col % shardWidth)) + } } } + + clearRow, ok := clearRows[j] + if ok { + clearBM.DirectAddN(clearRow*shardWidth + (col % shardWidth)) + // we're going to execute the clear before the set, so + // we want to make sure that at this point, the "set" + // fragments don't contian the bit that we're clearing + curBM.DirectRemoveN(clearRow*shardWidth + (col % shardWidth)) + } } } @@ -619,7 +710,7 @@ func (b *Batch) makeFragments() (fragments, error) { if opts.Type() == pilosa.FieldTypeTime { views, err := b.times[j].views(opts.TimeQuantum()) if err != nil { - return nil, errors.Wrap(err, "calculating views") + return nil, nil, errors.Wrap(err, "calculating views") } for _, view := range views { tbm := frags.GetOrCreate(curShard, fname, view) @@ -630,7 +721,7 @@ func (b *Batch) makeFragments() (fragments, error) { } } } - return frags, nil + return frags, clearFrags, nil } // importValueData imports data for int fields. @@ -788,6 +879,16 @@ func (b *Batch) reset() { delete(m, k) } } + for _, rowIDs := range b.clearRowIDs { + for k := range rowIDs { + delete(rowIDs, k) + } + } + for _, clearMap := range b.toTranslateClear { + for k := range clearMap { + delete(clearMap, k) + } + } for k := range b.toTranslateID { delete(b.toTranslateID, k) // TODO pool these slices } @@ -820,3 +921,15 @@ func (f fragments) GetOrCreate(shard uint64, field, view string) *roaring.Bitmap f[shard] = fieldMap return bm } + +func (f fragments) GetViewMap(shard uint64, field string) map[string]*roaring.Bitmap { + fieldMap, ok := f[shard] + if !ok { + return nil + } + viewMap, ok := fieldMap[field] + if !ok { + return nil + } + return viewMap +} diff --git a/gpexp/importbatch_test.go b/gpexp/importbatch_test.go index 7764d10..3c0dd4a 100644 --- a/gpexp/importbatch_test.go +++ b/gpexp/importbatch_test.go @@ -203,7 +203,7 @@ func TestBatches(t *testing.T) { if err != nil { t.Fatalf("getting new batch: %v", err) } - r := Row{Values: make([]interface{}, numFields)} + r := Row{Values: make([]interface{}, numFields), Clears: make(map[int]interface{})} r.Time.Set(time.Date(2019, time.January, 2, 15, 45, 0, 0, time.UTC)) for i := 0; i < 9; i++ { @@ -225,6 +225,8 @@ func TestBatches(t *testing.T) { } if i == 8 { r.Values[0] = nil + r.Clears[1] = uint64(97) + r.Clears[2] = "c" r.Values[3] = nil r.Values[4] = nil } @@ -252,6 +254,12 @@ func TestBatches(t *testing.T) { t.Fatalf("unexpected key %s", k) } } + if !reflect.DeepEqual(b.toTranslateClear, map[int]map[string][]int{2: map[string][]int{"c": []int{8}}}) { + t.Errorf("unexpected toTranslateClear: %+v", b.toTranslateClear) + } + if !reflect.DeepEqual(b.clearRowIDs, map[int]map[int]uint64{1: map[int]uint64{8: 97}, 2: map[int]uint64{}}) { + t.Errorf("unexpected clearRowIDs: %+v", b.clearRowIDs) + } if !reflect.DeepEqual(b.values["three"], []int64{99, -10, 99, -10, 99, -10, 99, -10, 0}) { t.Fatalf("unexpected values: %v", b.values["three"]) @@ -338,6 +346,13 @@ func TestBatches(t *testing.T) { } } + if !reflect.DeepEqual(b.clearRowIDs[1], map[int]uint64{8: 97}) { + t.Errorf("unexpected clearRowIDs after translation: %+v", b.clearRowIDs[1]) + } + if !reflect.DeepEqual(b.clearRowIDs[2], map[int]uint64{8: 2}) && !reflect.DeepEqual(b.clearRowIDs[2], map[int]uint64{8: 1}) { + t.Errorf("unexpected clearRowIDs: after translation%+v", b.clearRowIDs[2]) + } + err = b.doImport() if err != nil { t.Fatalf("doing import: %v", err) @@ -442,7 +457,7 @@ func TestBatches(t *testing.T) { } } - frags, err := b.makeFragments() + frags, _, err := b.makeFragments() if err != nil { t.Fatalf("making fragments: %v", err) } @@ -527,7 +542,38 @@ func TestBatches(t *testing.T) { t.Fatalf("wrong cols for January: got/want\n%v\n%v", cols, exp) } - // TODO test non-full batches, test behavior of doing import on empty batch + b.reset() + r.ID = uint64(0) + r.Values[0] = "x" + r.Values[1] = "b" + r.Clears[0] = "a" + r.Clears[1] = "b" // b should get cleared + err = b.Add(r) + if err != nil { + t.Fatalf("adding with clears: %v", err) + } + err = b.Import() + if err != nil { + t.Fatalf("importing w/clears: %v", err) + } + resp, err = client.Query(idx.BatchQuery( + fields[0].Row("a"), + fields[0].Row("x"), + fields[1].Row("b"), + )) + if err != nil { + t.Fatalf("querying after clears: %v", err) + } + if arow := resp.Results()[0].Row().Columns; arow[0] == 0 { + t.Errorf("shouldn't have id 0 in row a after clearing! %v", arow) + } + if xrow := resp.Results()[1].Row().Columns; xrow[0] != 0 { + t.Errorf("should have id 0 in row x after setting %v", xrow) + } + if brow := resp.Results()[2].Row().Columns; brow[0] == 0 { + t.Errorf("shouldn't have id 0 in row b after clearing! %v", brow) + } + // TODO test importing across multiple shards } From 128e0e7e4645c1ff2a297ae31d2a5f26da15ff0c Mon Sep 17 00:00:00 2001 From: Travis Date: Mon, 18 Nov 2019 12:07:35 -0600 Subject: [PATCH 03/13] reset record values and clears after adding record to batch --- gpexp/importbatch.go | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/gpexp/importbatch.go b/gpexp/importbatch.go index 16b1682..f087c75 100644 --- a/gpexp/importbatch.go +++ b/gpexp/importbatch.go @@ -284,6 +284,16 @@ func (qt *QuantizedTime) views(q pilosa.TimeQuantum) ([]string, error) { // clears rec.Clears when it returns normally (either a nil error or // BatchNowFull), it does not clear rec.Values although... TODO. func (b *Batch) Add(rec Row) error { + // Clear recValues and rec.Clears upon return. + defer func() { + for i := range rec.Values { + rec.Values[i] = nil + } + for k := range rec.Clears { + delete(rec.Clears, k) + } + }() + if len(b.ids) == cap(b.ids) { return ErrBatchAlreadyFull } @@ -433,9 +443,6 @@ func (b *Batch) Add(rec Row) error { if len(b.ids) == cap(b.ids) { return ErrBatchNowFull } - for k := range rec.Clears { - delete(rec.Clears, k) - } return nil } From 193f55ec9bc9b9cda13bf5745834b4f6255fa36f Mon Sep 17 00:00:00 2001 From: Matt Jaffee Date: Wed, 20 Nov 2019 19:10:58 -0600 Subject: [PATCH 04/13] fix bug where shard was being passed to goroutine and reused by loop This was causing the same shard to be used by import requests which were supposed to use different shards. A very sinister bug. --- gpexp/importbatch.go | 1 + 1 file changed, 1 insertion(+) diff --git a/gpexp/importbatch.go b/gpexp/importbatch.go index f087c75..b81b477 100644 --- a/gpexp/importbatch.go +++ b/gpexp/importbatch.go @@ -596,6 +596,7 @@ func (b *Batch) doImport() error { for field, viewMap := range fieldMap { field := field viewMap := viewMap + shard := shard eg.Go(func() error { clearViewMap := clearFrags.GetViewMap(shard, field) if len(clearViewMap) > 0 { From bbca9127b031ce9a08f00955e393829af984ace1 Mon Sep 17 00:00:00 2001 From: Matt Jaffee Date: Wed, 20 Nov 2019 22:02:47 -0600 Subject: [PATCH 05/13] fix data race in ORM and importbatch tests --- gpexp/importbatch_test.go | 13 ++++++------- orm.go | 10 ++++++++++ 2 files changed, 16 insertions(+), 7 deletions(-) diff --git a/gpexp/importbatch_test.go b/gpexp/importbatch_test.go index 3c0dd4a..e458718 100644 --- a/gpexp/importbatch_test.go +++ b/gpexp/importbatch_test.go @@ -340,7 +340,7 @@ func TestBatches(t *testing.T) { t.Fatalf("expected no rowIDs for int field, but got: %v", rowIDs) } } else { - if !reflect.DeepEqual(rowIDs, []uint64{1, 2, 1, 2, 1, 2, 1, 2, 1, 1}) && !reflect.DeepEqual(rowIDs, []uint64{2, 1, 2, 1, 2, 1, 2, 1, 2, 2}) { + if !reflect.DeepEqual(rowIDs, []uint64{1, 2, 1, 2, 1, 2, 1, 2, 1, nilSentinel}) && !reflect.DeepEqual(rowIDs, []uint64{2, 1, 2, 1, 2, 1, 2, 1, 2, nilSentinel}) { t.Fatalf("unexpected row ids for field %d: %v", fidx, rowIDs) } } @@ -482,17 +482,16 @@ func TestBatches(t *testing.T) { } results := resp.Results() - for _, j := range []int{0, 3} { + for _, j := range []int{0, 2, 3} { cols := results[j].Row().Columns if !reflect.DeepEqual(cols, []uint64{0, 2, 4, 6, 10, 12, 14, 16, 18}) { t.Fatalf("unexpected columns for a: %v", cols) } } - for i, res := range results[1:3] { - cols := res.Row().Columns - if !reflect.DeepEqual(cols, []uint64{0, 2, 4, 6, 8, 10, 12, 14, 16, 18}) { - t.Fatalf("unexpected columns at %d: %v", i, cols) - } + res = results[1] + cols := res.Row().Columns + if !reflect.DeepEqual(cols, []uint64{0, 2, 4, 6, 8, 10, 12, 14, 16, 18}) { + t.Fatalf("unexpected columns for field 1 row b: %v", cols) } resp, err = client.Query(idx.BatchQuery(fields[0].Row("d"), diff --git a/orm.go b/orm.go index 4569928..adfabcc 100644 --- a/orm.go +++ b/orm.go @@ -39,6 +39,7 @@ import ( "sort" "strconv" "strings" + "sync" "time" "github.com/pkg/errors" @@ -396,6 +397,7 @@ func OptOptionsShards(shards ...uint64) OptionsOption { // Index is a Pilosa index. The purpose of the Index is to represent a data namespace. // You cannot perform cross-index queries. Column-level attributes are global to the Index. type Index struct { + mu sync.Mutex name string options *IndexOptions fields map[string]*Field @@ -422,6 +424,8 @@ func (idx *Index) ShardWidth() uint64 { // Fields return a copy of the fields in this index func (idx *Index) Fields() map[string]*Field { + idx.mu.Lock() + defer idx.mu.Unlock() result := make(map[string]*Field) for k, v := range idx.fields { result[k] = v.copy() @@ -431,11 +435,15 @@ func (idx *Index) Fields() map[string]*Field { // HasFields returns true if the given field exists in the index. func (idx *Index) HasField(fieldName string) bool { + idx.mu.Lock() + defer idx.mu.Unlock() _, ok := idx.fields[fieldName] return ok } func (idx *Index) copy() *Index { + idx.mu.Lock() + defer idx.mu.Unlock() fields := make(map[string]*Field) for name, f := range idx.fields { fields[name] = f.copy() @@ -462,6 +470,8 @@ func (idx *Index) Opts() IndexOptions { // Field creates a Field struct with the specified name and defaults. func (idx *Index) Field(name string, options ...FieldOption) *Field { + idx.mu.Lock() + defer idx.mu.Unlock() if field, ok := idx.fields[name]; ok { return field } From 12c7ac800eaef2a61ad235b58c4133001c03409d Mon Sep 17 00:00:00 2001 From: Matt Jaffee Date: Mon, 25 Nov 2019 17:42:35 -0600 Subject: [PATCH 06/13] add Len to gpexp/batchimport --- gpexp/importbatch.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/gpexp/importbatch.go b/gpexp/importbatch.go index b81b477..43c61be 100644 --- a/gpexp/importbatch.go +++ b/gpexp/importbatch.go @@ -36,6 +36,9 @@ import ( type RecordBatch interface { Add(Row) error Import() error + // Len reports the number of records which have been added to the + // batch since the last call to Import (or since it was created). + Len() int } // Batch implements RecordBatch. @@ -111,6 +114,8 @@ type Batch struct { transCache Translator } +func (b *Batch) Len() int { return len(b.ids) } + // BatchOption is a functional option for Batch objects. type BatchOption func(b *Batch) error From 351ea47cba106d1ffc0920817ebfe6a8ff5731c8 Mon Sep 17 00:00:00 2001 From: Matt Jaffee Date: Wed, 27 Nov 2019 09:20:03 -0600 Subject: [PATCH 07/13] fix deadlock, catch empty translate keys, support []uint64 in batch MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit There was a deadlock which @tgruben found in detectClusterChanges where it could acquire a lock and never release it if there were no indexes or shards. TranslateKeys requests fail if the slice of keys is empty. We now test for this condition and return an empty slice of uint64 without hitting Pilosa. In importbatch, if a nil is encountered and the field is not an integer or decimal field, there is no way of knowing if this value will be using []string normally, or string or uint64. Instead of unconditionally appending to rowIDs, we wait until we know which value type this will be (as soon as we receive a non-nil value). Then we nil-extend rowIDs for this field if necessary. We now support []uint64 in addition to []string in importbatch — this is analagous to supporting strings and uint64, and they can be intermixed in the same value field. Also changed the way that indexes and fields are encoded to strings because a test was breaking due to the addition of a mutex to the Index. I don't think this affects much of anything. --- client.go | 14 ++++++-- gpexp/importbatch.go | 36 +++++++++++++++++-- gpexp/importbatch_test.go | 75 +++++++++++++++++++++++++++++++++++++-- orm.go | 21 +++++++++-- orm_test.go | 7 ++-- 5 files changed, 139 insertions(+), 14 deletions(-) diff --git a/client.go b/client.go index 8ad03d8..b4dc997 100644 --- a/client.go +++ b/client.go @@ -134,6 +134,7 @@ func (c *Client) Close() error { // see if it still matches, and if not it drops the whole cache. func (c *Client) detectClusterChanges() { c.shardNodes.mu.Lock() + needsUnlock := true // we rely on Go's random map iteration order to get a random // element. If it doesn't end up being random, it shouldn't // actually matter. @@ -142,6 +143,7 @@ func (c *Client) detectClusterChanges() { delete(shardMap, shard) c.shardNodes.data[index] = shardMap c.shardNodes.mu.Unlock() + needsUnlock = false newURIs, err := c.getURIsForShard(index, shard) // refetch URIs from server. if err != nil { c.logger.Printf("problem invalidating shard node cache: %v", err) @@ -164,6 +166,9 @@ func (c *Client) detectClusterChanges() { } break } + if needsUnlock { + c.shardNodes.mu.Unlock() + } } // DefaultClient creates a client with the default address and options. @@ -1227,7 +1232,7 @@ func (c *Client) TranslateRowKeys(field *Field, keys []string) ([]uint64, error) Field: field.name, Keys: keys, } - return c.translateKeys(req, keys) + return c.translateKeys(req) } func (c *Client) TranslateColumnKeys(index *Index, keys []string) ([]uint64, error) { @@ -1235,10 +1240,13 @@ func (c *Client) TranslateColumnKeys(index *Index, keys []string) ([]uint64, err Index: index.name, Keys: keys, } - return c.translateKeys(req, keys) + return c.translateKeys(req) } -func (c *Client) translateKeys(req *pbuf.TranslateKeysRequest, keys []string) ([]uint64, error) { +func (c *Client) translateKeys(req *pbuf.TranslateKeysRequest) ([]uint64, error) { + if len(req.Keys) == 0 { + return []uint64{}, nil + } reqData, err := proto.Marshal(req) if err != nil { return nil, errors.Wrap(err, "marshalling traslate keys request") diff --git a/gpexp/importbatch.go b/gpexp/importbatch.go index 43c61be..91cf55c 100644 --- a/gpexp/importbatch.go +++ b/gpexp/importbatch.go @@ -152,7 +152,6 @@ func NewBatch(client *pilosa.Client, size int, index *pilosa.Index, fields []*pi tt[i] = make(map[string][]int) ttSets[field.Name()] = make(map[string][]int) } - rowIDs[i] = make([]uint64, 0, size) // TODO make this on-demand when it gets used. could be a string array field. hasTime = typ == pilosa.FieldTypeTime || hasTime case pilosa.FieldTypeInt, pilosa.FieldTypeDecimal: values[field.Name()] = make([]int64, 0, size) @@ -352,6 +351,10 @@ func (b *Batch) Add(rec Row) error { field := b.header[i] switch val := rec.Values[i].(type) { case string: + // nil-extend + for len(b.rowIDs[i]) < curPos { + b.rowIDs[i] = append(b.rowIDs[i], nilSentinel) + } rowIDs := b.rowIDs[i] // translate val and append to b.rowIDs[i] if rowID, ok, err := b.transCache.GetRow(b.index.Name(), field.Name(), val); err != nil { @@ -368,10 +371,17 @@ func (b *Batch) Add(rec Row) error { b.rowIDs[i] = append(rowIDs, 0) } case uint64: + // nil-extend + for len(b.rowIDs[i]) < curPos { + b.rowIDs[i] = append(b.rowIDs[i], nilSentinel) + } b.rowIDs[i] = append(b.rowIDs[i], val) case int64: b.values[field.Name()] = append(b.values[field.Name()], val) case []string: + if len(val) == 0 { + continue + } rowIDSets, ok := b.rowIDSets[field.Name()] if !ok { rowIDSets = make([][]uint64, len(b.ids)-1, cap(b.ids)) @@ -395,6 +405,18 @@ func (b *Batch) Add(rec Row) error { } } b.rowIDSets[field.Name()] = append(rowIDSets, rowIDs) + case []uint64: + if len(val) == 0 { + continue + } + rowIDSets, ok := b.rowIDSets[field.Name()] + if !ok { + rowIDSets = make([][]uint64, len(b.ids)-1, cap(b.ids)) + } else { + rowIDSets = rowIDSets[:len(b.ids)] // grow this field's rowIDSets if necessary + } + rowIDSets[len(b.ids)-1] = val // TODO do we need to copy val here? + b.rowIDSets[field.Name()] = rowIDSets case nil: if field.Opts().Type() == pilosa.FieldTypeInt || field.Opts().Type() == pilosa.FieldTypeDecimal { b.values[field.Name()] = append(b.values[field.Name()], 0) @@ -406,7 +428,15 @@ func (b *Batch) Add(rec Row) error { b.nullIndices[field.Name()] = nullIndices } else { - b.rowIDs[i] = append(b.rowIDs[i], nilSentinel) + // only append nil to rowIDs if this field already has + // rowIDs. Otherwise, this could be a []string or + // []uint64 field where we've only seen nil values so + // far. when we see a uint64 or string value, we'll + // "nil-extend" rowIDs to make sure it's the right + // length. + if rowIDs, ok := b.rowIDs[i]; ok { + b.rowIDs[i] = append(rowIDs, nilSentinel) + } } default: return errors.Errorf("Val %v Type %[1]T is not currently supported. Use string, uint64 (row id), or int64 (integer value)", val) @@ -687,7 +717,7 @@ func (b *Batch) makeFragments() (frags fragments, clearFrags fragments, err erro clearBM.DirectAddN(clearRow*shardWidth + (col % shardWidth)) // we're going to execute the clear before the set, so // we want to make sure that at this point, the "set" - // fragments don't contian the bit that we're clearing + // fragments don't contain the bit that we're clearing curBM.DirectRemoveN(clearRow*shardWidth + (col % shardWidth)) } } diff --git a/gpexp/importbatch_test.go b/gpexp/importbatch_test.go index e458718..cc03134 100644 --- a/gpexp/importbatch_test.go +++ b/gpexp/importbatch_test.go @@ -72,6 +72,77 @@ func TestImportBatchInts(t *testing.T) { } } +func TestStringSliceEmptyAndNil(t *testing.T) { + client := pilosa.DefaultClient() + schema := pilosa.NewSchema() + idx := schema.Index("test-string-slice-nil") + fields := make([]*pilosa.Field, 1) + fields[0] = idx.Field("strslice", pilosa.OptFieldKeys(true), pilosa.OptFieldTypeSet(pilosa.CacheTypeRanked, 100)) + err := client.SyncSchema(schema) + if err != nil { + t.Fatalf("syncing schema: %v", err) + } + defer func() { + err := client.DeleteIndex(idx) + if err != nil { + t.Logf("problem cleaning up from test: %v", err) + } + }() + + b, err := NewBatch(client, 5, idx, fields) + if err != nil { + t.Fatalf("creating new batch: %v", err) + } + + r := Row{Values: make([]interface{}, len(fields))} + r.ID = uint64(0) + r.Values[0] = []string{"a"} + err = b.Add(r) + if err != nil { + t.Fatalf("adding to batch: %v", err) + } + + r.ID = uint64(1) + r.Values[0] = nil + err = b.Add(r) + if err != nil { + t.Fatalf("adding batch with nil stringslice to r: %v", err) + } + + r.ID = uint64(2) + r.Values[0] = []uint64{0, 1, 2, 222} + err = b.Add(r) + if err != nil { + t.Fatalf("adding batch with nil stringslice to r: %v", err) + } + + r.ID = uint64(3) + r.Values[0] = []string{"b", "c"} + err = b.Add(r) + if err != nil { + t.Fatalf("adding batch with nil stringslice to r: %v", err) + } + + err = b.Import() + if err != nil { + t.Fatalf("importing: %v", err) + } + + rows := []interface{}{0, 1, 2, 222, "c"} + resp, err := client.Query(idx.BatchQuery(fields[0].Row(rows[0]), fields[0].Row(rows[1]), fields[0].Row(rows[2]), fields[0].Row(rows[3]), fields[0].Row(rows[4]))) + if err != nil { + t.Fatalf("querying: %v", err) + } + + expectations := [][]uint64{{2}, {0, 2}, {2, 3}, {2}, {3}} + for i, re := range resp.Results() { + if !reflect.DeepEqual(re.Row().Columns, expectations[i]) { + t.Errorf("expected row %v to have columns %v, but got %v", rows[i], expectations[i], re.Row().Columns) + } + } + +} + func TestStringSlice(t *testing.T) { client := pilosa.DefaultClient() schema := pilosa.NewSchema() @@ -488,7 +559,7 @@ func TestBatches(t *testing.T) { t.Fatalf("unexpected columns for a: %v", cols) } } - res = results[1] + res := results[1] cols := res.Row().Columns if !reflect.DeepEqual(cols, []uint64{0, 2, 4, 6, 8, 10, 12, 14, 16, 18}) { t.Fatalf("unexpected columns for field 1 row b: %v", cols) @@ -518,7 +589,7 @@ func TestBatches(t *testing.T) { t.Fatalf("querying: %v", err) } results = resp.Results() - cols := results[0].Row().Columns + cols = results[0].Row().Columns if !reflect.DeepEqual(cols, []uint64{0, 1, 2, 3, 4, 5, 6, 7, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28}) { t.Fatalf("all columns (but 8) should be greater than -11, but got: %v", cols) } diff --git a/orm.go b/orm.go index adfabcc..0b33fa0 100644 --- a/orm.go +++ b/orm.go @@ -405,7 +405,7 @@ type Index struct { } func (idx *Index) String() string { - return fmt.Sprintf("%#v", idx) + return fmt.Sprintf(`{name: "%s", options: "%s", fields: %s, shardWidth: %d}`, idx.name, idx.options, idx.fields, idx.shardWidth) } // NewIndex creates an index with a name. @@ -964,7 +964,7 @@ type Field struct { } func (f *Field) String() string { - return fmt.Sprintf("%#v", f) + return fmt.Sprintf(`{name: "%s", index: "%s", options: "%s"}`, f.name, f.index.name, f.options) } func newField(name string, index *Index) *Field { @@ -1512,3 +1512,20 @@ func encodeMap(m map[string]interface{}) string { } return string(result) } + +func encodeFieldMap(m map[string]*Field) string { + sb := strings.Builder{} + sb.WriteRune('{') + first := true + for k, v := range m { + if first { + first = false + } else { + sb.WriteString(", ") + } + sb.WriteString(k + ": ") + sb.WriteString(v.String()) + } + sb.WriteRune('}') + return sb.String() +} diff --git a/orm_test.go b/orm_test.go index 6e33d71..989fde3 100644 --- a/orm_test.go +++ b/orm_test.go @@ -185,9 +185,9 @@ func TestIndexFields(t *testing.T) { func TestIndexToString(t *testing.T) { schema1 := NewSchema() index := schema1.Index("test-index") - target := fmt.Sprintf(`&pilosa.Index{name:"test-index", options:(*pilosa.IndexOptions)(%p), fields:map[string]*pilosa.Field{}, shardWidth:0x0}`, index.options) + target := `{name: "test-index", options: "{"options":{}}", fields: map[], shardWidth: 0}` if target != index.String() { - t.Fatalf("%s != %s", target, index.String()) + t.Fatalf("indexes not equal exp/got:\n%s\n%s", target, index.String()) } } @@ -214,8 +214,7 @@ func TestFieldToString(t *testing.T) { schema1 := NewSchema() index := schema1.Index("test-index") field := index.Field("test-field") - target := fmt.Sprintf(`&pilosa.Field{name:"test-field", index:(*pilosa.Index)(%p), options:(*pilosa.FieldOptions)(%p)}`, - field.index, field.options) + target := `{name: "test-field", index: "test-index", options: "{"options":{"type":"set"}}"}` if target != field.String() { t.Fatalf("%s != %s", target, field.String()) } From b8bfd215b937fbbfe54572a54d177568d46cb205 Mon Sep 17 00:00:00 2001 From: Matt Jaffee Date: Wed, 27 Nov 2019 09:30:54 -0600 Subject: [PATCH 08/13] gofmt -s --- gpexp/importbatch_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/gpexp/importbatch_test.go b/gpexp/importbatch_test.go index cc03134..667316a 100644 --- a/gpexp/importbatch_test.go +++ b/gpexp/importbatch_test.go @@ -325,10 +325,10 @@ func TestBatches(t *testing.T) { t.Fatalf("unexpected key %s", k) } } - if !reflect.DeepEqual(b.toTranslateClear, map[int]map[string][]int{2: map[string][]int{"c": []int{8}}}) { + if !reflect.DeepEqual(b.toTranslateClear, map[int]map[string][]int{2: {"c": {8}}}) { t.Errorf("unexpected toTranslateClear: %+v", b.toTranslateClear) } - if !reflect.DeepEqual(b.clearRowIDs, map[int]map[int]uint64{1: map[int]uint64{8: 97}, 2: map[int]uint64{}}) { + if !reflect.DeepEqual(b.clearRowIDs, map[int]map[int]uint64{1: {8: 97}, 2: {}}) { t.Errorf("unexpected clearRowIDs: %+v", b.clearRowIDs) } From b24432967a58f016f4cb3e911c4bfa382b5f1da4 Mon Sep 17 00:00:00 2001 From: Matt Jaffee Date: Wed, 27 Nov 2019 15:08:23 -0600 Subject: [PATCH 09/13] fix empty string row key issue --- gpexp/importbatch.go | 21 +++++++++++--------- gpexp/importbatch_test.go | 40 ++++++++++++++++++++++++++++++++------- 2 files changed, 45 insertions(+), 16 deletions(-) diff --git a/gpexp/importbatch.go b/gpexp/importbatch.go index 91cf55c..1c69993 100644 --- a/gpexp/importbatch.go +++ b/gpexp/importbatch.go @@ -356,8 +356,11 @@ func (b *Batch) Add(rec Row) error { b.rowIDs[i] = append(b.rowIDs[i], nilSentinel) } rowIDs := b.rowIDs[i] - // translate val and append to b.rowIDs[i] - if rowID, ok, err := b.transCache.GetRow(b.index.Name(), field.Name(), val); err != nil { + // empty string is not a valid value at this point (Pilosa refuses to translate it) + if val == "" { // + b.rowIDs[i] = append(rowIDs, nilSentinel) + + } else if rowID, ok, err := b.transCache.GetRow(b.index.Name(), field.Name(), val); err != nil { return errors.Wrap(err, "translating row") } else if ok { b.rowIDs[i] = append(rowIDs, rowID) @@ -384,13 +387,15 @@ func (b *Batch) Add(rec Row) error { } rowIDSets, ok := b.rowIDSets[field.Name()] if !ok { - rowIDSets = make([][]uint64, len(b.ids)-1, cap(b.ids)) - } else { - rowIDSets = rowIDSets[:len(b.ids)-1] // grow this field's rowIDSets if necessary + rowIDSets = make([][]uint64, cap(b.ids)) + b.rowIDSets[field.Name()] = rowIDSets } rowIDs := make([]uint64, 0, len(val)) for _, k := range val { + if k == "" { + continue + } if rowID, ok, err := b.transCache.GetRow(b.index.Name(), field.Name(), k); err != nil { return errors.Wrap(err, "translating row from []string") } else if ok { @@ -404,16 +409,14 @@ func (b *Batch) Add(rec Row) error { b.toTranslateSets[field.Name()][k] = ints } } - b.rowIDSets[field.Name()] = append(rowIDSets, rowIDs) + rowIDSets[curPos] = rowIDs case []uint64: if len(val) == 0 { continue } rowIDSets, ok := b.rowIDSets[field.Name()] if !ok { - rowIDSets = make([][]uint64, len(b.ids)-1, cap(b.ids)) - } else { - rowIDSets = rowIDSets[:len(b.ids)] // grow this field's rowIDSets if necessary + rowIDSets = make([][]uint64, cap(b.ids)) } rowIDSets[len(b.ids)-1] = val // TODO do we need to copy val here? b.rowIDSets[field.Name()] = rowIDSets diff --git a/gpexp/importbatch_test.go b/gpexp/importbatch_test.go index 667316a..a81b0f1 100644 --- a/gpexp/importbatch_test.go +++ b/gpexp/importbatch_test.go @@ -89,12 +89,31 @@ func TestStringSliceEmptyAndNil(t *testing.T) { } }() - b, err := NewBatch(client, 5, idx, fields) + // first create a batch and test adding a single value with empty + // string - this failed with a translation error at one point, and + // how we catch it and treat it like a nil. + b, err := NewBatch(client, 2, idx, fields) if err != nil { t.Fatalf("creating new batch: %v", err) } - r := Row{Values: make([]interface{}, len(fields))} + r.ID = uint64(1) + r.Values[0] = "" + err = b.Add(r) + if err != nil { + t.Fatalf("adding: %v", err) + } + err = b.Import() + if err != nil { + t.Fatalf("importing: %v", err) + } + + // now create a batch and add a mixture of string slice values + b, err = NewBatch(client, 6, idx, fields) + if err != nil { + t.Fatalf("creating new batch: %v", err) + } + r = Row{Values: make([]interface{}, len(fields))} r.ID = uint64(0) r.Values[0] = []string{"a"} err = b.Add(r) @@ -110,17 +129,24 @@ func TestStringSliceEmptyAndNil(t *testing.T) { } r.ID = uint64(2) - r.Values[0] = []uint64{0, 1, 2, 222} + r.Values[0] = []uint64{0, 1, 222} err = b.Add(r) if err != nil { - t.Fatalf("adding batch with nil stringslice to r: %v", err) + t.Fatalf("adding batch with idslice to r: %v", err) } r.ID = uint64(3) r.Values[0] = []string{"b", "c"} err = b.Add(r) if err != nil { - t.Fatalf("adding batch with nil stringslice to r: %v", err) + t.Fatalf("adding batch with stringslice to r: %v", err) + } + + r.ID = uint64(4) + r.Values[0] = []string{} + err = b.Add(r) + if err != nil { + t.Fatalf("adding batch with stringslice to r: %v", err) } err = b.Import() @@ -128,13 +154,13 @@ func TestStringSliceEmptyAndNil(t *testing.T) { t.Fatalf("importing: %v", err) } - rows := []interface{}{0, 1, 2, 222, "c"} + rows := []interface{}{0, "a", "b", "c", 222} resp, err := client.Query(idx.BatchQuery(fields[0].Row(rows[0]), fields[0].Row(rows[1]), fields[0].Row(rows[2]), fields[0].Row(rows[3]), fields[0].Row(rows[4]))) if err != nil { t.Fatalf("querying: %v", err) } - expectations := [][]uint64{{2}, {0, 2}, {2, 3}, {2}, {3}} + expectations := [][]uint64{{2}, {0, 2}, {3}, {3}, {2}} for i, re := range resp.Results() { if !reflect.DeepEqual(re.Row().Columns, expectations[i]) { t.Errorf("expected row %v to have columns %v, but got %v", rows[i], expectations[i], re.Row().Columns) From 3bbfdfea1c2c5eb9cd477b7cb60d2ebabc1b565f Mon Sep 17 00:00:00 2001 From: Matt Jaffee Date: Sun, 1 Dec 2019 19:44:40 -0600 Subject: [PATCH 10/13] fix some stringarray bugs in importbatch --- gpexp/importbatch.go | 36 ++++++---- gpexp/importbatch_test.go | 134 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 159 insertions(+), 11 deletions(-) diff --git a/gpexp/importbatch.go b/gpexp/importbatch.go index 1c69993..93b7a77 100644 --- a/gpexp/importbatch.go +++ b/gpexp/importbatch.go @@ -387,9 +387,12 @@ func (b *Batch) Add(rec Row) error { } rowIDSets, ok := b.rowIDSets[field.Name()] if !ok { - rowIDSets = make([][]uint64, cap(b.ids)) + rowIDSets = make([][]uint64, len(b.ids)-1, cap(b.ids)) b.rowIDSets[field.Name()] = rowIDSets } + for len(rowIDSets) < len(b.ids)-1 { + rowIDSets = append(rowIDSets, nil) // nil extend + } rowIDs := make([]uint64, 0, len(val)) for _, k := range val { @@ -409,17 +412,19 @@ func (b *Batch) Add(rec Row) error { b.toTranslateSets[field.Name()][k] = ints } } - rowIDSets[curPos] = rowIDs + b.rowIDSets[field.Name()] = append(rowIDSets, rowIDs) case []uint64: if len(val) == 0 { continue } rowIDSets, ok := b.rowIDSets[field.Name()] if !ok { - rowIDSets = make([][]uint64, cap(b.ids)) + rowIDSets = make([][]uint64, len(b.ids)-1, cap(b.ids)) + } + for len(rowIDSets) < len(b.ids)-1 { + rowIDSets = append(rowIDSets, nil) // nil extend } - rowIDSets[len(b.ids)-1] = val // TODO do we need to copy val here? - b.rowIDSets[field.Name()] = rowIDSets + b.rowIDSets[field.Name()] = append(rowIDSets, val) case nil: if field.Opts().Type() == pilosa.FieldTypeInt || field.Opts().Type() == pilosa.FieldTypeDecimal { b.values[field.Name()] = append(b.values[field.Name()], 0) @@ -611,6 +616,8 @@ func (b *Batch) doTranslation() error { return errors.Wrap(err, "adding rows to cache") } rowIDSets := b.rowIDSets[fieldName] + rowIDSets = rowIDSets[:cap(b.ids)] + b.rowIDSets[fieldName] = rowIDSets for j, key := range keys { rowID := ids[j] for _, recordIdx := range tt[key] { @@ -729,6 +736,12 @@ func (b *Batch) makeFragments() (frags fragments, clearFrags fragments, err erro for fname, rowIDSets := range b.rowIDSets { if len(rowIDSets) == 0 { continue + } else if len(rowIDSets) < len(b.ids) { + // rowIDSets is guaranteed to have capacity == to b.ids, + // but if the last record had a nil for this field, it + // might not have the same length, so we re-slice it to + // ensure the lengths are the same. + rowIDSets = rowIDSets[:len(b.ids)] } field := b.headerMap[fname] opts := field.Opts() @@ -912,19 +925,20 @@ func (b *Batch) reset() { b.ids = b.ids[:0] b.times = b.times[:0] for i, rowIDs := range b.rowIDs { - fieldName := b.header[i].Name() b.rowIDs[i] = rowIDs[:0] - rowIDSet := b.rowIDSets[fieldName] - b.rowIDSets[fieldName] = rowIDSet[:0] m := b.toTranslate[i] for k := range m { delete(m, k) // TODO pool these slices } - m = b.toTranslateSets[fieldName] - for k := range m { - delete(m, k) + } + for _, tts := range b.toTranslateSets { + for k := range tts { + delete(tts, k) } } + for field, rowIDSet := range b.rowIDSets { + b.rowIDSets[field] = rowIDSet[:0] + } for _, rowIDs := range b.clearRowIDs { for k := range rowIDs { delete(rowIDs, k) diff --git a/gpexp/importbatch_test.go b/gpexp/importbatch_test.go index a81b0f1..f59134a 100644 --- a/gpexp/importbatch_test.go +++ b/gpexp/importbatch_test.go @@ -2,6 +2,7 @@ package gpexp import ( "reflect" + "sort" "strconv" "testing" "time" @@ -12,6 +13,139 @@ import ( // TODO test against cluster +func TestStringSliceCombos(t *testing.T) { + client := pilosa.DefaultClient() + schema := pilosa.NewSchema() + idx := schema.Index("test-string-slicecombos") + fields := make([]*pilosa.Field, 1) + fields[0] = idx.Field("a1", pilosa.OptFieldKeys(true), pilosa.OptFieldTypeSet(pilosa.CacheTypeRanked, 100)) + err := client.SyncSchema(schema) + if err != nil { + t.Fatalf("syncing schema: %v", err) + } + defer func() { + err := client.DeleteIndex(idx) + if err != nil { + t.Logf("problem cleaning up from test: %v", err) + } + }() + + b, err := NewBatch(client, 5, idx, fields) + if err != nil { + t.Fatalf("creating new batch: %v", err) + } + + records := []Row{ + {ID: uint64(0), Values: []interface{}{[]string{"a", "b", "c"}}}, + {ID: uint64(1), Values: []interface{}{[]string{"z"}}}, + {ID: uint64(2), Values: []interface{}{[]string{}}}, + {ID: uint64(3), Values: []interface{}{[]string{"q", "r", "s", "t", "c"}}}, + {ID: uint64(4), Values: []interface{}{nil}}, + {ID: uint64(5), Values: []interface{}{[]string{"a", "b", "c"}}}, + {ID: uint64(6), Values: []interface{}{[]string{"a", "b", "c"}}}, + {ID: uint64(7), Values: []interface{}{[]string{"z"}}}, + {ID: uint64(8), Values: []interface{}{[]string{}}}, + {ID: uint64(9), Values: []interface{}{[]string{"q", "r", "s", "t"}}}, + {ID: uint64(10), Values: []interface{}{nil}}, + {ID: uint64(11), Values: []interface{}{[]string{"a", "b", "c"}}}, + } + + err = ingestRecords(records, b) + if err != nil { + t.Fatalf("importing: %v", err) + } + + a1 := fields[0] + + result := tq(t, client, a1.TopN(10)) + rez := sortableCRI(result.CountItems()) + sort.Sort(rez) + exp := sortableCRI{ + {Key: "a", Count: 4}, + {Key: "b", Count: 4}, + {Key: "c", Count: 5}, + {Key: "q", Count: 2}, + {Key: "r", Count: 2}, + {Key: "s", Count: 2}, + {Key: "t", Count: 2}, + {Key: "z", Count: 2}, + } + sort.Sort(exp) + errorIfNotEqual(t, exp, rez) + + result = tq(t, client, a1.Row("a")) + errorIfNotEqual(t, result.Row().Columns, []uint64{0, 5, 6, 11}) + result = tq(t, client, a1.Row("b")) + errorIfNotEqual(t, result.Row().Columns, []uint64{0, 5, 6, 11}) + result = tq(t, client, a1.Row("c")) + errorIfNotEqual(t, result.Row().Columns, []uint64{0, 3, 5, 6, 11}) + result = tq(t, client, a1.Row("z")) + errorIfNotEqual(t, result.Row().Columns, []uint64{1, 7}) + result = tq(t, client, a1.Row("q")) + errorIfNotEqual(t, result.Row().Columns, []uint64{3, 9}) + result = tq(t, client, a1.Row("r")) + errorIfNotEqual(t, result.Row().Columns, []uint64{3, 9}) + result = tq(t, client, a1.Row("s")) + errorIfNotEqual(t, result.Row().Columns, []uint64{3, 9}) + result = tq(t, client, a1.Row("t")) + errorIfNotEqual(t, result.Row().Columns, []uint64{3, 9}) +} + +func errorIfNotEqual(t *testing.T, exp, got interface{}) { + t.Helper() + if !reflect.DeepEqual(exp, got) { + t.Errorf("unequal exp/got:\n%v\n%v", exp, got) + } +} + +type sortableCRI []pilosa.CountResultItem + +func (s sortableCRI) Len() int { return len(s) } +func (s sortableCRI) Less(i, j int) bool { + if s[i].Count != s[j].Count { + return s[i].Count > s[j].Count + } + if s[i].ID != s[j].ID { + return s[i].ID < s[j].ID + } + if s[i].Key != s[j].Key { + return s[i].Key < s[j].Key + } + return true +} +func (s sortableCRI) Swap(i, j int) { + s[i], s[j] = s[j], s[i] +} + +func tq(t *testing.T, client *pilosa.Client, query pilosa.PQLQuery) pilosa.QueryResult { + resp, err := client.Query(query) + if err != nil { + t.Fatalf("querying: %v", err) + } + return resp.Results()[0] +} + +func ingestRecords(records []Row, batch *Batch) error { + for _, rec := range records { + err := batch.Add(rec) + if err == ErrBatchNowFull { + err = batch.Import() + if err != nil { + return errors.Wrap(err, "importing batch") + } + } else if err != nil { + return errors.Wrap(err, "while adding record") + } + } + if batch.Len() > 0 { + err := batch.Import() + if err != nil { + return errors.Wrap(err, "importing batch") + } + } + return nil +} + func TestImportBatchInts(t *testing.T) { client := pilosa.DefaultClient() schema := pilosa.NewSchema() From 0b5e4ca35faf175ab6664a8e09462ed026b3fa4d Mon Sep 17 00:00:00 2001 From: Matt Jaffee Date: Wed, 11 Dec 2019 15:18:08 -0600 Subject: [PATCH 11/13] grab some forgotten fixes from old ingest-support branch --- client.go | 13 +++++++------ gpexp/importbatch.go | 5 ++++- 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/client.go b/client.go index b4dc997..970cac5 100644 --- a/client.go +++ b/client.go @@ -1042,9 +1042,9 @@ func (c *Client) httpRequest(method string, path string, data []byte, headers ma var response *http.Response var err error for i := 0; i < maxHosts; i++ { - host, err := c.host(useCoordinator) - if err != nil { - return nil, nil, err + host, herr := c.host(useCoordinator) + if herr != nil { + return nil, nil, errors.Wrapf(herr, "getting host, previous err: %v", err) } response, err = c.doRequest(host, method, path, c.augmentHeaders(headers), data) if err == nil { @@ -1056,6 +1056,7 @@ func (c *Client) httpRequest(method string, path string, data []byte, headers ma c.coordinatorURI = nil c.coordinatorLock.Unlock() } else { + c.logger.Printf("removing host due to '%v'\n", err) c.cluster.RemoveHost(host) } } @@ -1063,7 +1064,7 @@ func (c *Client) httpRequest(method string, path string, data []byte, headers ma time.Sleep(1 * time.Second) } if response == nil { - return nil, nil, ErrTriedMaxHosts + return nil, nil, errors.Wrap(err, ErrTriedMaxHosts.Error()) } defer response.Body.Close() warning := response.Header.Get("warning") @@ -1693,10 +1694,10 @@ func (co *ClientOptions) withDefaults() (updated *ClientOptions) { updated.ConnectTimeout = time.Second * 60 } if updated.PoolSizePerRoute <= 0 { - updated.PoolSizePerRoute = 10 + updated.PoolSizePerRoute = 50 } if updated.TotalPoolSize <= 0 { - updated.TotalPoolSize = 100 + updated.TotalPoolSize = 500 } if updated.TLSConfig == nil { updated.TLSConfig = &tls.Config{} diff --git a/gpexp/importbatch.go b/gpexp/importbatch.go index 93b7a77..79a9fea 100644 --- a/gpexp/importbatch.go +++ b/gpexp/importbatch.go @@ -306,6 +306,9 @@ func (b *Batch) Add(rec Row) error { } handleStringID := func(rid string) error { + if rid == "" { + return errors.Errorf("record identifier cannot be an empty string") + } if colID, ok, err := b.transCache.GetCol(b.index.Name(), rid); err != nil { return errors.Wrap(err, "translating column") } else if ok { @@ -610,7 +613,7 @@ func (b *Batch) doTranslation() error { // translate keys from Pilosa ids, err := b.client.TranslateRowKeys(b.headerMap[fieldName], keys) if err != nil { - return errors.Wrap(err, "translating row keys") + return errors.Wrap(err, "translating row keys (sets)") } if err := b.transCache.AddRows(b.index.Name(), fieldName, keys, ids); err != nil { return errors.Wrap(err, "adding rows to cache") From c35a391742ca82d74247d4f7267a0d7c9653ed79 Mon Sep 17 00:00:00 2001 From: Matt Jaffee Date: Wed, 11 Dec 2019 15:22:30 -0600 Subject: [PATCH 12/13] add egpool to limit number of live goroutines in errgroup use in importbatch --- egpool/egpool.go | 60 +++++++++++++++++++++++++++++++++++++++++++ egpool/egpool_test.go | 36 ++++++++++++++++++++++++++ gpexp/importbatch.go | 8 +++--- 3 files changed, 100 insertions(+), 4 deletions(-) create mode 100644 egpool/egpool.go create mode 100644 egpool/egpool_test.go diff --git a/egpool/egpool.go b/egpool/egpool.go new file mode 100644 index 0000000..1279aea --- /dev/null +++ b/egpool/egpool.go @@ -0,0 +1,60 @@ +package egpool + +import ( + "sync" + + "golang.org/x/sync/errgroup" +) + +type Group struct { + PoolSize int + + jobs chan func() error + + errMu sync.Mutex + firstErr error + poolEG errgroup.Group +} + +func (eg *Group) Go(f func() error) { + if eg.PoolSize == 0 { + eg.PoolSize = 1 + } + + if eg.jobs == nil { + eg.jobs = make(chan func() error, eg.PoolSize*2) + eg.startProcessJobsPool() + } + + eg.jobs <- f +} + +func (eg *Group) startProcessJobsPool() { + eg.poolEG = errgroup.Group{} + for i := 0; i < eg.PoolSize; i++ { + eg.poolEG.Go(eg.processJobs) + } +} + +func (eg *Group) processJobs() error { + for jobFn := range eg.jobs { + err := jobFn() + if err != nil { + eg.errMu.Lock() + if eg.firstErr == nil { + eg.firstErr = err + } + eg.errMu.Unlock() + } + } + return nil +} + +func (eg *Group) Wait() error { + if eg.jobs == nil { + return nil + } + close(eg.jobs) + _ = eg.poolEG.Wait() // never returns err + return eg.firstErr +} diff --git a/egpool/egpool_test.go b/egpool/egpool_test.go new file mode 100644 index 0000000..c54d4f7 --- /dev/null +++ b/egpool/egpool_test.go @@ -0,0 +1,36 @@ +package egpool_test + +import ( + "errors" + "testing" + + "github.com/pilosa/go-pilosa/egpool" +) + +func TestEGPool(t *testing.T) { + eg := egpool.Group{} + + a := make([]int, 10) + + for i := 0; i < 10; i++ { + i := i + eg.Go(func() error { + a[i] = i + if i == 7 { + return errors.New("blah") + } + return nil + }) + } + + err := eg.Wait() + if err == nil || err.Error() != "blah" { + t.Errorf("expected err blah, got: %v", err) + } + + for i := 0; i < 10; i++ { + if a[i] != i { + t.Errorf("expected a[%d] to be %d, but is %d", i, i, a[i]) + } + } +} diff --git a/gpexp/importbatch.go b/gpexp/importbatch.go index 79a9fea..2cb54d0 100644 --- a/gpexp/importbatch.go +++ b/gpexp/importbatch.go @@ -4,9 +4,9 @@ import ( "time" "github.com/pilosa/go-pilosa" + "github.com/pilosa/go-pilosa/egpool" "github.com/pilosa/pilosa/roaring" "github.com/pkg/errors" - "golang.org/x/sync/errgroup" ) // TODO if using column translation, column ids might get way out of @@ -633,7 +633,7 @@ func (b *Batch) doTranslation() error { } func (b *Batch) doImport() error { - eg := errgroup.Group{} + eg := egpool.Group{PoolSize: 50} frags, clearFrags, err := b.makeFragments() if err != nil { @@ -792,7 +792,7 @@ func (b *Batch) importValueData() error { if shardWidth == 0 { shardWidth = pilosa.DefaultShardWidth } - eg := errgroup.Group{} + eg := egpool.Group{PoolSize: 50} ids := make([]uint64, len(b.ids)) for field, values := range b.values { // grow our temp ids slice to full length @@ -864,7 +864,7 @@ func (b *Batch) importMutexData() error { if shardWidth == 0 { shardWidth = pilosa.DefaultShardWidth } - eg := errgroup.Group{} + eg := egpool.Group{PoolSize: 50} ids := make([]uint64, 0, len(b.ids)) for findex, rowIDs := range b.rowIDs { From 7e2a5449773848b6a832ae575f1327b04414711e Mon Sep 17 00:00:00 2001 From: Matt Jaffee Date: Thu, 12 Dec 2019 15:38:01 -0600 Subject: [PATCH 13/13] fix single batch clear bug, add logging, change record ID translation a lot of this logging should probably be tracing the toTranslateID change became necessary because although IDs were coming in roughly sorted, they were getting randomized by the map. This was fine until the translated IDs crossed a slice boundary, and then 1000s of requests were being generated for a single batch as the value importer flipped back and forth between slices. the bug fix is explained in a comment, but basically makeFragments iterated over b.rowIDs and assumed that anything in clearRowIDs would also be in rowIDs. In this particular case, nothing had yet been added to rowIDs, so the clear was skipped. --- gpexp/importbatch.go | 78 +++++++++++++++++++++++++-------------- gpexp/importbatch_test.go | 68 ++++++++++++++++++++++++++-------- 2 files changed, 104 insertions(+), 42 deletions(-) diff --git a/gpexp/importbatch.go b/gpexp/importbatch.go index 2cb54d0..f5ee7f9 100644 --- a/gpexp/importbatch.go +++ b/gpexp/importbatch.go @@ -5,6 +5,7 @@ import ( "github.com/pilosa/go-pilosa" "github.com/pilosa/go-pilosa/egpool" + "github.com/pilosa/pilosa/logger" "github.com/pilosa/pilosa/roaring" "github.com/pkg/errors" ) @@ -102,16 +103,13 @@ type Batch struct { // those keys map to. toTranslateSets map[string]map[string][]int - // for string ids which we weren't able to immediately translate, - // keep a map of which record(s) each string id maps to. - // - // TODO: - // this is probably super inefficient in the (common) case where - // each record has a different string ID. In that case, a simple - // slice of strings would probably work better. - toTranslateID map[string][]int + // toTranslateID maps each string key to a record index - this + // will get translated into Batch.rowIDs + toTranslateID []string transCache Translator + + log logger.Logger } func (b *Batch) Len() int { return len(b.ids) } @@ -128,6 +126,13 @@ func OptTranslator(t Translator) BatchOption { } } +func OptLogger(l logger.Logger) BatchOption { + return func(b *Batch) error { + b.log = l + return nil + } +} + // NewBatch initializes a new Batch object which will use the given // Pilosa client, index, set of fields, and will take "size" records // before returning ErrBatchNowFull. The positions of the Fields in @@ -180,8 +185,9 @@ func NewBatch(client *pilosa.Client, size int, index *pilosa.Index, fields []*pi toTranslate: tt, toTranslateClear: make(map[int]map[string][]int), toTranslateSets: ttSets, - toTranslateID: make(map[string][]int), transCache: NewMapTranslator(), + + log: logger.NopLogger, } if hasTime { b.times = make([]QuantizedTime, 0, size) @@ -314,12 +320,10 @@ func (b *Batch) Add(rec Row) error { } else if ok { b.ids = append(b.ids, colID) } else { - ints, ok := b.toTranslateID[rid] - if !ok { - ints = make([]int, 0) + if b.toTranslateID == nil { + b.toTranslateID = make([]string, cap(b.ids)) } - ints = append(ints, len(b.ids)) - b.toTranslateID[rid] = ints + b.toTranslateID[len(b.ids)] = rid b.ids = append(b.ids, 0) } return nil @@ -484,6 +488,13 @@ func (b *Batch) Add(rec Row) error { default: return errors.Errorf("Clearing a value '%v' Type %[1]T is not currently supported (field '%s')", val, field.Name()) } + // nil extend b.rowIDs so we don't run into a horrible bug + // where we skip doing clears because b.rowIDs doesn't have a + // value for htis field + for len(b.rowIDs[i]) <= curPos { + b.rowIDs[i] = append(b.rowIDs[i], nilSentinel) + } + } if len(b.ids) == cap(b.ids) { @@ -524,29 +535,29 @@ func (b *Batch) Import() error { } func (b *Batch) doTranslation() error { - var keys []string + keys := make([]string, 0) + var indexes []int // translate column keys if there are any - if len(b.toTranslateID) > 0 { - keys = make([]string, 0, len(b.toTranslateID)) - for k := range b.toTranslateID { + for i, k := range b.toTranslateID { + if k != "" { keys = append(keys, k) + indexes = append(indexes, i) } + } + if len(keys) > 0 { + start := time.Now() ids, err := b.client.TranslateColumnKeys(b.index, keys) if err != nil { return errors.Wrap(err, "translating col keys") } + b.log.Debugf("translating %d column keys took %v", len(keys), time.Since(start)) if err := b.transCache.AddCols(b.index.Name(), keys, ids); err != nil { return errors.Wrap(err, "adding cols to cache") } - for j, key := range keys { - id := ids[j] - for _, recordIdx := range b.toTranslateID[key] { - b.ids[recordIdx] = id - } + for j, id := range ids { + b.ids[indexes[j]] = id } - } else { - keys = make([]string, 0) } // translate row keys @@ -572,10 +583,12 @@ func (b *Batch) doTranslation() error { } // translate keys from Pilosa + start := time.Now() ids, err := b.client.TranslateRowKeys(b.headerMap[fieldName], keys) if err != nil { return errors.Wrap(err, "translating row keys") } + b.log.Debugf("translating %d row keys for %s took %v", len(keys), fieldName, time.Since(start)) if err := b.transCache.AddRows(b.index.Name(), fieldName, keys, ids); err != nil { return errors.Wrap(err, "adding rows to cache") } @@ -611,10 +624,12 @@ func (b *Batch) doTranslation() error { continue } // translate keys from Pilosa + start := time.Now() ids, err := b.client.TranslateRowKeys(b.headerMap[fieldName], keys) if err != nil { return errors.Wrap(err, "translating row keys (sets)") } + b.log.Debugf("translating %d row keys(sets) for %s took %v", len(keys), fieldName, time.Since(start)) if err := b.transCache.AddRows(b.index.Name(), fieldName, keys, ids); err != nil { return errors.Wrap(err, "adding rows to cache") } @@ -645,15 +660,20 @@ func (b *Batch) doImport() error { field := field viewMap := viewMap shard := shard + eg.Go(func() error { clearViewMap := clearFrags.GetViewMap(shard, field) if len(clearViewMap) > 0 { + start := time.Now() err := b.client.ImportRoaringBitmap(b.index.Field(field), shard, clearViewMap, true) if err != nil { return errors.Wrapf(err, "import clearing clearing data for %s", field) } + b.log.Debugf("imp-roar-clr %s,shard:%d,views:%d %v", field, shard, len(clearViewMap), time.Since(start)) } + start := time.Now() err := b.client.ImportRoaringBitmap(b.index.Field(field), shard, viewMap, false) + b.log.Debugf("imp-roar %s,shard:%d,views:%d %v", field, shard, len(clearViewMap), time.Since(start)) return errors.Wrapf(err, "importing data for %s", field) }) } @@ -845,7 +865,9 @@ func (b *Batch) importValueData() error { return errors.Wrap(err, "encoding import values") } eg.Go(func() error { + start := time.Now() err := b.client.DoImportValues(b.index.Name(), shard, path, data) + b.log.Debugf("imp-vals %s,shard:%d,data:%d %v", field, shard, len(data), time.Since(start)) return errors.Wrapf(err, "importing values for %s", field) }) startIdx = i @@ -910,7 +932,9 @@ func (b *Batch) importMutexData() error { return errors.Wrap(err, "encoding mutex import") } eg.Go(func() error { + start := time.Now() err := b.client.DoImport(b.index.Name(), shard, path, data) + b.log.Debugf("imp-vals %s,shard:%d,data:%d %v", field, shard, len(data), time.Since(start)) return errors.Wrapf(err, "importing values for %s", field) }) startIdx = i @@ -952,8 +976,8 @@ func (b *Batch) reset() { delete(clearMap, k) } } - for k := range b.toTranslateID { - delete(b.toTranslateID, k) // TODO pool these slices + for i := range b.toTranslateID { + b.toTranslateID[i] = "" } for k := range b.values { delete(b.values, k) // TODO pool these slices diff --git a/gpexp/importbatch_test.go b/gpexp/importbatch_test.go index f59134a..d6fab8b 100644 --- a/gpexp/importbatch_test.go +++ b/gpexp/importbatch_test.go @@ -294,6 +294,7 @@ func TestStringSliceEmptyAndNil(t *testing.T) { t.Fatalf("querying: %v", err) } + // TODO test is flaky because we can't guarantee what a,b,c map to expectations := [][]uint64{{2}, {0, 2}, {3}, {3}, {2}} for i, re := range resp.Results() { if !reflect.DeepEqual(re.Row().Columns, expectations[i]) { @@ -409,6 +410,55 @@ func TestStringSlice(t *testing.T) { } +func TestSingleClearBatchRegression(t *testing.T) { + client := pilosa.DefaultClient() + schema := pilosa.NewSchema() + idx := schema.Index("gopilosatest-blah") + numFields := 1 + fields := make([]*pilosa.Field, numFields) + fields[0] = idx.Field("zero", pilosa.OptFieldKeys(true)) + + err := client.SyncSchema(schema) + if err != nil { + t.Fatalf("syncing schema: %v", err) + } + defer func() { + err := client.DeleteIndex(idx) + if err != nil { + t.Logf("problem cleaning up from test: %v", err) + } + }() + + _, err = client.Query(fields[0].Set("row1", 1)) + if err != nil { + t.Fatalf("setting bit: %v", err) + } + + b, err := NewBatch(client, 1, idx, fields) + if err != nil { + t.Fatalf("getting new batch: %v", err) + } + r := Row{ID: uint64(1), Values: make([]interface{}, numFields), Clears: make(map[int]interface{})} + r.Values[0] = nil + r.Clears[0] = "row1" + err = b.Add(r) + if err != ErrBatchNowFull { + t.Fatalf("wrong error from batch add: %v", err) + } + + err = b.Import() + if err != nil { + t.Fatalf("error importing: %v", err) + } + + resp, err := client.Query(fields[0].Row("row1")) + result := resp.Results()[0].Row().Columns + if len(result) != 0 { + t.Fatalf("unexpected values in row: result %+v", result) + } + +} + func TestBatches(t *testing.T) { client := pilosa.DefaultClient() schema := pilosa.NewSchema() @@ -847,21 +897,9 @@ func TestBatchesStringIDs(t *testing.T) { if len(b.toTranslateID) != 3 { t.Fatalf("id translation table unexpected size: %v", b.toTranslateID) } - for k, indexes := range b.toTranslateID { - if k == "0" { - if !reflect.DeepEqual(indexes, []int{0}) { - t.Fatalf("unexpected result k: %s, indexes: %v", k, indexes) - } - } - if k == "1" { - if !reflect.DeepEqual(indexes, []int{1}) { - t.Fatalf("unexpected result k: %s, indexes: %v", k, indexes) - } - } - if k == "2" { - if !reflect.DeepEqual(indexes, []int{2}) { - t.Fatalf("unexpected result k: %s, indexes: %v", k, indexes) - } + for i, k := range b.toTranslateID { + if ik, err := strconv.Atoi(k); err != nil || ik != i { + t.Errorf("unexpected toTranslateID key %s at index %d", k, i) } }