diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTToken.java b/paimon-core/src/main/java/org/apache/paimon/rest/RESTToken.java new file mode 100644 index 000000000000..2aaf27f8c175 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTToken.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.rest; + +import javax.annotation.Nullable; + +import java.io.Serializable; +import java.util.Map; +import java.util.Objects; + +/** Token for REST Catalog to access file io. */ +public class RESTToken implements Serializable { + + private static final long serialVersionUID = 1L; + + private final Map token; + private final long expireAtMillis; + + /** Cache the hash code. */ + @Nullable private Integer hash; + + public RESTToken(Map token, long expireAtMillis) { + this.token = token; + this.expireAtMillis = expireAtMillis; + } + + public Map token() { + return token; + } + + public long expireAtMillis() { + return expireAtMillis; + } + + @Override + public boolean equals(Object o) { + if (o == null || getClass() != o.getClass()) { + return false; + } + RESTToken token1 = (RESTToken) o; + return expireAtMillis == token1.expireAtMillis && Objects.equals(token, token1.token); + } + + @Override + public int hashCode() { + if (hash == null) { + hash = Objects.hash(token, expireAtMillis); + } + return hash; + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTTokenFileIO.java b/paimon-core/src/main/java/org/apache/paimon/rest/RESTTokenFileIO.java index c5f98286352b..42dddf341141 100644 --- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTTokenFileIO.java +++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTTokenFileIO.java @@ -34,13 +34,8 @@ import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Caffeine; import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Scheduler; -import javax.annotation.Nullable; - import java.io.IOException; -import java.io.Serializable; import java.io.UncheckedIOException; -import java.util.Map; -import java.util.Objects; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -51,7 +46,7 @@ public class RESTTokenFileIO implements FileIO { private static final long serialVersionUID = 1L; - private static final Cache FILE_IO_CACHE = + private static final Cache FILE_IO_CACHE = Caffeine.newBuilder() .expireAfterAccess(30, TimeUnit.MINUTES) .maximumSize(100) @@ -74,7 +69,7 @@ public class RESTTokenFileIO implements FileIO { // the latest token from REST Server, serializable in order to avoid loading token from the REST // Server again after serialization - private volatile Token token; + private volatile RESTToken token; public RESTTokenFileIO( RESTCatalogLoader catalogLoader, @@ -142,13 +137,7 @@ public boolean isObjectStore() { } private FileIO fileIO() throws IOException { - if (shouldRefresh()) { - synchronized (this) { - if (shouldRefresh()) { - refreshToken(); - } - } - } + tryToRefreshToken(); FileIO fileIO = FILE_IO_CACHE.getIfPresent(token); if (fileIO != null) { @@ -163,7 +152,7 @@ private FileIO fileIO() throws IOException { CatalogContext context = catalogLoader.context(); Options options = context.options(); - options = new Options(RESTUtil.merge(options.toMap(), token.token)); + options = new Options(RESTUtil.merge(options.toMap(), token.token())); options.set(FILE_IO_ALLOW_CACHE, false); context = CatalogContext.create(options, context.preferIO(), context.fallbackIO()); try { @@ -176,8 +165,18 @@ private FileIO fileIO() throws IOException { } } + private void tryToRefreshToken() { + if (shouldRefresh()) { + synchronized (this) { + if (shouldRefresh()) { + refreshToken(); + } + } + } + } + private boolean shouldRefresh() { - return token == null || System.currentTimeMillis() > token.expireAtMillis; + return token == null || System.currentTimeMillis() > token.expireAtMillis(); } private void refreshToken() { @@ -192,39 +191,15 @@ private void refreshToken() { } } - token = new Token(response.getToken(), response.getExpiresAtMillis()); + token = new RESTToken(response.getToken(), response.getExpiresAtMillis()); } - private static class Token implements Serializable { - - private static final long serialVersionUID = 1L; - - private final Map token; - private final long expireAtMillis; - - /** Cache the hash code. */ - @Nullable private Integer hash; - - private Token(Map token, long expireAtMillis) { - this.token = token; - this.expireAtMillis = expireAtMillis; - } - - @Override - public boolean equals(Object o) { - if (o == null || getClass() != o.getClass()) { - return false; - } - Token token1 = (Token) o; - return expireAtMillis == token1.expireAtMillis && Objects.equals(token, token1.token); - } - - @Override - public int hashCode() { - if (hash == null) { - hash = Objects.hash(token, expireAtMillis); - } - return hash; - } + /** + * Public interface to get valid token, this can be invoked by native engines to get the token + * and use own File System. + */ + public RESTToken validToken() { + tryToRefreshToken(); + return token; } }