Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,11 @@ public boolean rename(Path src, Path dst) throws IOException {
return wrap(() -> fileIO(src).rename(src, dst));
}

@Override
public boolean tryToWriteAtomic(Path path, String content) throws IOException {
return wrap(() -> fileIO(path).tryToWriteAtomic(path, content));
}

private FileIO fileIO(Path path) throws IOException {
if (lazyFileIO == null) {
synchronized (this) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,24 @@
import org.apache.paimon.options.Options;
import org.apache.paimon.utils.IOUtils;
import org.apache.paimon.utils.ReflectionUtils;
import org.apache.paimon.utils.StringUtils;

import com.aliyun.oss.OSSClient;
import com.aliyun.oss.OSSException;
import com.aliyun.oss.common.comm.ServiceClient;
import com.aliyun.oss.model.ObjectMetadata;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem;
import org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystemStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -180,6 +185,40 @@ protected AliyunOSSFileSystem createFileSystem(org.apache.hadoop.fs.Path path) {
}
}

@Override
public boolean tryToWriteAtomic(Path path, String content) throws IOException {
URI uri = path.toUri();
String bucket = uri.getHost();
String objectKey = uri.getPath().substring(1);
byte[] bytes = content.getBytes(StandardCharsets.UTF_8);

ObjectMetadata metadata = new ObjectMetadata();
metadata.setContentLength(bytes.length);
metadata.setHeader("x-oss-forbid-overwrite", "true");

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The direct SDK upload no longer carries fs.oss.server-side-encryption-algorithm into ObjectMetadata. The Hadoop OSS write path that this replaces sets ObjectMetadata#setServerSideEncryption before putObject; with catalogs configured for OSS SSE this can either violate bucket policies that require encrypted uploads or create unencrypted metadata/version files. Could we set the same SSE header from the configured Hadoop options/store before calling putObject?

@MaxLinyun MaxLinyun Jun 17, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JingsongLi Thanks for taking the time to review this PR. I've made modifications following your review suggestions, could you please take another look when you have time?


String sseAlgorithm =
hadoopOptions.getString("fs.oss.server-side-encryption-algorithm", "");
if (StringUtils.isNotEmpty(sseAlgorithm)) {
metadata.setServerSideEncryption(sseAlgorithm);
}

AliyunOSSFileSystem fs = (AliyunOSSFileSystem) getFileSystem(path(path));
AliyunOSSFileSystemStore store = fs.getStore();
try {
OSSClient ossClient = ReflectionUtils.getPrivateFieldValue(store, "ossClient");
ossClient.putObject(bucket, objectKey, new ByteArrayInputStream(bytes), metadata);
return true;
} catch (OSSException e) {
if ("FileAlreadyExists".equals(e.getErrorCode())) {
LOG.warn("Failed to atomic write {}: object already exists", path);
return false;
}
throw new IOException("Failed to atomic write " + path, e);
} catch (Exception e) {
throw new IOException("Failed to atomic write " + path, e);
}
}

@Override
public void close() {
if (!allowCache) {
Expand Down