Skip to content

Commit

Permalink
[fileio] Add file-io.allow-cache for RESTTokenFileIO (apache#5054)
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi authored and 李鹏程 committed Feb 14, 2025
1 parent a6b028a commit 6c292d8
Show file tree
Hide file tree
Showing 11 changed files with 83 additions and 28 deletions.
18 changes: 12 additions & 6 deletions docs/layouts/shortcodes/generated/catalog_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,12 @@
<td>Integer</td>
<td>Configure the size of the connection pool.</td>
</tr>
<tr>
<td><h5>file-io.allow-cache</h5></td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>Whether to allow static cache in file io implementation. If not allowed, this means that there may be a large number of FileIO instances generated, enabling caching can lead to resource leakage.</td>
</tr>
<tr>
<td><h5>format-table.enabled</h5></td>
<td style="word-wrap: break-word;">true</td>
Expand Down Expand Up @@ -116,6 +122,12 @@
<td>String</td>
<td>Metastore of paimon catalog, supports filesystem, hive and jdbc.</td>
</tr>
<tr>
<td><h5>resolving-file-io.enabled</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Whether to enable resolving fileio, when this option is enabled, in conjunction with the table's property data-file.external-paths, Paimon can read and write to external storage paths, such as OSS or S3. In order to access these external paths correctly, you also need to configure the corresponding access key and secret key.</td>
</tr>
<tr>
<td><h5>sync-all-properties</h5></td>
<td style="word-wrap: break-word;">false</td>
Expand All @@ -140,11 +152,5 @@
<td>String</td>
<td>The warehouse root path of catalog.</td>
</tr>
<tr>
<td><h5>resolving-fileio.enabled</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Whether to enable resolving fileio, when this option is enabled, in conjunction with the table's property data-file.external-paths, Paimon can read and write to external storage paths, such as OSS or S3. In order to access these external paths correctly, you also need to configure the corresponding access key and secret key.</td>
</tr>
</tbody>
</table>
4 changes: 2 additions & 2 deletions paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
import java.util.stream.Collectors;

import static org.apache.paimon.fs.FileIOUtils.checkAccess;
import static org.apache.paimon.options.CatalogOptions.RESOLVING_FILEIO_ENABLED;
import static org.apache.paimon.options.CatalogOptions.RESOLVING_FILE_IO_ENABLED;
import static org.apache.paimon.utils.Preconditions.checkArgument;

/**
Expand Down Expand Up @@ -420,7 +420,7 @@ default Optional<String> readOverwrittenFileUtf8(Path path) throws IOException {
* by the given path.
*/
static FileIO get(Path path, CatalogContext config) throws IOException {
if (config.options().get(RESOLVING_FILEIO_ENABLED)) {
if (config.options().get(RESOLVING_FILE_IO_ENABLED)) {
FileIO fileIO = new ResolvingFileIO();
fileIO.configure(config);
return fileIO;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;

import static org.apache.paimon.options.CatalogOptions.RESOLVING_FILEIO_ENABLED;
import static org.apache.paimon.options.CatalogOptions.RESOLVING_FILE_IO_ENABLED;

/**
* An implementation of {@link FileIO} that supports multiple file system schemas. It dynamically
Expand Down Expand Up @@ -61,7 +61,7 @@ public boolean isObjectStore() {
public void configure(CatalogContext context) {
Options options = new Options();
context.options().toMap().forEach(options::set);
options.set(RESOLVING_FILEIO_ENABLED, false);
options.set(RESOLVING_FILE_IO_ENABLED, false);
this.context =
CatalogContext.create(
options, context.hadoopConf(), context.preferIO(), context.fallbackIO());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,12 +150,21 @@ public class CatalogOptions {
+ "However, during these processes, it does not connect to the metastore; hence, newly added partitions will not be reflected in"
+ " the metastore and need to be manually added as separate partition operations.");

public static final ConfigOption<Boolean> RESOLVING_FILEIO_ENABLED =
ConfigOptions.key("resolving-fileio.enabled")
public static final ConfigOption<Boolean> RESOLVING_FILE_IO_ENABLED =
ConfigOptions.key("resolving-file-io.enabled")
.booleanType()
.defaultValue(false)
.withDescription(
"Whether to enable resolving fileio, when this option is enabled, in conjunction with the table's property data-file.external-paths, "
+ "Paimon can read and write to external storage paths, such as OSS or S3. "
+ "In order to access these external paths correctly, you also need to configure the corresponding access key and secret key.");

public static final ConfigOption<Boolean> FILE_IO_ALLOW_CACHE =
ConfigOptions.key("file-io.allow-cache")
.booleanType()
.defaultValue(true)
.withDescription(
"Whether to allow static cache in file io implementation. If not allowed, this means that "
+ "there may be a large number of FileIO instances generated, enabling caching can "
+ "lead to resource leakage.");
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import static org.apache.paimon.options.CatalogOptions.FILE_IO_ALLOW_CACHE;

/** A {@link FileIO} to support getting token from REST Server. */
public class RESTTokenFileIO implements FileIO {

Expand Down Expand Up @@ -162,6 +164,7 @@ private FileIO fileIO() throws IOException {
CatalogContext context = catalogLoader.context();
Options options = context.options();
options = new Options(RESTUtil.merge(options.toMap(), token.token));
options.set(FILE_IO_ALLOW_CACHE, false);
context = CatalogContext.create(options, context.preferIO(), context.fallbackIO());
try {
fileIO = FileIO.get(path, context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@
* <p>Important: copy this class from HadoopFileIO here to avoid class loader conflicts.
*/
public abstract class HadoopCompliantFileIO implements FileIO {

private static final long serialVersionUID = 1L;

protected transient volatile FileSystem fs;

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.hadoop.fs.FileSystem;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

Expand Down Expand Up @@ -145,12 +146,19 @@ private FileSystem getFileSystem(org.apache.hadoop.fs.Path path) throws IOExcept
if (authority == null) {
authority = "DEFAULT";
}
FileSystem fs = map.get(authority);
if (fs == null) {
fs = createFileSystem(path);
map.put(authority, fs);
try {
return map.computeIfAbsent(
authority,
k -> {
try {
return createFileSystem(path);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
});
} catch (UncheckedIOException e) {
throw e.getCause();
}
return fs;
}

protected abstract FileSystem createFileSystem(org.apache.hadoop.fs.Path path)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.options.Options;
import org.apache.paimon.utils.IOUtils;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
Expand All @@ -35,11 +36,14 @@
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;

import static org.apache.paimon.options.CatalogOptions.FILE_IO_ALLOW_CACHE;

/** OSS {@link FileIO}. */
public class OSSFileIO extends HadoopCompliantFileIO {

private static final long serialVersionUID = 1L;
private static final long serialVersionUID = 2L;

private static final Logger LOG = LoggerFactory.getLogger(OSSFileIO.class);

Expand Down Expand Up @@ -68,7 +72,11 @@ public class OSSFileIO extends HadoopCompliantFileIO {
*/
private static final Map<CacheKey, AliyunOSSFileSystem> CACHE = new ConcurrentHashMap<>();

// create a shared config to avoid load properties everytime
private static final Configuration SHARED_CONFIG = new Configuration();

private Options hadoopOptions;
private boolean allowCache = true;

@Override
public boolean isObjectStore() {
Expand All @@ -77,6 +85,7 @@ public boolean isObjectStore() {

@Override
public void configure(CatalogContext context) {
allowCache = context.options().get(FILE_IO_ALLOW_CACHE);
hadoopOptions = new Options();
// read all configuration with prefix 'CONFIG_PREFIXES'
for (String key : context.options().keySet()) {
Expand All @@ -101,11 +110,12 @@ public void configure(CatalogContext context) {
protected FileSystem createFileSystem(org.apache.hadoop.fs.Path path) {
final String scheme = path.toUri().getScheme();
final String authority = path.toUri().getAuthority();
return CACHE.computeIfAbsent(
new CacheKey(hadoopOptions, scheme, authority),
key -> {
Configuration hadoopConf = new Configuration();
key.options.toMap().forEach(hadoopConf::set);
Supplier<AliyunOSSFileSystem> supplier =
() -> {
// create config from base config, if initializing a new config, it will
// retrieve props from the file, which comes at a high cost
Configuration hadoopConf = new Configuration(SHARED_CONFIG);
hadoopOptions.toMap().forEach(hadoopConf::set);
URI fsUri = path.toUri();
if (scheme == null && authority == null) {
fsUri = FileSystem.getDefaultUri(hadoopConf);
Expand All @@ -124,7 +134,22 @@ protected FileSystem createFileSystem(org.apache.hadoop.fs.Path path) {
throw new UncheckedIOException(e);
}
return fs;
});
};

if (allowCache) {
return CACHE.computeIfAbsent(
new CacheKey(hadoopOptions, scheme, authority), key -> supplier.get());
} else {
return supplier.get();
}
}

@Override
public void close() {
if (!allowCache) {
fsMap.values().forEach(IOUtils::closeQuietly);
fsMap.clear();
}
}

private static class CacheKey {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public class OrcWriterFactory implements FormatWriterFactory {
*/
@VisibleForTesting
OrcWriterFactory(Vectorizer<InternalRow> vectorizer) {
this(vectorizer, new Properties(), new Configuration(), 1024);
this(vectorizer, new Properties(), new Configuration(false), 1024);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public SimpleColStats[] extract(FileIO fileIO, Path path) throws IOException {
public Pair<SimpleColStats[], FileInfo> extractWithFileInfo(FileIO fileIO, Path path)
throws IOException {
try (Reader reader =
OrcReaderFactory.createReader(new Configuration(), fileIO, path, null)) {
OrcReaderFactory.createReader(new Configuration(false), fileIO, path, null)) {
long rowCount = reader.getNumberOfRows();
ColumnStatistics[] columnStatistics = reader.getStatistics();
TypeDescription schema = reader.getSchema();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,10 @@ public long getDataSize() {
* @param <SELF> The type of this builder that is returned by builder methods
*/
public abstract static class Builder<T, SELF extends Builder<T, SELF>> {
private OutputFile file = null;
private Configuration conf = new Configuration();

private final OutputFile file;

private Configuration conf = new Configuration(false);
private ParquetFileWriter.Mode mode;
private CompressionCodecName codecName = DEFAULT_COMPRESSION_CODEC_NAME;
private long rowGroupSize = DEFAULT_BLOCK_SIZE;
Expand Down

0 comments on commit 6c292d8

Please sign in to comment.