for
return null;
}
- /**
- * Computes a map of repository shard id to set of shard generations, containing all shard generations that became obsolete and may be
- * deleted from the repository as the cluster state moves from the given old value of {@link SnapshotsInProgress} to this instance.
- *
- * An unique shard generation is created for every in-progress shard snapshot. The shard generation file contains information about all
- * the files needed by pre-existing and any new shard snapshots that were in-progress. When a shard snapshot is finalized, its file list
- * is promoted to the official shard snapshot list for the index shard. This final list will contain metadata about any other
- * in-progress shard snapshots that were not yet finalized when it began. All these other in-progress shard snapshot lists are scheduled
- * for deletion now.
- */
- @FixForMultiProject
- @Deprecated(forRemoval = true)
- public Map> obsoleteGenerations(
- String repository,
- SnapshotsInProgress oldClusterStateSnapshots
- ) {
- return obsoleteGenerations(Metadata.DEFAULT_PROJECT_ID, repository, oldClusterStateSnapshots);
- }
-
/**
* Computes a map of repository shard id to set of shard generations, containing all shard generations that became obsolete and may be
* deleted from the repository as the cluster state moves from the given old value of {@link SnapshotsInProgress} to this instance.
diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/ProjectMetadata.java b/server/src/main/java/org/elasticsearch/cluster/metadata/ProjectMetadata.java
index 4a10964b1be57..4b574b8313c28 100644
--- a/server/src/main/java/org/elasticsearch/cluster/metadata/ProjectMetadata.java
+++ b/server/src/main/java/org/elasticsearch/cluster/metadata/ProjectMetadata.java
@@ -2089,6 +2089,9 @@ static boolean assertDataStreams(Map indices, DataStreamM
public static ProjectMetadata fromXContent(XContentParser parser) throws IOException {
XContentParser.Token token = parser.currentToken();
+ if (token == null) {
+ token = parser.nextToken();
+ }
String currentFieldName = null;
XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, token, parser);
diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/SnapshotInProgressAllocationDecider.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/SnapshotInProgressAllocationDecider.java
index 2a76d36466c80..03ae7c6f1d32c 100644
--- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/SnapshotInProgressAllocationDecider.java
+++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/SnapshotInProgressAllocationDecider.java
@@ -14,6 +14,7 @@
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
+import org.elasticsearch.core.FixForMultiProject;
import java.util.Objects;
@@ -73,7 +74,9 @@ private static Decision canMove(ShardRouting shardRouting, RoutingAllocation all
return YES_NOT_SNAPSHOTTED;
}
- for (final var entriesByRepo : snapshotsInProgress.entriesByRepo()) {
+ @FixForMultiProject(description = "replace with entriesByRepo(ProjectId), see also ES-12195")
+ final var entriesByRepoIterable = snapshotsInProgress.entriesByRepo();
+ for (final var entriesByRepo : entriesByRepoIterable) {
for (final var entry : entriesByRepo) {
if (entry.isClone()) {
// clones do not run on data nodes
diff --git a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java
index dd134549ab014..48ad9efa4510b 100644
--- a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java
+++ b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java
@@ -1115,7 +1115,8 @@ public Map queryFields() {
repositoriesService,
transportService,
actionModule.getActionFilters(),
- systemIndices
+ systemIndices,
+ projectResolver.supportsMultipleProjects()
);
SnapshotShardsService snapshotShardsService = new SnapshotShardsService(
settings,
diff --git a/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java b/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java
index 34803a12d66cd..b0a55d1f66411 100644
--- a/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java
+++ b/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java
@@ -67,8 +67,8 @@ public void getSnapshotInfo(
}
@Override
- public Metadata getSnapshotGlobalMetadata(SnapshotId snapshotId) {
- return in.getSnapshotGlobalMetadata(snapshotId);
+ public Metadata getSnapshotGlobalMetadata(SnapshotId snapshotId, boolean fromProjectMetadata) {
+ return in.getSnapshotGlobalMetadata(snapshotId, fromProjectMetadata);
}
@Override
diff --git a/server/src/main/java/org/elasticsearch/repositories/FinalizeSnapshotContext.java b/server/src/main/java/org/elasticsearch/repositories/FinalizeSnapshotContext.java
index b508238406373..c9d8a703bbfdd 100644
--- a/server/src/main/java/org/elasticsearch/repositories/FinalizeSnapshotContext.java
+++ b/server/src/main/java/org/elasticsearch/repositories/FinalizeSnapshotContext.java
@@ -27,6 +27,7 @@
*/
public final class FinalizeSnapshotContext extends DelegatingActionListener {
+ private final boolean serializeProjectMetadata;
private final UpdatedShardGenerations updatedShardGenerations;
/**
@@ -46,6 +47,7 @@ public final class FinalizeSnapshotContext extends DelegatingActionListener PROJECT_REPO_SERIALIZER = new DiffableUtils.KeySerializer<>() {
+ @Override
+ public void writeKey(ProjectRepo key, StreamOutput out) throws IOException {
+ key.writeTo(out);
+ }
+
+ @Override
+ public ProjectRepo readKey(StreamInput in) throws IOException {
+ return new ProjectRepo(in);
+ }
+ };
+
+ public ProjectRepo(StreamInput in) throws IOException {
+ this(ProjectId.readFrom(in), in.readString());
+ }
+
+ @Override
+ public void writeTo(StreamOutput out) throws IOException {
+ projectId.writeTo(out);
+ out.writeString(name);
+ }
+
+ @Override
+ public String toString() {
+ return projectRepoString(projectId, name);
+ }
+
+ public static String projectRepoString(ProjectId projectId, String repositoryName) {
+ return "[" + projectId + "/" + repositoryName + "]";
+ }
+}
diff --git a/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java b/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java
index 6248e74bee109..f11232771d48f 100644
--- a/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java
+++ b/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java
@@ -77,7 +77,7 @@
import static java.util.Collections.unmodifiableMap;
import static org.elasticsearch.core.Strings.format;
-import static org.elasticsearch.repositories.RepositoryOperation.projectRepoString;
+import static org.elasticsearch.repositories.ProjectRepo.projectRepoString;
import static org.elasticsearch.snapshots.SearchableSnapshotsSettings.SEARCHABLE_SNAPSHOTS_REPOSITORY_NAME_SETTING_KEY;
import static org.elasticsearch.snapshots.SearchableSnapshotsSettings.SEARCHABLE_SNAPSHOTS_REPOSITORY_UUID_SETTING_KEY;
@@ -455,8 +455,8 @@ public static void updateRepositoryUuidInMetadata(
logger.info(
Strings.format(
- "Registering repository [%s] with repository UUID [%s] and generation [%d]",
- repositoryName,
+ "Registering repository %s with repository UUID [%s] and generation [%d]",
+ projectRepoString(projectId, repositoryName),
repositoryData.getUuid(),
repositoryData.getGenId()
)
diff --git a/server/src/main/java/org/elasticsearch/repositories/Repository.java b/server/src/main/java/org/elasticsearch/repositories/Repository.java
index 74c5f3e0e9dc0..e98c3ffb16532 100644
--- a/server/src/main/java/org/elasticsearch/repositories/Repository.java
+++ b/server/src/main/java/org/elasticsearch/repositories/Repository.java
@@ -87,6 +87,13 @@ default Repository create(
@Nullable
ProjectId getProjectId();
+ /**
+ * Get the project qualified repository
+ */
+ default ProjectRepo getProjectRepo() {
+ return new ProjectRepo(getProjectId(), getMetadata().name());
+ }
+
/**
* Returns metadata about this repository.
*/
@@ -138,10 +145,11 @@ public void onFailure(Exception e) {
/**
* Returns global metadata associated with the snapshot.
*
- * @param snapshotId the snapshot id to load the global metadata from
+ * @param snapshotId the snapshot id to load the global metadata from
+ * @param fromProjectMetadata The metadata may need to be constructed by first reading the project metadata
* @return the global metadata about the snapshot
*/
- Metadata getSnapshotGlobalMetadata(SnapshotId snapshotId);
+ Metadata getSnapshotGlobalMetadata(SnapshotId snapshotId, boolean fromProjectMetadata);
/**
* Returns the index metadata associated with the snapshot.
diff --git a/server/src/main/java/org/elasticsearch/repositories/RepositoryOperation.java b/server/src/main/java/org/elasticsearch/repositories/RepositoryOperation.java
index e6bdf5bc5efc8..a0660758d2b80 100644
--- a/server/src/main/java/org/elasticsearch/repositories/RepositoryOperation.java
+++ b/server/src/main/java/org/elasticsearch/repositories/RepositoryOperation.java
@@ -8,13 +8,7 @@
*/
package org.elasticsearch.repositories;
-import org.elasticsearch.cluster.DiffableUtils;
import org.elasticsearch.cluster.metadata.ProjectId;
-import org.elasticsearch.common.io.stream.StreamInput;
-import org.elasticsearch.common.io.stream.StreamOutput;
-import org.elasticsearch.common.io.stream.Writeable;
-
-import java.io.IOException;
/**
* Coordinates of an operation that modifies a repository, assuming that repository at a specific generation.
@@ -36,46 +30,4 @@ public interface RepositoryOperation {
*/
long repositoryStateId();
- /**
- * A project qualified repository
- * @param projectId The project that the repository belongs to
- * @param name Name of the repository
- */
- record ProjectRepo(ProjectId projectId, String name) implements Writeable {
-
- public ProjectRepo(StreamInput in) throws IOException {
- this(ProjectId.readFrom(in), in.readString());
- }
-
- @Override
- public void writeTo(StreamOutput out) throws IOException {
- projectId.writeTo(out);
- out.writeString(name);
- }
-
- @Override
- public String toString() {
- return projectRepoString(projectId, name);
- }
- }
-
- static ProjectRepo projectRepo(ProjectId projectId, String repositoryName) {
- return new ProjectRepo(projectId, repositoryName);
- }
-
- static String projectRepoString(ProjectId projectId, String repositoryName) {
- return "[" + projectId + "][" + repositoryName + "]";
- }
-
- DiffableUtils.KeySerializer PROJECT_REPO_SERIALIZER = new DiffableUtils.KeySerializer<>() {
- @Override
- public void writeKey(ProjectRepo key, StreamOutput out) throws IOException {
- key.writeTo(out);
- }
-
- @Override
- public ProjectRepo readKey(StreamInput in) throws IOException {
- return new ProjectRepo(in);
- }
- };
}
diff --git a/server/src/main/java/org/elasticsearch/repositories/UnknownTypeRepository.java b/server/src/main/java/org/elasticsearch/repositories/UnknownTypeRepository.java
index 13d81d2d0d484..9750666c8c8a9 100644
--- a/server/src/main/java/org/elasticsearch/repositories/UnknownTypeRepository.java
+++ b/server/src/main/java/org/elasticsearch/repositories/UnknownTypeRepository.java
@@ -74,7 +74,7 @@ public void getSnapshotInfo(
}
@Override
- public Metadata getSnapshotGlobalMetadata(SnapshotId snapshotId) {
+ public Metadata getSnapshotGlobalMetadata(SnapshotId snapshotId, boolean fromProjectMetadata) {
throw createUnknownTypeException();
}
diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java
index 996c9cda4deab..dea02908d215f 100644
--- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java
+++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java
@@ -179,6 +179,7 @@
import static org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo;
import static org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo.canonicalName;
import static org.elasticsearch.indices.recovery.RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING;
+import static org.elasticsearch.repositories.ProjectRepo.projectRepoString;
/**
* BlobStore - based implementation of Snapshot Repository
@@ -371,6 +372,15 @@ public static String getRepositoryDataBlobName(long repositoryGeneration) {
ChunkedToXContent::wrapAsToXContent
);
+ public static final ChecksumBlobStoreFormat PROJECT_METADATA_FORMAT = new ChecksumBlobStoreFormat<>(
+ "project-metadata",
+ METADATA_NAME_FORMAT,
+ (repoName, parser) -> ProjectMetadata.Builder.fromXContent(parser),
+ projectMetadata -> ChunkedToXContent.wrapAsToXContent(
+ params -> Iterators.concat(Iterators.single((builder, ignored) -> builder.field("id", projectMetadata.id())))
+ )
+ );
+
public static final ChecksumBlobStoreFormat INDEX_METADATA_FORMAT = new ChecksumBlobStoreFormat<>(
"index-metadata",
METADATA_NAME_FORMAT,
@@ -710,7 +720,7 @@ public boolean canUpdateInPlace(Settings updatedSettings, Set ignoredSet
@Override
public void updateState(ClusterState state) {
final Settings previousSettings = metadata.settings();
- metadata = getRepoMetadata(state);
+ metadata = getRepoMetadata(state.metadata().getProject(getProjectId()));
final Settings updatedSettings = metadata.settings();
if (updatedSettings.equals(previousSettings) == false) {
snapshotRateLimiter = getSnapshotRateLimiter();
@@ -725,7 +735,7 @@ public void updateState(ClusterState state) {
return;
}
if (bestEffortConsistency) {
- long bestGenerationFromCS = bestGeneration(SnapshotsInProgress.get(state).forRepo(this.metadata.name()));
+ long bestGenerationFromCS = bestGeneration(SnapshotsInProgress.get(state).forRepo(getProjectRepo()));
// Don't use generation from the delete task if we already found a generation for an in progress snapshot.
// In this case, the generation points at the generation the repo will be in after the snapshot finishes so it may not yet
// exist
@@ -1282,7 +1292,7 @@ private void determineShardCount(ActionListener listener) {
private void getOneShardCount(String indexMetaGeneration) {
try {
updateShardCount(
- INDEX_METADATA_FORMAT.read(metadata.name(), indexContainer, indexMetaGeneration, namedXContentRegistry)
+ INDEX_METADATA_FORMAT.read(getProjectRepo(), indexContainer, indexMetaGeneration, namedXContentRegistry)
.getNumberOfShards()
);
} catch (Exception ex) {
@@ -1513,8 +1523,8 @@ private void cleanupUnlinkedRootAndIndicesBlobs(RepositoryData newRepositoryData
} catch (Exception e) {
logger.warn(
() -> format(
- "[%s] The following blobs are no longer part of any snapshot [%s] but failed to remove them",
- metadata.name(),
+ "%s The following blobs are no longer part of any snapshot [%s] but failed to remove them",
+ toStringShort(),
staleRootBlobs
),
e
@@ -1542,8 +1552,8 @@ private void cleanupUnlinkedRootAndIndicesBlobs(RepositoryData newRepositoryData
logger.debug("[{}] Cleaned up stale index [{}]", metadata.name(), indexId);
} catch (IOException e) {
logger.warn(() -> format("""
- [%s] index %s is no longer part of any snapshot in the repository, \
- but failed to clean up its index folder""", metadata.name(), indexId), e);
+ %s index %s is no longer part of any snapshot in the repository, \
+ but failed to clean up its index folder""", toStringShort(), indexId), e);
}
}));
}
@@ -1616,7 +1626,7 @@ private void logStaleRootLevelBlobs(
.collect(Collectors.toSet());
final List blobsToLog = blobsToDelete.stream().filter(b -> blobNamesToIgnore.contains(b) == false).toList();
if (blobsToLog.isEmpty() == false) {
- logger.info("[{}] Found stale root level blobs {}. Cleaning them up", metadata.name(), blobsToLog);
+ logger.info("{} Found stale root level blobs {}. Cleaning them up", toStringShort(), blobsToLog);
}
}
}
@@ -1749,6 +1759,8 @@ int sizeInBytes() {
@Override
public void finalizeSnapshot(final FinalizeSnapshotContext finalizeSnapshotContext) {
assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.SNAPSHOT);
+ assert finalizeSnapshotContext.snapshotInfo().projectId().equals(getProjectId())
+ : "project-id mismatch: " + finalizeSnapshotContext.snapshotInfo().projectId() + " != " + getProjectId();
final long repositoryStateId = finalizeSnapshotContext.repositoryStateId();
final SnapshotInfo snapshotInfo = finalizeSnapshotContext.snapshotInfo();
assert repositoryStateId > RepositoryData.UNKNOWN_REPO_GEN
@@ -1813,17 +1825,19 @@ record RootBlobUpdateResult(RepositoryData oldRepositoryData, RepositoryData new
// Write global metadata
final Metadata clusterMetadata = finalizeSnapshotContext.clusterMetadata();
- executor.execute(
- ActionRunnable.run(
- allMetaListeners.acquire(),
- () -> GLOBAL_METADATA_FORMAT.write(clusterMetadata, blobContainer(), snapshotId.getUUID(), compress)
- )
- );
+ final var projectMetadata = clusterMetadata.getProject(getProjectId());
+ executor.execute(ActionRunnable.run(allMetaListeners.acquire(), () -> {
+ if (finalizeSnapshotContext.serializeProjectMetadata()) {
+ PROJECT_METADATA_FORMAT.write(projectMetadata, blobContainer(), snapshotId.getUUID(), compress);
+ } else {
+ GLOBAL_METADATA_FORMAT.write(clusterMetadata, blobContainer(), snapshotId.getUUID(), compress);
+ }
+ }));
// Write the index metadata for each index in the snapshot
for (IndexId index : indices) {
executor.execute(ActionRunnable.run(allMetaListeners.acquire(), () -> {
- final IndexMetadata indexMetaData = clusterMetadata.getProject().index(index.getName());
+ final IndexMetadata indexMetaData = projectMetadata.index(index.getName());
if (writeIndexGens) {
final String identifiers = IndexMetaDataGenerations.buildUniqueIdentifier(indexMetaData);
String metaUUID = existingRepositoryData.indexMetaDataGenerations().getIndexMetaBlobId(identifiers);
@@ -1836,7 +1850,7 @@ record RootBlobUpdateResult(RepositoryData oldRepositoryData, RepositoryData new
metadataWriteResult.indexMetas().put(index, identifiers);
} else {
INDEX_METADATA_FORMAT.write(
- clusterMetadata.getProject().index(index.getName()),
+ clusterMetadata.getProject(getProjectId()).index(index.getName()),
indexContainer(index),
snapshotId.getUUID(),
compress
@@ -2014,7 +2028,7 @@ private void getOneSnapshotInfo(BlockingQueue queue, GetSnapshotInfo
Exception failure = null;
SnapshotInfo snapshotInfo = null;
try {
- snapshotInfo = SNAPSHOT_FORMAT.read(metadata.name(), blobContainer(), snapshotId.getUUID(), namedXContentRegistry);
+ snapshotInfo = SNAPSHOT_FORMAT.read(getProjectRepo(), blobContainer(), snapshotId.getUUID(), namedXContentRegistry);
} catch (NoSuchFileException ex) {
failure = new SnapshotMissingException(metadata.name(), snapshotId, ex);
} catch (IOException | NotXContentException ex) {
@@ -2038,9 +2052,19 @@ private void getOneSnapshotInfo(BlockingQueue queue, GetSnapshotInfo
}
@Override
- public Metadata getSnapshotGlobalMetadata(final SnapshotId snapshotId) {
+ public Metadata getSnapshotGlobalMetadata(final SnapshotId snapshotId, boolean fromProjectMetadata) {
try {
- return GLOBAL_METADATA_FORMAT.read(metadata.name(), blobContainer(), snapshotId.getUUID(), namedXContentRegistry);
+ if (fromProjectMetadata) {
+ final var projectMetadata = PROJECT_METADATA_FORMAT.read(
+ getProjectRepo(),
+ blobContainer(),
+ snapshotId.getUUID(),
+ namedXContentRegistry
+ );
+ return Metadata.builder().put(projectMetadata).build();
+ } else {
+ return GLOBAL_METADATA_FORMAT.read(getProjectRepo(), blobContainer(), snapshotId.getUUID(), namedXContentRegistry);
+ }
} catch (NoSuchFileException ex) {
throw new SnapshotMissingException(metadata.name(), snapshotId, ex);
} catch (IOException ex) {
@@ -2052,7 +2076,7 @@ public Metadata getSnapshotGlobalMetadata(final SnapshotId snapshotId) {
public IndexMetadata getSnapshotIndexMetaData(RepositoryData repositoryData, SnapshotId snapshotId, IndexId index) throws IOException {
try {
return INDEX_METADATA_FORMAT.read(
- metadata.name(),
+ getProjectRepo(),
indexContainer(index),
repositoryData.indexMetaDataGenerations().indexMetaBlobId(snapshotId, index),
namedXContentRegistry
@@ -2130,9 +2154,9 @@ private RateLimiter getRateLimiter(
if (warnIfOverRecovery && effectiveRecoverySpeed.getBytes() > 0) {
if (maxConfiguredBytesPerSec.getBytes() > effectiveRecoverySpeed.getBytes()) {
logger.warn(
- "repository [{}] has a rate limit [{}={}] per second which is above the effective recovery rate limit "
+ "repository {} has a rate limit [{}={}] per second which is above the effective recovery rate limit "
+ "[{}={}] per second, thus the repository rate limit will be superseded by the recovery rate limit",
- metadata.name(),
+ toStringShort(),
settingKey,
maxConfiguredBytesPerSec,
INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey(),
@@ -2339,7 +2363,7 @@ public void onResponse(RepositoryData repositoryData) {
@Override
public void onFailure(Exception e) {
logger.warn(
- () -> format("[%s] Exception when initializing repository generation in cluster state", metadata.name()),
+ () -> format("%s Exception when initializing repository generation in cluster state", toStringShort()),
e
);
acquireAndClearRepoDataInitialized().onFailure(e);
@@ -2403,11 +2427,11 @@ public void clusterStateProcessed(ClusterState oldState, ClusterState newState)
private ClusterState getClusterStateWithUpdatedRepositoryGeneration(ClusterState currentState, RepositoryData repoData) {
// In theory we might have failed over to a different master which initialized the repo and then failed back to this node, so we
// must check the repository generation in the cluster state is still unknown here.
- final RepositoryMetadata repoMetadata = getRepoMetadata(currentState);
+ final var project = currentState.metadata().getProject(getProjectId());
+ final RepositoryMetadata repoMetadata = getRepoMetadata(project);
if (repoMetadata.generation() != RepositoryData.UNKNOWN_REPO_GEN) {
throw new RepositoryException(repoMetadata.name(), "Found unexpected initialized repo metadata [" + repoMetadata + "]");
}
- final var project = currentState.metadata().getProject(getProjectId());
return ClusterState.builder(currentState)
.putProjectMetadata(
ProjectMetadata.builder(project)
@@ -2588,56 +2612,53 @@ private static String previousWriterMessage(@Nullable Tuple previo
private void markRepoCorrupted(long corruptedGeneration, Exception originalException, ActionListener listener) {
assert corruptedGeneration != RepositoryData.UNKNOWN_REPO_GEN;
assert bestEffortConsistency == false;
- logger.warn(() -> "Marking repository [" + metadata.name() + "] as corrupted", originalException);
- submitUnbatchedTask(
- "mark repository corrupted [" + metadata.name() + "][" + corruptedGeneration + "]",
- new ClusterStateUpdateTask() {
- @Override
- public ClusterState execute(ClusterState currentState) {
- final var project = currentState.metadata().getDefaultProject();
- final RepositoriesMetadata state = RepositoriesMetadata.get(project);
- final RepositoryMetadata repoState = state.repository(metadata.name());
- if (repoState.generation() != corruptedGeneration) {
- throw new IllegalStateException(
- "Tried to mark repo generation ["
- + corruptedGeneration
- + "] as corrupted but its state concurrently changed to ["
- + repoState
- + "]"
- );
- }
- return ClusterState.builder(currentState)
- .putProjectMetadata(
- ProjectMetadata.builder(project)
- .putCustom(
- RepositoriesMetadata.TYPE,
- state.withUpdatedGeneration(
- metadata.name(),
- RepositoryData.CORRUPTED_REPO_GEN,
- repoState.pendingGeneration()
- )
- )
- )
- .build();
- }
-
- @Override
- public void onFailure(Exception e) {
- listener.onFailure(
- new RepositoryException(
- metadata.name(),
- "Failed marking repository state as corrupted",
- ExceptionsHelper.useOrSuppress(e, originalException)
- )
+ logger.warn(() -> "Marking repository " + toStringShort() + " as corrupted", originalException);
+ submitUnbatchedTask("mark repository corrupted " + toStringShort() + "[" + corruptedGeneration + "]", new ClusterStateUpdateTask() {
+ @Override
+ public ClusterState execute(ClusterState currentState) {
+ final var project = currentState.metadata().getProject(projectId);
+ final RepositoriesMetadata state = RepositoriesMetadata.get(project);
+ final RepositoryMetadata repoState = state.repository(metadata.name());
+ if (repoState.generation() != corruptedGeneration) {
+ throw new IllegalStateException(
+ "Tried to mark repo generation ["
+ + corruptedGeneration
+ + "] as corrupted but its state concurrently changed to ["
+ + repoState
+ + "]"
);
}
+ return ClusterState.builder(currentState)
+ .putProjectMetadata(
+ ProjectMetadata.builder(project)
+ .putCustom(
+ RepositoriesMetadata.TYPE,
+ state.withUpdatedGeneration(
+ metadata.name(),
+ RepositoryData.CORRUPTED_REPO_GEN,
+ repoState.pendingGeneration()
+ )
+ )
+ )
+ .build();
+ }
- @Override
- public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
- listener.onResponse(null);
- }
+ @Override
+ public void onFailure(Exception e) {
+ listener.onFailure(
+ new RepositoryException(
+ metadata.name(),
+ "Failed marking repository state as corrupted",
+ ExceptionsHelper.useOrSuppress(e, originalException)
+ )
+ );
}
- );
+
+ @Override
+ public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
+ listener.onResponse(null);
+ }
+ });
}
private RepositoryData getRepositoryData(long indexGen) {
@@ -2748,7 +2769,8 @@ protected void writeIndexGen(
@Override
public ClusterState execute(ClusterState currentState) {
- final RepositoryMetadata meta = getRepoMetadata(currentState);
+ final var project = currentState.metadata().getProject(projectId);
+ final RepositoryMetadata meta = getRepoMetadata(project);
final String repoName = metadata.name();
if (RepositoriesService.isReadOnly(meta.settings())) {
@@ -2762,9 +2784,9 @@ public ClusterState execute(ClusterState currentState) {
final boolean uninitializedMeta = meta.generation() == RepositoryData.UNKNOWN_REPO_GEN || bestEffortConsistency;
if (uninitializedMeta == false && meta.pendingGeneration() != genInState) {
logger.info(
- "Trying to write new repository data over unfinished write, repo [{}] is at "
+ "Trying to write new repository data over unfinished write, repo {} is at "
+ "safe generation [{}] and pending generation [{}]",
- meta.name(),
+ toStringShort(),
genInState,
meta.pendingGeneration()
);
@@ -2788,7 +2810,6 @@ public ClusterState execute(ClusterState currentState) {
+ "] must be larger than latest known generation ["
+ latestKnownRepoGen.get()
+ "]";
- final var project = currentState.metadata().getDefaultProject();
return ClusterState.builder(currentState)
.putProjectMetadata(
ProjectMetadata.builder(project)
@@ -2897,9 +2918,9 @@ public void onFailure(Exception e) {
assert newRepositoryData.getUuid().equals(RepositoryData.MISSING_UUID) == false;
logger.info(
Strings.format(
- "Generated new repository UUID [%s] for repository [%s] in generation [%d]",
+ "Generated new repository UUID [%s] for repository %s in generation [%d]",
newRepositoryData.getUuid(),
- metadata.name(),
+ toStringShort(),
newGen
)
);
@@ -2914,7 +2935,8 @@ public void onFailure(Exception e) {
submitUnbatchedTask(setSafeGenerationSource, new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
- final RepositoryMetadata meta = getRepoMetadata(currentState);
+ final var project = currentState.metadata().getProject(projectId);
+ final RepositoryMetadata meta = getRepoMetadata(project);
if (meta.generation() != expectedGen) {
throw new IllegalStateException(
"Tried to update repo generation to [" + newGen + "] but saw unexpected generation in state [" + meta + "]"
@@ -2929,7 +2951,6 @@ public ClusterState execute(ClusterState currentState) {
+ "]"
);
}
- final var project = currentState.metadata().getDefaultProject();
final RepositoriesMetadata withGenerations = RepositoriesMetadata.get(project)
.withUpdatedGeneration(metadata.name(), newGen, newGen);
final RepositoriesMetadata withUuid = meta.uuid().equals(newRepositoryData.getUuid())
@@ -3089,7 +3110,7 @@ private ClusterState updateRepositoryGenerationsIfNecessary(ClusterState state,
boolean changedSnapshots = false;
final List snapshotEntries = new ArrayList<>();
final SnapshotsInProgress snapshotsInProgress = SnapshotsInProgress.get(state);
- for (SnapshotsInProgress.Entry entry : snapshotsInProgress.forRepo(repoName)) {
+ for (SnapshotsInProgress.Entry entry : snapshotsInProgress.forRepo(getProjectId(), repoName)) {
if (entry.repositoryStateId() == oldGen) {
snapshotEntries.add(entry.withRepoGen(newGen));
changedSnapshots = true;
@@ -3098,13 +3119,13 @@ private ClusterState updateRepositoryGenerationsIfNecessary(ClusterState state,
}
}
updatedSnapshotsInProgress = changedSnapshots
- ? snapshotsInProgress.createCopyWithUpdatedEntriesForRepo(repoName, snapshotEntries)
+ ? snapshotsInProgress.createCopyWithUpdatedEntriesForRepo(getProjectId(), repoName, snapshotEntries)
: null;
final SnapshotDeletionsInProgress updatedDeletionsInProgress;
boolean changedDeletions = false;
final List deletionEntries = new ArrayList<>();
for (SnapshotDeletionsInProgress.Entry entry : SnapshotDeletionsInProgress.get(state).getEntries()) {
- if (entry.repository().equals(repoName) && entry.repositoryStateId() == oldGen) {
+ if (entry.projectId().equals(getProjectId()) && entry.repository().equals(repoName) && entry.repositoryStateId() == oldGen) {
deletionEntries.add(entry.withRepoGen(newGen));
changedDeletions = true;
} else {
@@ -3115,9 +3136,8 @@ private ClusterState updateRepositoryGenerationsIfNecessary(ClusterState state,
return SnapshotsService.updateWithSnapshots(state, updatedSnapshotsInProgress, updatedDeletionsInProgress);
}
- private RepositoryMetadata getRepoMetadata(ClusterState state) {
- final RepositoryMetadata repositoryMetadata = RepositoriesMetadata.get(state.getMetadata().getProject(getProjectId()))
- .repository(metadata.name());
+ private RepositoryMetadata getRepoMetadata(ProjectMetadata projectMetadata) {
+ final RepositoryMetadata repositoryMetadata = RepositoriesMetadata.get(projectMetadata).repository(metadata.name());
assert repositoryMetadata != null || lifecycle.stoppedOrClosed()
: "did not find metadata for repo [" + metadata.name() + "] in state [" + lifecycleState() + "]";
return repositoryMetadata;
@@ -3189,7 +3209,7 @@ private long latestGeneration(Collection rootBlobs) {
} catch (NumberFormatException nfe) {
// the index- blob wasn't of the format index-N where N is a number,
// no idea what this blob is but it doesn't belong in the repository!
- logger.warn("[{}] Unknown blob in the repository: {}", metadata.name(), blobName);
+ logger.warn("[{}] Unknown blob in the repository: {}", toStringShort(), blobName);
}
}
return latest;
@@ -3868,7 +3888,12 @@ public void verify(String seed, DiscoveryNode localNode) {
@Override
public String toString() {
- return "BlobStoreRepository[" + "[" + metadata.name() + "], [" + blobStore.get() + ']' + ']';
+ return "BlobStoreRepository[" + toStringShort() + ", [" + blobStore.get() + ']' + ']';
+ }
+
+ // Package private for testing
+ String toStringShort() {
+ return projectRepoString(projectId, metadata.name());
}
/**
@@ -3898,7 +3923,7 @@ private void writeShardIndexBlobAtomic(
*/
public BlobStoreIndexShardSnapshot loadShardSnapshot(BlobContainer shardContainer, SnapshotId snapshotId) {
try {
- return INDEX_SHARD_SNAPSHOT_FORMAT.read(metadata.name(), shardContainer, snapshotId.getUUID(), namedXContentRegistry);
+ return INDEX_SHARD_SNAPSHOT_FORMAT.read(getProjectRepo(), shardContainer, snapshotId.getUUID(), namedXContentRegistry);
} catch (NoSuchFileException ex) {
throw new SnapshotMissingException(metadata.name(), snapshotId, ex);
} catch (IOException ex) {
@@ -3953,7 +3978,7 @@ private Tuple buildBlobStoreIndex
try {
return new Tuple<>(
INDEX_SHARD_SNAPSHOTS_FORMAT.read(
- metadata.name(),
+ getProjectRepo(),
shardContainer,
generation.getGenerationUUID(),
namedXContentRegistry
@@ -3989,10 +4014,10 @@ private Tuple buildBlobStoreIndex
// keeping hold of its data blobs.
try {
final var message = Strings.format(
- "index %s shard generation [%s] in [%s][%s] not found - falling back to reading all shard snapshots",
+ "index %s shard generation [%s] in %s[%s] not found - falling back to reading all shard snapshots",
indexId,
generation,
- metadata.name(),
+ toStringShort(),
shardContainer.path()
);
logger.error(message, noSuchFileException);
@@ -4009,7 +4034,7 @@ private Tuple buildBlobStoreIndex
&& shardSnapshotBlobName.endsWith(METADATA_BLOB_NAME_SUFFIX)
&& shardSnapshotBlobName.length() == shardSnapshotBlobNameLength) {
final var shardSnapshot = INDEX_SHARD_SNAPSHOT_FORMAT.read(
- metadata.name(),
+ getProjectRepo(),
shardContainer,
shardSnapshotBlobName.substring(SNAPSHOT_PREFIX.length(), shardSnapshotBlobNameLengthBeforeExt),
namedXContentRegistry
@@ -4033,17 +4058,17 @@ private Tuple buildBlobStoreIndex
}
}
logger.error(
- "read shard snapshots [{}] due to missing shard generation [{}] for index {} in [{}][{}]",
+ "read shard snapshots [{}] due to missing shard generation [{}] for index {} in {}[{}]",
messageBuilder,
generation,
indexId,
- metadata.name(),
+ toStringShort(),
shardContainer.path()
);
return new Tuple<>(blobStoreIndexShardSnapshots, generation);
} catch (Exception fallbackException) {
logger.error(
- Strings.format("failed while reading all shard snapshots from [%s][%s]", metadata.name(), shardContainer.path()),
+ Strings.format("failed while reading all shard snapshots from %s[%s]", toStringShort(), shardContainer.path()),
fallbackException
);
noSuchFileException.addSuppressed(fallbackException);
@@ -4067,7 +4092,7 @@ private Tuple buildBlobStoreIndexShardSnapsh
long latest = latestGeneration(blobs);
if (latest >= 0) {
final BlobStoreIndexShardSnapshots shardSnapshots = INDEX_SHARD_SNAPSHOTS_FORMAT.read(
- metadata.name(),
+ getProjectRepo(),
shardContainer,
Long.toString(latest),
namedXContentRegistry
diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/ChecksumBlobStoreFormat.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/ChecksumBlobStoreFormat.java
index bae5ae0fdadf0..60a5f6c09d3e4 100644
--- a/server/src/main/java/org/elasticsearch/repositories/blobstore/ChecksumBlobStoreFormat.java
+++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/ChecksumBlobStoreFormat.java
@@ -30,6 +30,7 @@
import org.elasticsearch.common.xcontent.XContentParserUtils;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.gateway.CorruptStateException;
+import org.elasticsearch.repositories.ProjectRepo;
import org.elasticsearch.xcontent.NamedXContentRegistry;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.XContentBuilder;
@@ -69,9 +70,9 @@ public final class ChecksumBlobStoreFormat {
private final String blobNameFormat;
- private final CheckedBiFunction reader;
+ private final CheckedBiFunction reader;
- private final CheckedBiFunction fallbackReader;
+ private final CheckedBiFunction fallbackReader;
private final Function writer;
@@ -85,8 +86,8 @@ public final class ChecksumBlobStoreFormat {
public ChecksumBlobStoreFormat(
String codec,
String blobNameFormat,
- CheckedBiFunction reader,
- @Nullable CheckedBiFunction fallbackReader,
+ CheckedBiFunction reader,
+ @Nullable CheckedBiFunction fallbackReader,
Function writer
) {
this.reader = reader;
@@ -105,7 +106,7 @@ public ChecksumBlobStoreFormat(
public ChecksumBlobStoreFormat(
String codec,
String blobNameFormat,
- CheckedBiFunction reader,
+ CheckedBiFunction reader,
Function writer
) {
this(codec, blobNameFormat, reader, null, writer);
@@ -118,11 +119,11 @@ public ChecksumBlobStoreFormat(
* @param name name to be translated into
* @return parsed blob object
*/
- public T read(String repoName, BlobContainer blobContainer, String name, NamedXContentRegistry namedXContentRegistry)
+ public T read(ProjectRepo projectRepo, BlobContainer blobContainer, String name, NamedXContentRegistry namedXContentRegistry)
throws IOException {
String blobName = blobName(name);
try (InputStream in = blobContainer.readBlob(OperationPurpose.SNAPSHOT_METADATA, blobName)) {
- return deserialize(repoName, namedXContentRegistry, in);
+ return deserialize(projectRepo, namedXContentRegistry, in);
}
}
@@ -130,7 +131,7 @@ public String blobName(String name) {
return String.format(Locale.ROOT, blobNameFormat, name);
}
- public T deserialize(String repoName, NamedXContentRegistry namedXContentRegistry, InputStream input) throws IOException {
+ public T deserialize(ProjectRepo projectRepo, NamedXContentRegistry namedXContentRegistry, InputStream input) throws IOException {
final DeserializeMetaBlobInputStream deserializeMetaBlobInputStream = new DeserializeMetaBlobInputStream(input);
try {
CodecUtil.checkHeader(new InputStreamDataInput(deserializeMetaBlobInputStream), codec, VERSION, VERSION);
@@ -154,7 +155,7 @@ public T deserialize(String repoName, NamedXContentRegistry namedXContentRegistr
XContentType.SMILE
)
) {
- result = reader.apply(repoName, parser);
+ result = reader.apply(projectRepo, parser);
XContentParserUtils.ensureExpectedToken(null, parser.nextToken(), parser);
} catch (Exception e) {
try (
@@ -165,7 +166,7 @@ public T deserialize(String repoName, NamedXContentRegistry namedXContentRegistr
XContentType.SMILE
)
) {
- result = fallbackReader.apply(repoName, parser);
+ result = fallbackReader.apply(projectRepo, parser);
XContentParserUtils.ensureExpectedToken(null, parser.nextToken(), parser);
}
}
@@ -174,7 +175,7 @@ public T deserialize(String repoName, NamedXContentRegistry namedXContentRegistr
XContentParser parser = XContentType.SMILE.xContent()
.createParser(namedXContentRegistry, LoggingDeprecationHandler.INSTANCE, wrappedStream)
) {
- result = reader.apply(repoName, parser);
+ result = reader.apply(projectRepo, parser);
XContentParserUtils.ensureExpectedToken(null, parser.nextToken(), parser);
}
deserializeMetaBlobInputStream.verifyFooter();
diff --git a/server/src/main/java/org/elasticsearch/snapshots/InFlightShardSnapshotStates.java b/server/src/main/java/org/elasticsearch/snapshots/InFlightShardSnapshotStates.java
index 89ebf13b58c95..94fc6dfad06d8 100644
--- a/server/src/main/java/org/elasticsearch/snapshots/InFlightShardSnapshotStates.java
+++ b/server/src/main/java/org/elasticsearch/snapshots/InFlightShardSnapshotStates.java
@@ -12,7 +12,7 @@
import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.repositories.IndexId;
-import org.elasticsearch.repositories.RepositoryOperation.ProjectRepo;
+import org.elasticsearch.repositories.ProjectRepo;
import org.elasticsearch.repositories.RepositoryShardId;
import org.elasticsearch.repositories.ShardGeneration;
import org.elasticsearch.repositories.ShardGenerations;
diff --git a/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java b/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java
index f57db79d4625d..1680bfa3a59a0 100644
--- a/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java
+++ b/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java
@@ -359,7 +359,7 @@ private void startRestore(
Metadata globalMetadata = null;
final Metadata.Builder metadataBuilder;
if (request.includeGlobalState()) {
- globalMetadata = repository.getSnapshotGlobalMetadata(snapshotId);
+ globalMetadata = repository.getSnapshotGlobalMetadata(snapshotId, false);
metadataBuilder = Metadata.builder(globalMetadata);
} else {
metadataBuilder = Metadata.builder();
@@ -652,7 +652,7 @@ private static Tuple
*
* @param snapshotsInProgress snapshots in progress in the cluster state
+ * @param projectId project to look for the repository
* @param repository repository id
* @param snapshots list of snapshots that will be used as a filter, empty list means no snapshots are filtered
* @return list of metadata for currently running snapshots
*/
public static List currentSnapshots(
@Nullable SnapshotsInProgress snapshotsInProgress,
+ ProjectId projectId,
String repository,
List snapshots
) {
@@ -843,13 +875,13 @@ public static List currentSnapshots(
return Collections.emptyList();
}
if ("_all".equals(repository)) {
- return snapshotsInProgress.asStream().toList();
+ return snapshotsInProgress.asStream(projectId).toList();
}
if (snapshots.isEmpty()) {
- return snapshotsInProgress.forRepo(repository);
+ return snapshotsInProgress.forRepo(projectId, repository);
}
List builder = new ArrayList<>();
- for (SnapshotsInProgress.Entry entry : snapshotsInProgress.forRepo(repository)) {
+ for (SnapshotsInProgress.Entry entry : snapshotsInProgress.forRepo(projectId, repository)) {
for (String snapshot : snapshots) {
if (entry.snapshot().getSnapshotId().getName().equals(snapshot)) {
builder.add(entry);
@@ -956,19 +988,19 @@ private boolean assertConsistentWithClusterState(ClusterState state) {
private static boolean assertNoDanglingSnapshots(ClusterState state) {
final SnapshotsInProgress snapshotsInProgress = SnapshotsInProgress.get(state);
final SnapshotDeletionsInProgress snapshotDeletionsInProgress = SnapshotDeletionsInProgress.get(state);
- final Set reposWithRunningDelete = snapshotDeletionsInProgress.getEntries()
+ final Set reposWithRunningDelete = snapshotDeletionsInProgress.getEntries()
.stream()
.filter(entry -> entry.state() == SnapshotDeletionsInProgress.State.STARTED)
- .map(SnapshotDeletionsInProgress.Entry::repository)
+ .map(entry -> new ProjectRepo(entry.projectId(), entry.repository()))
.collect(Collectors.toSet());
for (List repoEntry : snapshotsInProgress.entriesByRepo()) {
final SnapshotsInProgress.Entry entry = repoEntry.get(0);
for (ShardSnapshotStatus value : entry.shardSnapshotStatusByRepoShardId().values()) {
if (value.equals(ShardSnapshotStatus.UNASSIGNED_QUEUED)) {
- assert reposWithRunningDelete.contains(entry.repository())
+ assert reposWithRunningDelete.contains(new ProjectRepo(entry.projectId(), entry.repository()))
: "Found shard snapshot waiting to be assigned in [" + entry + "] but it is not blocked by any running delete";
} else if (value.isActive()) {
- assert reposWithRunningDelete.contains(entry.repository()) == false
+ assert reposWithRunningDelete.contains(new ProjectRepo(entry.projectId(), entry.repository())) == false
: "Found shard snapshot actively executing in ["
+ entry
+ "] when it should be blocked by a running delete ["
@@ -1007,7 +1039,6 @@ private void processExternalChanges(boolean changedNodes, boolean changedShards)
@Override
public ClusterState execute(ClusterState currentState) {
- RoutingTable routingTable = currentState.routingTable();
final SnapshotsInProgress snapshotsInProgress = SnapshotsInProgress.get(currentState);
final SnapshotDeletionsInProgress deletesInProgress = SnapshotDeletionsInProgress.get(currentState);
DiscoveryNodes nodes = currentState.nodes();
@@ -1035,6 +1066,7 @@ public ClusterState execute(ClusterState currentState) {
boolean changed = false;
final List updatedEntriesForRepo = new ArrayList<>();
final Map knownFailures = new HashMap<>();
+ final ProjectId projectId = snapshotsInRepo.get(0).projectId();
final String repositoryName = snapshotsInRepo.get(0).repository();
for (SnapshotsInProgress.Entry snapshotEntry : snapshotsInRepo) {
if (statesToUpdate.contains(snapshotEntry.state())) {
@@ -1049,7 +1081,7 @@ public ClusterState execute(ClusterState currentState) {
}
} else {
// see if any clones may have had a shard become available for execution because of failures
- if (deletesInProgress.hasExecutingDeletion(repositoryName)) {
+ if (deletesInProgress.hasExecutingDeletion(projectId, repositoryName)) {
// Currently executing a delete for this repo, no need to try and update any clone operations.
// The logic for finishing the delete will update running clones with the latest changes.
updatedEntriesForRepo.add(snapshotEntry);
@@ -1099,7 +1131,7 @@ public ClusterState execute(ClusterState currentState) {
ImmutableOpenMap shards = processWaitingShardsAndRemovedNodes(
snapshotEntry,
- routingTable,
+ currentState.routingTable(projectId),
nodes,
snapshotsInProgress::isNodeIdForRemoval,
knownFailures
@@ -1131,13 +1163,18 @@ public ClusterState execute(ClusterState currentState) {
}
}
if (changed) {
- updatedSnapshots = updatedSnapshots.createCopyWithUpdatedEntriesForRepo(repositoryName, updatedEntriesForRepo);
+ updatedSnapshots = updatedSnapshots.createCopyWithUpdatedEntriesForRepo(
+ projectId,
+ repositoryName,
+ updatedEntriesForRepo
+ );
}
}
final ClusterState res = readyDeletions(
updatedSnapshots != snapshotsInProgress
? ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE, updatedSnapshots).build()
- : currentState
+ : currentState,
+ null
).v1();
for (SnapshotDeletionsInProgress.Entry delete : SnapshotDeletionsInProgress.get(res).getEntries()) {
if (delete.state() == SnapshotDeletionsInProgress.State.STARTED) {
@@ -1173,10 +1210,10 @@ public void clusterStateProcessed(ClusterState oldState, ClusterState newState)
}
}
}
- startExecutableClones(SnapshotsInProgress.get(newState), null);
+ startExecutableClones(SnapshotsInProgress.get(newState));
// run newly ready deletes
for (SnapshotDeletionsInProgress.Entry entry : deletionsToExecute) {
- if (tryEnterRepoLoop(entry.repository())) {
+ if (tryEnterRepoLoop(entry.projectId(), entry.repository())) {
deleteSnapshotsFromRepository(entry, newState.nodes().getMaxDataNodeCompatibleIndexVersion());
}
}
@@ -1306,6 +1343,7 @@ private static boolean waitingShardsStartedOrUnassigned(SnapshotsInProgress snap
for (List entries : snapshotsInProgress.entriesByRepo()) {
for (SnapshotsInProgress.Entry entry : entries) {
if (entry.state() == SnapshotsInProgress.State.STARTED && entry.isClone() == false) {
+ final ProjectId projectId = entry.projectId();
for (Map.Entry shardStatus : entry.shardSnapshotStatusByRepoShardId()
.entrySet()) {
final ShardState state = shardStatus.getValue().state();
@@ -1315,7 +1353,7 @@ private static boolean waitingShardsStartedOrUnassigned(SnapshotsInProgress snap
final RepositoryShardId shardId = shardStatus.getKey();
final Index index = entry.indexByName(shardId.indexName());
if (event.indexRoutingTableChanged(index)) {
- IndexRoutingTable indexShardRoutingTable = event.state().getRoutingTable().index(index);
+ IndexRoutingTable indexShardRoutingTable = event.state().routingTable(projectId).index(index);
if (indexShardRoutingTable == null) {
// index got removed concurrently and we have to fail WAITING, QUEUED and PAUSED_FOR_REMOVAL state shards
return true;
@@ -1374,10 +1412,11 @@ private void endSnapshot(SnapshotsInProgress.Entry entry, Metadata metadata, @Nu
}
return;
}
+ final ProjectId projectId = snapshot.getProjectId();
final String repoName = snapshot.getRepository();
- if (tryEnterRepoLoop(repoName)) {
+ if (tryEnterRepoLoop(projectId, repoName)) {
if (repositoryData == null) {
- repositoriesService.repository(snapshot.getProjectId(), repoName)
+ repositoriesService.repository(projectId, repoName)
.getRepositoryData(
EsExecutors.DIRECT_EXECUTOR_SERVICE, // TODO contemplate threading here, do we need to fork, see #101445?
new ActionListener<>() {
@@ -1386,13 +1425,16 @@ public void onResponse(RepositoryData repositoryData) {
if (newFinalization) {
finalizeSnapshotEntry(snapshot, metadata, repositoryData);
} else {
- runNextQueuedOperation(repositoryData, repoName, false);
+ runNextQueuedOperation(repositoryData, projectId, repoName, false);
}
}
@Override
public void onFailure(Exception e) {
- submitUnbatchedTask("fail repo tasks for [" + repoName + "]", new FailPendingRepoTasksTask(repoName, e));
+ submitUnbatchedTask(
+ "fail repo tasks for [" + repoName + "]",
+ new FailPendingRepoTasksTask(projectId, repoName, e)
+ );
}
}
);
@@ -1400,7 +1442,7 @@ public void onFailure(Exception e) {
if (newFinalization) {
finalizeSnapshotEntry(snapshot, metadata, repositoryData);
} else {
- runNextQueuedOperation(repositoryData, repoName, false);
+ runNextQueuedOperation(repositoryData, projectId, repoName, false);
}
}
} else {
@@ -1413,26 +1455,34 @@ public void onFailure(Exception e) {
/**
* Try starting to run a snapshot finalization or snapshot delete for the given repository. If this method returns
* {@code true} then snapshot finalizations and deletions for the repo may be executed. Once no more operations are
- * ready for the repository {@link #leaveRepoLoop(String)} should be invoked so that a subsequent state change that
+ * ready for the repository {@link #leaveRepoLoop(ProjectId, String)} should be invoked so that a subsequent state change that
* causes another operation to become ready can execute.
*
* @return true if a finalization or snapshot delete may be started at this point
*/
- private boolean tryEnterRepoLoop(String repository) {
- return currentlyFinalizing.add(repository);
+ private boolean tryEnterRepoLoop(ProjectId projectId, String repository) {
+ return currentlyFinalizing.add(new ProjectRepo(projectId, repository));
}
/**
* Stop polling for ready snapshot finalizations or deletes in state {@link SnapshotDeletionsInProgress.State#STARTED} to execute
* for the given repository.
*/
- private void leaveRepoLoop(String repository) {
- final boolean removed = currentlyFinalizing.remove(repository);
+ private void leaveRepoLoop(ProjectId projectId, String repository) {
+ final boolean removed = currentlyFinalizing.remove(new ProjectRepo(projectId, repository));
assert removed;
}
private void finalizeSnapshotEntry(Snapshot snapshot, Metadata metadata, RepositoryData repositoryData) {
- threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(new SnapshotFinalization(snapshot, metadata, repositoryData));
+ final ProjectId projectId = snapshot.getProjectId();
+ final Metadata effectiveMetadata;
+ if (serializeProjectMetadata) {
+ // If we are serializing ProjectMetadata (i.e. multi-project enabled), capture only the ProjectMetadata with an empty Metadata
+ effectiveMetadata = Metadata.builder().put(metadata.getProject(projectId)).build();
+ } else {
+ effectiveMetadata = metadata;
+ }
+ threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(new SnapshotFinalization(snapshot, effectiveMetadata, repositoryData));
}
/**
@@ -1457,7 +1507,7 @@ private class SnapshotFinalization extends AbstractRunnable {
@Override
protected void doRun() {
assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.SNAPSHOT);
- assert currentlyFinalizing.contains(snapshot.getRepository());
+ assert currentlyFinalizing.contains(new ProjectRepo(snapshot.getProjectId(), snapshot.getRepository()));
assert repositoryOperations.assertNotQueued(snapshot);
SnapshotsInProgress.Entry entry = SnapshotsInProgress.get(clusterService.state()).snapshot(snapshot);
@@ -1484,14 +1534,14 @@ protected void doRun() {
assert state == ShardState.SUCCESS;
}
}
+ final ProjectId projectId = snapshot.getProjectId();
final String repository = snapshot.getRepository();
final ListenableFuture metadataListener = new ListenableFuture<>();
- final Repository repo = repositoriesService.repository(snapshot.getProjectId(), snapshot.getRepository());
+ final Repository repo = repositoriesService.repository(projectId, repository);
if (entry.isClone()) {
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.supply(metadataListener, () -> {
- final Metadata existingMetadata = repo.getSnapshotGlobalMetadata(entry.source());
- @FixForMultiProject
- final ProjectMetadata existingProject = existingMetadata.getProject();
+ final Metadata existingMetadata = repo.getSnapshotGlobalMetadata(entry.source(), serializeProjectMetadata);
+ final ProjectMetadata existingProject = existingMetadata.getProject(projectId);
final ProjectMetadata.Builder projBuilder = ProjectMetadata.builder(existingProject);
final Set existingIndices = new HashSet<>();
for (IndexId index : entry.indices().values()) {
@@ -1518,7 +1568,7 @@ protected void doRun() {
}
metadataListener.addListener(ActionListener.wrap(meta -> {
assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.SNAPSHOT);
- final Metadata metaForSnapshot = metadataForSnapshot(entry, meta);
+ final Metadata metaForSnapshot = metadataForSnapshot(entry, meta, projectId);
final Map indexSnapshotDetails = Maps.newMapWithExpectedSize(
finalIndices.size()
@@ -1558,7 +1608,7 @@ protected void doRun() {
final SnapshotInfo snapshotInfo = new SnapshotInfo(
snapshot,
finalIndices,
- entry.dataStreams().stream().filter(metaForSnapshot.getProject().dataStreams()::containsKey).toList(),
+ entry.dataStreams().stream().filter(metaForSnapshot.getProject(projectId).dataStreams()::containsKey).toList(),
entry.partial() ? onlySuccessfulFeatureStates(entry, finalIndices) : entry.featureStates(),
failure,
threadPool.absoluteTimeInMillis(),
@@ -1572,6 +1622,7 @@ protected void doRun() {
final ListenableFuture>> snapshotListeners = new ListenableFuture<>();
repo.finalizeSnapshot(
new FinalizeSnapshotContext(
+ serializeProjectMetadata,
updatedShardGenerations,
repositoryData.getGenId(),
metaForSnapshot,
@@ -1582,7 +1633,7 @@ protected void doRun() {
// by the snapshot info callback below and won't be failed needlessly if #runNextQueuedOperation runs into
// a fatal like e.g. this node stopped being the master node
snapshotListeners.onResponse(endAndGetListenersToResolve(snapshot));
- runNextQueuedOperation(updatedRepositoryData, repository, true);
+ runNextQueuedOperation(updatedRepositoryData, projectId, repository, true);
},
e -> handleFinalizationFailure(
e,
@@ -1711,18 +1762,19 @@ private void handleFinalizationFailure(
* Run the next queued up repository operation for the given repository name.
*
* @param repositoryData current repository data
+ * @param projectId project for the repository
* @param repository repository name
* @param attemptDelete whether to try and run delete operations that are ready in the cluster state if no
* snapshot create operations remain to execute
*/
- private void runNextQueuedOperation(RepositoryData repositoryData, String repository, boolean attemptDelete) {
- assert currentlyFinalizing.contains(repository);
- final Tuple nextFinalization = repositoryOperations.pollFinalization(repository);
+ private void runNextQueuedOperation(RepositoryData repositoryData, ProjectId projectId, String repository, boolean attemptDelete) {
+ assert currentlyFinalizing.contains(new ProjectRepo(projectId, repository));
+ final Tuple nextFinalization = repositoryOperations.pollFinalization(projectId, repository);
if (nextFinalization == null) {
if (attemptDelete) {
- runReadyDeletions(repositoryData, repository);
+ runReadyDeletions(repositoryData, projectId, repository);
} else {
- leaveRepoLoop(repository);
+ leaveRepoLoop(projectId, repository);
}
} else {
logger.trace("Moving on to finalizing next snapshot [{}]", nextFinalization);
@@ -1735,17 +1787,19 @@ private void runNextQueuedOperation(RepositoryData repositoryData, String reposi
*
* TODO: optimize this to execute in a single CS update together with finalizing the latest snapshot
*/
- private void runReadyDeletions(RepositoryData repositoryData, String repository) {
+ private void runReadyDeletions(RepositoryData repositoryData, ProjectId projectId, String repository) {
submitUnbatchedTask("Run ready deletions", new ClusterStateUpdateTask() {
private SnapshotDeletionsInProgress.Entry deletionToRun;
@Override
public ClusterState execute(ClusterState currentState) {
- assert readyDeletions(currentState).v1() == currentState
+ assert readyDeletions(currentState, projectId).v1() == currentState
: "Deletes should have been set to ready by finished snapshot deletes and finalizations";
for (SnapshotDeletionsInProgress.Entry entry : SnapshotDeletionsInProgress.get(currentState).getEntries()) {
- if (entry.repository().equals(repository) && entry.state() == SnapshotDeletionsInProgress.State.STARTED) {
+ if (entry.projectId().equals(projectId)
+ && entry.repository().equals(repository)
+ && entry.state() == SnapshotDeletionsInProgress.State.STARTED) {
deletionToRun = entry;
break;
}
@@ -1762,7 +1816,7 @@ public void onFailure(Exception e) {
@Override
public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
if (deletionToRun == null) {
- runNextQueuedOperation(repositoryData, repository, false);
+ runNextQueuedOperation(repositoryData, projectId, repository, false);
} else {
deleteSnapshotsFromRepository(deletionToRun, repositoryData, newState.nodes().getMaxDataNodeCompatibleIndexVersion());
}
@@ -1777,25 +1831,34 @@ public void clusterStateProcessed(ClusterState oldState, ClusterState newState)
* {@link SnapshotDeletionsInProgress.State#STARTED} or waiting entries in state {@link SnapshotDeletionsInProgress.State#WAITING}
* that were moved to {@link SnapshotDeletionsInProgress.State#STARTED} in the returned updated cluster state.
*
+ * @param projectId the project for repositories where deletions should be prepared. {@code null} means all projects
* @param currentState current cluster state
* @return tuple of an updated cluster state and currently executable snapshot delete operations
*/
- private static Tuple> readyDeletions(ClusterState currentState) {
+ private static Tuple> readyDeletions(
+ ClusterState currentState,
+ @Nullable ProjectId projectId
+ ) {
final SnapshotDeletionsInProgress deletions = SnapshotDeletionsInProgress.get(currentState);
- if (deletions.hasDeletionsInProgress() == false) {
+ if (deletions.hasDeletionsInProgress() == false || (projectId != null && deletions.hasDeletionsInProgress(projectId) == false)) {
return Tuple.tuple(currentState, List.of());
}
final SnapshotsInProgress snapshotsInProgress = currentState.custom(SnapshotsInProgress.TYPE);
assert snapshotsInProgress != null;
- final Set repositoriesSeen = new HashSet<>();
+ final Set repositoriesSeen = new HashSet<>();
boolean changed = false;
final ArrayList readyDeletions = new ArrayList<>();
final List newDeletes = new ArrayList<>();
for (SnapshotDeletionsInProgress.Entry entry : deletions.getEntries()) {
- final String repo = entry.repository();
- if (repositoriesSeen.add(entry.repository())
+ if (projectId != null && projectId.equals(entry.projectId()) == false) {
+ // Not the target project, keep the entry as is
+ newDeletes.add(entry);
+ continue;
+ }
+ final var projectRepo = new ProjectRepo(entry.projectId(), entry.repository());
+ if (repositoriesSeen.add(projectRepo)
&& entry.state() == SnapshotDeletionsInProgress.State.WAITING
- && snapshotsInProgress.forRepo(repo).stream().noneMatch(SnapshotsService::isWritingToRepository)) {
+ && snapshotsInProgress.forRepo(projectRepo).stream().noneMatch(SnapshotsService::isWritingToRepository)) {
changed = true;
final SnapshotDeletionsInProgress.Entry newEntry = entry.started();
readyDeletions.add(newEntry);
@@ -1836,7 +1899,9 @@ public static ClusterState stateWithoutSnapshot(
ClusterState result = state;
int indexOfEntry = -1;
// Find the in-progress snapshot entry that matches {@code snapshot}.
- final List entryList = inProgressSnapshots.forRepo(snapshot.getRepository());
+ final ProjectId projectId = snapshot.getProjectId();
+ final String repository = snapshot.getRepository();
+ final List entryList = inProgressSnapshots.forRepo(projectId, repository);
for (int i = 0; i < entryList.size(); i++) {
SnapshotsInProgress.Entry entry = entryList.get(i);
if (entry.snapshot().equals(snapshot)) {
@@ -1935,11 +2000,11 @@ public static ClusterState stateWithoutSnapshot(
result = ClusterState.builder(state)
.putCustom(
SnapshotsInProgress.TYPE,
- inProgressSnapshots.createCopyWithUpdatedEntriesForRepo(snapshot.getRepository(), updatedEntries)
+ inProgressSnapshots.createCopyWithUpdatedEntriesForRepo(projectId, repository, updatedEntries)
)
.build();
}
- return readyDeletions(result).v1();
+ return readyDeletions(result, projectId).v1();
}
private static void addSnapshotEntry(
@@ -2023,6 +2088,7 @@ public ClusterState execute(ClusterState currentState) {
deletionsWithoutSnapshots(
SnapshotDeletionsInProgress.get(updatedState),
Collections.singletonList(snapshot.getSnapshotId()),
+ snapshot.getProjectId(),
snapshot.getRepository()
)
);
@@ -2050,7 +2116,7 @@ public void onFailure(Exception e) {
public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
failSnapshotCompletionListeners(snapshot, failure, Runnable::run);
if (repositoryData != null) {
- runNextQueuedOperation(repositoryData, snapshot.getRepository(), true);
+ runNextQueuedOperation(repositoryData, snapshot.getProjectId(), snapshot.getRepository(), true);
}
}
});
@@ -2064,6 +2130,7 @@ public void clusterStateProcessed(ClusterState oldState, ClusterState newState)
*
* @param deletions snapshot deletions to update
* @param snapshotIds snapshot ids to remove
+ * @param projectId project for the repository
* @param repository repository that the snapshot ids belong to
* @return updated {@link SnapshotDeletionsInProgress} or {@code null} if unchanged
*/
@@ -2071,12 +2138,13 @@ public void clusterStateProcessed(ClusterState oldState, ClusterState newState)
private static SnapshotDeletionsInProgress deletionsWithoutSnapshots(
SnapshotDeletionsInProgress deletions,
Collection snapshotIds,
+ ProjectId projectId,
String repository
) {
boolean changed = false;
List updatedEntries = new ArrayList<>(deletions.getEntries().size());
for (SnapshotDeletionsInProgress.Entry entry : deletions.getEntries()) {
- if (entry.repository().equals(repository)) {
+ if (entry.projectId().equals(projectId) && entry.repository().equals(repository)) {
final List updatedSnapshotIds = new ArrayList<>(entry.snapshots());
if (updatedSnapshotIds.removeAll(snapshotIds)) {
changed = true;
@@ -2103,14 +2171,15 @@ private void failSnapshotCompletionListeners(Snapshot snapshot, Exception e, Con
* When wait_for_completion
is set to true, the passed action listener will only complete when all
* matching snapshots are deleted, when it is false it will complete as soon as the deletes are scheduled
*
+ * @param projectId project to look for the snapshot
* @param request delete snapshot request
* @param listener listener a listener which will be resolved according to the wait_for_completion parameter
*/
- public void deleteSnapshots(final DeleteSnapshotRequest request, final ActionListener listener) {
+ public void deleteSnapshots(final ProjectId projectId, final DeleteSnapshotRequest request, final ActionListener listener) {
final String repositoryName = request.repository();
final String[] snapshotNames = request.snapshots();
- final Repository repository = repositoriesService.repository(repositoryName);
+ final Repository repository = repositoriesService.repository(projectId, repositoryName);
executeConsistentStateUpdate(repository, repositoryData -> new ClusterStateUpdateTask(request.masterNodeTimeout()) {
private SnapshotDeletionsInProgress.Entry newDelete = null;
@@ -2127,12 +2196,13 @@ public void deleteSnapshots(final DeleteSnapshotRequest request, final ActionLis
@Override
public ClusterState execute(ClusterState currentState) {
- ensureRepositoryExists(repositoryName, currentState);
+ final var projectMetadata = currentState.metadata().getProject(projectId);
+ ensureRepositoryExists(repositoryName, projectMetadata);
final Set snapshotIds = new HashSet<>();
// find in-progress snapshots to delete in cluster state
final SnapshotsInProgress snapshotsInProgress = SnapshotsInProgress.get(currentState);
- for (SnapshotsInProgress.Entry entry : snapshotsInProgress.forRepo(repositoryName)) {
+ for (SnapshotsInProgress.Entry entry : snapshotsInProgress.forRepo(projectId, repositoryName)) {
final SnapshotId snapshotId = entry.snapshot().getSnapshotId();
if (Regex.simpleMatch(snapshotNames, snapshotId.getName())) {
snapshotIds.add(snapshotId);
@@ -2168,14 +2238,14 @@ public ClusterState execute(ClusterState currentState) {
return currentState;
}
- final Set activeCloneSources = snapshotsInProgress.asStream()
+ final Set activeCloneSources = snapshotsInProgress.asStream(projectId)
.filter(SnapshotsInProgress.Entry::isClone)
.map(SnapshotsInProgress.Entry::source)
.collect(Collectors.toSet());
for (SnapshotId snapshotId : snapshotIds) {
if (activeCloneSources.contains(snapshotId)) {
throw new ConcurrentSnapshotExecutionException(
- new Snapshot(repositoryName, snapshotId),
+ new Snapshot(projectId, repositoryName, snapshotId),
"cannot delete snapshot while it is being cloned"
);
}
@@ -2188,7 +2258,7 @@ public ClusterState execute(ClusterState currentState) {
"delete snapshot"
);
- ensureNotReadOnly(currentState, repositoryName);
+ ensureNotReadOnly(projectMetadata, repositoryName);
final SnapshotDeletionsInProgress deletionsInProgress = SnapshotDeletionsInProgress.get(currentState);
@@ -2200,7 +2270,7 @@ public ClusterState execute(ClusterState currentState) {
for (RestoreInProgress.Entry entry : restoreInProgress) {
if (repositoryName.equals(entry.snapshot().getRepository()) && snapshotIds.contains(entry.snapshot().getSnapshotId())) {
throw new ConcurrentSnapshotExecutionException(
- new Snapshot(repositoryName, snapshotIds.stream().findFirst().get()),
+ new Snapshot(projectId, repositoryName, snapshotIds.stream().findFirst().get()),
"cannot delete snapshot during a restore in progress in [" + restoreInProgress + "]"
);
}
@@ -2208,8 +2278,9 @@ public ClusterState execute(ClusterState currentState) {
// Snapshot ids that will have to be physically deleted from the repository
final Set snapshotIdsRequiringCleanup = new HashSet<>(snapshotIds);
final SnapshotsInProgress updatedSnapshots = snapshotsInProgress.createCopyWithUpdatedEntriesForRepo(
+ projectId,
repositoryName,
- snapshotsInProgress.forRepo(repositoryName).stream().map(existing -> {
+ snapshotsInProgress.forRepo(projectId, repositoryName).stream().map(existing -> {
if (existing.state() == SnapshotsInProgress.State.STARTED
&& snapshotIdsRequiringCleanup.contains(existing.snapshot().getSnapshotId())) {
// snapshot is started - mark every non completed shard as aborted
@@ -2238,7 +2309,7 @@ public ClusterState execute(ClusterState currentState) {
// add the snapshot deletion to the cluster state
final SnapshotDeletionsInProgress.Entry replacedEntry = deletionsInProgress.getEntries()
.stream()
- .filter(entry -> entry.repository().equals(repositoryName))
+ .filter(entry -> entry.projectId().equals(projectId) && entry.repository().equals(repositoryName))
.filter(entry -> entry.state() == SnapshotDeletionsInProgress.State.WAITING)
.findFirst()
.orElse(null);
@@ -2246,7 +2317,8 @@ public ClusterState execute(ClusterState currentState) {
final Optional foundDuplicate = deletionsInProgress.getEntries()
.stream()
.filter(
- entry -> entry.repository().equals(repositoryName)
+ entry -> entry.projectId().equals(projectId)
+ && entry.repository().equals(repositoryName)
&& entry.state() == SnapshotDeletionsInProgress.State.STARTED
&& entry.snapshots().containsAll(snapshotIds)
)
@@ -2256,16 +2328,14 @@ public ClusterState execute(ClusterState currentState) {
reusedExistingDelete = true;
return currentState;
}
- @FixForMultiProject
- final var projectId = ProjectId.DEFAULT;
newDelete = new SnapshotDeletionsInProgress.Entry(
projectId,
repositoryName,
List.copyOf(snapshotIdsRequiringCleanup),
threadPool.absoluteTimeInMillis(),
repositoryData.getGenId(),
- updatedSnapshots.forRepo(repositoryName).stream().noneMatch(SnapshotsService::isWritingToRepository)
- && deletionsInProgress.hasExecutingDeletion(repositoryName) == false
+ updatedSnapshots.forRepo(projectId, repositoryName).stream().noneMatch(SnapshotsService::isWritingToRepository)
+ && deletionsInProgress.hasExecutingDeletion(projectId, repositoryName) == false
? SnapshotDeletionsInProgress.State.STARTED
: SnapshotDeletionsInProgress.State.WAITING
);
@@ -2289,7 +2359,11 @@ public void onFailure(Exception e) {
@Override
public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
logger.info(
- () -> format("deleting snapshots [%s] from repository [%s]", arrayToCommaDelimitedString(snapshotNames), repositoryName)
+ () -> format(
+ "deleting snapshots [%s] from repository %s",
+ arrayToCommaDelimitedString(snapshotNames),
+ projectRepoString(projectId, repositoryName)
+ )
);
if (completedNoCleanup.isEmpty() == false) {
@@ -2312,7 +2386,7 @@ public void clusterStateProcessed(ClusterState oldState, ClusterState newState)
return;
}
if (newDelete.state() == SnapshotDeletionsInProgress.State.STARTED) {
- if (tryEnterRepoLoop(repositoryName)) {
+ if (tryEnterRepoLoop(projectId, repositoryName)) {
deleteSnapshotsFromRepository(
newDelete,
repositoryData,
@@ -2457,7 +2531,7 @@ public void onResponse(RepositoryData repositoryData) {
public void onFailure(Exception e) {
submitUnbatchedTask(
"fail repo tasks for [" + deleteEntry.repository() + "]",
- new FailPendingRepoTasksTask(deleteEntry.repository(), e)
+ new FailPendingRepoTasksTask(deleteEntry.projectId(), deleteEntry.repository(), e)
);
}
});
@@ -2505,8 +2579,9 @@ public ClusterState execute(ClusterState currentState) throws Exception {
// updated repository data and state on the retry. We don't want to wait for the write to finish though
// because it could fail for any number of reasons so we just retry instead of waiting on the cluster state
// to change in any form.
+ final ProjectMetadata projectMetadata = currentState.metadata().getProject(repository.getProjectId());
if (repositoryMetadataStart.equals(
- RepositoriesMetadata.get(currentState).repository(repository.getMetadata().name())
+ RepositoriesMetadata.get(projectMetadata).repository(repository.getMetadata().name())
)) {
executedTask = true;
return updateTask.execute(currentState);
@@ -2548,7 +2623,7 @@ private void deleteSnapshotsFromRepository(
IndexVersion minNodeVersion
) {
if (repositoryOperations.startDeletion(deleteEntry.uuid())) {
- assert currentlyFinalizing.contains(deleteEntry.repository());
+ assert currentlyFinalizing.contains(new ProjectRepo(deleteEntry.projectId(), deleteEntry.repository()));
final List snapshotIds = deleteEntry.snapshots();
assert deleteEntry.state() == SnapshotDeletionsInProgress.State.STARTED : "incorrect state for entry [" + deleteEntry + "]";
if (snapshotIds.isEmpty()) {
@@ -2595,7 +2670,7 @@ public void onFailure(Exception e) {
final var collector = new Strings.BoundedDelimitedStringCollector(sb, ",", 1024);
deleteEntry.snapshots().forEach(s -> collector.appendItem(s.getName()));
collector.finish();
- sb.append("] from repository [").append(deleteEntry.repository()).append("]");
+ sb.append("] from repository ").append(projectRepoString(deleteEntry.projectId(), deleteEntry.repository()));
return sb;
}, e);
submitUnbatchedTask(
@@ -2614,7 +2689,8 @@ protected void handleListeners(List> deleteListeners) {
final var collector = new Strings.BoundedDelimitedStringCollector(sb, ",", 1024);
snapshotIds.forEach(collector::appendItem);
collector.finish();
- sb.append("] deleted");
+ sb.append("] deleted in repository ");
+ sb.append(projectRepoString(deleteEntry.projectId(), deleteEntry.repository()));
return sb;
});
doneFuture.onResponse(null);
@@ -2645,6 +2721,7 @@ protected SnapshotDeletionsInProgress filterDeletions(SnapshotDeletionsInProgres
final SnapshotDeletionsInProgress updatedDeletions = deletionsWithoutSnapshots(
deletions,
deleteEntry.snapshots(),
+ deleteEntry.projectId(),
deleteEntry.repository()
);
return updatedDeletions == null ? deletions : updatedDeletions;
@@ -2732,7 +2809,8 @@ public ClusterState execute(ClusterState currentState) {
}
final SnapshotDeletionsInProgress newDeletions = filterDeletions(updatedDeletions);
final Tuple> res = readyDeletions(
- updateWithSnapshots(currentState, updatedSnapshotsInProgress(currentState, newDeletions), newDeletions)
+ updateWithSnapshots(currentState, updatedSnapshotsInProgress(currentState, newDeletions), newDeletions),
+ deleteEntry.projectId()
);
readyDeletions = res.v2();
return res.v1();
@@ -2761,7 +2839,7 @@ public final void clusterStateProcessed(ClusterState oldState, ClusterState newS
readyToResolveListeners.forEach(Runnable::run);
if (newFinalizations.isEmpty()) {
if (readyDeletions.isEmpty()) {
- leaveRepoLoop(deleteEntry.repository());
+ leaveRepoLoop(deleteEntry.projectId(), deleteEntry.repository());
} else {
for (SnapshotDeletionsInProgress.Entry readyDeletion : readyDeletions) {
deleteSnapshotsFromRepository(
@@ -2772,7 +2850,7 @@ public final void clusterStateProcessed(ClusterState oldState, ClusterState newS
}
}
} else {
- leaveRepoLoop(deleteEntry.repository());
+ leaveRepoLoop(deleteEntry.projectId(), deleteEntry.repository());
assert readyDeletions.stream().noneMatch(entry -> entry.repository().equals(deleteEntry.repository()))
: "New finalizations " + newFinalizations + " added even though deletes " + readyDeletions + " are ready";
for (SnapshotsInProgress.Entry entry : newFinalizations) {
@@ -2781,7 +2859,7 @@ public final void clusterStateProcessed(ClusterState oldState, ClusterState newS
}
// TODO: be more efficient here, we could collect newly ready shard clones as we compute them and then directly start them
// instead of looping over all possible clones to execute
- startExecutableClones(SnapshotsInProgress.get(newState), null);
+ startExecutableClones(SnapshotsInProgress.get(newState), deleteEntry.projectId());
}
/**
@@ -2821,12 +2899,13 @@ private SnapshotsInProgress updatedSnapshotsInProgress(ClusterState currentState
boolean changed = false;
final String localNodeId = currentState.nodes().getLocalNodeId();
+ final ProjectId projectId = deleteEntry.projectId();
final String repoName = deleteEntry.repository();
InFlightShardSnapshotStates inFlightShardStates = null;
// Keep track of IndexId values that may have gone unreferenced due to the delete entry just executed.
// See org.elasticsearch.cluster.SnapshotsInProgress.Entry#withUpdatedIndexIds for details.
final Set newIndexIdsToRefresh = new HashSet<>();
- for (SnapshotsInProgress.Entry entry : snapshotsInProgress.forRepo(repoName)) {
+ for (SnapshotsInProgress.Entry entry : snapshotsInProgress.forRepo(projectId, repoName)) {
if (entry.state().completed() == false) {
// TODO: dry up redundant computation and code between clone and non-clone case, in particular reuse
// `inFlightShardStates` across both clone and standard snapshot code
@@ -2842,12 +2921,14 @@ private SnapshotsInProgress updatedSnapshotsInProgress(ClusterState currentState
}
// TODO: the below logic is very similar to that in #startCloning and both could be dried up against each other
// also the code for standard snapshots could make use of this breakout as well
- if (canBeUpdated.isEmpty() || updatedDeletions.hasExecutingDeletion(repoName)) {
+ if (canBeUpdated.isEmpty() || updatedDeletions.hasExecutingDeletion(projectId, repoName)) {
// No shards can be updated in this snapshot so we just add it as is again
snapshotEntries.add(entry);
} else {
if (inFlightShardStates == null) {
- inFlightShardStates = InFlightShardSnapshotStates.forEntries(snapshotsInProgress.forRepo(repoName));
+ inFlightShardStates = InFlightShardSnapshotStates.forEntries(
+ snapshotsInProgress.forRepo(projectId, repoName)
+ );
}
final ImmutableOpenMap.Builder updatedAssignmentsBuilder =
ImmutableOpenMap.builder(entry.shardSnapshotStatusByRepoShardId());
@@ -2891,7 +2972,7 @@ private SnapshotsInProgress updatedSnapshotsInProgress(ClusterState currentState
final ImmutableOpenMap shardAssignments = shards(
snapshotsInProgress,
updatedDeletions,
- currentState,
+ currentState.projectState(projectId),
entry.indices().values(),
entry.version().onOrAfter(SHARD_GEN_IN_REPO_DATA_VERSION),
repositoryData,
@@ -2904,7 +2985,7 @@ private SnapshotsInProgress updatedSnapshotsInProgress(ClusterState currentState
final ShardSnapshotStatus updated = shardAssignments.get(sid);
if (updated == null) {
// We don't have a new assignment for this shard because its index was concurrently deleted
- assert currentState.routingTable().hasIndex(sid.getIndex()) == false
+ assert currentState.routingTable(projectId).hasIndex(sid.getIndex()) == false
: "Missing assignment for [" + sid + "]";
updatedAssignmentsBuilder.put(sid, ShardSnapshotStatus.MISSING);
} else {
@@ -2936,7 +3017,7 @@ private SnapshotsInProgress updatedSnapshotsInProgress(ClusterState currentState
}
snapshotEntries.replaceAll(entry -> entry.withUpdatedIndexIds(updatedIndexIds));
}
- return changed ? snapshotsInProgress.createCopyWithUpdatedEntriesForRepo(repoName, snapshotEntries) : null;
+ return changed ? snapshotsInProgress.createCopyWithUpdatedEntriesForRepo(projectId, repoName, snapshotEntries) : null;
}
private static void markShardReassigned(RepositoryShardId shardId, Set reassignments) {
@@ -3010,7 +3091,7 @@ private static void completeListenersIgnoringException(@Nullable List shards(
SnapshotsInProgress snapshotsInProgress,
SnapshotDeletionsInProgress deletionsInProgress,
- ClusterState currentState,
+ ProjectState currentState,
Collection indices,
boolean useShardGenerations,
RepositoryData repositoryData,
@@ -3019,13 +3100,13 @@ private static ImmutableOpenMap builder = ImmutableOpenMap.builder();
final ShardGenerations shardGenerations = repositoryData.shardGenerations();
final InFlightShardSnapshotStates inFlightShardStates = InFlightShardSnapshotStates.forEntries(
- snapshotsInProgress.forRepo(repoName)
+ snapshotsInProgress.forRepo(currentState.projectId(), repoName)
);
- final boolean readyToExecute = deletionsInProgress.hasExecutingDeletion(repoName) == false;
+ final boolean readyToExecute = deletionsInProgress.hasExecutingDeletion(currentState.projectId(), repoName) == false;
for (IndexId index : indices) {
final String indexName = index.getName();
final boolean isNewIndex = repositoryData.getIndices().containsKey(indexName) == false;
- IndexMetadata indexMetadata = currentState.metadata().getProject().index(indexName);
+ IndexMetadata indexMetadata = currentState.metadata().index(indexName);
if (indexMetadata == null) {
// The index was deleted before we managed to start the snapshot - mark it as missing.
builder.put(new ShardId(indexName, IndexMetadata.INDEX_UUID_NA_VALUE, 0), ShardSnapshotStatus.MISSING);
@@ -3106,13 +3187,10 @@ private static ShardSnapshotStatus initShardSnapshotStatus(
* Returns the data streams that are currently being snapshotted (with partial == false) and that are contained in the
* indices-to-check set.
*/
- @FixForMultiProject
public static Set snapshottingDataStreams(final ProjectState projectState, final Set dataStreamsToCheck) {
- // TODO multi-project: this will behave incorrectly when there are data streams with equal names in different projects that are
- // being snapshotted at the same time.
Map dataStreams = projectState.metadata().dataStreams();
return SnapshotsInProgress.get(projectState.cluster())
- .asStream()
+ .asStream(projectState.projectId())
.filter(e -> e.partial() == false)
.flatMap(e -> e.dataStreams().stream())
.filter(ds -> dataStreams.containsKey(ds) && dataStreamsToCheck.contains(ds))
@@ -3124,7 +3202,8 @@ public static Set snapshottingDataStreams(final ProjectState projectStat
*/
public static Set snapshottingIndices(final ProjectState projectState, final Set indicesToCheck) {
final Set indices = new HashSet<>();
- for (List snapshotsInRepo : SnapshotsInProgress.get(projectState.cluster()).entriesByRepo()) {
+ for (List snapshotsInRepo : SnapshotsInProgress.get(projectState.cluster())
+ .entriesByRepo(projectState.projectId())) {
for (final SnapshotsInProgress.Entry entry : snapshotsInRepo) {
if (entry.partial() == false && entry.isClone() == false) {
for (String indexName : entry.indices().keySet()) {
@@ -3247,7 +3326,7 @@ static final class SnapshotShardsUpdateContext {
private final Predicate nodeIdRemovalPredicate;
/** Updates outstanding to be applied to existing snapshot entries. Maps repository name to shard snapshot updates. */
- private final Map> updatesByRepo;
+ private final Map> updatesByRepo;
/** Updates that were used to update an existing in-progress shard snapshot */
private final Set executedUpdates = new HashSet<>();
@@ -3270,11 +3349,14 @@ static final class SnapshotShardsUpdateContext {
this.nodeIdRemovalPredicate = SnapshotsInProgress.get(initialState)::isNodeIdForRemoval;
this.completionHandler = completionHandler;
- // SnapshotsInProgress is organized by repository name, so organize the shard snapshot updates similarly.
+ // SnapshotsInProgress is organized by ProjectRepo, so organize the shard snapshot updates similarly.
this.updatesByRepo = new HashMap<>();
for (final var taskContext : batchExecutionContext.taskContexts()) {
if (taskContext.getTask() instanceof ShardSnapshotUpdate task) {
- updatesByRepo.computeIfAbsent(task.snapshot.getRepository(), r -> new ArrayList<>()).add(task);
+ updatesByRepo.computeIfAbsent(
+ new ProjectRepo(task.snapshot.getProjectId(), task.snapshot.getRepository()),
+ r -> new ArrayList<>()
+ ).add(task);
}
}
}
@@ -3287,9 +3369,9 @@ static final class SnapshotShardsUpdateContext {
SnapshotsInProgress computeUpdatedState() {
final SnapshotsInProgress existing = SnapshotsInProgress.get(initialState);
SnapshotsInProgress updated = existing;
- for (Map.Entry> updates : updatesByRepo.entrySet()) {
- final String repoName = updates.getKey();
- final List oldEntries = existing.forRepo(repoName);
+ for (Map.Entry> updates : updatesByRepo.entrySet()) {
+ final var projectRepo = updates.getKey();
+ final List oldEntries = existing.forRepo(projectRepo);
if (oldEntries.isEmpty()) {
continue;
}
@@ -3303,7 +3385,7 @@ SnapshotsInProgress computeUpdatedState() {
newlyCompletedEntries.add(newEntry);
}
}
- updated = updated.createCopyWithUpdatedEntriesForRepo(repoName, newEntries);
+ updated = updated.createCopyWithUpdatedEntriesForRepo(projectRepo.projectId(), projectRepo.name(), newEntries);
}
if (changedCount > 0) {
@@ -3604,7 +3686,7 @@ private void startShardSnapshot(RepositoryShardId repoShardId, ShardGeneration g
+ "] because it's a normal snapshot but did not";
// work out the node to run the snapshot task on as it might have changed from the previous operation if it was a clone
// or there was a primary failover
- final IndexRoutingTable indexRouting = initialState.routingTable().index(index);
+ final IndexRoutingTable indexRouting = initialState.routingTable(entry.projectId()).index(index);
final ShardRouting shardRouting;
if (indexRouting == null) {
shardRouting = null;
@@ -3649,14 +3731,14 @@ interface ShardSnapshotUpdateCompletionHandler {
void handleCompletion(
ShardSnapshotUpdateResult shardSnapshotUpdateResult,
List newlyCompletedEntries,
- Set updatedRepositories
+ Set updatedRepositories
);
}
private void handleShardSnapshotUpdateCompletion(
ShardSnapshotUpdateResult shardSnapshotUpdateResult,
List newlyCompletedEntries,
- Set updatedRepositories
+ Set updatedRepositories
) {
// Maybe this state update completed one or more snapshots. If we are not already ending them because of some earlier update, end
// them now.
@@ -3767,16 +3849,31 @@ private void innerUpdateSnapshotState(
masterServiceTaskQueue.submitTask("update snapshot state", update, null);
}
- private void startExecutableClones(SnapshotsInProgress snapshotsInProgress, @Nullable String repoName) {
- if (repoName == null) {
- for (List entries : snapshotsInProgress.entriesByRepo()) {
- startExecutableClones(entries);
- }
- } else {
- startExecutableClones(snapshotsInProgress.forRepo(repoName));
+ /**
+ * Maybe kick off new shard clone operations for all repositories from all projects
+ */
+ private void startExecutableClones(SnapshotsInProgress snapshotsInProgress) {
+ for (List entries : snapshotsInProgress.entriesByRepo()) {
+ startExecutableClones(entries);
+ }
+ }
+
+ /**
+ * Maybe kick off new shard clone operations for all repositories of the specified project
+ */
+ private void startExecutableClones(SnapshotsInProgress snapshotsInProgress, ProjectId projectId) {
+ for (List entries : snapshotsInProgress.entriesByRepo(projectId)) {
+ startExecutableClones(entries);
}
}
+ /**
+ * Maybe kick off new shard clone operations for the single specified project repository
+ */
+ private void startExecutableClones(SnapshotsInProgress snapshotsInProgress, ProjectRepo projectRepo) {
+ startExecutableClones(snapshotsInProgress.forRepo(Objects.requireNonNull(projectRepo)));
+ }
+
private void startExecutableClones(List entries) {
for (SnapshotsInProgress.Entry entry : entries) {
if (entry.isClone() && entry.state() == SnapshotsInProgress.State.STARTED) {
@@ -3856,9 +3953,11 @@ private final class FailPendingRepoTasksTask extends ClusterStateUpdateTask {
// Failure that caused the decision to fail all snapshots and deletes for a repo
private final Exception failure;
+ private final ProjectId projectId;
private final String repository;
- FailPendingRepoTasksTask(String repository, Exception failure) {
+ FailPendingRepoTasksTask(ProjectId projectId, String repository, Exception failure) {
+ this.projectId = projectId;
this.repository = repository;
this.failure = failure;
}
@@ -3870,7 +3969,7 @@ public ClusterState execute(ClusterState currentState) {
final List remainingEntries = deletionsInProgress.getEntries();
List updatedEntries = new ArrayList<>(remainingEntries.size());
for (SnapshotDeletionsInProgress.Entry entry : remainingEntries) {
- if (entry.repository().equals(repository)) {
+ if (entry.projectId().equals(projectId) && entry.repository().equals(repository)) {
changed = true;
deletionsToFail.add(entry.uuid());
} else {
@@ -3880,38 +3979,44 @@ public ClusterState execute(ClusterState currentState) {
final SnapshotDeletionsInProgress updatedDeletions = changed ? SnapshotDeletionsInProgress.of(updatedEntries) : null;
final SnapshotsInProgress snapshotsInProgress = SnapshotsInProgress.get(currentState);
boolean changedSnapshots = false;
- for (SnapshotsInProgress.Entry entry : snapshotsInProgress.forRepo(repository)) {
+ for (SnapshotsInProgress.Entry entry : snapshotsInProgress.forRepo(projectId, repository)) {
// We failed to read repository data for this delete, it is not the job of SnapshotsService to
// retry these kinds of issues so we fail all the pending snapshots
snapshotsToFail.add(entry.snapshot());
changedSnapshots = true;
}
final SnapshotsInProgress updatedSnapshotsInProgress = changedSnapshots
- ? snapshotsInProgress.createCopyWithUpdatedEntriesForRepo(repository, List.of())
+ ? snapshotsInProgress.createCopyWithUpdatedEntriesForRepo(projectId, repository, List.of())
: null;
return updateWithSnapshots(currentState, updatedSnapshotsInProgress, updatedDeletions);
}
@Override
public void onFailure(Exception e) {
- logger.info(() -> "Failed to remove all snapshot tasks for repo [" + repository + "] from cluster state", e);
+ logger.info(
+ () -> "Failed to remove all snapshot tasks for repo [" + projectRepoString(projectId, repository) + "] from cluster state",
+ e
+ );
failAllListenersOnMasterFailOver(e);
}
@Override
public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
logger.warn(
- () -> format("Removed all snapshot tasks for repository [%s] from cluster state, now failing listeners", repository),
- failure
+ () -> format(
+ "Removed all snapshot tasks for repository %s from cluster state, now failing listeners",
+ projectRepoString(projectId, repository),
+ failure
+ )
);
final List readyToResolveListeners = new ArrayList<>();
synchronized (currentlyFinalizing) {
Tuple finalization;
- while ((finalization = repositoryOperations.pollFinalization(repository)) != null) {
+ while ((finalization = repositoryOperations.pollFinalization(projectId, repository)) != null) {
assert snapshotsToFail.contains(finalization.v1())
: "[" + finalization.v1() + "] not found in snapshots to fail " + snapshotsToFail;
}
- leaveRepoLoop(repository);
+ leaveRepoLoop(projectId, repository);
for (Snapshot snapshot : snapshotsToFail) {
failSnapshotCompletionListeners(snapshot, failure, readyToResolveListeners::add);
}
@@ -3932,7 +4037,7 @@ private static final class OngoingRepositoryOperations {
* Map of repository name to a deque of {@link Snapshot} that need to be finalized for the repository and the
* {@link Metadata to use when finalizing}.
*/
- private final Map> snapshotsToFinalize = new HashMap<>();
+ private final Map> snapshotsToFinalize = new HashMap<>();
/**
* Set of delete operations currently being executed against the repository. The values in this set are the delete UUIDs returned
@@ -3944,10 +4049,11 @@ private static final class OngoingRepositoryOperations {
private Metadata latestKnownMetaData;
@Nullable
- synchronized Tuple pollFinalization(String repository) {
+ synchronized Tuple pollFinalization(ProjectId projectId, String repository) {
assertConsistent();
+ final var projectRepo = new ProjectRepo(projectId, repository);
final Snapshot nextEntry;
- final Deque queued = snapshotsToFinalize.get(repository);
+ final Deque queued = snapshotsToFinalize.get(projectRepo);
if (queued == null) {
return null;
}
@@ -3955,7 +4061,7 @@ synchronized Tuple pollFinalization(String repository) {
assert nextEntry != null;
final Tuple res = Tuple.tuple(nextEntry, latestKnownMetaData);
if (queued.isEmpty()) {
- snapshotsToFinalize.remove(repository);
+ snapshotsToFinalize.remove(projectRepo);
}
if (snapshotsToFinalize.isEmpty()) {
latestKnownMetaData = null;
@@ -3973,7 +4079,8 @@ void finishDeletion(String deleteUUID) {
}
synchronized void addFinalization(Snapshot snapshot, Metadata metadata) {
- snapshotsToFinalize.computeIfAbsent(snapshot.getRepository(), k -> new LinkedList<>()).add(snapshot);
+ snapshotsToFinalize.computeIfAbsent(new ProjectRepo(snapshot.getProjectId(), snapshot.getRepository()), k -> new LinkedList<>())
+ .add(snapshot);
this.latestKnownMetaData = metadata;
assertConsistent();
}
@@ -3993,7 +4100,7 @@ synchronized boolean isEmpty() {
}
synchronized boolean assertNotQueued(Snapshot snapshot) {
- if (snapshotsToFinalize.getOrDefault(snapshot.getRepository(), new LinkedList<>())
+ if (snapshotsToFinalize.getOrDefault(new ProjectRepo(snapshot.getProjectId(), snapshot.getRepository()), new LinkedList<>())
.stream()
.anyMatch(entry -> entry.equals(snapshot))) {
@@ -4042,8 +4149,8 @@ private static void logSnapshotFailure(String operation, Snapshot snapshot, Exce
// suppress stack trace at INFO unless extra verbosity is configured
logger.info(
format(
- "[%s][%s] failed to %s snapshot: %s",
- snapshot.getRepository(),
+ "%s[%s] failed to %s snapshot: %s",
+ projectRepoString(snapshot.getProjectId(), snapshot.getRepository()),
snapshot.getSnapshotId().getName(),
operation,
e.getMessage()
@@ -4094,25 +4201,22 @@ public ClusterState execute(BatchExecutionContext batchExecutionCo
// Handle the tasks to apply the shard snapshot updates (ShardSnapshotUpdate tasks).
SnapshotsInProgress snapshotsInProgress = shardsUpdateContext.computeUpdatedState();
- final var project = state.metadata().getProject();
- final RegisteredPolicySnapshots.Builder registeredPolicySnapshots = project.custom(
- RegisteredPolicySnapshots.TYPE,
- RegisteredPolicySnapshots.EMPTY
- ).builder();
+ final Map registeredPolicySnapshotsBuilders = new HashMap<>();
// Handle the tasks to create new snapshots (CreateSnapshotTask tasks).
for (final var taskContext : batchExecutionContext.taskContexts()) {
if (taskContext.getTask() instanceof CreateSnapshotTask task) {
try {
- final var repoMeta = RepositoriesMetadata.get(state).repository(task.snapshot.getRepository());
+ final var projectMetadata = state.metadata().getProject(task.snapshot.getProjectId());
+ final var repoMeta = RepositoriesMetadata.get(projectMetadata).repository(task.snapshot.getRepository());
if (RepositoriesService.isReadOnly(repoMeta.settings())) {
taskContext.onFailure(new RepositoryException(repoMeta.name(), "repository is readonly"));
continue;
}
- registeredPolicySnapshots.addIfSnapshotIsSLMInitiated(
- task.createSnapshotRequest.userMetadata(),
- task.snapshot.getSnapshotId()
- );
+ registeredPolicySnapshotsBuilders.computeIfAbsent(
+ projectMetadata.id(),
+ ignored -> projectMetadata.custom(RegisteredPolicySnapshots.TYPE, RegisteredPolicySnapshots.EMPTY).builder()
+ ).addIfSnapshotIsSLMInitiated(task.createSnapshotRequest.userMetadata(), task.snapshot.getSnapshotId());
if (Objects.equals(task.initialRepositoryMetadata, repoMeta)) {
snapshotsInProgress = createSnapshot(task, taskContext, state, snapshotsInProgress);
} else {
@@ -4138,12 +4242,11 @@ public ClusterState execute(BatchExecutionContext batchExecutionCo
return state;
}
- return ClusterState.builder(state)
- .putCustom(SnapshotsInProgress.TYPE, snapshotsInProgress)
- .putProjectMetadata(
- ProjectMetadata.builder(project).putCustom(RegisteredPolicySnapshots.TYPE, registeredPolicySnapshots.build())
- )
- .build();
+ final var metadataBuilder = Metadata.builder(state.metadata());
+ registeredPolicySnapshotsBuilders.forEach(
+ (projectId, builder) -> metadataBuilder.getProject(projectId).putCustom(RegisteredPolicySnapshots.TYPE, builder.build())
+ );
+ return ClusterState.builder(state).putCustom(SnapshotsInProgress.TYPE, snapshotsInProgress).metadata(metadataBuilder).build();
}
private SnapshotsInProgress createSnapshot(
@@ -4156,11 +4259,12 @@ private SnapshotsInProgress createSnapshot(
final Snapshot snapshot = createSnapshotTask.snapshot;
final String repositoryName = snapshot.getRepository();
final String snapshotName = snapshot.getSnapshotId().getName();
- ensureRepositoryExists(repositoryName, currentState);
+ final var projectState = currentState.projectState(snapshot.getProjectId());
+ ensureRepositoryExists(repositoryName, projectState.metadata());
final Repository repository = createSnapshotTask.repository;
ensureSnapshotNameAvailableInRepo(repositoryData, snapshotName, repository);
- ensureSnapshotNameNotRunning(snapshotsInProgress, repositoryName, snapshotName);
- validate(repositoryName, snapshotName, currentState);
+ ensureSnapshotNameNotRunning(snapshotsInProgress, snapshot.getProjectId(), repositoryName, snapshotName);
+ validate(repositoryName, snapshotName, projectState.metadata());
final SnapshotDeletionsInProgress deletionsInProgress = SnapshotDeletionsInProgress.get(currentState);
ensureNoCleanupInProgress(currentState, repositoryName, snapshotName, "create snapshot");
ensureBelowConcurrencyLimit(repositoryName, snapshotName, snapshotsInProgress, deletionsInProgress);
@@ -4168,7 +4272,7 @@ private SnapshotsInProgress createSnapshot(
final CreateSnapshotRequest request = createSnapshotTask.createSnapshotRequest;
// Store newSnapshot here to be processed in clusterStateProcessed
Map> requestedIndices = Arrays.stream(
- indexNameExpressionResolver.concreteIndexNames(currentState, request)
+ indexNameExpressionResolver.concreteIndexNames(projectState.metadata(), request)
).collect(Collectors.partitioningBy(systemIndices::isSystemIndex));
List requestedSystemIndices = requestedIndices.get(true);
@@ -4228,17 +4332,17 @@ private SnapshotsInProgress createSnapshot(
Set featureSystemIndices = feature.getIndexDescriptors()
.stream()
- .flatMap(descriptor -> descriptor.getMatchingIndices(currentState.metadata().getProject()).stream())
+ .flatMap(descriptor -> descriptor.getMatchingIndices(projectState.metadata()).stream())
.collect(Collectors.toSet());
Set featureAssociatedIndices = feature.getAssociatedIndexDescriptors()
.stream()
- .flatMap(descriptor -> descriptor.getMatchingIndices(currentState.metadata().getProject()).stream())
+ .flatMap(descriptor -> descriptor.getMatchingIndices(projectState.metadata()).stream())
.collect(Collectors.toSet());
Set featureSystemDataStreams = new HashSet<>();
Set featureDataStreamBackingIndices = new HashSet<>();
for (SystemDataStreamDescriptor sdd : feature.getDataStreamDescriptors()) {
- List backingIndexNames = sdd.getBackingIndexNames(currentState.metadata());
+ List backingIndexNames = sdd.getBackingIndexNames(projectState.metadata());
if (backingIndexNames.size() > 0) {
featureDataStreamBackingIndices.addAll(backingIndexNames);
featureSystemDataStreams.add(sdd.getDataStreamName());
@@ -4267,7 +4371,7 @@ private SnapshotsInProgress createSnapshot(
logger.trace("[{}][{}] creating snapshot for indices [{}]", repositoryName, snapshotName, indices);
final Map allIndices = new HashMap<>();
- for (SnapshotsInProgress.Entry runningSnapshot : snapshotsInProgress.forRepo(repositoryName)) {
+ for (SnapshotsInProgress.Entry runningSnapshot : snapshotsInProgress.forRepo(projectState.projectId(), repositoryName)) {
allIndices.putAll(runningSnapshot.indices());
}
final Map indexIds = repositoryData.resolveNewIndices(indices, allIndices);
@@ -4283,7 +4387,7 @@ private SnapshotsInProgress createSnapshot(
ImmutableOpenMap shards = shards(
snapshotsInProgress,
deletionsInProgress,
- currentState,
+ projectState,
indexIds.values(),
useShardGenerations(version),
repositoryData,
@@ -4310,7 +4414,7 @@ private SnapshotsInProgress createSnapshot(
request.partial(),
indexIds,
CollectionUtils.concatLists(
- indexNameExpressionResolver.dataStreamNames(currentState, request.indicesOptions(), request.indices()),
+ indexNameExpressionResolver.dataStreamNames(projectState.metadata(), request.indicesOptions(), request.indices()),
systemDataStreamNames
),
threadPool.absoluteTimeInMillis(),
diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusActionTests.java
index adaad4eede0d6..09bf072c7225d 100644
--- a/server/src/test/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusActionTests.java
+++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusActionTests.java
@@ -82,7 +82,8 @@ public void initializeComponents() throws Exception {
threadPool,
repositoriesService,
nodeClient,
- new ActionFilters(Set.of())
+ new ActionFilters(Set.of()),
+ TestProjectResolvers.DEFAULT_PROJECT_ONLY
);
}
@@ -198,6 +199,7 @@ public void onFailure(Exception e) {
action.buildResponse(
SnapshotsInProgress.EMPTY,
+ ProjectId.DEFAULT,
new SnapshotsStatusRequest(TEST_REQUEST_TIMEOUT),
currentSnapshotEntries,
nodeSnapshotStatuses,
@@ -357,6 +359,7 @@ public void onFailure(Exception e) {
action.buildResponse(
SnapshotsInProgress.EMPTY,
+ ProjectId.DEFAULT,
new SnapshotsStatusRequest(TEST_REQUEST_TIMEOUT),
currentSnapshotEntries,
nodeSnapshotStatuses,
diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsServiceTests.java
index 936831f15baf5..7ac4399cc0fae 100644
--- a/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsServiceTests.java
+++ b/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsServiceTests.java
@@ -559,7 +559,8 @@ public void testDeleteMissing() {
public void testDeleteSnapshotting() {
String dataStreamName = randomAlphaOfLength(5);
- Snapshot snapshot = new Snapshot("doesn't matter", new SnapshotId("snapshot name", "snapshot uuid"));
+ var projectId = randomProjectIdOrDefault();
+ Snapshot snapshot = new Snapshot(projectId, "doesn't matter", new SnapshotId("snapshot name", "snapshot uuid"));
SnapshotsInProgress snaps = SnapshotsInProgress.EMPTY.withAddedEntry(
SnapshotsInProgress.Entry.snapshot(
snapshot,
@@ -578,7 +579,6 @@ public void testDeleteSnapshotting() {
)
);
final DataStream dataStream = DataStreamTestHelper.randomInstance(dataStreamName);
- var projectId = randomProjectIdOrDefault();
ProjectState state = ClusterState.builder(ClusterName.DEFAULT)
.putCustom(SnapshotsInProgress.TYPE, snaps)
.putProjectMetadata(ProjectMetadata.builder(projectId).put(dataStream))
diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataDeleteIndexServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataDeleteIndexServiceTests.java
index 2259ccb26621c..1020d9b2a14d7 100644
--- a/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataDeleteIndexServiceTests.java
+++ b/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataDeleteIndexServiceTests.java
@@ -88,7 +88,8 @@ public void testDeleteMissing() {
public void testDeleteSnapshotting() {
String indexName = randomAlphaOfLength(5);
- Snapshot snapshot = new Snapshot("doesn't matter", new SnapshotId("snapshot name", "snapshot uuid"));
+ final ProjectId projectId = randomProjectIdOrDefault();
+ Snapshot snapshot = new Snapshot(projectId, "doesn't matter", new SnapshotId("snapshot name", "snapshot uuid"));
SnapshotsInProgress snaps = SnapshotsInProgress.EMPTY.withAddedEntry(
SnapshotsInProgress.Entry.snapshot(
snapshot,
@@ -107,7 +108,7 @@ public void testDeleteSnapshotting() {
)
);
final Index index = new Index(indexName, randomUUID());
- ClusterState state = ClusterState.builder(clusterState(index)).putCustom(SnapshotsInProgress.TYPE, snaps).build();
+ ClusterState state = ClusterState.builder(clusterState(projectId, index)).putCustom(SnapshotsInProgress.TYPE, snaps).build();
Exception e = expectThrows(
SnapshotInProgressException.class,
() -> MetadataDeleteIndexService.deleteIndices(state, Set.of(index), Settings.EMPTY)
@@ -125,9 +126,8 @@ public void testDeleteUnassigned() throws Exception {
// Create an unassigned index
String indexName = randomAlphaOfLength(5);
Index index = new Index(indexName, randomUUID());
- ClusterState before = clusterState(index);
-
- final var projectId = before.metadata().projectFor(index).id();
+ final ProjectId projectId = randomProjectIdOrDefault();
+ ClusterState before = clusterState(projectId, index);
// Mock the built reroute
when(allocationService.reroute(any(ClusterState.class), anyString(), any())).then(i -> i.getArguments()[0]);
@@ -433,11 +433,10 @@ public void testDeleteIndicesFromMultipleProjects() {
assertThat(after.metadata().projects(), aMapWithSize(numProjects));
}
- private ClusterState clusterState(Index index) {
+ private ClusterState clusterState(ProjectId projectId, Index index) {
final IndexMetadata indexMetadata = IndexMetadata.builder(index.getName())
.settings(indexSettings(IndexVersionUtils.randomVersion(), index.getUUID(), 1, 1))
.build();
- final ProjectId projectId = randomProjectIdOrDefault();
final Metadata.Builder metadataBuilder = Metadata.builder().put(ProjectMetadata.builder(projectId).put(indexMetadata, false));
if (randomBoolean()) {
@@ -454,7 +453,7 @@ private ClusterState clusterState(Index index) {
return ClusterState.builder(ClusterName.DEFAULT)
.metadata(metadata)
.routingTable(GlobalRoutingTableTestHelper.buildRoutingTable(metadata, RoutingTable.Builder::addAsNew))
- .blocks(ClusterBlocks.builder().addBlocks(indexMetadata))
+ .blocks(ClusterBlocks.builder().addBlocks(projectId, indexMetadata))
.build();
}
}
diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataIndexStateServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataIndexStateServiceTests.java
index 88603fd3e6315..cf9dd323db6ac 100644
--- a/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataIndexStateServiceTests.java
+++ b/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataIndexStateServiceTests.java
@@ -452,7 +452,11 @@ private static ClusterState addSnapshotIndex(
);
}
- final Snapshot snapshot = new Snapshot(randomAlphaOfLength(10), new SnapshotId(randomAlphaOfLength(5), randomAlphaOfLength(5)));
+ final Snapshot snapshot = new Snapshot(
+ projectId,
+ randomAlphaOfLength(10),
+ new SnapshotId(randomAlphaOfLength(5), randomAlphaOfLength(5))
+ );
final SnapshotsInProgress.Entry entry = SnapshotsInProgress.Entry.snapshot(
snapshot,
randomBoolean(),
diff --git a/server/src/test/java/org/elasticsearch/repositories/InvalidRepositoryTests.java b/server/src/test/java/org/elasticsearch/repositories/InvalidRepositoryTests.java
index 7a82ec02c956d..8135b0ce3b799 100644
--- a/server/src/test/java/org/elasticsearch/repositories/InvalidRepositoryTests.java
+++ b/server/src/test/java/org/elasticsearch/repositories/InvalidRepositoryTests.java
@@ -32,7 +32,7 @@ public void testShouldThrowWhenGettingMetadata() {
assertThat(repository.getProjectId(), equalTo(projectId));
final var expectedException = expectThrows(
RepositoryException.class,
- () -> repository.getSnapshotGlobalMetadata(new SnapshotId("name", "uuid"))
+ () -> repository.getSnapshotGlobalMetadata(new SnapshotId("name", "uuid"), false)
);
assertThat(expectedException.getMessage(), equalTo("[name] repository type [type] failed to create on current node"));
assertThat(expectedException.getCause(), isA(RepositoryException.class));
diff --git a/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java b/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java
index 96d601f9091ff..da6d6ff78a989 100644
--- a/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java
+++ b/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java
@@ -670,7 +670,7 @@ public void getSnapshotInfo(
}
@Override
- public Metadata getSnapshotGlobalMetadata(SnapshotId snapshotId) {
+ public Metadata getSnapshotGlobalMetadata(SnapshotId snapshotId, boolean fromProjectMetadata) {
return null;
}
diff --git a/server/src/test/java/org/elasticsearch/repositories/UnknownTypeRepositoryTests.java b/server/src/test/java/org/elasticsearch/repositories/UnknownTypeRepositoryTests.java
index b7225546fd82e..2a11cdba7099c 100644
--- a/server/src/test/java/org/elasticsearch/repositories/UnknownTypeRepositoryTests.java
+++ b/server/src/test/java/org/elasticsearch/repositories/UnknownTypeRepositoryTests.java
@@ -24,7 +24,7 @@ public class UnknownTypeRepositoryTests extends ESTestCase {
public void testShouldThrowWhenGettingMetadata() {
assertThat(repository.getProjectId(), equalTo(projectId));
- expectThrows(RepositoryException.class, () -> repository.getSnapshotGlobalMetadata(new SnapshotId("name", "uuid")));
+ expectThrows(RepositoryException.class, () -> repository.getSnapshotGlobalMetadata(new SnapshotId("name", "uuid"), false));
}
public void testShouldNotThrowWhenApplyingLifecycleChanges() {
diff --git a/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java
index 6c20831969738..12742fb9b56b7 100644
--- a/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java
+++ b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java
@@ -180,6 +180,7 @@ public void testSnapshotWithConflictingName() throws Exception {
final RepositoryData ignoredRepositoryData = safeAwait(
listener -> repository.finalizeSnapshot(
new FinalizeSnapshotContext(
+ false,
snapshotShardGenerations,
RepositoryData.EMPTY_REPO_GEN,
Metadata.builder().put(shard.indexSettings().getIndexMetadata(), false).build(),
diff --git a/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java
index eb0b545712982..802e5a86afa35 100644
--- a/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java
+++ b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java
@@ -700,7 +700,7 @@ public void testUuidCreationLogging() {
"new repo uuid message",
BlobStoreRepository.class.getCanonicalName(),
Level.INFO,
- Strings.format("Generated new repository UUID [*] for repository [%s] in generation [*]", repoName)
+ Strings.format("Generated new repository UUID [*] for repository %s in generation [*]", repo.toStringShort())
)
);
@@ -729,7 +729,7 @@ public void testUuidCreationLogging() {
"existing repo uuid message",
RepositoriesService.class.getCanonicalName(),
Level.INFO,
- Strings.format("Registering repository [%s] with repository UUID *", repoName)
+ Strings.format("Registering repository %s with repository UUID *", repo.toStringShort())
)
);
@@ -785,7 +785,7 @@ public void testUuidCreationLogging() {
"existing repo uuid message",
RepositoriesService.class.getCanonicalName(),
Level.INFO,
- Strings.format("Registering repository [%s] with repository UUID *", repoName)
+ Strings.format("Registering repository %s with repository UUID *", repo.toStringShort())
)
);
},
diff --git a/server/src/test/java/org/elasticsearch/snapshots/BlobStoreFormatTests.java b/server/src/test/java/org/elasticsearch/snapshots/BlobStoreFormatTests.java
index b779f626753c3..1ffa3e9365801 100644
--- a/server/src/test/java/org/elasticsearch/snapshots/BlobStoreFormatTests.java
+++ b/server/src/test/java/org/elasticsearch/snapshots/BlobStoreFormatTests.java
@@ -11,6 +11,7 @@
import org.elasticsearch.ElasticsearchCorruptionException;
import org.elasticsearch.ElasticsearchParseException;
+import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
@@ -19,6 +20,7 @@
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.xcontent.XContentParserUtils;
import org.elasticsearch.core.Streams;
+import org.elasticsearch.repositories.ProjectRepo;
import org.elasticsearch.repositories.blobstore.ChecksumBlobStoreFormat;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xcontent.ToXContent;
@@ -95,8 +97,9 @@ public void testBlobStoreOperations() throws IOException {
checksumSMILE.write(new BlobObj(compressedText), blobContainer, "check-smile-comp", true);
// Assert that all checksum blobs can be read
- assertEquals(normalText, checksumSMILE.read("repo", blobContainer, "check-smile", xContentRegistry()).getText());
- assertEquals(compressedText, checksumSMILE.read("repo", blobContainer, "check-smile-comp", xContentRegistry()).getText());
+ final var projectRepo = new ProjectRepo(ProjectId.DEFAULT, "repo");
+ assertEquals(normalText, checksumSMILE.read(projectRepo, blobContainer, "check-smile", xContentRegistry()).getText());
+ assertEquals(compressedText, checksumSMILE.read(projectRepo, blobContainer, "check-smile-comp", xContentRegistry()).getText());
}
public void testCompressionIsApplied() throws IOException {
@@ -133,10 +136,11 @@ public void testBlobCorruption() throws IOException {
Function.identity()
);
checksumFormat.write(blobObj, blobContainer, "test-path", randomBoolean());
- assertEquals(checksumFormat.read("repo", blobContainer, "test-path", xContentRegistry()).getText(), testString);
+ final var projectRepo = new ProjectRepo(ProjectId.DEFAULT, "repo");
+ assertEquals(checksumFormat.read(projectRepo, blobContainer, "test-path", xContentRegistry()).getText(), testString);
randomCorruption(blobContainer, "test-path");
try {
- checksumFormat.read("repo", blobContainer, "test-path", xContentRegistry());
+ checksumFormat.read(projectRepo, blobContainer, "test-path", xContentRegistry());
fail("Should have failed due to corruption");
} catch (ElasticsearchCorruptionException | EOFException ex) {
// expected exceptions from random byte corruption
diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotInfoBlobSerializationTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotInfoBlobSerializationTests.java
index c664e2a108a29..6dac8e49209c3 100644
--- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotInfoBlobSerializationTests.java
+++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotInfoBlobSerializationTests.java
@@ -11,6 +11,7 @@
import org.elasticsearch.TransportVersion;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
+import org.elasticsearch.repositories.ProjectRepo;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import org.elasticsearch.test.AbstractWireTestCase;
import org.elasticsearch.xcontent.NamedXContentRegistry;
@@ -34,7 +35,7 @@ protected SnapshotInfo copyInstance(SnapshotInfo instance, TransportVersion vers
final BytesStreamOutput out = new BytesStreamOutput();
BlobStoreRepository.SNAPSHOT_FORMAT.serialize(instance, "test", randomBoolean(), out);
return BlobStoreRepository.SNAPSHOT_FORMAT.deserialize(
- instance.repository(),
+ new ProjectRepo(instance.projectId(), instance.repository()),
NamedXContentRegistry.EMPTY,
out.bytes().streamInput()
);
diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java
index c005691b212b2..ee99195766111 100644
--- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java
+++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java
@@ -2421,7 +2421,8 @@ public RecyclerBytesStreamOutput newNetworkBytesStream() {
repositoriesService,
transportService,
actionFilters,
- EmptySystemIndices.INSTANCE
+ EmptySystemIndices.INSTANCE,
+ false
);
nodeEnv = new NodeEnvironment(settings, environment);
final NamedXContentRegistry namedXContentRegistry = new NamedXContentRegistry(Collections.emptyList());
@@ -2781,19 +2782,47 @@ public boolean clusterHasFeature(ClusterState state, NodeFeature feature) {
);
actions.put(
TransportCleanupRepositoryAction.TYPE,
- new TransportCleanupRepositoryAction(transportService, clusterService, repositoriesService, threadPool, actionFilters)
+ new TransportCleanupRepositoryAction(
+ transportService,
+ clusterService,
+ repositoriesService,
+ threadPool,
+ actionFilters,
+ TestProjectResolvers.DEFAULT_PROJECT_ONLY
+ )
);
actions.put(
TransportCreateSnapshotAction.TYPE,
- new TransportCreateSnapshotAction(transportService, clusterService, threadPool, snapshotsService, actionFilters)
+ new TransportCreateSnapshotAction(
+ transportService,
+ clusterService,
+ threadPool,
+ snapshotsService,
+ actionFilters,
+ TestProjectResolvers.DEFAULT_PROJECT_ONLY
+ )
);
actions.put(
TransportCloneSnapshotAction.TYPE,
- new TransportCloneSnapshotAction(transportService, clusterService, threadPool, snapshotsService, actionFilters)
+ new TransportCloneSnapshotAction(
+ transportService,
+ clusterService,
+ threadPool,
+ snapshotsService,
+ actionFilters,
+ TestProjectResolvers.DEFAULT_PROJECT_ONLY
+ )
);
actions.put(
TransportGetSnapshotsAction.TYPE,
- new TransportGetSnapshotsAction(transportService, clusterService, threadPool, repositoriesService, actionFilters)
+ new TransportGetSnapshotsAction(
+ transportService,
+ clusterService,
+ threadPool,
+ repositoriesService,
+ actionFilters,
+ TestProjectResolvers.DEFAULT_PROJECT_ONLY
+ )
);
actions.put(
TransportClusterRerouteAction.TYPE,
@@ -2843,7 +2872,14 @@ public boolean clusterHasFeature(ClusterState state, NodeFeature feature) {
);
actions.put(
TransportDeleteSnapshotAction.TYPE,
- new TransportDeleteSnapshotAction(transportService, clusterService, threadPool, snapshotsService, actionFilters)
+ new TransportDeleteSnapshotAction(
+ transportService,
+ clusterService,
+ threadPool,
+ snapshotsService,
+ actionFilters,
+ TestProjectResolvers.DEFAULT_PROJECT_ONLY
+ )
);
client.initialize(
actions,
diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotsInProgressSerializationTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotsInProgressSerializationTests.java
index 4e4c5adf5d758..79d880c5ef5e1 100644
--- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotsInProgressSerializationTests.java
+++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotsInProgressSerializationTests.java
@@ -35,7 +35,7 @@
import org.elasticsearch.index.IndexVersion;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.repositories.IndexId;
-import org.elasticsearch.repositories.RepositoryOperation.ProjectRepo;
+import org.elasticsearch.repositories.ProjectRepo;
import org.elasticsearch.repositories.ShardGeneration;
import org.elasticsearch.repositories.ShardSnapshotResult;
import org.elasticsearch.test.AbstractChunkedSerializingTestCase;
diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java b/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java
index 67dd587258f23..a3e8f6e84600f 100644
--- a/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java
+++ b/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java
@@ -82,7 +82,7 @@ public void getSnapshotInfo(
}
@Override
- public Metadata getSnapshotGlobalMetadata(SnapshotId snapshotId) {
+ public Metadata getSnapshotGlobalMetadata(SnapshotId snapshotId, boolean fromProjectMetadata) {
return null;
}
diff --git a/test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java
index 63e77cfdc4523..ead163eaf26a4 100644
--- a/test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java
+++ b/test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java
@@ -22,6 +22,7 @@
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.SnapshotDeletionsInProgress;
import org.elasticsearch.cluster.SnapshotsInProgress;
+import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.RepositoriesMetadata;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
@@ -39,6 +40,7 @@
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.repositories.FinalizeSnapshotContext;
import org.elasticsearch.repositories.FinalizeSnapshotContext.UpdatedShardGenerations;
+import org.elasticsearch.repositories.ProjectRepo;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.RepositoryData;
@@ -423,7 +425,7 @@ protected String initWithSnapshotVersion(String repoName, Path repoPath, IndexVe
.replace(IndexVersion.current().toString(), version.toString())
)
) {
- downgradedSnapshotInfo = SnapshotInfo.fromXContentInternal(repoName, parser);
+ downgradedSnapshotInfo = SnapshotInfo.fromXContentInternal(new ProjectRepo(ProjectId.DEFAULT, repoName), parser);
}
final BlobStoreRepository blobStoreRepository = getRepositoryOnMaster(repoName);
BlobStoreRepository.SNAPSHOT_FORMAT.write(
@@ -545,6 +547,7 @@ protected void addBwCFailedSnapshot(String repoName, String snapshotName, Map listener) -> repo.finalizeSnapshot(
new FinalizeSnapshotContext(
+ false,
UpdatedShardGenerations.EMPTY,
getRepositoryData(repoName).getGenId(),
state.metadata(),
diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java
index bd2c925d52347..469f4c4d81998 100644
--- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java
+++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java
@@ -281,7 +281,7 @@ private Respons
}
@Override
- public Metadata getSnapshotGlobalMetadata(SnapshotId snapshotId) {
+ public Metadata getSnapshotGlobalMetadata(SnapshotId snapshotId, boolean fromProjectMetadata) {
assert SNAPSHOT_ID.equals(snapshotId) : "RemoteClusterRepository only supports " + SNAPSHOT_ID + " as the SnapshotId";
var remoteClient = getRemoteClusterClient();
ClusterStateResponse clusterState = executeRecoveryAction(
diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/sourceonly/SourceOnlySnapshotRepository.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/sourceonly/SourceOnlySnapshotRepository.java
index e6f3c3fa54277..c845df8501e45 100644
--- a/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/sourceonly/SourceOnlySnapshotRepository.java
+++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/sourceonly/SourceOnlySnapshotRepository.java
@@ -100,6 +100,7 @@ public void finalizeSnapshot(FinalizeSnapshotContext finalizeSnapshotContext) {
// required engine, that the index is read-only and the mapping to a default mapping
super.finalizeSnapshot(
new FinalizeSnapshotContext(
+ finalizeSnapshotContext.serializeProjectMetadata(),
finalizeSnapshotContext.updatedShardGenerations(),
finalizeSnapshotContext.repositoryStateId(),
metadataToSnapshot(
diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/AsyncRetryDuringSnapshotActionStep.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/AsyncRetryDuringSnapshotActionStep.java
index 94e44e7f9118f..702e3a88dc434 100644
--- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/AsyncRetryDuringSnapshotActionStep.java
+++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/AsyncRetryDuringSnapshotActionStep.java
@@ -145,7 +145,7 @@ public void onTimeout(TimeValue timeout) {
// The index has since been deleted, mission accomplished!
return true;
}
- for (List snapshots : SnapshotsInProgress.get(state).entriesByRepo()) {
+ for (List snapshots : SnapshotsInProgress.get(state).entriesByRepo(projectId)) {
for (SnapshotsInProgress.Entry snapshot : snapshots) {
if (snapshot.indices().containsKey(indexName)) {
// There is a snapshot running with this index name
diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/sourceonly/SourceOnlySnapshotShardTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/sourceonly/SourceOnlySnapshotShardTests.java
index fbb7ddfb6024c..655547a3f26ea 100644
--- a/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/sourceonly/SourceOnlySnapshotShardTests.java
+++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/sourceonly/SourceOnlySnapshotShardTests.java
@@ -375,6 +375,7 @@ public void testRestoreMinimal() throws IOException {
.build();
repository.finalizeSnapshot(
new FinalizeSnapshotContext(
+ false,
new UpdatedShardGenerations(shardGenerations, ShardGenerations.EMPTY),
ESBlobStoreRepositoryIntegTestCase.getRepositoryData(repository).getGenId(),
Metadata.builder().put(shard.indexSettings().getIndexMetadata(), false).build(),
diff --git a/x-pack/plugin/slm/src/internalClusterTest/java/org/elasticsearch/xpack/slm/SLMStatDisruptionIT.java b/x-pack/plugin/slm/src/internalClusterTest/java/org/elasticsearch/xpack/slm/SLMStatDisruptionIT.java
index d3eff9eb8585e..5837354cd5bb2 100644
--- a/x-pack/plugin/slm/src/internalClusterTest/java/org/elasticsearch/xpack/slm/SLMStatDisruptionIT.java
+++ b/x-pack/plugin/slm/src/internalClusterTest/java/org/elasticsearch/xpack/slm/SLMStatDisruptionIT.java
@@ -251,6 +251,7 @@ protected TestRestartBeforeListenersRepo(
@Override
public void finalizeSnapshot(FinalizeSnapshotContext fsc) {
var newFinalizeContext = new FinalizeSnapshotContext(
+ false,
fsc.updatedShardGenerations(),
fsc.repositoryStateId(),
fsc.clusterMetadata(),
diff --git a/x-pack/plugin/slm/src/main/java/org/elasticsearch/xpack/slm/SnapshotLifecycleTask.java b/x-pack/plugin/slm/src/main/java/org/elasticsearch/xpack/slm/SnapshotLifecycleTask.java
index eb7c9c1b8794e..df124a0f7ba36 100644
--- a/x-pack/plugin/slm/src/main/java/org/elasticsearch/xpack/slm/SnapshotLifecycleTask.java
+++ b/x-pack/plugin/slm/src/main/java/org/elasticsearch/xpack/slm/SnapshotLifecycleTask.java
@@ -20,6 +20,7 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.scheduler.SchedulerEngine;
+import org.elasticsearch.core.FixForMultiProject;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.snapshots.RegisteredPolicySnapshots;
@@ -218,7 +219,9 @@ public static String exceptionToString(Exception ex) {
static Set currentlyRunningSnapshots(ClusterState clusterState) {
final SnapshotsInProgress snapshots = clusterState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY);
final Set currentlyRunning = new HashSet<>();
- for (final List entriesForRepo : snapshots.entriesByRepo()) {
+ @FixForMultiProject(description = "replace with snapshots.entriesByRepo(ProjectId) when SLM is project aware")
+ final Iterable> entriesByRepo = snapshots.entriesByRepo();
+ for (final List entriesForRepo : entriesByRepo) {
for (SnapshotsInProgress.Entry entry : entriesForRepo) {
currentlyRunning.add(entry.snapshot().getSnapshotId());
}
diff --git a/x-pack/plugin/slm/src/main/java/org/elasticsearch/xpack/slm/action/TransportGetSnapshotLifecycleAction.java b/x-pack/plugin/slm/src/main/java/org/elasticsearch/xpack/slm/action/TransportGetSnapshotLifecycleAction.java
index 5392e182ed20f..7dd702238f755 100644
--- a/x-pack/plugin/slm/src/main/java/org/elasticsearch/xpack/slm/action/TransportGetSnapshotLifecycleAction.java
+++ b/x-pack/plugin/slm/src/main/java/org/elasticsearch/xpack/slm/action/TransportGetSnapshotLifecycleAction.java
@@ -86,7 +86,7 @@ protected void masterOperation(
}
} else {
final Map inProgress = new HashMap<>();
- for (List entriesForRepo : SnapshotsInProgress.get(state).entriesByRepo()) {
+ for (List entriesForRepo : SnapshotsInProgress.get(state).entriesByRepo(projectMetadata.id())) {
for (SnapshotsInProgress.Entry entry : entriesForRepo) {
Map meta = entry.userMetadata();
if (meta == null
diff --git a/x-pack/plugin/snapshot-repo-test-kit/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/testkit/integrity/RepositoryVerifyIntegrityIT.java b/x-pack/plugin/snapshot-repo-test-kit/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/testkit/integrity/RepositoryVerifyIntegrityIT.java
index 298dcd9ed94c5..e8c23d0741712 100644
--- a/x-pack/plugin/snapshot-repo-test-kit/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/testkit/integrity/RepositoryVerifyIntegrityIT.java
+++ b/x-pack/plugin/snapshot-repo-test-kit/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/testkit/integrity/RepositoryVerifyIntegrityIT.java
@@ -14,6 +14,7 @@
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
+import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
@@ -27,6 +28,7 @@
import org.elasticsearch.index.snapshots.blobstore.SnapshotFiles;
import org.elasticsearch.index.store.StoreFileMetadata;
import org.elasticsearch.plugins.Plugin;
+import org.elasticsearch.repositories.ProjectRepo;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.RepositoryData;
import org.elasticsearch.repositories.ShardGenerations;
@@ -468,7 +470,11 @@ public void testBadSnapshotInfo() throws IOException {
final SnapshotInfo snapshotInfo;
try (var inputStream = Files.newInputStream(snapshotInfoBlob)) {
- snapshotInfo = SNAPSHOT_FORMAT.deserialize(testContext.repositoryName(), xContentRegistry(), inputStream);
+ snapshotInfo = SNAPSHOT_FORMAT.deserialize(
+ new ProjectRepo(ProjectId.DEFAULT, testContext.repositoryName()),
+ xContentRegistry(),
+ inputStream
+ );
}
final var newIndices = new ArrayList<>(snapshotInfo.indices());
@@ -727,7 +733,7 @@ private void runInconsistentShardGenerationBlobTest(
final BlobStoreIndexShardSnapshots blobStoreIndexShardSnapshots;
try (var inputStream = Files.newInputStream(shardGenerationBlob)) {
blobStoreIndexShardSnapshots = INDEX_SHARD_SNAPSHOTS_FORMAT.deserialize(
- testContext.repositoryName(),
+ new ProjectRepo(ProjectId.DEFAULT, testContext.repositoryName()),
xContentRegistry(),
inputStream
);
diff --git a/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/integrity/RepositoryIntegrityVerifier.java b/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/integrity/RepositoryIntegrityVerifier.java
index 5fa3282cd57ae..cadf4463b4605 100644
--- a/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/integrity/RepositoryIntegrityVerifier.java
+++ b/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/integrity/RepositoryIntegrityVerifier.java
@@ -315,7 +315,7 @@ void verifySnapshotInfo(SnapshotInfo snapshotInfo, ActionListener listener
private void verifySnapshotGlobalMetadata(ActionListener listener) {
metadataTaskRunner.run(ActionRunnable.wrap(listener, l -> {
try {
- blobStoreRepository.getSnapshotGlobalMetadata(snapshotId);
+ blobStoreRepository.getSnapshotGlobalMetadata(snapshotId, false);
// no checks here, loading it is enough
l.onResponse(null);
} catch (Exception e) {
diff --git a/x-pack/qa/multi-project/core-rest-tests-with-multiple-projects/build.gradle b/x-pack/qa/multi-project/core-rest-tests-with-multiple-projects/build.gradle
index 2d33cccb00bee..2fac160a0b0dc 100644
--- a/x-pack/qa/multi-project/core-rest-tests-with-multiple-projects/build.gradle
+++ b/x-pack/qa/multi-project/core-rest-tests-with-multiple-projects/build.gradle
@@ -47,8 +47,6 @@ tasks.named("yamlRestTest").configure {
ArrayList blacklist = [
/* These tests don't work on multi-project yet - we need to go through each of them and make them work */
'^cat.recovery/*/*',
- '^cat.repositories/*/*',
- '^cat.snapshots/*/*',
'^cluster.desired_balance/10_basic/*',
'^cluster.stats/10_basic/snapshot stats reported in get cluster stats',
'^data_stream/40_supported_apis/Verify shard stores api', // uses _shard_stores API
@@ -58,14 +56,7 @@ tasks.named("yamlRestTest").configure {
'^indices.resolve_cluster/*/*/*',
'^indices.shard_stores/*/*',
'^migration/*/*',
- '^nodes.stats/70_repository_throttling_stats/Repository throttling stats (some repositories exist)',
- '^snapshot.clone/*/*',
- '^snapshot.create/*/*',
- '^snapshot.delete/*/*',
- '^snapshot.get/*/*',
- '^snapshot.get_repository/*/*',
'^snapshot.restore/*/*',
- '^snapshot.status/*/*',
'^synonyms/*/*',
'^tsdb/30_snapshot/*',
'^update_by_query/80_scripting/Update all docs with one deletion and one noop using a stored script', // scripting is not project aware yet