Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fileio] Add file-io.allow-cache for RESTTokenFileIO #5054

Merged
merged 6 commits into from
Feb 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions docs/layouts/shortcodes/generated/catalog_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,12 @@
<td>Integer</td>
<td>Configure the size of the connection pool.</td>
</tr>
<tr>
<td><h5>file-io.allow-cache</h5></td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>Whether to allow static cache in file io implementation. If not allowed, this means that there may be a large number of FileIO instances generated, enabling caching can lead to resource leakage.</td>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a question here:
If enabling caching, the number of FileIO instances will be reduce. Why enabling caching can lead to resource leakage?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Our original design was to place it in the cache of static variables, but for the case of REST Catalog, it may have a lot of FileIO generated, and there may be a FileIO for each table because each table has different file access permissions.

So if there are too many FileIOs in this situation, we cannot cache them in memory casually. If there are too many, it will lead to too many resources, that is, resource leakage.

</tr>
<tr>
<td><h5>format-table.enabled</h5></td>
<td style="word-wrap: break-word;">true</td>
Expand Down Expand Up @@ -116,6 +122,12 @@
<td>String</td>
<td>Metastore of paimon catalog, supports filesystem, hive and jdbc.</td>
</tr>
<tr>
<td><h5>resolving-file-io.enabled</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Whether to enable resolving fileio, when this option is enabled, in conjunction with the table's property data-file.external-paths, Paimon can read and write to external storage paths, such as OSS or S3. In order to access these external paths correctly, you also need to configure the corresponding access key and secret key.</td>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fileio -> file io same with line 87?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is ok in description, we can just be careful for option key, it is API part.

</tr>
<tr>
<td><h5>sync-all-properties</h5></td>
<td style="word-wrap: break-word;">false</td>
Expand All @@ -140,11 +152,5 @@
<td>String</td>
<td>The warehouse root path of catalog.</td>
</tr>
<tr>
<td><h5>resolving-fileio.enabled</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Whether to enable resolving fileio, when this option is enabled, in conjunction with the table's property data-file.external-paths, Paimon can read and write to external storage paths, such as OSS or S3. In order to access these external paths correctly, you also need to configure the corresponding access key and secret key.</td>
</tr>
</tbody>
</table>
4 changes: 2 additions & 2 deletions paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
import java.util.stream.Collectors;

import static org.apache.paimon.fs.FileIOUtils.checkAccess;
import static org.apache.paimon.options.CatalogOptions.RESOLVING_FILEIO_ENABLED;
import static org.apache.paimon.options.CatalogOptions.RESOLVING_FILE_IO_ENABLED;
import static org.apache.paimon.utils.Preconditions.checkArgument;

/**
Expand Down Expand Up @@ -420,7 +420,7 @@ default Optional<String> readOverwrittenFileUtf8(Path path) throws IOException {
* by the given path.
*/
static FileIO get(Path path, CatalogContext config) throws IOException {
if (config.options().get(RESOLVING_FILEIO_ENABLED)) {
if (config.options().get(RESOLVING_FILE_IO_ENABLED)) {
FileIO fileIO = new ResolvingFileIO();
fileIO.configure(config);
return fileIO;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;

import static org.apache.paimon.options.CatalogOptions.RESOLVING_FILEIO_ENABLED;
import static org.apache.paimon.options.CatalogOptions.RESOLVING_FILE_IO_ENABLED;

/**
* An implementation of {@link FileIO} that supports multiple file system schemas. It dynamically
Expand Down Expand Up @@ -61,7 +61,7 @@ public boolean isObjectStore() {
public void configure(CatalogContext context) {
Options options = new Options();
context.options().toMap().forEach(options::set);
options.set(RESOLVING_FILEIO_ENABLED, false);
options.set(RESOLVING_FILE_IO_ENABLED, false);
this.context =
CatalogContext.create(
options, context.hadoopConf(), context.preferIO(), context.fallbackIO());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,12 +150,21 @@ public class CatalogOptions {
+ "However, during these processes, it does not connect to the metastore; hence, newly added partitions will not be reflected in"
+ " the metastore and need to be manually added as separate partition operations.");

public static final ConfigOption<Boolean> RESOLVING_FILEIO_ENABLED =
ConfigOptions.key("resolving-fileio.enabled")
public static final ConfigOption<Boolean> RESOLVING_FILE_IO_ENABLED =
ConfigOptions.key("resolving-file-io.enabled")
.booleanType()
.defaultValue(false)
.withDescription(
"Whether to enable resolving fileio, when this option is enabled, in conjunction with the table's property data-file.external-paths, "
+ "Paimon can read and write to external storage paths, such as OSS or S3. "
+ "In order to access these external paths correctly, you also need to configure the corresponding access key and secret key.");

public static final ConfigOption<Boolean> FILE_IO_ALLOW_CACHE =
ConfigOptions.key("file-io.allow-cache")
.booleanType()
.defaultValue(true)
.withDescription(
"Whether to allow static cache in file io implementation. If not allowed, this means that "
+ "there may be a large number of FileIO instances generated, enabling caching can "
+ "lead to resource leakage.");
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import static org.apache.paimon.options.CatalogOptions.FILE_IO_ALLOW_CACHE;

/** A {@link FileIO} to support getting token from REST Server. */
public class RESTTokenFileIO implements FileIO {

Expand Down Expand Up @@ -162,6 +164,7 @@ private FileIO fileIO() throws IOException {
CatalogContext context = catalogLoader.context();
Options options = context.options();
options = new Options(RESTUtil.merge(options.toMap(), token.token));
options.set(FILE_IO_ALLOW_CACHE, false);
context = CatalogContext.create(options, context.preferIO(), context.fallbackIO());
try {
fileIO = FileIO.get(path, context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@
* <p>Important: copy this class from HadoopFileIO here to avoid class loader conflicts.
*/
public abstract class HadoopCompliantFileIO implements FileIO {

private static final long serialVersionUID = 1L;

protected transient volatile FileSystem fs;

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.hadoop.fs.FileSystem;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

Expand Down Expand Up @@ -145,12 +146,19 @@ private FileSystem getFileSystem(org.apache.hadoop.fs.Path path) throws IOExcept
if (authority == null) {
authority = "DEFAULT";
}
FileSystem fs = map.get(authority);
if (fs == null) {
fs = createFileSystem(path);
map.put(authority, fs);
try {
return map.computeIfAbsent(
authority,
k -> {
try {
return createFileSystem(path);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
});
} catch (UncheckedIOException e) {
throw e.getCause();
}
return fs;
}

protected abstract FileSystem createFileSystem(org.apache.hadoop.fs.Path path)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.options.Options;
import org.apache.paimon.utils.IOUtils;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
Expand All @@ -35,11 +36,14 @@
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;

import static org.apache.paimon.options.CatalogOptions.FILE_IO_ALLOW_CACHE;

/** OSS {@link FileIO}. */
public class OSSFileIO extends HadoopCompliantFileIO {

private static final long serialVersionUID = 1L;
private static final long serialVersionUID = 2L;

private static final Logger LOG = LoggerFactory.getLogger(OSSFileIO.class);

Expand Down Expand Up @@ -68,7 +72,11 @@ public class OSSFileIO extends HadoopCompliantFileIO {
*/
private static final Map<CacheKey, AliyunOSSFileSystem> CACHE = new ConcurrentHashMap<>();

// create a shared config to avoid load properties everytime
private static final Configuration SHARED_CONFIG = new Configuration();

private Options hadoopOptions;
private boolean allowCache = true;

@Override
public boolean isObjectStore() {
Expand All @@ -77,6 +85,7 @@ public boolean isObjectStore() {

@Override
public void configure(CatalogContext context) {
allowCache = context.options().get(FILE_IO_ALLOW_CACHE);
hadoopOptions = new Options();
// read all configuration with prefix 'CONFIG_PREFIXES'
for (String key : context.options().keySet()) {
Expand All @@ -101,11 +110,12 @@ public void configure(CatalogContext context) {
protected FileSystem createFileSystem(org.apache.hadoop.fs.Path path) {
final String scheme = path.toUri().getScheme();
final String authority = path.toUri().getAuthority();
return CACHE.computeIfAbsent(
new CacheKey(hadoopOptions, scheme, authority),
key -> {
Configuration hadoopConf = new Configuration();
key.options.toMap().forEach(hadoopConf::set);
Supplier<AliyunOSSFileSystem> supplier =
() -> {
// create config from base config, if initializing a new config, it will
// retrieve props from the file, which comes at a high cost
Configuration hadoopConf = new Configuration(SHARED_CONFIG);
hadoopOptions.toMap().forEach(hadoopConf::set);
URI fsUri = path.toUri();
if (scheme == null && authority == null) {
fsUri = FileSystem.getDefaultUri(hadoopConf);
Expand All @@ -124,7 +134,22 @@ protected FileSystem createFileSystem(org.apache.hadoop.fs.Path path) {
throw new UncheckedIOException(e);
}
return fs;
});
};

if (allowCache) {
return CACHE.computeIfAbsent(
new CacheKey(hadoopOptions, scheme, authority), key -> supplier.get());
} else {
return supplier.get();
}
}

@Override
public void close() {
if (!allowCache) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This means, if allowCache, the fileIo need to close by outside?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if not allow cache, the fileio should be closed.

If allow cache, the fileio is in the static cache, we cannot close them.

fsMap.values().forEach(IOUtils::closeQuietly);
fsMap.clear();
}
}

private static class CacheKey {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public class OrcWriterFactory implements FormatWriterFactory {
*/
@VisibleForTesting
OrcWriterFactory(Vectorizer<InternalRow> vectorizer) {
this(vectorizer, new Properties(), new Configuration(), 1024);
this(vectorizer, new Properties(), new Configuration(false), 1024);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public SimpleColStats[] extract(FileIO fileIO, Path path) throws IOException {
public Pair<SimpleColStats[], FileInfo> extractWithFileInfo(FileIO fileIO, Path path)
throws IOException {
try (Reader reader =
OrcReaderFactory.createReader(new Configuration(), fileIO, path, null)) {
OrcReaderFactory.createReader(new Configuration(false), fileIO, path, null)) {
long rowCount = reader.getNumberOfRows();
ColumnStatistics[] columnStatistics = reader.getStatistics();
TypeDescription schema = reader.getSchema();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,10 @@ public long getDataSize() {
* @param <SELF> The type of this builder that is returned by builder methods
*/
public abstract static class Builder<T, SELF extends Builder<T, SELF>> {
private OutputFile file = null;
private Configuration conf = new Configuration();

private final OutputFile file;

private Configuration conf = new Configuration(false);
private ParquetFileWriter.Mode mode;
private CompressionCodecName codecName = DEFAULT_COMPRESSION_CODEC_NAME;
private long rowGroupSize = DEFAULT_BLOCK_SIZE;
Expand Down