From 75a487f7894ce832c05510a0dffe2a603791ac3b Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Tue, 24 Jun 2025 18:02:11 +0100 Subject: [PATCH 1/3] loqrecovery: rm unused argument Epic: none Release note: none --- pkg/kv/kvserver/loqrecovery/collect.go | 3 +-- pkg/kv/kvserver/loqrecovery/server.go | 3 +-- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/pkg/kv/kvserver/loqrecovery/collect.go b/pkg/kv/kvserver/loqrecovery/collect.go index 0cb4a0bb5f8d..8db8d2ea3a39 100644 --- a/pkg/kv/kvserver/loqrecovery/collect.go +++ b/pkg/kv/kvserver/loqrecovery/collect.go @@ -158,7 +158,7 @@ func CollectStoresReplicaInfo( return loqrecoverypb.ClusterReplicaInfo{}, CollectionStats{}, errors.New("can't collect info from stored that belong to different clusters") } nodes[ident.NodeID] = struct{}{} - if err := visitStoreReplicas(ctx, reader, ident.StoreID, ident.NodeID, version, + if err := visitStoreReplicas(ctx, reader, ident.StoreID, ident.NodeID, func(info loqrecoverypb.ReplicaInfo) error { replicas = append(replicas, info) return nil @@ -181,7 +181,6 @@ func visitStoreReplicas( reader storage.Reader, storeID roachpb.StoreID, nodeID roachpb.NodeID, - targetVersion clusterversion.ClusterVersion, send func(info loqrecoverypb.ReplicaInfo) error, ) error { if err := kvstorage.IterateRangeDescriptorsFromDisk(ctx, reader, func(desc roachpb.RangeDescriptor) error { diff --git a/pkg/kv/kvserver/loqrecovery/server.go b/pkg/kv/kvserver/loqrecovery/server.go index 344035469ee8..17e2afe2b5e7 100644 --- a/pkg/kv/kvserver/loqrecovery/server.go +++ b/pkg/kv/kvserver/loqrecovery/server.go @@ -121,7 +121,6 @@ func (s Server) ServeLocalReplicas( _ *serverpb.RecoveryCollectLocalReplicaInfoRequest, stream serverpb.Admin_RecoveryCollectLocalReplicaInfoServer, ) error { - v := s.settings.Version.ActiveVersion(ctx) var stores []*kvserver.Store if err := s.stores.VisitStores(func(s *kvserver.Store) error { stores = append(stores, s) @@ -137,7 +136,7 @@ func (s Server) ServeLocalReplicas( g.Go(func() error { reader := s.TODOEngine().NewSnapshot() defer reader.Close() - return visitStoreReplicas(ctx, reader, s.StoreID(), s.NodeID(), v, + return visitStoreReplicas(ctx, reader, s.StoreID(), s.NodeID(), func(info loqrecoverypb.ReplicaInfo) error { return syncStream.Send(&serverpb.RecoveryCollectLocalReplicaInfoResponse{ReplicaInfo: &info}) }) From 7dbbb8c474b768089ff4fc19136d16ebdae45bdb Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Tue, 24 Jun 2025 18:13:04 +0100 Subject: [PATCH 2/3] loqrecovery: use separate readers when collecting Epic: none Release note: none --- pkg/kv/kvserver/loqrecovery/collect.go | 15 +++++++++------ pkg/kv/kvserver/loqrecovery/server.go | 6 +++++- 2 files changed, 14 insertions(+), 7 deletions(-) diff --git a/pkg/kv/kvserver/loqrecovery/collect.go b/pkg/kv/kvserver/loqrecovery/collect.go index 8db8d2ea3a39..1cc48cfa9262 100644 --- a/pkg/kv/kvserver/loqrecovery/collect.go +++ b/pkg/kv/kvserver/loqrecovery/collect.go @@ -158,7 +158,10 @@ func CollectStoresReplicaInfo( return loqrecoverypb.ClusterReplicaInfo{}, CollectionStats{}, errors.New("can't collect info from stored that belong to different clusters") } nodes[ident.NodeID] = struct{}{} - if err := visitStoreReplicas(ctx, reader, ident.StoreID, ident.NodeID, + // TODO(sep-raft-log): use different readers when the raft and state machine + // engines are separate. Since the engines are immutable in this path, there + // is no question whether to and in which order to grab engine snapshots. + if err := visitStoreReplicas(ctx, reader, reader, ident.StoreID, ident.NodeID, func(info loqrecoverypb.ReplicaInfo) error { replicas = append(replicas, info) return nil @@ -178,18 +181,18 @@ func CollectStoresReplicaInfo( func visitStoreReplicas( ctx context.Context, - reader storage.Reader, + state, raft storage.Reader, storeID roachpb.StoreID, nodeID roachpb.NodeID, send func(info loqrecoverypb.ReplicaInfo) error, ) error { - if err := kvstorage.IterateRangeDescriptorsFromDisk(ctx, reader, func(desc roachpb.RangeDescriptor) error { + if err := kvstorage.IterateRangeDescriptorsFromDisk(ctx, state, func(desc roachpb.RangeDescriptor) error { rsl := stateloader.Make(desc.RangeID) - rstate, err := rsl.Load(ctx, reader, &desc) + rstate, err := rsl.Load(ctx, state, &desc) if err != nil { return err } - hstate, err := rsl.LoadHardState(ctx, reader) + hstate, err := rsl.LoadHardState(ctx, raft) if err != nil { return err } @@ -199,7 +202,7 @@ func visitStoreReplicas( // outcome, and they will become committed as soon as the replica is // designated as a survivor. rangeUpdates, err := GetDescriptorChangesFromRaftLog( - ctx, desc.RangeID, rstate.RaftAppliedIndex+1, math.MaxInt64, reader) + ctx, desc.RangeID, rstate.RaftAppliedIndex+1, math.MaxInt64, raft) if err != nil { return err } diff --git a/pkg/kv/kvserver/loqrecovery/server.go b/pkg/kv/kvserver/loqrecovery/server.go index 17e2afe2b5e7..358001a33762 100644 --- a/pkg/kv/kvserver/loqrecovery/server.go +++ b/pkg/kv/kvserver/loqrecovery/server.go @@ -134,9 +134,13 @@ func (s Server) ServeLocalReplicas( for _, s := range stores { s := s // copy for closure g.Go(func() error { + // TODO(sep-raft-log): when raft and state machine engines are separate, + // we need two snapshots here. This path is online, so we should make sure + // these snapshots are consistent. In particular, the LogID must match + // across the two. reader := s.TODOEngine().NewSnapshot() defer reader.Close() - return visitStoreReplicas(ctx, reader, s.StoreID(), s.NodeID(), + return visitStoreReplicas(ctx, reader, reader, s.StoreID(), s.NodeID(), func(info loqrecoverypb.ReplicaInfo) error { return syncStream.Send(&serverpb.RecoveryCollectLocalReplicaInfoResponse{ReplicaInfo: &info}) }) From 97f32dfa3d19d69f95513ff7ee440c2c2cf20f41 Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Tue, 24 Jun 2025 18:17:55 +0100 Subject: [PATCH 3/3] loqrecovery: TODO Epic: none Release note: none --- pkg/kv/kvserver/loqrecovery/apply.go | 2 ++ pkg/kv/kvserver/loqrecovery/collect.go | 7 +++++++ pkg/kv/kvserver/loqrecovery/server.go | 5 +++-- 3 files changed, 12 insertions(+), 2 deletions(-) diff --git a/pkg/kv/kvserver/loqrecovery/apply.go b/pkg/kv/kvserver/loqrecovery/apply.go index a9601ec2edfe..2e99060a34fd 100644 --- a/pkg/kv/kvserver/loqrecovery/apply.go +++ b/pkg/kv/kvserver/loqrecovery/apply.go @@ -317,6 +317,8 @@ func applyReplicaUpdate( hs.LeadEpoch = 0 + // TODO(sep-raft-log): when raft and state machine engines are separated, this + // update must be written to the raft engine. if err := sl.SetHardState(ctx, readWriter, hs); err != nil { return PrepareReplicaReport{}, errors.Wrap(err, "setting HardState") } diff --git a/pkg/kv/kvserver/loqrecovery/collect.go b/pkg/kv/kvserver/loqrecovery/collect.go index 1cc48cfa9262..95561bab45a6 100644 --- a/pkg/kv/kvserver/loqrecovery/collect.go +++ b/pkg/kv/kvserver/loqrecovery/collect.go @@ -192,6 +192,9 @@ func visitStoreReplicas( if err != nil { return err } + // TODO(pav-kv): the LoQ recovery flow uses only the applied index, and the + // HardState.Commit loaded here is unused. Consider removing. Make sure this + // doesn't break compatibility for ReplicaInfo unmarshalling. hstate, err := rsl.LoadHardState(ctx, raft) if err != nil { return err @@ -201,6 +204,10 @@ func visitStoreReplicas( // at potentially uncommitted entries as we have no way to determine their // outcome, and they will become committed as soon as the replica is // designated as a survivor. + // TODO(sep-raft-log): decide which LogID to read from. If the raft and + // state machine readers are slightly out of sync, the LogIDs may mismatch. + // For the heuristics here, it would probably make sense to read from all + // LogIDs with unapplied entries. rangeUpdates, err := GetDescriptorChangesFromRaftLog( ctx, desc.RangeID, rstate.RaftAppliedIndex+1, math.MaxInt64, raft) if err != nil { diff --git a/pkg/kv/kvserver/loqrecovery/server.go b/pkg/kv/kvserver/loqrecovery/server.go index 358001a33762..03e000f921fb 100644 --- a/pkg/kv/kvserver/loqrecovery/server.go +++ b/pkg/kv/kvserver/loqrecovery/server.go @@ -136,8 +136,9 @@ func (s Server) ServeLocalReplicas( g.Go(func() error { // TODO(sep-raft-log): when raft and state machine engines are separate, // we need two snapshots here. This path is online, so we should make sure - // these snapshots are consistent. In particular, the LogID must match - // across the two. + // these snapshots are consistent (in particular, the LogID must match + // across the two), or make sure that LogID mismatch is handled in + // visitStoreReplicas. reader := s.TODOEngine().NewSnapshot() defer reader.Close() return visitStoreReplicas(ctx, reader, reader, s.StoreID(), s.NodeID(),