Skip to content

server: fault tolerance metrics #148759

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions docs/generated/metrics/metrics.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10201,6 +10201,15 @@ layers:
essential: true
- name: REPLICATION
metrics:
- name: fault_tolerance.nodes
exported_name: fault_tolerance_nodes
description: "Number of nodes that can fail before we lose quorum on any range, \n if the labeled failure domain goes down. 0 indicates that we can tolerate a fault\n in the failure domain, but only just. Negative values indicate that the failure \n domain is critical to continued availability for at least one range."
y_axis_label: Nodes
type: GAUGE
unit: COUNT
aggregation: AVG
derivative: NONE
how_to_use: Take the minimum, faceted across all labels
- name: leases.transfers.success
exported_name: leases_transfers_success
description: Number of successful lease transfers
Expand Down
3 changes: 3 additions & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,7 @@ ALL_TESTS = [
"//pkg/server/diagnostics:diagnostics_disallowed_imports_test",
"//pkg/server/diagnostics:diagnostics_test",
"//pkg/server/dumpstore:dumpstore_test",
"//pkg/server/faulttolerance:faulttolerance_test",
"//pkg/server/goroutinedumper:goroutinedumper_test",
"//pkg/server/license:license_test",
"//pkg/server/pgurl:pgurl_test",
Expand Down Expand Up @@ -1745,6 +1746,8 @@ GO_TARGETS = [
"//pkg/server/diagnostics:diagnostics_test",
"//pkg/server/dumpstore:dumpstore",
"//pkg/server/dumpstore:dumpstore_test",
"//pkg/server/faulttolerance:faulttolerance",
"//pkg/server/faulttolerance:faulttolerance_test",
"//pkg/server/goroutinedumper:goroutinedumper",
"//pkg/server/goroutinedumper:goroutinedumper_test",
"//pkg/server/license:license",
Expand Down
73 changes: 47 additions & 26 deletions pkg/roachpb/metadata_replicas.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,66 +393,87 @@ type RangeStatusReport struct {
UnderReplicatedNonVoters, OverReplicatedNonVoters bool
}

// countLive will first filter the ReplicaSet using filterPred, then
// count the live descriptors using livePred. It returns the size of
// both sets.
func (d ReplicaSet) countLive(
filterPred func(rDesc ReplicaDescriptor) bool, livePred func(rDesc ReplicaDescriptor) bool,
) (total, live int) {
filtered := d.FilterToDescriptors(filterPred)
wrappedFiltered := ReplicaSet{wrapped: filtered}
liveDesc := wrappedFiltered.FilterToDescriptors(livePred)

return len(filtered), len(liveDesc)
}

// ReplicationStatus returns availability and over/under-replication
// determinations for the range.
//
// neededVoters is the replica's desired replication for purposes of determining
// over/under-replication of voters. If the caller is only interested in
// over/under-replication of voters. If the caller is only interested in the
// availability of voting replicas, 0 can be passed in. neededNonVoters is the
// counterpart for non-voting replicas but with -1 as the sentinel value (unlike
// voters, it's possible to expect 0 non-voters).
func (d ReplicaSet) ReplicationStatus(
liveFunc func(descriptor ReplicaDescriptor) bool, neededVoters int, neededNonVoters int,
) RangeStatusReport {
var res RangeStatusReport
// isBoth takes two replica predicates and returns their conjunction.
isBoth := func(
pred1 func(rDesc ReplicaDescriptor) bool,
pred2 func(rDesc ReplicaDescriptor) bool) func(ReplicaDescriptor) bool {
return func(rDesc ReplicaDescriptor) bool {
return pred1(rDesc) && pred2(rDesc)
}
}

// This functions handles regular, or joint-consensus replica groups. In the
// joint-consensus case, we'll independently consider the health of the
// outgoing group ("old") and the incoming group ("new"). In the regular case,
// the two groups will be identical.

votersOldGroup := d.FilterToDescriptors(ReplicaDescriptor.IsVoterOldConfig)
liveVotersOldGroup := d.FilterToDescriptors(isBoth(ReplicaDescriptor.IsVoterOldConfig, liveFunc))
oldGroup, oldLive := d.countLive(ReplicaDescriptor.IsVoterOldConfig, liveFunc)
newGroup, newLive := d.countLive(ReplicaDescriptor.IsVoterNewConfig, liveFunc)

n := len(votersOldGroup)
// Empty groups succeed by default, to match the Raft implementation.
availableOutgoingGroup := (n == 0) || (len(liveVotersOldGroup) >= n/2+1)
availableOutgoingGroup := (oldGroup == 0) || (oldLive >= oldGroup/2+1)

votersNewGroup := d.FilterToDescriptors(ReplicaDescriptor.IsVoterNewConfig)
liveVotersNewGroup := d.FilterToDescriptors(isBoth(ReplicaDescriptor.IsVoterNewConfig, liveFunc))

n = len(votersNewGroup)
availableIncomingGroup := len(liveVotersNewGroup) >= n/2+1
availableIncomingGroup := newLive >= newGroup/2+1

res.Available = availableIncomingGroup && availableOutgoingGroup

// Determine over/under-replication of voting replicas. Note that learners
// don't matter.
underReplicatedOldGroup := len(liveVotersOldGroup) < neededVoters
underReplicatedNewGroup := len(liveVotersNewGroup) < neededVoters
overReplicatedOldGroup := len(votersOldGroup) > neededVoters
overReplicatedNewGroup := len(votersNewGroup) > neededVoters
underReplicatedOldGroup := oldLive < neededVoters
underReplicatedNewGroup := newLive < neededVoters
overReplicatedOldGroup := oldGroup > neededVoters
overReplicatedNewGroup := newGroup > neededVoters
res.UnderReplicated = underReplicatedOldGroup || underReplicatedNewGroup
res.OverReplicated = overReplicatedOldGroup || overReplicatedNewGroup
if neededNonVoters == -1 {
return res
}

nonVoters := d.FilterToDescriptors(ReplicaDescriptor.IsNonVoter)
liveNonVoters := d.FilterToDescriptors(isBoth(ReplicaDescriptor.IsNonVoter, liveFunc))
res.UnderReplicatedNonVoters = len(liveNonVoters) < neededNonVoters
res.OverReplicatedNonVoters = len(nonVoters) > neededNonVoters
nonVoters, liveNonVoters := d.countLive(ReplicaDescriptor.IsNonVoter, liveFunc)

res.UnderReplicatedNonVoters = liveNonVoters < neededNonVoters
res.OverReplicatedNonVoters = nonVoters > neededNonVoters

return res
}

// ReplicationMargin computes the amount by which live voters exceed quorum. Values >= 0 indicate an available range,
// as well as the number of additional replicas we can lose before losing quorum.
func (d ReplicaSet) ReplicationMargin(liveFunc func(descriptor ReplicaDescriptor) bool) int {
// This functions handles regular, or joint-consensus replica groups. In the
// joint-consensus case, we'll independently consider the health of the
// outgoing group ("old") and the incoming group ("new"). In the regular case,
// the two groups will be identical.

oldGroup, oldLive := d.countLive(ReplicaDescriptor.IsVoterOldConfig, liveFunc)
newGroup, newLive := d.countLive(ReplicaDescriptor.IsVoterNewConfig, liveFunc)

marginIncomingGroup := newLive - (newGroup/2 + 1)
marginOutgoingGroup := marginIncomingGroup
// Empty groups succeed by default, to match the Raft implementation.
if oldGroup > 0 {
marginOutgoingGroup = oldLive - (oldGroup/2 + 1)
}
return min(marginIncomingGroup, marginOutgoingGroup)
}

// Empty returns true if `target` is an empty replication target.
func Empty(target ReplicationTarget) bool {
return target == ReplicationTarget{}
Expand Down
1 change: 1 addition & 0 deletions pkg/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ go_library(
"//pkg/server/decommissioning",
"//pkg/server/diagnostics",
"//pkg/server/diagnostics/diagnosticspb",
"//pkg/server/faulttolerance",
"//pkg/server/goroutinedumper",
"//pkg/server/license",
"//pkg/server/pgurl",
Expand Down
27 changes: 27 additions & 0 deletions pkg/server/faulttolerance/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "faulttolerance",
srcs = ["locality.go"],
importpath = "github.com/cockroachdb/cockroach/pkg/server/faulttolerance",
visibility = ["//visibility:public"],
deps = [
"//pkg/gossip",
"//pkg/kv/kvserver",
"//pkg/kv/kvserver/liveness/livenesspb",
"//pkg/roachpb",
"//pkg/util/protoutil",
"@com_github_cockroachdb_errors//:errors",
],
)

go_test(
name = "faulttolerance_test",
srcs = ["locality_test.go"],
embed = [":faulttolerance"],
deps = [
"//pkg/kv/kvserver/liveness/livenesspb",
"//pkg/roachpb",
"@com_github_stretchr_testify//require",
],
)
213 changes: 213 additions & 0 deletions pkg/server/faulttolerance/locality.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
// Copyright 2025 The Cockroach Authors.
//
// Use of this software is governed by the CockroachDB Software License
// included in the /LICENSE file.

// Package faulttolerance provides visibility into the cluster's ability
// to tolerate failures across different failure domains.
package faulttolerance

import (
"context"
"iter"
"maps"
"slices"
"strings"

"github.com/cockroachdb/cockroach/pkg/gossip"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/errors"
)

// LocalityMapsFromGossip constructs a map from each NodeID to its
// Locality, by deserializing the node descriptors from Gossip.
func LocalityMapsFromGossip(g *gossip.Gossip) (map[roachpb.NodeID]roachpb.Locality, error) {
nodeLocality := make(map[roachpb.NodeID]roachpb.Locality)
if err := g.IterateInfos(gossip.KeyNodeDescPrefix, func(key string, info gossip.Info) error {
bs, err := info.Value.GetBytes()
if err != nil {
return errors.NewAssertionErrorWithWrappedErrf(err,
"failed to extract bytes for key %q", key)
}

var d roachpb.NodeDescriptor
if err := protoutil.Unmarshal(bs, &d); err != nil {
return errors.NewAssertionErrorWithWrappedErrf(err,
"failed to parse value for key %q", key)
}

// Don't use node descriptors with NodeID 0, because that's meant to
// indicate that the node has been removed from the cluster.
if d.NodeID != 0 {
nodeLocality[d.NodeID] = d.Locality
}
return nil
}); err != nil {
return nil, err
}
return nodeLocality, nil
}

// ComputeFaultTolerance computes the fault tolerance margin for each
// failure domain. A failure domain is a set of nodes that map to a
// single locality in the nodeLocality map, or a merged set of nodes
// that share a locality prefix. The fault tolerance margin is the
// number of additional nodes (beyond the nodes in the failure domain)
// that can fail before any range experiences unavailability.
//
// Margins can be negative (indicating that the failure domain is
// critical; its failure will cause unavailability), 0 (the failure
// domain is not critical, but no additional node failures can be
// tolerated), or positive.
//
// The keys in the returned map are locality strings, as returned by
// `roachpb.Locality.String()`. They include "parent localities" as
// well as the actual leaf localities to which nodes are assigned.
//
// The fault tolerance computation occurs in the context of the
// livenessMap (to determine any existing node failures), and is
// evaluated for only the replicas that appear in the replicas Seq.
//
// The evaluation for each replica is independent: it is valid to merge
// the results of this computation for different sets of replicas by
// taking the `min` of the margin values.
func ComputeFaultTolerance(
ctx context.Context,
livenessMap livenesspb.NodeVitalityMap,
nodeLocality map[roachpb.NodeID]roachpb.Locality,
replicas iter.Seq[*kvserver.Replica],
) (map[string]int, error) {
return computeFaultToleranceImpl(ctx, livenessMap, nodeLocality, replicas)
}

type replicaDescriber interface {
Desc() *roachpb.RangeDescriptor
}

func computeFaultToleranceImpl[RD replicaDescriber](
ctx context.Context,
livenessMap livenesspb.NodeVitalityMap,
nodeLocality map[roachpb.NodeID]roachpb.Locality,
replicas iter.Seq[RD],
) (map[string]int, error) {
for n, l := range nodeLocality {
if len(l.Tiers) == 0 {
return nil, errors.AssertionFailedf("node %d missing locality", n)
}
}
failureDomainMap := makeFailureDomains(nodeLocality)

domainMargins := make(map[string]int)
for replica := range replicas {
for domainKey, fd := range failureDomainMap {
if err := ctx.Err(); err != nil {
return nil, err
}

margin := replica.Desc().Replicas().ReplicationMargin(func(rd roachpb.ReplicaDescriptor) bool {
if _, ok := fd.nodes[rd.NodeID]; ok {
return false
}
return livenessMap[rd.NodeID].IsLive(livenesspb.SpanConfigConformance)
})

if oldMargin, ok := domainMargins[domainKey]; ok {
domainMargins[domainKey] = min(oldMargin, margin)
} else {
domainMargins[domainKey] = margin
}
}
}
return domainMargins, nil
}

func makeFailureDomains(
nodeLocality map[roachpb.NodeID]roachpb.Locality,
) map[string]*failureDomain {
domainMap := make(map[string]*failureDomain)
var unresolved []string
// First, construct the leaf domains.
for _, l := range nodeLocality {
k := l.String()
if _, ok := domainMap[k]; !ok {
domainMap[k] = newFailureDomain(l, nodeLocality)
}
unresolved = append(unresolved, k)
}

// Sort the domains by descending length. In case the depth of the
// locality tree varies, we want to handle the taller trees first.
slices.SortStableFunc(unresolved, func(a, b string) int {
aComma := strings.Count(a, ",")
bComma := strings.Count(b, ",")
return bComma - aComma
})

// Merge existing domains into parent domains.
for len(unresolved) > 0 {
fd := domainMap[unresolved[0]]
unresolved = unresolved[1:]

pdKey := fd.parentKey()
if pdKey == "" {
continue
}
if parentFailureDomain, ok := domainMap[pdKey]; !ok {
// new unresolved parent domain
pd := fd.parent()
domainMap[pdKey] = pd
unresolved = append(unresolved, pdKey)
} else {
// merge child into parent domain
parentFailureDomain.merge(fd)
}
}
return domainMap
}

type failureDomain struct {
domain roachpb.Locality
nodes map[roachpb.NodeID]struct{}
}

func (fd *failureDomain) merge(rhs *failureDomain) {
if match, _ := rhs.domain.Matches(fd.domain); !match {
panic("cannot merge failure domain")
}

for n := range rhs.nodes {
fd.nodes[n] = struct{}{}
}
}

func (fd *failureDomain) parentKey() string {
return roachpb.Locality{Tiers: fd.domain.Tiers[0 : len(fd.domain.Tiers)-1]}.String()
}

func newFailureDomain(
domain roachpb.Locality, nodeLocality map[roachpb.NodeID]roachpb.Locality,
) *failureDomain {
faultScenario := make(map[roachpb.NodeID]struct{})
for node := range nodeLocality {
if match, _ := nodeLocality[node].Matches(domain); match {
faultScenario[node] = struct{}{}
}
}
return &failureDomain{
domain: domain,
nodes: faultScenario,
}
}

func (fd *failureDomain) parent() *failureDomain {
pd := roachpb.Locality{Tiers: fd.domain.Tiers[0 : len(fd.domain.Tiers)-1]}
nodes := make(map[roachpb.NodeID]struct{}, len(fd.nodes))
maps.Copy(nodes, fd.nodes)
return &failureDomain{
domain: pd,
nodes: nodes,
}
}
Loading