Skip to content

Commit

Permalink
Add a way to know if DemotePrimary is blocked and send it in the heal…
Browse files Browse the repository at this point in the history
…th stream (#17289)

Signed-off-by: Manan Gupta <[email protected]>
  • Loading branch information
GuptaManan100 authored Dec 26, 2024
1 parent 83f2aab commit bd02b45
Show file tree
Hide file tree
Showing 8 changed files with 147 additions and 14 deletions.
3 changes: 3 additions & 0 deletions go/vt/vterrors/code.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ var (
VT09028 = errorWithState("VT09028", vtrpcpb.Code_FAILED_PRECONDITION, CTERecursiveForbiddenJoinOrder, "In recursive query block of Recursive Common Table Expression '%s', the recursive table must neither be in the right argument of a LEFT JOIN, nor be forced to be non-first with join order hints", "")
VT09029 = errorWithState("VT09029", vtrpcpb.Code_FAILED_PRECONDITION, CTERecursiveRequiresSingleReference, "In recursive query block of Recursive Common Table Expression %s, the recursive table must be referenced only once, and not in any subquery", "")
VT09030 = errorWithState("VT09030", vtrpcpb.Code_FAILED_PRECONDITION, CTEMaxRecursionDepth, "Recursive query aborted after 1000 iterations.", "")
VT09031 = errorWithoutState("VT09031", vtrpcpb.Code_FAILED_PRECONDITION, "Primary demotion is stalled", "")

VT10001 = errorWithoutState("VT10001", vtrpcpb.Code_ABORTED, "foreign key constraints are not allowed", "Foreign key constraints are not allowed, see https://vitess.io/blog/2021-06-15-online-ddl-why-no-fk/.")
VT10002 = errorWithoutState("VT10002", vtrpcpb.Code_ABORTED, "atomic distributed transaction not allowed: %s", "The distributed transaction cannot be committed. A rollback decision is taken.")
Expand Down Expand Up @@ -192,6 +193,8 @@ var (
VT09027,
VT09028,
VT09029,
VT09030,
VT09031,
VT10001,
VT10002,
VT12001,
Expand Down
19 changes: 19 additions & 0 deletions go/vt/vttablet/tabletmanager/rpc_replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package tabletmanager
import (
"context"
"fmt"
"runtime"
"strings"
"time"

Expand All @@ -29,6 +30,7 @@ import (
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/mysqlctl"
"vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vttablet/tabletserver"
Expand Down Expand Up @@ -524,6 +526,23 @@ func (tm *TabletManager) demotePrimary(ctx context.Context, revertPartialFailure
}
defer tm.unlock()

finishCtx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
select {
case <-finishCtx.Done():
// Finished running DemotePrimary. Nothing to do.
case <-time.After(10 * topo.RemoteOperationTimeout):
// We waited for over 10 times of remote operation timeout, but DemotePrimary is still not done.
// Collect more information and signal demote primary is indefinitely stalled.
log.Errorf("DemotePrimary seems to be stalled. Collecting more information.")
tm.QueryServiceControl.SetDemotePrimaryStalled()
buf := make([]byte, 1<<16) // 64 KB buffer size
stackSize := runtime.Stack(buf, true)
log.Errorf("Stack trace:\n%s", string(buf[:stackSize]))
}
}()

tablet := tm.Tablet()
wasPrimary := tablet.Type == topodatapb.TabletType_PRIMARY
wasServing := tm.QueryServiceControl.IsServing()
Expand Down
51 changes: 51 additions & 0 deletions go/vt/vttablet/tabletmanager/rpc_replication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,15 @@ package tabletmanager

import (
"context"
"sync/atomic"
"testing"
"time"

"github.com/stretchr/testify/require"
"golang.org/x/sync/semaphore"

"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/vttablet/tabletserver"
)

// TestWaitForGrantsToHaveApplied tests that waitForGrantsToHaveApplied only succeeds after waitForDBAGrants has been called.
Expand All @@ -42,3 +47,49 @@ func TestWaitForGrantsToHaveApplied(t *testing.T) {
err = tm.waitForGrantsToHaveApplied(secondContext)
require.NoError(t, err)
}

type demotePrimaryStallQS struct {
tabletserver.Controller
waitTime time.Duration
primaryStalled atomic.Bool
}

func (d *demotePrimaryStallQS) SetDemotePrimaryStalled() {
d.primaryStalled.Store(true)
}

func (d *demotePrimaryStallQS) IsServing() bool {
time.Sleep(d.waitTime)
return false
}

// TestDemotePrimaryStalled checks that if demote primary takes too long, then we mark it as stalled.
func TestDemotePrimaryStalled(t *testing.T) {
// Set remote operation timeout to a very low value.
origVal := topo.RemoteOperationTimeout
topo.RemoteOperationTimeout = 100 * time.Millisecond
defer func() {
topo.RemoteOperationTimeout = origVal
}()

// Create a fake query service control to intercept calls from DemotePrimary function.
qsc := &demotePrimaryStallQS{
waitTime: 2 * time.Second,
}
// Create a tablet manager with a replica type tablet.
tm := &TabletManager{
actionSema: semaphore.NewWeighted(1),
MysqlDaemon: newTestMysqlDaemon(t, 1),
tmState: &tmState{
displayState: displayState{
tablet: newTestTablet(t, 100, "ks", "-", map[string]string{}),
},
},
QueryServiceControl: qsc,
}

// We make IsServing stall for over 2 seconds, which is longer than 10 * remote operation timeout.
// This should cause the demote primary operation to be stalled.
tm.demotePrimary(context.Background(), false)
require.True(t, qsc.primaryStalled.Load())
}
3 changes: 3 additions & 0 deletions go/vt/vttablet/tabletserver/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,9 @@ type Controller interface {

// WaitForPreparedTwoPCTransactions waits for all prepared transactions to be resolved.
WaitForPreparedTwoPCTransactions(ctx context.Context) error

// SetDemotePrimaryStalled marks that demote primary is stalled in the state manager.
SetDemotePrimaryStalled()
}

// Ensure TabletServer satisfies Controller interface.
Expand Down
33 changes: 19 additions & 14 deletions go/vt/vttablet/tabletserver/state_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,18 +87,19 @@ type stateManager struct {
//
// If a transition fails, we set retrying to true and launch
// retryTransition which loops until the state converges.
mu sync.Mutex
wantState servingState
wantTabletType topodatapb.TabletType
state servingState
target *querypb.Target
ptsTimestamp time.Time
retrying bool
replHealthy bool
lameduck bool
alsoAllow []topodatapb.TabletType
reason string
transitionErr error
mu sync.Mutex
wantState servingState
wantTabletType topodatapb.TabletType
state servingState
target *querypb.Target
ptsTimestamp time.Time
retrying bool
replHealthy bool
demotePrimaryStalled bool
lameduck bool
alsoAllow []topodatapb.TabletType
reason string
transitionErr error

rw *requestsWaiter

Expand Down Expand Up @@ -387,7 +388,7 @@ func (sm *stateManager) StartRequest(ctx context.Context, target *querypb.Target
sm.mu.Lock()
defer sm.mu.Unlock()

if sm.state != StateServing || !sm.replHealthy {
if sm.state != StateServing || !sm.replHealthy || sm.demotePrimaryStalled {
// This specific error string needs to be returned for vtgate buffering to work.
return vterrors.New(vtrpcpb.Code_CLUSTER_EVENT, vterrors.NotServing)
}
Expand Down Expand Up @@ -715,6 +716,10 @@ func (sm *stateManager) Broadcast() {
defer sm.mu.Unlock()

lag, err := sm.refreshReplHealthLocked()
if sm.demotePrimaryStalled {
// If we are stalled while demoting primary, we should send an error for it.
err = vterrors.VT09031()
}
sm.hs.ChangeState(sm.target.TabletType, sm.ptsTimestamp, lag, err, sm.isServingLocked())
}

Expand Down Expand Up @@ -772,7 +777,7 @@ func (sm *stateManager) IsServing() bool {
}

func (sm *stateManager) isServingLocked() bool {
return sm.state == StateServing && sm.wantState == StateServing && sm.replHealthy && !sm.lameduck
return sm.state == StateServing && sm.wantState == StateServing && sm.replHealthy && !sm.demotePrimaryStalled && !sm.lameduck
}

func (sm *stateManager) AppendDetails(details []*kv) []*kv {
Expand Down
39 changes: 39 additions & 0 deletions go/vt/vttablet/tabletserver/state_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -669,6 +669,45 @@ func TestStateManagerNotify(t *testing.T) {
sm.StopService()
}

func TestDemotePrimaryStalled(t *testing.T) {
sm := newTestStateManager()
defer sm.StopService()
err := sm.SetServingType(topodatapb.TabletType_PRIMARY, testNow, StateServing, "")
require.NoError(t, err)
// Stopping the ticker so that we don't get unexpected health streams.
sm.hcticks.Stop()

ch := make(chan *querypb.StreamHealthResponse, 5)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
err := sm.hs.Stream(context.Background(), func(shr *querypb.StreamHealthResponse) error {
ch <- shr
return nil
})
assert.Contains(t, err.Error(), "tabletserver is shutdown")
}()
defer wg.Wait()

// Send a broadcast message and check we have no error there.
sm.Broadcast()
gotshr := <-ch
require.Empty(t, gotshr.RealtimeStats.HealthError)

// If demote primary is stalled, then we should get an error.
sm.demotePrimaryStalled = true
sm.Broadcast()
gotshr = <-ch
require.EqualValues(t, "VT09031: Primary demotion is stalled", gotshr.RealtimeStats.HealthError)
// Verify that we can't start a new request once we have a demote primary stalled.
err = sm.StartRequest(context.Background(), &querypb.Target{TabletType: topodatapb.TabletType_PRIMARY}, false)
require.ErrorContains(t, err, "operation not allowed in state NOT_SERVING")

// Stop the state manager.
sm.StopService()
}

func TestRefreshReplHealthLocked(t *testing.T) {
sm := newTestStateManager()
defer sm.StopService()
Expand Down
8 changes: 8 additions & 0 deletions go/vt/vttablet/tabletserver/tabletserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -758,6 +758,14 @@ func (tsv *TabletServer) WaitForPreparedTwoPCTransactions(ctx context.Context) e
}
}

// SetDemotePrimaryStalled marks that demote primary is stalled in the state manager.
func (tsv *TabletServer) SetDemotePrimaryStalled() {
tsv.sm.mu.Lock()
tsv.sm.demotePrimaryStalled = true
tsv.sm.mu.Unlock()
tsv.BroadcastHealth()
}

// CreateTransaction creates the metadata for a 2PC transaction.
func (tsv *TabletServer) CreateTransaction(ctx context.Context, target *querypb.Target, dtid string, participants []*querypb.Target) (err error) {
return tsv.execRequest(
Expand Down
5 changes: 5 additions & 0 deletions go/vt/vttablet/tabletservermock/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,11 @@ func (tqsc *Controller) WaitForPreparedTwoPCTransactions(context.Context) error
return nil
}

// SetDemotePrimaryStalled is part of the tabletserver.Controller interface
func (tqsc *Controller) SetDemotePrimaryStalled() {
tqsc.MethodCalled["SetDemotePrimaryStalled"] = true
}

// EnterLameduck implements tabletserver.Controller.
func (tqsc *Controller) EnterLameduck() {
tqsc.mu.Lock()
Expand Down

0 comments on commit bd02b45

Please sign in to comment.