Skip to content

Snapshots support multi-project #130000

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 16 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.DataStreamTestHelper;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -138,8 +139,9 @@ public void testDeleteSnapshottingDataStream() {
List.of(new Tuple<>(dataStreamName, 2), new Tuple<>(dataStreamName2, 2)),
otherIndices
);
SnapshotsInProgress snapshotsInProgress = SnapshotsInProgress.EMPTY.withAddedEntry(createEntry(dataStreamName, "repo1", false))
.withAddedEntry(createEntry(dataStreamName2, "repo2", true));
SnapshotsInProgress snapshotsInProgress = SnapshotsInProgress.EMPTY.withAddedEntry(
createEntry(dataStreamName, projectId, "repo1", false)
).withAddedEntry(createEntry(dataStreamName2, projectId, "repo2", true));
ClusterState snapshotCs = ClusterState.builder(cs).putCustom(SnapshotsInProgress.TYPE, snapshotsInProgress).build();

DeleteDataStreamAction.Request req = new DeleteDataStreamAction.Request(TEST_REQUEST_TIMEOUT, new String[] { dataStreamName });
Expand All @@ -157,9 +159,9 @@ public void testDeleteSnapshottingDataStream() {
);
}

private SnapshotsInProgress.Entry createEntry(String dataStreamName, String repo, boolean partial) {
private SnapshotsInProgress.Entry createEntry(String dataStreamName, ProjectId projectId, String repo, boolean partial) {
return SnapshotsInProgress.Entry.snapshot(
new Snapshot(repo, new SnapshotId("", "")),
new Snapshot(projectId, repo, new SnapshotId("", "")),
false,
partial,
SnapshotsInProgress.State.SUCCESS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,7 @@ public void finalizeSnapshot(final FinalizeSnapshotContext finalizeSnapshotConte
if (SnapshotsService.useShardGenerations(finalizeSnapshotContext.repositoryMetaVersion()) == false) {
final ListenableFuture<Void> metadataDone = new ListenableFuture<>();
wrappedFinalizeContext = new FinalizeSnapshotContext(
finalizeSnapshotContext.serializeProjectMetadata(),
finalizeSnapshotContext.updatedShardGenerations(),
finalizeSnapshotContext.repositoryStateId(),
finalizeSnapshotContext.clusterMetadata(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -890,7 +890,7 @@ private static BlobStoreIndexShardSnapshots readShardGeneration(
ShardGeneration generation
) throws IOException {
return BlobStoreRepository.INDEX_SHARD_SNAPSHOTS_FORMAT.read(
repository.getMetadata().name(),
repository.getProjectRepo(),
repository.shardContainer(repositoryShardId.index(), repositoryShardId.shardId()),
generation.getGenerationUUID(),
NamedXContentRegistry.EMPTY
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -820,15 +820,16 @@ public void testSnapshotWithMissingShardLevelIndexFile() throws Exception {
"fallback message",
"org.elasticsearch.repositories.blobstore.BlobStoreRepository",
Level.ERROR,
"index [test-idx-1/*] shard generation [*] in [test-repo][*] not found - falling back to reading all shard snapshots"
"index [test-idx-1/*] shard generation [*] in [default][test-repo][*] not found "
+ "- falling back to reading all shard snapshots"
)
);
mockLog.addExpectation(
new MockLog.SeenEventExpectation(
"shard blobs list",
"org.elasticsearch.repositories.blobstore.BlobStoreRepository",
Level.ERROR,
"read shard snapshots [*] due to missing shard generation [*] for index [test-idx-1/*] in [test-repo][*]"
"read shard snapshots [*] due to missing shard generation [*] for index [test-idx-1/*] in [default][test-repo][*]"
)
);
if (repairWithDelete) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,9 +189,9 @@ public CountingMockRepository(
}

@Override
public Metadata getSnapshotGlobalMetadata(SnapshotId snapshotId) {
public Metadata getSnapshotGlobalMetadata(SnapshotId snapshotId, boolean fromProjectMetadata) {
globalMetadata.computeIfAbsent(snapshotId.getName(), (s) -> new AtomicInteger(0)).incrementAndGet();
return super.getSnapshotGlobalMetadata(snapshotId);
return super.getSnapshotGlobalMetadata(snapshotId, fromProjectMetadata);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ public void testWarningSpeedOverRecovery() throws Exception {
"snapshot speed over recovery speed",
"org.elasticsearch.repositories.blobstore.BlobStoreRepository",
Level.WARN,
"repository [test-repo] has a rate limit [max_snapshot_bytes_per_sec=1gb] per second which is above "
"repository [default][test-repo] has a rate limit [max_snapshot_bytes_per_sec=1gb] per second which is above "
+ "the effective recovery rate limit [indices.recovery.max_bytes_per_sec=100mb] per second, thus the repository "
+ "rate limit will be superseded by the recovery rate limit"
);
Expand All @@ -152,7 +152,7 @@ public void testWarningSpeedOverRecovery() throws Exception {
"snapshot restore speed over recovery speed",
"org.elasticsearch.repositories.blobstore.BlobStoreRepository",
Level.WARN,
"repository [test-repo] has a rate limit [max_restore_bytes_per_sec=2gb] per second which is above "
"repository [default][test-repo] has a rate limit [max_restore_bytes_per_sec=2gb] per second which is above "
+ "the effective recovery rate limit [indices.recovery.max_bytes_per_sec=100mb] per second, thus the repository "
+ "rate limit will be superseded by the recovery rate limit"
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.cluster.SnapshotDeletionsInProgress;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.service.MasterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.snapshots.mockstore.MockRepository;
Expand Down Expand Up @@ -43,7 +44,7 @@ public void testDeletingSnapshotsIsLoggedAfterClusterStateIsProcessed() throws E
"[does-not-exist]",
SnapshotsService.class.getName(),
Level.INFO,
"deleting snapshots [does-not-exist] from repository [test-repo]"
"deleting snapshots [does-not-exist] from repository [default][test-repo]"
)
);

Expand All @@ -52,7 +53,7 @@ public void testDeletingSnapshotsIsLoggedAfterClusterStateIsProcessed() throws E
"[deleting test-snapshot]",
SnapshotsService.class.getName(),
Level.INFO,
"deleting snapshots [test-snapshot] from repository [test-repo]"
"deleting snapshots [test-snapshot] from repository [default][test-repo]"
)
);

Expand All @@ -61,7 +62,7 @@ public void testDeletingSnapshotsIsLoggedAfterClusterStateIsProcessed() throws E
"[test-snapshot deleted]",
SnapshotsService.class.getName(),
Level.INFO,
"snapshots [test-snapshot/*] deleted"
"snapshots [test-snapshot/*] deleted in repository [default][test-repo]"
)
);

Expand Down Expand Up @@ -90,7 +91,7 @@ public void testSnapshotDeletionFailureShouldBeLogged() throws Exception {
"[test-snapshot]",
SnapshotsService.class.getName(),
Level.WARN,
"failed to complete snapshot deletion for [test-snapshot] from repository [test-repo]"
"failed to complete snapshot deletion for [test-snapshot] from repository [default][test-repo]"
)
);

Expand Down Expand Up @@ -176,10 +177,10 @@ private SubscribableListener<Void> createSnapshotDeletionListener(String reposit
return false;
}
if (deleteHasStarted.get() == false) {
deleteHasStarted.set(deletionsInProgress.hasExecutingDeletion(repositoryName));
deleteHasStarted.set(deletionsInProgress.hasExecutingDeletion(ProjectId.DEFAULT, repositoryName));
return false;
} else {
return deletionsInProgress.hasExecutingDeletion(repositoryName) == false;
return deletionsInProgress.hasExecutingDeletion(ProjectId.DEFAULT, repositoryName) == false;
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,13 @@
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.ProjectMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.blobstore.DeleteResult;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.ListenableFuture;
import org.elasticsearch.core.FixForMultiProject;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.injection.guice.Inject;
Expand Down Expand Up @@ -66,14 +67,16 @@ public final class TransportCleanupRepositoryAction extends TransportMasterNodeA
private static final Logger logger = LogManager.getLogger(TransportCleanupRepositoryAction.class);

private final RepositoriesService repositoriesService;
private final ProjectResolver projectResolver;

@Inject
public TransportCleanupRepositoryAction(
TransportService transportService,
ClusterService clusterService,
RepositoriesService repositoriesService,
ThreadPool threadPool,
ActionFilters actionFilters
ActionFilters actionFilters,
ProjectResolver projectResolver
) {
super(
TYPE.name(),
Expand All @@ -86,6 +89,7 @@ public TransportCleanupRepositoryAction(
EsExecutors.DIRECT_EXECUTOR_SERVICE
);
this.repositoriesService = repositoriesService;
this.projectResolver = projectResolver;
// We add a state applier that will remove any dangling repository cleanup actions on master failover.
// This is safe to do since cleanups will increment the repository state id before executing any operations to prevent concurrent
// operations from corrupting the repository. This is the same safety mechanism used by snapshot deletes.
Expand Down Expand Up @@ -134,22 +138,23 @@ protected void masterOperation(
ClusterState state,
ActionListener<CleanupRepositoryResponse> listener
) {
cleanupRepo(request.name(), listener.map(CleanupRepositoryResponse::new));
cleanupRepo(projectResolver.getProjectId(), request.name(), listener.map(CleanupRepositoryResponse::new));
}

@Override
protected ClusterBlockException checkBlock(CleanupRepositoryRequest request, ClusterState state) {
// Cluster is not affected but we look up repositories in metadata
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ);
return state.blocks().globalBlockedException(projectResolver.getProjectId(), ClusterBlockLevel.METADATA_READ);
}

/**
* Runs cleanup operations on the given repository.
* @param projectId Project for the repository
* @param repositoryName Repository to clean up
* @param listener Listener for cleanup result
*/
private void cleanupRepo(String repositoryName, ActionListener<RepositoryCleanupResult> listener) {
final Repository repository = repositoriesService.repository(repositoryName);
private void cleanupRepo(ProjectId projectId, String repositoryName, ActionListener<RepositoryCleanupResult> listener) {
final Repository repository = repositoriesService.repository(projectId, repositoryName);
if (repository instanceof BlobStoreRepository == false) {
listener.onFailure(new IllegalArgumentException("Repository [" + repositoryName + "] does not support repository cleanup"));
return;
Expand All @@ -172,8 +177,10 @@ private void cleanupRepo(String repositoryName, ActionListener<RepositoryCleanup

@Override
public ClusterState execute(ClusterState currentState) {
SnapshotsService.ensureRepositoryExists(repositoryName, currentState);
SnapshotsService.ensureNotReadOnly(currentState, repositoryName);
final ProjectMetadata projectMetadata = currentState.metadata().getProject(projectId);
SnapshotsService.ensureRepositoryExists(repositoryName, projectMetadata);
SnapshotsService.ensureNotReadOnly(projectMetadata, repositoryName);
// Repository cleanup is intentionally cluster wide exclusive
final RepositoryCleanupInProgress repositoryCleanupInProgress = RepositoryCleanupInProgress.get(currentState);
if (repositoryCleanupInProgress.hasCleanupInProgress()) {
throw new IllegalStateException(
Expand All @@ -200,8 +207,6 @@ public ClusterState execute(ClusterState currentState) {
"Cannot cleanup [" + repositoryName + "] - a snapshot is currently running in [" + snapshots + "]"
);
}
@FixForMultiProject
final var projectId = ProjectId.DEFAULT;
return ClusterState.builder(currentState)
.putCustom(
RepositoryCleanupInProgress.TYPE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.injection.guice.Inject;
Expand All @@ -32,14 +33,16 @@ public final class TransportCloneSnapshotAction extends AcknowledgedTransportMas

public static final ActionType<AcknowledgedResponse> TYPE = new ActionType<>("cluster:admin/snapshot/clone");
private final SnapshotsService snapshotsService;
private final ProjectResolver projectResolver;

@Inject
public TransportCloneSnapshotAction(
TransportService transportService,
ClusterService clusterService,
ThreadPool threadPool,
SnapshotsService snapshotsService,
ActionFilters actionFilters
ActionFilters actionFilters,
ProjectResolver projectResolver
) {
super(
TYPE.name(),
Expand All @@ -51,12 +54,13 @@ public TransportCloneSnapshotAction(
EsExecutors.DIRECT_EXECUTOR_SERVICE
);
this.snapshotsService = snapshotsService;
this.projectResolver = projectResolver;
}

@Override
protected ClusterBlockException checkBlock(CloneSnapshotRequest request, ClusterState state) {
// Cluster is not affected but we look up repositories in metadata
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ);
return state.blocks().globalBlockedException(projectResolver.getProjectId(), ClusterBlockLevel.METADATA_READ);
}

@Override
Expand All @@ -66,6 +70,6 @@ protected void masterOperation(
ClusterState state,
final ActionListener<AcknowledgedResponse> listener
) {
snapshotsService.cloneSnapshot(request, listener.map(v -> AcknowledgedResponse.TRUE));
snapshotsService.cloneSnapshot(projectResolver.getProjectId(), request, listener.map(v -> AcknowledgedResponse.TRUE));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.injection.guice.Inject;
Expand All @@ -31,14 +32,16 @@
public class TransportCreateSnapshotAction extends TransportMasterNodeAction<CreateSnapshotRequest, CreateSnapshotResponse> {
public static final ActionType<CreateSnapshotResponse> TYPE = new ActionType<>("cluster:admin/snapshot/create");
private final SnapshotsService snapshotsService;
private final ProjectResolver projectResolver;

@Inject
public TransportCreateSnapshotAction(
TransportService transportService,
ClusterService clusterService,
ThreadPool threadPool,
SnapshotsService snapshotsService,
ActionFilters actionFilters
ActionFilters actionFilters,
ProjectResolver projectResolver
) {
super(
TYPE.name(),
Expand All @@ -51,12 +54,13 @@ public TransportCreateSnapshotAction(
EsExecutors.DIRECT_EXECUTOR_SERVICE
);
this.snapshotsService = snapshotsService;
this.projectResolver = projectResolver;
}

@Override
protected ClusterBlockException checkBlock(CreateSnapshotRequest request, ClusterState state) {
// We only check metadata block, as we want to snapshot closed indices (which have a read block)
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ);
return state.blocks().globalBlockedException(projectResolver.getProjectId(), ClusterBlockLevel.METADATA_READ);
}

@Override
Expand All @@ -67,9 +71,13 @@ protected void masterOperation(
final ActionListener<CreateSnapshotResponse> listener
) {
if (request.waitForCompletion()) {
snapshotsService.executeSnapshot(request, listener.map(CreateSnapshotResponse::new));
snapshotsService.executeSnapshot(projectResolver.getProjectId(), request, listener.map(CreateSnapshotResponse::new));
} else {
snapshotsService.createSnapshot(request, listener.map(snapshot -> new CreateSnapshotResponse((SnapshotInfo) null)));
snapshotsService.createSnapshot(
projectResolver.getProjectId(),
request,
listener.map(snapshot -> new CreateSnapshotResponse((SnapshotInfo) null))
);
}
}
}
Loading