Skip to content

Commit

Permalink
Hold off on consuming from the Kafka topic as long as a pause-consume…
Browse files Browse the repository at this point in the history
… predicate is in place. This will allow the Kafka buffer to wait for the circuit breaker to close before reading.

Signed-off-by: David Venable <[email protected]>
  • Loading branch information
dlvenable committed Feb 16, 2024
1 parent faddf01 commit b793536
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -338,9 +338,13 @@ public void run() {
boolean retryingAfterException = false;
while (!shutdownInProgress.get()) {
try {
if (retryingAfterException || pauseConsumePredicate.pauseConsuming()) {
LOG.debug("Pause consuming from Kafka topic.");
if (retryingAfterException) {
LOG.debug("Pause consuming from Kafka topic due a previous exception.");
Thread.sleep(10000);
} else if (pauseConsumePredicate.pauseConsuming()) {
LOG.debug("Pause and skip the next consume from Kafka topic due to an external condition: {}", pauseConsumePredicate);
Thread.sleep(10000);
continue;
}
synchronized(this) {
commitOffsets(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,17 @@ public interface PauseConsumePredicate {
static PauseConsumePredicate circuitBreakingPredicate(final CircuitBreaker circuitBreaker) {
if(circuitBreaker == null)
return noPause();
return circuitBreaker::isOpen;
return new PauseConsumePredicate() {
@Override
public boolean pauseConsuming() {
return circuitBreaker.isOpen();
}

@Override
public String toString() {
return "Circuit Breaker";
}
};
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,13 @@ void circuitBreakingPredicate_with_a_circuit_breaker_returns_predicate_with_paus

assertThat(pauseConsumePredicate.pauseConsuming(), equalTo(isOpen));
}

@Test
void circuitBreakingPredicate_with_a_circuit_breaker_returns_predicate_with_toString() {
final CircuitBreaker circuitBreaker = mock(CircuitBreaker.class);

final PauseConsumePredicate pauseConsumePredicate = PauseConsumePredicate.circuitBreakingPredicate(circuitBreaker);

assertThat(pauseConsumePredicate.toString(), equalTo("Circuit Breaker"));
}
}

0 comments on commit b793536

Please sign in to comment.