Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package co.worklytics.psoxy.gateway;

/**
* Signals that a config/secret backend had a transient failure (credential rotation, network
* blip, service hiccup) and the value may still be accessible on the next attempt.
*
* Distinct from a missing value ({@code Optional.empty()} / {@code NEGATIVE_VALUE}): callers
* should NOT treat this as "property not configured" — they should retry or serve a cached value.
*/
public class TransientConfigException extends RuntimeException {

public TransientConfigException(String message, Throwable cause) {
super(message, cause);
}

public TransientConfigException(String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,14 @@ public HttpEventResponse handle(HttpEventRequest requestToProxy,
log.log(Level.WARNING,
"Confirm oauth scopes set in config.yaml match those granted in data source");
return builder.build();
} catch (co.worklytics.psoxy.gateway.TransientConfigException e) {
// Config store was temporarily unreachable (e.g. credential rotation, AWS hiccup).
// The proxy already retried internally; this is not a misconfiguration.
builder.statusCode(HttpStatus.SC_SERVICE_UNAVAILABLE);
builder.header(ProcessedDataMetadataFields.ERROR.getHttpHeader(),
ErrorCauses.CONFIGURATION_FAILURE.name());
log.log(Level.WARNING, "Transient config store failure after retries: " + e.getMessage(), e);
return builder.build();
} catch (java.util.NoSuchElementException e) {
// missing config, such as ACCESS_TOKEN
builder.statusCode(HttpStatus.SC_INTERNAL_SERVER_ERROR);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,51 @@
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Ticker;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.UncheckedExecutionException;
import co.worklytics.psoxy.gateway.ConfigService;
import co.worklytics.psoxy.gateway.SecretStore;
import co.worklytics.psoxy.gateway.TransientConfigException;
import co.worklytics.psoxy.gateway.WritableConfigService;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.java.Log;
import org.jspecify.annotations.NonNull;
Comment thread
aperez-worklytics marked this conversation as resolved.
Outdated

import java.util.logging.Level;
Comment thread
aperez-worklytics marked this conversation as resolved.

@RequiredArgsConstructor

@Log
public class CachingConfigServiceDecorator implements WritableConfigService, SecretStore {

static final int MAX_TRANSIENT_RETRIES = 3;
static final long DEFAULT_TRANSIENT_RETRY_DELAY_MS = 500L;

final ConfigService delegate;
final Duration defaultTtl;
final Ticker ticker;
final long transientRetryDelayMs;

public CachingConfigServiceDecorator(ConfigService delegate, Duration defaultTtl) {
this(delegate, defaultTtl, Ticker.systemTicker(), DEFAULT_TRANSIENT_RETRY_DELAY_MS);
}

@VisibleForTesting
CachingConfigServiceDecorator(ConfigService delegate, Duration defaultTtl, Ticker ticker) {
this(delegate, defaultTtl, ticker, DEFAULT_TRANSIENT_RETRY_DELAY_MS);
}

@VisibleForTesting
CachingConfigServiceDecorator(ConfigService delegate, Duration defaultTtl, Ticker ticker, long transientRetryDelayMs) {
this.delegate = delegate;
this.defaultTtl = defaultTtl;
this.ticker = ticker;
this.transientRetryDelayMs = transientRetryDelayMs;
}

private volatile LoadingCache<ConfigProperty, String> cache;

Expand All @@ -39,12 +69,58 @@ LoadingCache<ConfigProperty, String> getCache() {
if (this.cache == null) {
this.cache = CacheBuilder.newBuilder()
.maximumSize(100)
.expireAfterWrite(defaultTtl.getSeconds(), TimeUnit.SECONDS)
.ticker(ticker)
.refreshAfterWrite(defaultTtl.getSeconds(), TimeUnit.SECONDS)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree this is better.

.recordStats()
.build(new CacheLoader<ConfigProperty, String>() { //req for java8-backwards compatibility
@Override
public String load(ConfigProperty key) {
return delegate.getConfigPropertyAsOptional(key).orElse(NEGATIVE_VALUE);
public String load(@NonNull ConfigProperty key) {
TransientConfigException lastException = null;
Comment on lines 77 to +79
for (int attempt = 0; attempt < MAX_TRANSIENT_RETRIES; attempt++) {
if (attempt > 0) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't really get why don't get rid of this condition, move the sleep to end of loop instead of beginning.

try {
if (transientRetryDelayMs > 0) Thread.sleep(transientRetryDelayMs);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new TransientConfigException("Config load for " + key.name() + " interrupted during retry", ie);
}
log.log(Level.WARNING, "Retrying config property {0}, attempt {1}/{2}",
new Object[]{key.name(), attempt + 1, MAX_TRANSIENT_RETRIES});
}
try {
return delegate.getConfigPropertyAsOptional(key).orElse(NEGATIVE_VALUE);
} catch (TransientConfigException e) {
lastException = e;
log.log(Level.WARNING, "Transient failure on attempt {0}/{1} for config property {2}",
new Object[]{attempt + 1, MAX_TRANSIENT_RETRIES, key.name()});
}
}
throw Objects.requireNonNull(lastException);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pointelss, right?

}

@Override
public ListenableFuture<String> reload(@NonNull ConfigProperty key, @NonNull String oldValue) {
try {
Comment thread
aperez-worklytics marked this conversation as resolved.
String newValue = delegate.getConfigPropertyAsOptional(key).orElse(NEGATIVE_VALUE);
// Fallback heuristic for backends that still swallow exceptions
// (e.g. GCP SecretManagerConfigService): if the value was valid
// before but now comes back empty, assume transient and retain.
if (NEGATIVE_VALUE.equals(newValue) && !NEGATIVE_VALUE.equals(oldValue)) {
log.log(Level.WARNING,
"Backend returned empty for config property {0} which was previously set; assuming transient failure and retaining cached value",
key.name());
return Futures.immediateFuture(oldValue);
}
Comment on lines +103 to +111
return Futures.immediateFuture(newValue);
} catch (TransientConfigException e) {
// Backend explicitly signalled a transient failure.
// Returning the old value resets the write-time so Guava waits a
// full TTL before retrying, rather than retrying on every request.
log.log(Level.WARNING,
"Transient failure reloading config property {0}; retaining cached value until next refresh cycle",
key.name());
return Futures.immediateFuture(oldValue);
}
}
});
}
Expand Down Expand Up @@ -85,8 +161,22 @@ public Optional<String> getConfigPropertyAsOptional(ConfigProperty property) {
} else {
return Optional.of(value);
}
} catch (UncheckedExecutionException e) {
// Guava wraps RuntimeExceptions from load() in UncheckedExecutionException.
// TransientConfigException is a RuntimeException, so it lands here.
Throwable cause = e.getCause();
if (cause instanceof TransientConfigException) {
// load() retried MAX_TRANSIENT_RETRIES times and still failed. Nothing was
// cached, so the next request will retry immediately. Re-throw so callers can
// distinguish a transient store outage from a genuinely missing property.
log.log(Level.WARNING,
"Transient backend failure for config property {0}; all retries exhausted",
property.name());
throw (TransientConfigException) cause;
}
throw (cause instanceof RuntimeException) ? (RuntimeException) cause : e;
} catch (ExecutionException e) {
//unwrap if possible, re-throw
// Guava wraps checked exceptions from load() in ExecutionException.
if (e.getCause() == null) {
throw e;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.concurrent.TimeUnit;

import com.google.common.base.Ticker;
import co.worklytics.psoxy.gateway.TransientConfigException;

import static org.junit.jupiter.api.Assertions.*;

Expand Down Expand Up @@ -72,6 +76,98 @@ void putConfigProperty() {
localHashMapConfigService.getConfigPropertyOrError(TestConfigProperties.EXAMPLE_PROPERTY));
}

@Test
void retainsStaleValueOnTransientReloadFailure() {
FakeTicker ticker = new FakeTicker();
ToggleableConfigService delegate = new ToggleableConfigService();
delegate.putConfigProperty(TestConfigProperties.EXAMPLE_PROPERTY, "valid_token");

CachingConfigServiceDecorator cache =
new CachingConfigServiceDecorator(delegate, Duration.ofMinutes(1), ticker);

// initial load succeeds
assertEquals(Optional.of("valid_token"),
cache.getConfigPropertyAsOptional(TestConfigProperties.EXAMPLE_PROPERTY));
assertEquals(1, delegate.getReads());

// simulate transient SSM failure and advance past TTL
delegate.setSimulateFailure(true);
ticker.advance(2, TimeUnit.MINUTES);

// reload fails silently; old value is retained — caller sees no error
assertEquals(Optional.of("valid_token"),
cache.getConfigPropertyAsOptional(TestConfigProperties.EXAMPLE_PROPERTY));
assertEquals(2, delegate.getReads()); // reload was attempted

// SSM recovers; advance past TTL again
delegate.setSimulateFailure(false);
ticker.advance(2, TimeUnit.MINUTES);

// next reload succeeds; value still valid
assertEquals(Optional.of("valid_token"),
cache.getConfigPropertyAsOptional(TestConfigProperties.EXAMPLE_PROPERTY));
assertEquals(3, delegate.getReads());
}

@Test
void retainsStaleValueOnExplicitTransientException() {
FakeTicker ticker = new FakeTicker();
ThrowingConfigService delegate = new ThrowingConfigService("valid_token");

CachingConfigServiceDecorator cache =
new CachingConfigServiceDecorator(delegate, Duration.ofMinutes(1), ticker);

// initial load succeeds
assertEquals(Optional.of("valid_token"),
cache.getConfigPropertyAsOptional(TestConfigProperties.EXAMPLE_PROPERTY));
assertEquals(1, delegate.getReads());

// backend starts throwing TransientConfigException
delegate.setThrowTransient(true);
ticker.advance(2, TimeUnit.MINUTES);

// reload catches TransientConfigException; old value retained, no error to caller
assertEquals(Optional.of("valid_token"),
cache.getConfigPropertyAsOptional(TestConfigProperties.EXAMPLE_PROPERTY));
assertEquals(2, delegate.getReads());

// backend recovers
delegate.setThrowTransient(false);
ticker.advance(2, TimeUnit.MINUTES);

assertEquals(Optional.of("valid_token"),
cache.getConfigPropertyAsOptional(TestConfigProperties.EXAMPLE_PROPERTY));
assertEquals(3, delegate.getReads());
}

@Test
void transientExceptionOnColdStartDoesNotCacheNegativeValue() {
FakeTicker ticker = new FakeTicker();
ThrowingConfigService delegate = new ThrowingConfigService("valid_token");
delegate.setThrowTransient(true);

// 0ms delay so the retry loop is fast in tests
CachingConfigServiceDecorator cache =
new CachingConfigServiceDecorator(delegate, Duration.ofMinutes(1), ticker, 0L);

// cold start with transient error: retries MAX_TRANSIENT_RETRIES times then throws —
// nothing is cached as NEGATIVE_VALUE, so the next request will retry immediately
assertThrows(TransientConfigException.class,
() -> cache.getConfigPropertyAsOptional(TestConfigProperties.EXAMPLE_PROPERTY));
assertEquals(CachingConfigServiceDecorator.MAX_TRANSIENT_RETRIES, delegate.getReads());

// second request: still failing, retries again from scratch (nothing was cached)
assertThrows(TransientConfigException.class,
() -> cache.getConfigPropertyAsOptional(TestConfigProperties.EXAMPLE_PROPERTY));
assertEquals(CachingConfigServiceDecorator.MAX_TRANSIENT_RETRIES * 2, delegate.getReads());

// backend recovers — no TTL advance needed since nothing was ever cached
delegate.setThrowTransient(false);
assertEquals(Optional.of("valid_token"),
cache.getConfigPropertyAsOptional(TestConfigProperties.EXAMPLE_PROPERTY));
assertEquals(CachingConfigServiceDecorator.MAX_TRANSIENT_RETRIES * 2 + 1, delegate.getReads());
}

@Test
void getConfigProperty_noCache() {
assertTrue(config.getConfigPropertyAsOptional(TestConfigProperties.NO_CACHE).isEmpty());
Expand All @@ -91,6 +187,88 @@ void getConfigProperty_noCache() {
}


static class FakeTicker extends Ticker {
private long nanos = 0;

@Override
public long read() {
return nanos;
}

void advance(long amount, TimeUnit unit) {
nanos += unit.toNanos(amount);
}
}

static class ThrowingConfigService implements WritableConfigService {
private String value;
private boolean throwTransient = false;

@Getter
private int reads = 0;

ThrowingConfigService(String value) {
this.value = value;
}

void setThrowTransient(boolean throwTransient) {
this.throwTransient = throwTransient;
}

@Override
public void putConfigProperty(ConfigProperty property, String newValue) {
this.value = newValue;
}

@Override
public String getConfigPropertyOrError(ConfigProperty property) {
return getConfigPropertyAsOptional(property)
.orElseThrow(() -> new NoSuchElementException("no value for " + property));
}

@Override
public Optional<String> getConfigPropertyAsOptional(ConfigProperty property) {
reads++;
if (throwTransient) {
throw new TransientConfigException("simulated transient failure");
}
return Optional.ofNullable(value);
}
}

static class ToggleableConfigService implements WritableConfigService {
private final Map<ConfigProperty, String> map = new HashMap<>();

@Getter
private int reads = 0;

private boolean simulateFailure = false;

void setSimulateFailure(boolean simulateFailure) {
this.simulateFailure = simulateFailure;
}

@Override
public void putConfigProperty(ConfigProperty property, String value) {
map.put(property, value);
}

@Override
public String getConfigPropertyOrError(ConfigProperty property) {
return getConfigPropertyAsOptional(property)
.orElseThrow(() -> new NoSuchElementException("no value for " + property));
}

@Override
public Optional<String> getConfigPropertyAsOptional(ConfigProperty property) {
reads++;
if (simulateFailure) {
return Optional.empty();
}
return Optional.ofNullable(map.get(property));
}
}

static class LocalHashMapConfigService implements WritableConfigService {

Map<ConfigProperty, String> map = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package co.worklytics.psoxy.aws;

import software.amazon.awssdk.awscore.exception.AwsServiceException;

class AwsExceptionUtils {

static boolean isAccessDenied(AwsServiceException e) {
if (e.awsErrorDetails() == null) {
return false;
}
String code = e.awsErrorDetails().errorCode();
return code != null && (code.contains("AccessDenied") || code.contains("Forbidden"));
}
}
Loading
Loading