From 13b6f1a4480876348ce7aef727a6c237dc82695b Mon Sep 17 00:00:00 2001 From: limu Date: Wed, 17 Jun 2026 10:20:50 +0800 Subject: [PATCH] [core] Rewrite OSSFileIO.tryToWriteAtomic using atomic putObject API of the OSS SDK (#8226) Paimon's existing OSSFileIO inherits from HadoopCompliantFileIO, with file operations implemented underneath via Hadoop's AliyunOSSFileSystem. In object storage scenarios, the default implementation of tryToWriteAtomic follows the pattern of 'writing a temporary file followed by renaming'. However, renaming on OSS is essentially a copy-then-delete process and not an atomic operation. Rewrite the implementation of tryToWriteAtomic, and directly call the conditional write API (put-if-absent) of the OSS SDK, so as to implement the atomic 'write if not exists' semantics without relying on external locks. --- .../org/apache/paimon/fs/PluginFileIO.java | 5 +++ .../java/org/apache/paimon/oss/OSSFileIO.java | 39 +++++++++++++++++++ 2 files changed, 44 insertions(+) diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/PluginFileIO.java b/paimon-common/src/main/java/org/apache/paimon/fs/PluginFileIO.java index 2af4919f0815..d5b62a2944cf 100644 --- a/paimon-common/src/main/java/org/apache/paimon/fs/PluginFileIO.java +++ b/paimon-common/src/main/java/org/apache/paimon/fs/PluginFileIO.java @@ -91,6 +91,11 @@ public boolean rename(Path src, Path dst) throws IOException { return wrap(() -> fileIO(src).rename(src, dst)); } + @Override + public boolean tryToWriteAtomic(Path path, String content) throws IOException { + return wrap(() -> fileIO(path).tryToWriteAtomic(path, content)); + } + private FileIO fileIO(Path path) throws IOException { if (lazyFileIO == null) { synchronized (this) { diff --git a/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OSSFileIO.java b/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OSSFileIO.java index 85d7e8cf9ec8..b9c18bf9940d 100644 --- a/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OSSFileIO.java +++ b/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OSSFileIO.java @@ -26,9 +26,12 @@ import org.apache.paimon.options.Options; import org.apache.paimon.utils.IOUtils; import org.apache.paimon.utils.ReflectionUtils; +import org.apache.paimon.utils.StringUtils; import com.aliyun.oss.OSSClient; +import com.aliyun.oss.OSSException; import com.aliyun.oss.common.comm.ServiceClient; +import com.aliyun.oss.model.ObjectMetadata; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem; @@ -36,9 +39,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.UncheckedIOException; import java.net.URI; +import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Map; import java.util.Objects; @@ -180,6 +185,40 @@ protected AliyunOSSFileSystem createFileSystem(org.apache.hadoop.fs.Path path) { } } + @Override + public boolean tryToWriteAtomic(Path path, String content) throws IOException { + URI uri = path.toUri(); + String bucket = uri.getHost(); + String objectKey = uri.getPath().substring(1); + byte[] bytes = content.getBytes(StandardCharsets.UTF_8); + + ObjectMetadata metadata = new ObjectMetadata(); + metadata.setContentLength(bytes.length); + metadata.setHeader("x-oss-forbid-overwrite", "true"); + + String sseAlgorithm = + hadoopOptions.getString("fs.oss.server-side-encryption-algorithm", ""); + if (StringUtils.isNotEmpty(sseAlgorithm)) { + metadata.setServerSideEncryption(sseAlgorithm); + } + + AliyunOSSFileSystem fs = (AliyunOSSFileSystem) getFileSystem(path(path)); + AliyunOSSFileSystemStore store = fs.getStore(); + try { + OSSClient ossClient = ReflectionUtils.getPrivateFieldValue(store, "ossClient"); + ossClient.putObject(bucket, objectKey, new ByteArrayInputStream(bytes), metadata); + return true; + } catch (OSSException e) { + if ("FileAlreadyExists".equals(e.getErrorCode())) { + LOG.warn("Failed to atomic write {}: object already exists", path); + return false; + } + throw new IOException("Failed to atomic write " + path, e); + } catch (Exception e) { + throw new IOException("Failed to atomic write " + path, e); + } + } + @Override public void close() { if (!allowCache) {