Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
ba9b3a1
Changes for versioned blob container
Pranshu-S Dec 3, 2025
5374c87
Changes in download flow/read flow to get the Etag ID and cache it lo…
Pranshu-S Dec 9, 2025
5b44f59
Change the path of the writes and reads and don’t make it cluster UUI…
Pranshu-S Dec 9, 2025
6ab9491
Write RemoteManifest using Etag if present otherwise upload if not ex…
Pranshu-S Dec 9, 2025
935faae
Adding Its to test E2E flow
Pranshu-S Dec 11, 2025
fbf1047
[feat]: handling disruptive cases where cluster manager needs to fetc…
Pranshu-S Dec 31, 2025
7f74caa
[feat]: Seperating index metadata updates to an indepdenet manifest
Pranshu-S Dec 31, 2025
b201fdc
[feat]: Defining a new IndexMetadataCoordinator Node to perform Index…
Pranshu-S Dec 31, 2025
ebb7610
[feat]: Updating IndexMetadata via IndexMetadataCoordinator
Pranshu-S Jan 2, 2026
02e82f6
[temp]: Updating tests to temporarily run
Pranshu-S Jan 2, 2026
9708412
[feat]: Implementing a new publish protocol to perform async apply of…
Pranshu-S Jan 4, 2026
2a8e7e9
feat: Adding index metadata version to manifest and uploading it to r…
Pranshu-S Jan 9, 2026
fb1145f
[feat]: Adding appliers to add routing details on empty indices
Pranshu-S Jan 9, 2026
f93f4a7
[feat]: Implementing distributing updates for IndexMetadata outside o…
Pranshu-S Jan 12, 2026
167b2bd
chore: Changes to get things working in demo + minor fixes
Pranshu-S Jan 13, 2026
35f9cde
feat: Abstracting IndexMetadata updates to UpdateTask and adding shar…
Pranshu-S Jan 14, 2026
396753c
feat: Extending IndexMetadataCoordinator implementation to put-mappin…
Pranshu-S Jan 14, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import software.amazon.awssdk.core.ResponseInputStream;
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.services.s3.model.S3Exception;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest;
Expand Down Expand Up @@ -78,6 +79,8 @@
import org.opensearch.common.blobstore.stream.write.WriteContext;
import org.opensearch.common.blobstore.stream.write.WritePriority;
import org.opensearch.common.blobstore.support.AbstractBlobContainer;
import org.opensearch.common.blobstore.versioned.VersionedBlobContainer;
import org.opensearch.common.blobstore.versioned.VersionedInputStream;
import org.opensearch.common.blobstore.support.PlainBlobMetadata;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.io.InputStreamContainer;
Expand Down Expand Up @@ -115,7 +118,7 @@
import static org.opensearch.repositories.s3.S3Repository.MIN_PART_SIZE_USING_MULTIPART;
import static org.opensearch.repositories.s3.utils.SseKmsUtil.configureEncryptionSettings;

class S3BlobContainer extends AbstractBlobContainer implements AsyncMultiStreamBlobContainer {
class S3BlobContainer extends VersionedBlobContainer implements AsyncMultiStreamBlobContainer {

private static final Logger logger = LogManager.getLogger(S3BlobContainer.class);
private static final long DEFAULT_OPERATION_TIMEOUT = TimeUnit.SECONDS.toSeconds(30);
Expand Down Expand Up @@ -948,4 +951,145 @@ public void deleteBlobsAsyncIgnoringIfNotExists(List<String> blobNames, ActionLi
completionListener.onFailure(new IOException("Failed to initiate async blob deletion", e));
}
}

/**
* Writes a blob with conditional version support.
*
* @param blobName the name of the blob
* @param inputStream the input stream to write
* @param blobSize the size of the blob
* @param expectedVersion the expected version for conditional write
* @return VersionedInputStream containing the new version
* @throws IOException if write fails or version mismatch
*/
@Override
public String conditionallyWriteBlobWithVersion(String blobName, InputStream inputStream, long blobSize, String expectedVersion) throws IOException {
PutObjectRequest.Builder builder = PutObjectRequest.builder()
.bucket(blobStore.bucket())
.key(buildKey(blobName))
.contentLength(blobSize)
.storageClass(blobStore.getStorageClass())
.acl(blobStore.getCannedACL())
.expectedBucketOwner(blobStore.expectedBucketOwner())
.ifMatch(expectedVersion);

configureEncryptionSettings(builder, blobStore);

try (AmazonS3Reference clientReference = blobStore.clientReference()) {
final InputStream requestInputStream = blobStore.isUploadRetryEnabled()
? new BufferedInputStream(inputStream, (int) (blobSize + 1))
: inputStream;

var response = AccessController.doPrivileged(() ->
clientReference.get().putObject(
builder.build(),
RequestBody.fromInputStream(requestInputStream, blobSize)
)
);
return response.eTag();
} catch (S3Exception e) {
if (e.statusCode() == 412) {
throw new IOException("Version conflict: expected version '" + expectedVersion + "' but remote version differs", e);
}
throw new IOException("Failed to write blob with version check", e);
} catch (SdkException e) {
throw new IOException("Failed to write blob with version check", e);
}
}



/**
* Writes a blob only if it does not already exist.
*
* @param blobName the name of the blob
* @param inputStream the input stream to write
* @param blobSize the size of the blob
* @return VersionedInputStream containing the new version, or null if blob already exists
* @throws IOException if write fails
*/
@Override
public String writeVersionedBlobIfNotExists(String blobName, InputStream inputStream, long blobSize) throws IOException {
PutObjectRequest.Builder builder = PutObjectRequest.builder()
.bucket(blobStore.bucket())
.key(buildKey(blobName))
.contentLength(blobSize)
.storageClass(blobStore.getStorageClass())
.acl(blobStore.getCannedACL())
.expectedBucketOwner(blobStore.expectedBucketOwner())
.ifNoneMatch("*");

configureEncryptionSettings(builder, blobStore);

try (AmazonS3Reference clientReference = blobStore.clientReference()) {
final InputStream requestInputStream = blobStore.isUploadRetryEnabled()
? new BufferedInputStream(inputStream, (int) (blobSize + 1))
: inputStream;

var response = AccessController.doPrivileged(() ->
clientReference.get().putObject(
builder.build(),
RequestBody.fromInputStream(requestInputStream, blobSize)
)
);
return response.eTag();
} catch (S3Exception e) {
if (e.statusCode() == 412) {
throw new IOException("Blob already exists: " + blobName, e);
}
throw new IOException("Failed to write blob if not exists", e);
} catch (SdkException e) {
throw new IOException("Failed to write blob if not exists", e);
}
}

/**
* Reads a versioned blob
*
* @param blobName the name of the blob
* @return VersionedInputStream containing the input stream and version
* @throws IOException if read fails
*/
@Override
public VersionedInputStream readVersionedBlob(String blobName) throws IOException {
GetObjectRequest request = GetObjectRequest.builder()
.bucket(blobStore.bucket())
.key(buildKey(blobName))
.expectedBucketOwner(blobStore.expectedBucketOwner())
.build();

try (AmazonS3Reference clientReference = blobStore.clientReference()) {
var response = AccessController.doPrivileged(() ->
clientReference.get().getObject(request)
);
return new VersionedInputStream(response.response().eTag(), response);
} catch (SdkException e) {
throw new IOException("Failed to read versioned blob: " + blobName, e);
}
}

/**
* Gets the current version of a blob without reading its content.
*
* @param blobName the name of the blob
* @return the current version
* @throws IOException if blob doesn't exist or operation fails
*/
@Override
public String getVersion(String blobName) throws IOException {
HeadObjectRequest request = HeadObjectRequest.builder()
.bucket(blobStore.bucket())
.key(buildKey(blobName))
.expectedBucketOwner(blobStore.expectedBucketOwner())
.build();

try (AmazonS3Reference clientReference = blobStore.clientReference()) {
var response = AccessController.doPrivileged(() ->
clientReference.get().headObject(request)
);
return response.eTag();
} catch (SdkException e) {
throw new IOException("Failed to get version for blob: " + blobName, e);
}
}
}
Loading
Loading