Skip to content

Commit fe8b67b

Browse files
committed
kvserver: unskip lease preferences during outage
Previously, `TestLeasePreferenceDuringOutage` would force replication queue processing of the test range, then assert that the range up-replicated and lease transferred to a preferred locality. This test was skipped, and two of the assumptions it relied on to pass were no longer true. After #85219, the replicate queue no longer re-processes replicas. Instead, the queue requeues replicas after processing, at the appropriate priority. This broke the test due to the replicate queue being disabled, making the re-queue a no-op. After #94023, the replicate queue no longer looked for lease transfers, after processing a replication action. Combined with #85219, the queue would now be guaranteed to not process both up-replication and lease transfers from a single enqueue. Update the test to not require a manual process, instead using a queue range filter, which allows tests which disable automatic replication, to still process filtered ranges via the various replica queues. Also, ensure that the non-stopped stores are considered live targets, after simulating an outage (bumping manual clocks, stopping servers) -- so that the expected up-replication, then lease transfer can proceed. Fixes: #88769 Release note: None
1 parent 7bac4be commit fe8b67b

File tree

3 files changed

+155
-105
lines changed

3 files changed

+155
-105
lines changed

pkg/kv/kvserver/client_lease_test.go

Lines changed: 130 additions & 101 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ import (
1616
"math"
1717
"runtime"
1818
"strconv"
19-
"strings"
2019
"sync"
2120
"sync/atomic"
2221
"testing"
@@ -909,15 +908,24 @@ func gossipLiveness(t *testing.T, tc *testcluster.TestCluster) {
909908
}
910909

911910
// This test replicates the behavior observed in
912-
// https://github.com/cockroachdb/cockroach/issues/62485. We verify that
913-
// when a dc with the leaseholder is lost, a node in a dc that does not have the
914-
// lease preference can steal the lease, upreplicate the range and then give up the
915-
// lease in a single cycle of the replicate_queue.
911+
// https://github.com/cockroachdb/cockroach/issues/62485. We verify that when a
912+
// dc with the leaseholder is lost, a node in a dc that does not have the lease
913+
// preference, can steal the lease, upreplicate the range and then give up the
914+
// lease in a short period of time. Previously, the replicate queue would
915+
// reprocess, instead of requeue replicas. This behavior changed in #85219, to
916+
// prevent queue priority inversion. Subsequently, this test only asserts that
917+
// the lease preferences are satisfied quickly, rather than in a single
918+
// replicate queue process() call.
916919
func TestLeasePreferencesDuringOutage(t *testing.T) {
917920
defer leaktest.AfterTest(t)()
918-
skip.WithIssue(t, 88769, "flaky test")
919921
defer log.Scope(t).Close(t)
920922

923+
// This is a hefty test, so we skip it under short.
924+
skip.UnderShort(t)
925+
// The test has 5 nodes. Its possible in stress-race for nodes to be starved
926+
// out heartbeating their liveness.
927+
skip.UnderStressRace(t)
928+
921929
stickyRegistry := server.NewStickyVFSRegistry()
922930
ctx := context.Background()
923931
manualClock := hlc.NewHybridManualClock()
@@ -947,15 +955,34 @@ func TestLeasePreferencesDuringOutage(t *testing.T) {
947955
locality("us", "mi"),
948956
locality("us", "mi"),
949957
}
950-
// Disable expiration based lease transfers. It is possible that a (pseudo)
951-
// dead node acquires the lease and we are forced to wait out the expiration
952-
// timer, if this were not set.
958+
959+
// This test disables the replicate queue. We wish to enable the replicate
960+
// queue only for range we are testing, after marking some servers as dead.
961+
// We also wait until the expected live stores are considered live from n1,
962+
// if we didn't do this it would be possible for the range to process and be
963+
// seen as unavailable due to manual clock jumps.
964+
var testRangeID int64
965+
var clockJumpMu syncutil.Mutex
966+
atomic.StoreInt64(&testRangeID, -1)
967+
disabledQueueBypassFn := func(rangeID roachpb.RangeID) bool {
968+
if rangeID == roachpb.RangeID(atomic.LoadInt64(&testRangeID)) {
969+
clockJumpMu.Lock()
970+
defer clockJumpMu.Unlock()
971+
return true
972+
}
973+
return false
974+
}
953975
settings := cluster.MakeTestingClusterSettings()
954976
sv := &settings.SV
955-
kvserver.TransferExpirationLeasesFirstEnabled.Override(ctx, sv, false)
956-
kvserver.ExpirationLeasesOnly.Override(ctx, sv, false)
977+
// The remaining live stores (n1,n4,n5) may become suspect due to manual
978+
// clock jumps. Disable the suspect timer to prevent them becoming suspect
979+
// when we bump the clocks.
980+
liveness.TimeAfterNodeSuspect.Override(ctx, sv, 0)
981+
timeUntilNodeDead := liveness.TimeUntilNodeDead.Get(sv)
982+
957983
for i := 0; i < numNodes; i++ {
958984
serverArgs[i] = base.TestServerArgs{
985+
Settings: settings,
959986
Locality: localities[i],
960987
Knobs: base.TestingKnobs{
961988
Server: &server.TestingKnobs{
@@ -967,6 +994,7 @@ func TestLeasePreferencesDuringOutage(t *testing.T) {
967994
// The Raft leadership may not end up on the eu node, but it needs to
968995
// be able to acquire the lease anyway.
969996
AllowLeaseRequestProposalsWhenNotLeader: true,
997+
BaseQueueDisabledBypassFilter: disabledQueueBypassFn,
970998
},
971999
},
9721000
StoreSpecs: []base.StoreSpec{
@@ -992,105 +1020,106 @@ func TestLeasePreferencesDuringOutage(t *testing.T) {
9921020
require.NoError(t, tc.WaitForVoters(key, tc.Targets(1, 3)...))
9931021
tc.TransferRangeLeaseOrFatal(t, *repl.Desc(), tc.Target(1))
9941022

995-
// Shutdown the sf datacenter, which is going to kill the node with the lease.
996-
tc.StopServer(1)
997-
tc.StopServer(2)
998-
999-
wait := func(duration time.Duration) {
1000-
manualClock.Increment(duration.Nanoseconds())
1001-
// Gossip and heartbeat all the live stores, we do this manually otherwise the
1002-
// allocator on server 0 may see everyone as temporarily dead due to the
1003-
// clock move above.
1004-
for _, i := range []int{0, 3, 4} {
1005-
require.NoError(t, tc.Servers[i].HeartbeatNodeLiveness())
1006-
require.NoError(t, tc.GetFirstStoreFromServer(t, i).GossipStore(ctx, true))
1023+
func() {
1024+
// Lock the clockJumpMu, in order to prevent processing the test range before
1025+
// the intended stores are considered live from n1. If we didn't do this, it
1026+
// is possible for n1 to process the test range and find it unavailable
1027+
// (unactionable).
1028+
clockJumpMu.Lock()
1029+
defer clockJumpMu.Unlock()
1030+
1031+
// Enable queue processing of the test range, right before we stop the sf
1032+
// datacenter. We expect the test range to be enqueued into the replicate
1033+
// queue shortly after.
1034+
rangeID := repl.GetRangeID()
1035+
atomic.StoreInt64(&testRangeID, int64(rangeID))
1036+
1037+
// Shutdown the sf datacenter, which is going to kill the node with the lease.
1038+
tc.StopServer(1)
1039+
tc.StopServer(2)
1040+
1041+
wait := func(duration time.Duration) {
1042+
manualClock.Increment(duration.Nanoseconds())
1043+
// Gossip and heartbeat all the live stores, we do this manually otherwise the
1044+
// allocator on server 0 may see everyone as temporarily dead due to the
1045+
// clock move above.
1046+
for _, i := range []int{0, 3, 4} {
1047+
require.NoError(t, tc.Servers[i].HeartbeatNodeLiveness())
1048+
require.NoError(t, tc.GetFirstStoreFromServer(t, i).GossipStore(ctx, true))
1049+
}
10071050
}
1008-
}
1009-
// We need to wait until 2 and 3 are considered to be dead.
1010-
timeUntilNodeDead := liveness.TimeUntilNodeDead.Get(&tc.GetFirstStoreFromServer(t, 0).GetStoreConfig().Settings.SV)
1011-
wait(timeUntilNodeDead)
1012-
1013-
checkDead := func(store *kvserver.Store, storeIdx int) error {
1014-
if dead, timetoDie, err := store.GetStoreConfig().StorePool.IsDead(
1015-
tc.GetFirstStoreFromServer(t, storeIdx).StoreID()); err != nil || !dead {
1016-
// Sometimes a gossip update arrives right after server shutdown and
1017-
// after we manually moved the time, so move it again.
1018-
if err == nil {
1019-
wait(timetoDie)
1051+
wait(timeUntilNodeDead)
1052+
1053+
checkDead := func(store *kvserver.Store, storeIdx int) error {
1054+
if dead, timetoDie, err := store.GetStoreConfig().StorePool.IsDead(
1055+
tc.GetFirstStoreFromServer(t, storeIdx).StoreID()); err != nil || !dead {
1056+
// Sometimes a gossip update arrives right after server shutdown and
1057+
// after we manually moved the time, so move it again.
1058+
if err == nil {
1059+
wait(timetoDie)
1060+
}
1061+
// NB: errors.Wrapf(nil, ...) returns nil.
1062+
// nolint:errwrap
1063+
return errors.Errorf("expected server %d to be dead, instead err=%v, dead=%v", storeIdx, err, dead)
10201064
}
1021-
// NB: errors.Wrapf(nil, ...) returns nil.
1022-
// nolint:errwrap
1023-
return errors.Errorf("expected server %d to be dead, instead err=%v, dead=%v", storeIdx, err, dead)
1065+
return nil
10241066
}
1025-
return nil
1026-
}
1067+
1068+
testutils.SucceedsSoon(t, func() error {
1069+
store := tc.GetFirstStoreFromServer(t, 0)
1070+
sl, available, _ := store.GetStoreConfig().StorePool.TestingGetStoreList()
1071+
if available != 3 {
1072+
return errors.Errorf(
1073+
"expected all 3 remaining stores to be live, but only got %d, stores=%v",
1074+
available, sl)
1075+
}
1076+
if err := checkDead(store, 1); err != nil {
1077+
return err
1078+
}
1079+
if err := checkDead(store, 2); err != nil {
1080+
return err
1081+
}
1082+
return nil
1083+
})
1084+
}()
1085+
1086+
// Send a request to force lease acquisition on _some_ remaining live node.
1087+
// Note, we expect this to be n1 (server 0).
1088+
ba := &kvpb.BatchRequest{}
1089+
ba.Add(getArgs(key))
1090+
_, pErr := tc.Servers[0].DistSenderI().(kv.Sender).Send(ctx, ba)
1091+
require.Nil(t, pErr)
10271092

10281093
testutils.SucceedsSoon(t, func() error {
1029-
store := tc.GetFirstStoreFromServer(t, 0)
1030-
sl, _, _ := store.GetStoreConfig().StorePool.TestingGetStoreList()
1031-
if len(sl.TestingStores()) != 3 {
1032-
return errors.Errorf("expected all 3 remaining stores to be live, but only got %v",
1033-
sl.TestingStores())
1034-
}
1035-
if err := checkDead(store, 1); err != nil {
1036-
return err
1094+
// Validate that we upreplicated outside of SF. NB: This will occur prior
1095+
// to the lease preference being satisfied.
1096+
require.Equal(t, 3, len(repl.Desc().Replicas().Voters().VoterDescriptors()))
1097+
for _, replDesc := range repl.Desc().Replicas().Voters().VoterDescriptors() {
1098+
serv, err := tc.FindMemberServer(replDesc.StoreID)
1099+
require.NoError(t, err)
1100+
servLocality := serv.Locality()
1101+
dc, ok := servLocality.Find("dc")
1102+
require.True(t, ok)
1103+
if dc == "sf" {
1104+
return errors.Errorf(
1105+
"expected no replicas in dc=sf, but found replica in "+
1106+
"dc=%s node_id=%v desc=%v",
1107+
dc, replDesc.NodeID, repl.Desc())
1108+
}
10371109
}
1038-
if err := checkDead(store, 2); err != nil {
1039-
return err
1110+
// Validate that the lease also transferred to a preferred locality. n4
1111+
// (us) and n5 (us) are the only valid stores to be leaseholders during the
1112+
// outage. n1 is the original leaseholder, expect it to not be the
1113+
// leaseholder now.
1114+
if !repl.OwnsValidLease(ctx, tc.Servers[0].Clock().NowAsClockTimestamp()) {
1115+
return nil
10401116
}
1041-
return nil
1042-
})
1043-
_, _, enqueueError := tc.GetFirstStoreFromServer(t, 0).
1044-
Enqueue(ctx, "replicate", repl, true /* skipShouldQueue */, false /* async */)
10451117

1046-
require.NoError(t, enqueueError, "failed to enqueue replica for replication")
1047-
1048-
var newLeaseHolder roachpb.ReplicationTarget
1049-
testutils.SucceedsSoon(t, func() error {
1050-
var err error
1051-
newLeaseHolder, err = tc.FindRangeLeaseHolder(*repl.Desc(), nil)
1052-
return err
1118+
return errors.Errorf(
1119+
"expected no leaseholder in region=us, but found %v",
1120+
repl.CurrentLeaseStatus(ctx),
1121+
)
10531122
})
1054-
1055-
srv, err := tc.FindMemberServer(newLeaseHolder.StoreID)
1056-
require.NoError(t, err)
1057-
loc := srv.Locality()
1058-
region, ok := loc.Find("region")
1059-
require.True(t, ok)
1060-
require.Equal(t, "us", region)
1061-
require.Equal(t, 3, len(repl.Desc().Replicas().Voters().VoterDescriptors()))
1062-
// Validate that we upreplicated outside of SF.
1063-
for _, replDesc := range repl.Desc().Replicas().Voters().VoterDescriptors() {
1064-
serv, err := tc.FindMemberServer(replDesc.StoreID)
1065-
require.NoError(t, err)
1066-
memberLoc := serv.Locality()
1067-
dc, ok := memberLoc.Find("dc")
1068-
require.True(t, ok)
1069-
require.NotEqual(t, "sf", dc)
1070-
}
1071-
history := repl.GetLeaseHistory()
1072-
// Make sure we see the eu node as a lease holder in the second to last
1073-
// leaseholder change.
1074-
// Since we can have expiration and epoch based leases at the tail of the
1075-
// history, we need to ignore them together if they originate from the same
1076-
// leaseholder.
1077-
nextNodeID := history[len(history)-1].Replica.NodeID
1078-
lastMove := len(history) - 2
1079-
for ; lastMove >= 0; lastMove-- {
1080-
if history[lastMove].Replica.NodeID != nextNodeID {
1081-
break
1082-
}
1083-
}
1084-
lastMove++
1085-
var leasesMsg []string
1086-
for _, h := range history {
1087-
leasesMsg = append(leasesMsg, h.String())
1088-
}
1089-
leaseHistory := strings.Join(leasesMsg, ", ")
1090-
require.Greater(t, lastMove, 0,
1091-
"must have at least one leaseholder change in history (lease history: %s)", leaseHistory)
1092-
require.Equal(t, tc.Target(0).NodeID, history[lastMove-1].Replica.NodeID,
1093-
"node id prior to last lease move (lease history: %s)", leaseHistory)
10941123
}
10951124

10961125
// This test verifies that when a node starts flapping its liveness, all leases

pkg/kv/kvserver/queue.go

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -655,13 +655,24 @@ func (bq *baseQueue) maybeAdd(ctx context.Context, repl replicaInQueue, now hlc.
655655
}
656656

657657
bq.mu.Lock()
658-
stopped := bq.mu.stopped || bq.mu.disabled
658+
stopped := bq.mu.stopped
659+
disabled := bq.mu.disabled
659660
bq.mu.Unlock()
660661

661662
if stopped {
662663
return
663664
}
664665

666+
if disabled {
667+
// The disabled queue bypass is used in tests which enable manual
668+
// replication, however still require specific range(s) to be processed
669+
// through the queue.
670+
bypassDisabled := bq.store.TestingKnobs().BaseQueueDisabledBypassFilter
671+
if bypassDisabled == nil || !bypassDisabled(repl.GetRangeID()) {
672+
return
673+
}
674+
}
675+
665676
if !repl.IsInitialized() {
666677
return
667678
}
@@ -729,10 +740,16 @@ func (bq *baseQueue) addInternal(
729740
}
730741

731742
if bq.mu.disabled {
732-
if log.V(3) {
733-
log.Infof(ctx, "queue disabled")
743+
// The disabled queue bypass is used in tests which enable manual
744+
// replication, however still require specific range(s) to be processed
745+
// through the queue.
746+
bypassDisabled := bq.store.TestingKnobs().BaseQueueDisabledBypassFilter
747+
if bypassDisabled == nil || !bypassDisabled(desc.RangeID) {
748+
if log.V(3) {
749+
log.Infof(ctx, "queue disabled")
750+
}
751+
return false, errQueueDisabled
734752
}
735-
return false, errQueueDisabled
736753
}
737754

738755
// If the replica is currently in purgatory, don't re-add it.

pkg/kv/kvserver/testing_knobs.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -493,6 +493,10 @@ type StoreTestingKnobs struct {
493493
// required.
494494
BaseQueueInterceptor func(ctx context.Context, bq *baseQueue)
495495

496+
// BaseQueueDisabledBypassFilter checks whether the replica for the given
497+
// rangeID should ignore the queue being disabled, and be processed anyway.
498+
BaseQueueDisabledBypassFilter func(rangeID roachpb.RangeID) bool
499+
496500
// InjectReproposalError injects an error in tryReproposeWithNewLeaseIndex.
497501
// If nil is returned, reproposal will be attempted.
498502
InjectReproposalError func(p *ProposalData) error

0 commit comments

Comments
 (0)