Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/loqrecovery/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,8 @@ func applyReplicaUpdate(

hs.LeadEpoch = 0

// TODO(sep-raft-log): when raft and state machine engines are separated, this
// update must be written to the raft engine.
if err := sl.SetHardState(ctx, readWriter, hs); err != nil {
return PrepareReplicaReport{}, errors.Wrap(err, "setting HardState")
}
Expand Down
23 changes: 16 additions & 7 deletions pkg/kv/kvserver/loqrecovery/collect.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,10 @@ func CollectStoresReplicaInfo(
return loqrecoverypb.ClusterReplicaInfo{}, CollectionStats{}, errors.New("can't collect info from stored that belong to different clusters")
}
nodes[ident.NodeID] = struct{}{}
if err := visitStoreReplicas(ctx, reader, ident.StoreID, ident.NodeID, version,
// TODO(sep-raft-log): use different readers when the raft and state machine
// engines are separate. Since the engines are immutable in this path, there
// is no question whether to and in which order to grab engine snapshots.
if err := visitStoreReplicas(ctx, reader, reader, ident.StoreID, ident.NodeID,
func(info loqrecoverypb.ReplicaInfo) error {
replicas = append(replicas, info)
return nil
Expand All @@ -178,19 +181,21 @@ func CollectStoresReplicaInfo(

func visitStoreReplicas(
ctx context.Context,
reader storage.Reader,
state, raft storage.Reader,
storeID roachpb.StoreID,
nodeID roachpb.NodeID,
targetVersion clusterversion.ClusterVersion,
send func(info loqrecoverypb.ReplicaInfo) error,
) error {
if err := kvstorage.IterateRangeDescriptorsFromDisk(ctx, reader, func(desc roachpb.RangeDescriptor) error {
if err := kvstorage.IterateRangeDescriptorsFromDisk(ctx, state, func(desc roachpb.RangeDescriptor) error {
rsl := stateloader.Make(desc.RangeID)
rstate, err := rsl.Load(ctx, reader, &desc)
rstate, err := rsl.Load(ctx, state, &desc)
if err != nil {
return err
}
hstate, err := rsl.LoadHardState(ctx, reader)
// TODO(pav-kv): the LoQ recovery flow uses only the applied index, and the
// HardState.Commit loaded here is unused. Consider removing. Make sure this
// doesn't break compatibility for ReplicaInfo unmarshalling.
hstate, err := rsl.LoadHardState(ctx, raft)
if err != nil {
return err
}
Expand All @@ -199,8 +204,12 @@ func visitStoreReplicas(
// at potentially uncommitted entries as we have no way to determine their
// outcome, and they will become committed as soon as the replica is
// designated as a survivor.
// TODO(sep-raft-log): decide which LogID to read from. If the raft and
// state machine readers are slightly out of sync, the LogIDs may mismatch.
// For the heuristics here, it would probably make sense to read from all
// LogIDs with unapplied entries.
rangeUpdates, err := GetDescriptorChangesFromRaftLog(
ctx, desc.RangeID, rstate.RaftAppliedIndex+1, math.MaxInt64, reader)
ctx, desc.RangeID, rstate.RaftAppliedIndex+1, math.MaxInt64, raft)
if err != nil {
return err
}
Expand Down
8 changes: 6 additions & 2 deletions pkg/kv/kvserver/loqrecovery/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,6 @@ func (s Server) ServeLocalReplicas(
_ *serverpb.RecoveryCollectLocalReplicaInfoRequest,
stream serverpb.Admin_RecoveryCollectLocalReplicaInfoServer,
) error {
v := s.settings.Version.ActiveVersion(ctx)
var stores []*kvserver.Store
if err := s.stores.VisitStores(func(s *kvserver.Store) error {
stores = append(stores, s)
Expand All @@ -135,9 +134,14 @@ func (s Server) ServeLocalReplicas(
for _, s := range stores {
s := s // copy for closure
g.Go(func() error {
// TODO(sep-raft-log): when raft and state machine engines are separate,
// we need two snapshots here. This path is online, so we should make sure
// these snapshots are consistent (in particular, the LogID must match
// across the two), or make sure that LogID mismatch is handled in
// visitStoreReplicas.
reader := s.TODOEngine().NewSnapshot()
defer reader.Close()
return visitStoreReplicas(ctx, reader, s.StoreID(), s.NodeID(), v,
return visitStoreReplicas(ctx, reader, reader, s.StoreID(), s.NodeID(),
func(info loqrecoverypb.ReplicaInfo) error {
return syncStream.Send(&serverpb.RecoveryCollectLocalReplicaInfoResponse{ReplicaInfo: &info})
})
Expand Down