Skip to content

Commit

Permalink
[core] Public RESTTokenFileIO.validToken which can be invoked by nati…
Browse files Browse the repository at this point in the history
…ve engines to get the token (#5063)
  • Loading branch information
JingsongLi authored Feb 12, 2025
1 parent 155e51c commit e7ccd39
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 48 deletions.
67 changes: 67 additions & 0 deletions paimon-core/src/main/java/org/apache/paimon/rest/RESTToken.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.rest;

import javax.annotation.Nullable;

import java.io.Serializable;
import java.util.Map;
import java.util.Objects;

/** Token for REST Catalog to access file io. */
public class RESTToken implements Serializable {

private static final long serialVersionUID = 1L;

private final Map<String, String> token;
private final long expireAtMillis;

/** Cache the hash code. */
@Nullable private Integer hash;

public RESTToken(Map<String, String> token, long expireAtMillis) {
this.token = token;
this.expireAtMillis = expireAtMillis;
}

public Map<String, String> token() {
return token;
}

public long expireAtMillis() {
return expireAtMillis;
}

@Override
public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) {
return false;
}
RESTToken token1 = (RESTToken) o;
return expireAtMillis == token1.expireAtMillis && Objects.equals(token, token1.token);
}

@Override
public int hashCode() {
if (hash == null) {
hash = Objects.hash(token, expireAtMillis);
}
return hash;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,8 @@
import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Caffeine;
import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Scheduler;

import javax.annotation.Nullable;

import java.io.IOException;
import java.io.Serializable;
import java.io.UncheckedIOException;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

Expand All @@ -51,7 +46,7 @@ public class RESTTokenFileIO implements FileIO {

private static final long serialVersionUID = 1L;

private static final Cache<Token, FileIO> FILE_IO_CACHE =
private static final Cache<RESTToken, FileIO> FILE_IO_CACHE =
Caffeine.newBuilder()
.expireAfterAccess(30, TimeUnit.MINUTES)
.maximumSize(100)
Expand All @@ -74,7 +69,7 @@ public class RESTTokenFileIO implements FileIO {

// the latest token from REST Server, serializable in order to avoid loading token from the REST
// Server again after serialization
private volatile Token token;
private volatile RESTToken token;

public RESTTokenFileIO(
RESTCatalogLoader catalogLoader,
Expand Down Expand Up @@ -142,13 +137,7 @@ public boolean isObjectStore() {
}

private FileIO fileIO() throws IOException {
if (shouldRefresh()) {
synchronized (this) {
if (shouldRefresh()) {
refreshToken();
}
}
}
tryToRefreshToken();

FileIO fileIO = FILE_IO_CACHE.getIfPresent(token);
if (fileIO != null) {
Expand All @@ -163,7 +152,7 @@ private FileIO fileIO() throws IOException {

CatalogContext context = catalogLoader.context();
Options options = context.options();
options = new Options(RESTUtil.merge(options.toMap(), token.token));
options = new Options(RESTUtil.merge(options.toMap(), token.token()));
options.set(FILE_IO_ALLOW_CACHE, false);
context = CatalogContext.create(options, context.preferIO(), context.fallbackIO());
try {
Expand All @@ -176,8 +165,18 @@ private FileIO fileIO() throws IOException {
}
}

private void tryToRefreshToken() {
if (shouldRefresh()) {
synchronized (this) {
if (shouldRefresh()) {
refreshToken();
}
}
}
}

private boolean shouldRefresh() {
return token == null || System.currentTimeMillis() > token.expireAtMillis;
return token == null || System.currentTimeMillis() > token.expireAtMillis();
}

private void refreshToken() {
Expand All @@ -192,39 +191,15 @@ private void refreshToken() {
}
}

token = new Token(response.getToken(), response.getExpiresAtMillis());
token = new RESTToken(response.getToken(), response.getExpiresAtMillis());
}

private static class Token implements Serializable {

private static final long serialVersionUID = 1L;

private final Map<String, String> token;
private final long expireAtMillis;

/** Cache the hash code. */
@Nullable private Integer hash;

private Token(Map<String, String> token, long expireAtMillis) {
this.token = token;
this.expireAtMillis = expireAtMillis;
}

@Override
public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) {
return false;
}
Token token1 = (Token) o;
return expireAtMillis == token1.expireAtMillis && Objects.equals(token, token1.token);
}

@Override
public int hashCode() {
if (hash == null) {
hash = Objects.hash(token, expireAtMillis);
}
return hash;
}
/**
* Public interface to get valid token, this can be invoked by native engines to get the token
* and use own File System.
*/
public RESTToken validToken() {
tryToRefreshToken();
return token;
}
}

0 comments on commit e7ccd39

Please sign in to comment.