Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ CHANGES:
* Add configurable pause before retrying r.Route in Router.Call method.
* Add ability to set custom dialer in InstaceInfo.
* Router.Call: retry on VShardErrNameTransferIsInProgress error as in the `vshard` module (#75).
* Work with the same routeMap and nameToReplicasetRef within one method (resolve issue #14).

BUG FIXES:
* Router.bucketSearchBatched: do not flush out routeMap (#79).
Expand Down
20 changes: 12 additions & 8 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,13 +249,13 @@ func (r *Router) Call(ctx context.Context, bucketID uint64, mode CallMode,
// poolMode, vshardMode = pool.PreferRO, ReadMode
// since go-tarantool always use balance=true politic,
// we can't support this case until: https://github.com/tarantool/go-tarantool/issues/400
return VshardRouterCallResp{}, fmt.Errorf("mode VshardCallModeRE is not supported yet")
return VshardRouterCallResp{}, fmt.Errorf("mode CallModeRE is not supported yet")
case CallModeBRO:
poolMode, vshardMode = pool.ANY, ReadMode
case CallModeBRE:
poolMode, vshardMode = pool.PreferRO, ReadMode
default:
return VshardRouterCallResp{}, fmt.Errorf("unknown VshardCallMode(%d)", mode)
return VshardRouterCallResp{}, fmt.Errorf("unknown CallMode(%d)", mode)
}

timeout := callTimeoutDefault
Expand All @@ -277,6 +277,9 @@ func (r *Router) Call(ctx context.Context, bucketID uint64, mode CallMode,

requestStartTime := time.Now()

nameToReplicasetRef := r.getNameToReplicaset()
routerMap := r.getRouteMap()

var err error

for {
Expand All @@ -293,7 +296,7 @@ func (r *Router) Call(ctx context.Context, bucketID uint64, mode CallMode,

var rs *Replicaset

rs, err = r.Route(ctx, bucketID)
rs, err = r.route(ctx, nameToReplicasetRef, routerMap, bucketID)
if err != nil {
r.metrics().RetryOnCall("bucket_resolve_error")

Expand Down Expand Up @@ -330,14 +333,12 @@ func (r *Router) Call(ctx context.Context, bucketID uint64, mode CallMode,
switch vshardError.Name {
case VShardErrNameWrongBucket, VShardErrNameBucketIsLocked, VShardErrNameTransferIsInProgress:
// We reproduce here behavior in https://github.com/tarantool/vshard/blob/0.1.34/vshard/router/init.lua#L667
r.BucketReset(bucketID)
r.bucketReset(routerMap, bucketID)

destination := vshardError.Destination
if destination != "" {
var loggedOnce bool
for {
nameToReplicasetRef := r.getNameToReplicaset()

// In some cases destination contains UUID (prior to tnt 3.x), in some cases it contains replicaset name.
// So, at this point we don't know what destination is: a name or an UUID.
// But we need a name to access values in nameToReplicasetRef map, so let's find it out.
Expand All @@ -359,11 +360,11 @@ func (r *Router) Call(ctx context.Context, bucketID uint64, mode CallMode,
}

if destinationExists {
_, err := r.BucketSet(bucketID, destinationName)
_, err := r.bucketSet(nameToReplicasetRef, routerMap, bucketID, destinationName)
if err == nil {
break // breaks loop
}
r.log().Warnf(ctx, "Failed set bucket %d to %v (possible race): %v", bucketID, destinationName, err)
r.log().Warnf(ctx, "Failed set bucket %d to %v (this should not happen): %v", bucketID, destinationName, err)
}

if !loggedOnce {
Expand All @@ -378,6 +379,9 @@ func (r *Router) Call(ctx context.Context, bucketID uint64, mode CallMode,
if spent := time.Since(requestStartTime); spent > timeout {
return VshardRouterCallResp{}, vshardError
}

// update nameToReplicasetRef explicitly before next try, the topology might changed.
nameToReplicasetRef = r.getNameToReplicaset()
}
}

Expand Down
28 changes: 16 additions & 12 deletions discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,21 +52,26 @@ func (r *Router) Route(ctx context.Context, bucketID uint64) (*Replicaset, error
}

routeMap := r.getRouteMap()
nameToReplicasetRef := r.getNameToReplicaset()

return r.route(ctx, nameToReplicasetRef, routeMap, bucketID)
}

func (r *Router) route(ctx context.Context, nameToReplicasetRef map[string]*Replicaset,
routeMap routeMap, bucketID uint64) (*Replicaset, error) {

rs := routeMap[bucketID].Load()
if rs != nil {
nameToReplicasetRef := r.getNameToReplicaset()

actualRs := nameToReplicasetRef[rs.info.Name]
switch {
case actualRs == nil:
// rs is outdated, can't use it -- let's discover bucket again
r.BucketReset(bucketID)
r.bucketReset(routeMap, bucketID)
case actualRs == rs:
return rs, nil
default: // actualRs != rs
// update rs -> actualRs for this bucket
_, _ = r.BucketSet(bucketID, actualRs.info.Name)
_, _ = r.bucketSet(nameToReplicasetRef, routeMap, bucketID, actualRs.info.Name)
return actualRs, nil
}
}
Expand All @@ -75,14 +80,14 @@ func (r *Router) Route(ctx context.Context, bucketID uint64) (*Replicaset, error
r.log().Infof(ctx, "Discovering bucket %d", bucketID)

if r.cfg.BucketsSearchMode == BucketsSearchLegacy {
return r.bucketSearchLegacy(ctx, bucketID)
return r.bucketSearchLegacy(ctx, nameToReplicasetRef, routeMap, bucketID)
}

return r.bucketSearchBatched(ctx, bucketID)
return r.bucketSearchBatched(ctx, nameToReplicasetRef, routeMap, bucketID)
}

func (r *Router) bucketSearchLegacy(ctx context.Context, bucketID uint64) (*Replicaset, error) {
nameToReplicasetRef := r.getNameToReplicaset()
func (r *Router) bucketSearchLegacy(ctx context.Context,
nameToReplicasetRef map[string]*Replicaset, routeMap routeMap, bucketID uint64) (*Replicaset, error) {

type rsFuture struct {
rsName string
Expand All @@ -109,7 +114,7 @@ func (r *Router) bucketSearchLegacy(ctx context.Context, bucketID uint64) (*Repl
}

// It's ok if several replicasets return ok to bucket_stat command for the same bucketID, just pick any of them.
rs, err := r.BucketSet(bucketID, rsFuture.rsName)
rs, err := r.bucketSet(nameToReplicasetRef, routeMap, bucketID, rsFuture.rsName)
if err != nil {
r.log().Errorf(ctx, "bucketSearchLegacy: can't set rsID %v for bucketID %d: %v", rsFuture.rsName, bucketID, err)
return nil, newVShardErrorNoRouteToBucket(bucketID)
Expand All @@ -133,9 +138,8 @@ func (r *Router) bucketSearchLegacy(ctx context.Context, bucketID uint64) (*Repl
// P.S. 1000 is a batch size in response of buckets_discovery, see:
// https://github.com/tarantool/vshard/blob/dfa2cc8a2aff221d5f421298851a9a229b2e0434/vshard/storage/init.lua#L1700
// https://github.com/tarantool/vshard/blob/dfa2cc8a2aff221d5f421298851a9a229b2e0434/vshard/consts.lua#L37
func (r *Router) bucketSearchBatched(ctx context.Context, bucketIDToFind uint64) (*Replicaset, error) {
nameToReplicasetRef := r.getNameToReplicaset()
routeMap := r.getRouteMap()
func (r *Router) bucketSearchBatched(ctx context.Context,
nameToReplicasetRef map[string]*Replicaset, routeMap routeMap, bucketIDToFind uint64) (*Replicaset, error) {

type rsFuture struct {
rs *Replicaset
Expand Down
16 changes: 14 additions & 2 deletions vshard.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,25 +238,37 @@ func NewRouter(ctx context.Context, cfg Config) (*Router, error) {

// BucketSet Set a bucket to a replicaset.
func (r *Router) BucketSet(bucketID uint64, rsName string) (*Replicaset, error) {
if bucketID < 1 || r.cfg.TotalBucketCount < bucketID {
return nil, fmt.Errorf("bucket id is out of range: %d (total %d)", bucketID, r.cfg.TotalBucketCount)
}

nameToReplicasetRef := r.getNameToReplicaset()
routeMap := r.getRouteMap()

return r.bucketSet(nameToReplicasetRef, routeMap, bucketID, rsName)
}

func (r *Router) bucketSet(nameToReplicasetRef map[string]*Replicaset, routeMap routeMap, bucketID uint64, rsName string) (*Replicaset, error) {
rs := nameToReplicasetRef[rsName]
if rs == nil {
return nil, newVShardErrorNoRouteToBucket(bucketID)
}

routeMap := r.getRouteMap()
routeMap[bucketID].Store(rs)

return rs, nil
}

func (r *Router) BucketReset(bucketID uint64) {
if bucketID > r.cfg.TotalBucketCount {
if bucketID < 1 || r.cfg.TotalBucketCount < bucketID {
return
}

routeMap := r.getRouteMap()
r.bucketReset(routeMap, bucketID)
}

func (r *Router) bucketReset(routeMap routeMap, bucketID uint64) {
routeMap[bucketID].Store(nil)
}

Expand Down
Loading