Skip to content
This repository was archived by the owner on Sep 28, 2022. It is now read-only.

Decimal mutex import breakout use row instead of range #260

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 76 additions & 27 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ func (c *Client) Close() error {
// see if it still matches, and if not it drops the whole cache.
func (c *Client) detectClusterChanges() {
c.shardNodes.mu.Lock()
needsUnlock := true
// we rely on Go's random map iteration order to get a random
// element. If it doesn't end up being random, it shouldn't
// actually matter.
Expand All @@ -142,6 +143,7 @@ func (c *Client) detectClusterChanges() {
delete(shardMap, shard)
c.shardNodes.data[index] = shardMap
c.shardNodes.mu.Unlock()
needsUnlock = false
newURIs, err := c.getURIsForShard(index, shard) // refetch URIs from server.
if err != nil {
c.logger.Printf("problem invalidating shard node cache: %v", err)
Expand All @@ -164,6 +166,9 @@ func (c *Client) detectClusterChanges() {
}
break
}
if needsUnlock {
c.shardNodes.mu.Unlock()
}
}

// DefaultClient creates a client with the default address and options.
Expand Down Expand Up @@ -725,6 +730,59 @@ func (c *Client) importValues(field *Field,
return errors.Wrap(err, "importing values to nodes")
}

// Import imports data for a single shard using the regular import
// endpoint rather than import-roaring. This is good for e.g. mutex or
// bool fields where import-roaring is not supported.
func (c *Client) Import(index, field string, shard uint64, vals, ids []uint64, clear bool) error {
path, data, err := c.EncodeImport(index, field, shard, vals, ids, clear)
if err != nil {
return errors.Wrap(err, "encoding import request")
}
err = c.DoImport(index, shard, path, data)
return errors.Wrap(err, "doing import")
}

// EncodeImport computes the HTTP path and payload for an import
// request. It is typically followed by a call to DoImport.
func (c *Client) EncodeImport(index, field string, shard uint64, vals, ids []uint64, clear bool) (path string, data []byte, err error) {
msg := &pbuf.ImportRequest{
Index: index,
Field: field,
Shard: shard,
RowIDs: vals,
ColumnIDs: ids,
}
data, err = proto.Marshal(msg)
if err != nil {
return "", nil, errors.Wrap(err, "marshaling Import to protobuf")
}
path = fmt.Sprintf("/index/%s/field/%s/import?clear=%s&ignoreKeyCheck=true", index, field, strconv.FormatBool(clear))
return path, data, nil
}

// DoImport takes a path and data payload (normally from EncodeImport
// or EncodeImportValues), logs the import, finds all nodes which own
// this shard, and concurrently imports to those nodes.
func (c *Client) DoImport(index string, shard uint64, path string, data []byte) error {
// TODO: figure out if the forwarding logic in Pilosa is the same
// for importing values as it is for importing mutex/bool. We may
// need to change that...
c.logImport(index, path, shard, false, data)

uris, err := c.getURIsForShard(index, shard)
if err != nil {
return errors.Wrap(err, "getting uris")
}

eg := errgroup.Group{}
for _, uri := range uris {
eg.Go(func() error {
return c.importData(uri, path, data)
})
}
return errors.Wrap(eg.Wait(), "importing to nodes")
}

// ImportValues takes the given integer values and column ids (which
// must all be in the given shard) and imports them into the given
// index,field,shard on all nodes which should hold that shard. It
Expand Down Expand Up @@ -756,30 +814,15 @@ func (c *Client) EncodeImportValues(index, field string, shard uint64, vals []in
}
data, err = proto.Marshal(msg)
if err != nil {
return "", nil, errors.Wrap(err, "marshaling to protobuf")
return "", nil, errors.Wrap(err, "marshaling ImportValue to protobuf")
}
path = fmt.Sprintf("/index/%s/field/%s/import?clear=%s&ignoreKeyCheck=true", index, field, strconv.FormatBool(clear))
return path, data, nil
}

// DoImportValues takes a path and data payload (normally from
// EncodeImportValues), logs the import, finds all nodes which own
// this shard, and concurrently imports to those nodes.
// DoImportValues is deprecated. Use DoImport.
func (c *Client) DoImportValues(index string, shard uint64, path string, data []byte) error {
c.logImport(index, path, shard, false, data)

uris, err := c.getURIsForShard(index, shard)
if err != nil {
return errors.Wrap(err, "getting uris")
}

eg := errgroup.Group{}
for _, uri := range uris {
eg.Go(func() error {
return c.importData(uri, path, data)
})
}
return errors.Wrap(eg.Wait(), "importing values to nodes")
return c.DoImport(index, shard, path, data)
}

func importPathData(field *Field, shard uint64, msg proto.Message, options *ImportOptions) (path string, data []byte, err error) {
Expand Down Expand Up @@ -999,9 +1042,9 @@ func (c *Client) httpRequest(method string, path string, data []byte, headers ma
var response *http.Response
var err error
for i := 0; i < maxHosts; i++ {
host, err := c.host(useCoordinator)
if err != nil {
return nil, nil, err
host, herr := c.host(useCoordinator)
if herr != nil {
return nil, nil, errors.Wrapf(herr, "getting host, previous err: %v", err)
}
response, err = c.doRequest(host, method, path, c.augmentHeaders(headers), data)
if err == nil {
Expand All @@ -1013,14 +1056,15 @@ func (c *Client) httpRequest(method string, path string, data []byte, headers ma
c.coordinatorURI = nil
c.coordinatorLock.Unlock()
} else {
c.logger.Printf("removing host due to '%v'\n", err)
c.cluster.RemoveHost(host)
}
}
// TODO: exponential backoff
time.Sleep(1 * time.Second)
}
if response == nil {
return nil, nil, ErrTriedMaxHosts
return nil, nil, errors.Wrap(err, ErrTriedMaxHosts.Error())
}
defer response.Body.Close()
warning := response.Header.Get("warning")
Expand Down Expand Up @@ -1189,18 +1233,21 @@ func (c *Client) TranslateRowKeys(field *Field, keys []string) ([]uint64, error)
Field: field.name,
Keys: keys,
}
return c.translateKeys(req, keys)
return c.translateKeys(req)
}

func (c *Client) TranslateColumnKeys(index *Index, keys []string) ([]uint64, error) {
req := &pbuf.TranslateKeysRequest{
Index: index.name,
Keys: keys,
}
return c.translateKeys(req, keys)
return c.translateKeys(req)
}

func (c *Client) translateKeys(req *pbuf.TranslateKeysRequest, keys []string) ([]uint64, error) {
func (c *Client) translateKeys(req *pbuf.TranslateKeysRequest) ([]uint64, error) {
if len(req.Keys) == 0 {
return []uint64{}, nil
}
reqData, err := proto.Marshal(req)
if err != nil {
return nil, errors.Wrap(err, "marshalling traslate keys request")
Expand Down Expand Up @@ -1647,10 +1694,10 @@ func (co *ClientOptions) withDefaults() (updated *ClientOptions) {
updated.ConnectTimeout = time.Second * 60
}
if updated.PoolSizePerRoute <= 0 {
updated.PoolSizePerRoute = 10
updated.PoolSizePerRoute = 50
}
if updated.TotalPoolSize <= 0 {
updated.TotalPoolSize = 100
updated.TotalPoolSize = 500
}
if updated.TLSConfig == nil {
updated.TLSConfig = &tls.Config{}
Expand Down Expand Up @@ -1940,6 +1987,7 @@ type SchemaOptions struct {
TimeQuantum string `json:"timeQuantum"`
Min int64 `json:"min"`
Max int64 `json:"max"`
Scale int64 `json:"scale"`
Keys bool `json:"keys"`
NoStandardView bool `json:"noStandardView"`
TrackExistence bool `json:"trackExistence"`
Expand All @@ -1962,6 +2010,7 @@ func (so SchemaOptions) asFieldOptions() *FieldOptions {
timeQuantum: TimeQuantum(so.TimeQuantum),
min: so.Min,
max: so.Max,
scale: so.Scale,
keys: so.Keys,
noStandardView: so.NoStandardView,
}
Expand Down
20 changes: 20 additions & 0 deletions client_it_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,26 @@ func TestOrmCount(t *testing.T) {
}
}

func TestDecimalField(t *testing.T) {
client := getClient()
defer client.Close()
decField := index.Field("a-decimal", OptFieldTypeDecimal(3))
err := client.EnsureField(decField)
if err != nil {
t.Fatal(err)
}

sch, err := client.Schema()
if err != nil {
t.Fatalf("getting schema: %v", err)
}
idx := sch.indexes["go-testindex"]
if opts := idx.Field("a-decimal").Options(); opts.scale != 3 {
t.Fatalf("scale should be 3, but: %v", opts)
}

}

func TestIntersectReturns(t *testing.T) {
client := getClient()
defer client.Close()
Expand Down
60 changes: 60 additions & 0 deletions egpool/egpool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package egpool

import (
"sync"

"golang.org/x/sync/errgroup"
)

type Group struct {
PoolSize int

jobs chan func() error

errMu sync.Mutex
firstErr error
poolEG errgroup.Group
}

func (eg *Group) Go(f func() error) {
if eg.PoolSize == 0 {
eg.PoolSize = 1
}

if eg.jobs == nil {
eg.jobs = make(chan func() error, eg.PoolSize*2)
eg.startProcessJobsPool()
}

eg.jobs <- f
}

func (eg *Group) startProcessJobsPool() {
eg.poolEG = errgroup.Group{}
for i := 0; i < eg.PoolSize; i++ {
eg.poolEG.Go(eg.processJobs)
}
}

func (eg *Group) processJobs() error {
for jobFn := range eg.jobs {
err := jobFn()
if err != nil {
eg.errMu.Lock()
if eg.firstErr == nil {
eg.firstErr = err
}
eg.errMu.Unlock()
}
}
return nil
}

func (eg *Group) Wait() error {
if eg.jobs == nil {
return nil
}
close(eg.jobs)
_ = eg.poolEG.Wait() // never returns err
return eg.firstErr
}
36 changes: 36 additions & 0 deletions egpool/egpool_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package egpool_test

import (
"errors"
"testing"

"github.com/pilosa/go-pilosa/egpool"
)

func TestEGPool(t *testing.T) {
eg := egpool.Group{}

a := make([]int, 10)

for i := 0; i < 10; i++ {
i := i
eg.Go(func() error {
a[i] = i
if i == 7 {
return errors.New("blah")
}
return nil
})
}

err := eg.Wait()
if err == nil || err.Error() != "blah" {
t.Errorf("expected err blah, got: %v", err)
}

for i := 0; i < 10; i++ {
if a[i] != i {
t.Errorf("expected a[%d] to be %d, but is %d", i, i, a[i])
}
}
}
Loading