Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions bot/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -157,10 +157,16 @@
<artifactId>jcache</artifactId>
</dependency>
<dependency>
<groupId>edu.java</groupId>
<artifactId>retry</artifactId>
<version>0.1</version>
<scope>compile</scope>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-prometheus</artifactId>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-tracing-bridge-brave</artifactId>
</dependency>
</dependencies>

Expand Down
2 changes: 1 addition & 1 deletion bot/src/main/java/edu/java/bot/BotApplication.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package edu.java.bot;

import edu.java.RetryQueryConfiguration;
import edu.java.bot.configuration.ApplicationConfig;
import edu.java.bot.configuration.RetryQueryConfiguration;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
Expand Down
21 changes: 14 additions & 7 deletions bot/src/main/java/edu/java/bot/bot/UpdateTrackingBot.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,27 +10,27 @@
import com.pengrad.telegrambot.response.BaseResponse;
import edu.java.bot.commands.Command;
import edu.java.bot.processors.MessageProcessor;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import jakarta.annotation.PostConstruct;
import java.util.List;
import org.springframework.beans.factory.annotation.Autowired;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;

@Component
@RequiredArgsConstructor
public class UpdateTrackingBot implements Bot {

private final TelegramBot telegramBot;
private final MessageProcessor userMessagesProcessor;

@Autowired
public UpdateTrackingBot(MessageProcessor userMessagesProcessor, TelegramBot telegramBot) {
this.userMessagesProcessor = userMessagesProcessor;
this.telegramBot = telegramBot;
}
private final MeterRegistry meterRegistry;
private Counter userMessagesCounter;

@Override
public int process(List<Update> updates) {
updates.forEach(update -> {
SendMessage sendMessage = userMessagesProcessor.process(update);
userMessagesCounter.increment();
execute(sendMessage);
});
return UpdatesListener.CONFIRMED_UPDATES_ALL;
Expand All @@ -55,4 +55,11 @@ public <T extends BaseRequest<T, R>, R extends BaseResponse> void execute(BaseRe
public void close() {
telegramBot.shutdown();
}

@PostConstruct
public void initMetrics() {
userMessagesCounter = Counter.builder("user_messages")
.description("Count of processed user messages")
.register(meterRegistry);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package edu.java.bot.configuration;

import java.time.Duration;
import java.util.List;
import java.util.Map;
import org.jetbrains.annotations.NotNull;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.validation.annotation.Validated;

@Validated
@ConfigurationProperties(prefix = "retry-query", ignoreUnknownFields = false)
public record RetryQueryConfiguration(Map<String, RetryElement> targets) {
public record RetryElement(
@NotNull String type,
int maxAttempts,
double factor,
Duration minDelay,
Duration maxDelay,
List<Integer> codes
) {
}
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
package edu.java.bot.configuration;

import edu.java.RetryFactory;
import edu.java.RetryQueryConfiguration;
import edu.java.bot.client.scrapper.ScrapperClient;
import edu.java.bot.util.retry.RetryFactory;
import lombok.extern.log4j.Log4j2;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
Expand All @@ -11,7 +10,6 @@
import org.springframework.web.reactive.function.client.support.WebClientAdapter;
import org.springframework.web.service.invoker.HttpServiceProxyFactory;
import reactor.core.publisher.Mono;
import reactor.util.retry.Retry;

@Configuration
@Log4j2
Expand All @@ -22,11 +20,10 @@ public class ScrapperClientConfiguration {

@Bean
public ScrapperClient scrapperClient(RetryQueryConfiguration retryQueryConfiguration) {
Retry retry = RetryFactory.createRetry(retryQueryConfiguration, "scrapper");
WebClient webClient = WebClient.builder()
.defaultStatusHandler(httpStatusCode -> true, clientResponse -> Mono.empty())
.defaultHeader("Content-Type", "application/json")
.filter(RetryFactory.createFilter(retry))
.filter(RetryFactory.createFilter(retryQueryConfiguration, "scrapper"))
.baseUrl(scrapperUrl).build();

HttpServiceProxyFactory httpServiceProxyFactory = HttpServiceProxyFactory
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package edu.java.bot.util.retry;

import java.util.List;
import java.util.function.Predicate;
import org.springframework.web.reactive.function.client.WebClientResponseException;

public class ErrorFilterPredicate implements Predicate<Throwable> {
private final List<Integer> retryCodes;

public ErrorFilterPredicate(List<Integer> retryCodes) {
this.retryCodes = retryCodes;
}

@Override
public boolean test(Throwable throwable) {
if (throwable instanceof WebClientResponseException e) {
return retryCodes.contains(e.getStatusCode().value());
}
return true;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package edu.java.bot.util.retry;

import java.time.Duration;
import java.util.function.Predicate;
import java.util.function.Supplier;
import lombok.RequiredArgsConstructor;
import org.reactivestreams.Publisher;
import org.springframework.retry.ExhaustedRetryException;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.util.retry.Retry;


@RequiredArgsConstructor
public class LinearRetryBackoffSpec extends Retry {
private static final Duration MAX_BACKOFF = Duration.ofMillis(Long.MAX_VALUE);
private final Duration minBackoff;
private final Duration maxBackoff;
private final double factor;
private final int maxAttempts;
private final Predicate<Throwable> errorFilter;
private final Supplier<Scheduler> schedulerSupplier;

public LinearRetryBackoffSpec factor(double factor) {
return new LinearRetryBackoffSpec(
this.minBackoff,
this.maxBackoff,
factor,
this.maxAttempts,
this.errorFilter,
this.schedulerSupplier
);
}

public LinearRetryBackoffSpec filter(Predicate<Throwable> errorFilter) {
return new LinearRetryBackoffSpec(
this.minBackoff,
this.maxBackoff,
this.factor,
this.maxAttempts,
errorFilter,
this.schedulerSupplier
);
}

public static LinearRetryBackoffSpec linear(int maxAttempts, Duration minDelay) {
return new LinearRetryBackoffSpec(
minDelay,
MAX_BACKOFF,
1.0,
maxAttempts,
e -> true,
Schedulers::parallel
);
}

@Override
public Publisher<?> generateCompanion(Flux<RetrySignal> retrySignals) {
return Flux.deferContextual(cv ->
retrySignals.contextWrite(cv)
.concatMap(retryWhenState -> {
RetrySignal copy = retryWhenState.copy();
Throwable currentFailure = copy.failure();
long iteration = copy.totalRetries();
if (currentFailure == null) {
return Mono.error(new IllegalStateException(
"Retry.RetrySignal#failure() not expected to be null"));
}
if (!errorFilter.test(currentFailure)) {
return Mono.error(currentFailure);
}
if (iteration >= maxAttempts) {
return Mono.error(new ExhaustedRetryException("Retry exhausted: " + this));
}

Duration nextBackoff;
try {
nextBackoff = minBackoff.multipliedBy((long) (iteration * factor));
if (nextBackoff.compareTo(maxBackoff) > 0) {
nextBackoff = maxBackoff;
}
} catch (ArithmeticException overflow) {
nextBackoff = maxBackoff;
}

return Mono.delay(nextBackoff, schedulerSupplier.get()).contextWrite(cv);
})
.onErrorStop()
);
}
}
43 changes: 43 additions & 0 deletions bot/src/main/java/edu/java/bot/util/retry/RetryFactory.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package edu.java.bot.util.retry;

import edu.java.bot.configuration.RetryQueryConfiguration;
import edu.java.bot.util.retry.builders.ExponentialRetryBuilder;
import edu.java.bot.util.retry.builders.FixedRetryBuilder;
import edu.java.bot.util.retry.builders.LinearRetryBuilder;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Function;
import lombok.experimental.UtilityClass;
import org.springframework.web.reactive.function.client.ExchangeFilterFunction;
import reactor.core.publisher.Mono;
import reactor.util.retry.Retry;

@UtilityClass
public class RetryFactory {

private static final Map<String, Function<RetryQueryConfiguration.RetryElement, Retry>> RETRY_BUILDERS =
new HashMap<>();

static {
RETRY_BUILDERS.put("fixed", new FixedRetryBuilder());
RETRY_BUILDERS.put("linear", new LinearRetryBuilder());
RETRY_BUILDERS.put("exponential", new ExponentialRetryBuilder());
}

public static ExchangeFilterFunction createFilter(RetryQueryConfiguration config, String target) {
return (response, next) -> next.exchange(response)
.flatMap(clientResponse -> {
if (clientResponse.statusCode().isError()
&& config.targets().get(target).codes().contains(clientResponse.statusCode().value())) {
return clientResponse.createError();
} else {
return Mono.just(clientResponse);
}
}).retryWhen(createRetry(config, target));
}

public static Retry createRetry(RetryQueryConfiguration config, String target) {
RetryQueryConfiguration.RetryElement element = config.targets().get(target);
return RETRY_BUILDERS.get(element.type()).apply(element);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package edu.java.bot.util.retry.builders;

import edu.java.bot.configuration.RetryQueryConfiguration;
import edu.java.bot.util.retry.ErrorFilterPredicate;
import java.util.function.Function;
import reactor.util.retry.Retry;
import reactor.util.retry.RetryBackoffSpec;

public class ExponentialRetryBuilder implements Function<RetryQueryConfiguration.RetryElement, Retry> {
@Override
public Retry apply(RetryQueryConfiguration.RetryElement retryElement) {
return RetryBackoffSpec.backoff(retryElement.maxAttempts(), retryElement.minDelay())
.maxBackoff(retryElement.maxDelay())
.filter(new ErrorFilterPredicate(retryElement.codes()));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package edu.java.bot.util.retry.builders;

import edu.java.bot.configuration.RetryQueryConfiguration;
import edu.java.bot.util.retry.ErrorFilterPredicate;
import java.util.function.Function;
import reactor.util.retry.Retry;
import reactor.util.retry.RetryBackoffSpec;

public class FixedRetryBuilder implements Function<RetryQueryConfiguration.RetryElement, Retry> {
@Override
public Retry apply(RetryQueryConfiguration.RetryElement retryElement) {
return RetryBackoffSpec.fixedDelay(retryElement.maxAttempts(), retryElement.minDelay())
.filter(new ErrorFilterPredicate(retryElement.codes()));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package edu.java.bot.util.retry.builders;

import edu.java.bot.configuration.RetryQueryConfiguration;
import edu.java.bot.util.retry.ErrorFilterPredicate;
import edu.java.bot.util.retry.LinearRetryBackoffSpec;
import java.util.function.Function;
import reactor.util.retry.Retry;

public class LinearRetryBuilder implements Function<RetryQueryConfiguration.RetryElement, Retry> {
@Override
public Retry apply(RetryQueryConfiguration.RetryElement retryElement) {
return LinearRetryBackoffSpec.linear(retryElement.maxAttempts(), retryElement.minDelay())
.factor(retryElement.factor()).filter(new ErrorFilterPredicate(retryElement.codes()));
}
}
20 changes: 17 additions & 3 deletions bot/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,13 @@ scrapper:
url: http://localhost:8080

retry-query:
retries:
- target: scrapper
targets:
scrapper:
type: exponential
max-attempts: 5
min-delay: 1s
max-delay: 10s
codes: 500
codes: 429

bucket4j:
enabled: true
Expand All @@ -72,3 +72,17 @@ bucket4j:

rate-limiter:
whitelist: ${WHITELISTED_IPS:localhost}

management:
metrics:
tags:
application: ${spring.application.name}
server:
port: 8001
endpoints:
web:
exposure:
include: health, info, prometheus
path-mapping:
prometheus: /metrics
base-path: '/'
Loading