From 057ac9f57d58dec3c3e27fe253184bf5b346d3a7 Mon Sep 17 00:00:00 2001 From: Yusha D Date: Sun, 7 Apr 2024 16:34:38 +0400 Subject: [PATCH] Make hw8 --- .../bot/configuration/ApplicationConfig.java | 5 +- .../bot/configuration/KafkaConfiguration.java | 18 +++++ .../bot/service/KafkaUpdatesListener.java | 20 +++++ .../java/bot/service/RateLimiterService.java | 20 +++++ .../java/edu/java/bot/util/URLCreator.java | 18 +++++ bot/src/main/resources/application.yml | 29 ++++++- .../kafka/KafkaIntegrationEnvironment.java | 24 ++++++ .../bot/kafka/KafkaUpdatesListenerTest.java | 81 +++++++++++++++++++ compose.yml | 36 +++++++++ .../src/main/java/edu/java/RetryElement.java | 16 ---- .../src/main/java/edu/java/RetryFactory.java | 5 +- .../edu/java/RetryQueryConfiguration.java | 12 +++ .../builders/ExponentialRetryBuilder.java | 6 +- .../edu/java/builders/FixedRetryBuilder.java | 6 +- .../edu/java/builders/LinearRetryBuilder.java | 6 +- .../java/configuration/ApplicationConfig.java | 7 +- .../configuration/KafkaConfiguration.java | 18 +++++ .../edu/java/service/RateLimiterService.java | 20 +++++ .../edu/java/updates/LinkUpdateScheduler.java | 71 ++++++++++++++++ .../updates/sender/HttpLinkUpdateSender.java | 19 +++++ .../updates/sender/KafkaLinkUpdateSender.java | 22 +++++ .../java/updates/sender/LinkUpdateSender.java | 7 ++ scrapper/src/main/resources/application.yml | 40 ++++++--- .../java/scrapper/IntegrationEnvironment.java | 9 +++ .../github/GithubInfoSupplierTest.java | 7 +- .../StackOverflowInfoSupplierTest.java | 3 +- .../updates/KafkaLinkUpdateSenderTest.java | 61 ++++++++++++++ 27 files changed, 537 insertions(+), 49 deletions(-) create mode 100644 bot/src/main/java/edu/java/bot/configuration/KafkaConfiguration.java create mode 100644 bot/src/main/java/edu/java/bot/service/KafkaUpdatesListener.java create mode 100644 bot/src/main/java/edu/java/bot/service/RateLimiterService.java create mode 100644 bot/src/main/java/edu/java/bot/util/URLCreator.java create mode 100644 bot/src/test/java/edu/java/bot/kafka/KafkaIntegrationEnvironment.java create mode 100644 bot/src/test/java/edu/java/bot/kafka/KafkaUpdatesListenerTest.java delete mode 100644 retry/src/main/java/edu/java/RetryElement.java create mode 100644 scrapper/src/main/java/edu/java/configuration/KafkaConfiguration.java create mode 100644 scrapper/src/main/java/edu/java/service/RateLimiterService.java create mode 100644 scrapper/src/main/java/edu/java/updates/LinkUpdateScheduler.java create mode 100644 scrapper/src/main/java/edu/java/updates/sender/HttpLinkUpdateSender.java create mode 100644 scrapper/src/main/java/edu/java/updates/sender/KafkaLinkUpdateSender.java create mode 100644 scrapper/src/main/java/edu/java/updates/sender/LinkUpdateSender.java create mode 100644 scrapper/src/test/java/edu/java/scrapper/updates/KafkaLinkUpdateSenderTest.java diff --git a/bot/src/main/java/edu/java/bot/configuration/ApplicationConfig.java b/bot/src/main/java/edu/java/bot/configuration/ApplicationConfig.java index 977f29f..14ac44c 100644 --- a/bot/src/main/java/edu/java/bot/configuration/ApplicationConfig.java +++ b/bot/src/main/java/edu/java/bot/configuration/ApplicationConfig.java @@ -8,6 +8,9 @@ @ConfigurationProperties(prefix = "app", ignoreUnknownFields = false) public record ApplicationConfig( @NotEmpty - String telegramToken + String telegramToken, + KafkaConfigurationInfo kafkaConfigurationInfo ) { + public record KafkaConfigurationInfo(String topicName) { + } } diff --git a/bot/src/main/java/edu/java/bot/configuration/KafkaConfiguration.java b/bot/src/main/java/edu/java/bot/configuration/KafkaConfiguration.java new file mode 100644 index 0000000..069df1c --- /dev/null +++ b/bot/src/main/java/edu/java/bot/configuration/KafkaConfiguration.java @@ -0,0 +1,18 @@ +package edu.java.bot.configuration; + +import org.apache.kafka.clients.admin.NewTopic; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.config.TopicBuilder; + +@Configuration +public class KafkaConfiguration { + + @Bean + public NewTopic topic(ApplicationConfig config) { + return TopicBuilder.name(config.kafkaConfigurationInfo().topicName() + "_dlq") + .partitions(1) + .replicas(1) + .build(); + } +} diff --git a/bot/src/main/java/edu/java/bot/service/KafkaUpdatesListener.java b/bot/src/main/java/edu/java/bot/service/KafkaUpdatesListener.java new file mode 100644 index 0000000..b8e2928 --- /dev/null +++ b/bot/src/main/java/edu/java/bot/service/KafkaUpdatesListener.java @@ -0,0 +1,20 @@ +package edu.java.bot.service; + +import edu.java.bot.dto.request.LinkUpdate; +import lombok.RequiredArgsConstructor; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.annotation.RetryableTopic; +import org.springframework.kafka.retrytopic.DltStrategy; +import org.springframework.stereotype.Service; + +@Service +@RequiredArgsConstructor +public class KafkaUpdatesListener { + private final LinkUpdatesSenderService senderService; + + @KafkaListener(topics = "${app.kafka-configuration-info.topic-name}", groupId = "bot") + @RetryableTopic(attempts = "1", dltStrategy = DltStrategy.FAIL_ON_ERROR, dltTopicSuffix = "_dlq") + public void listenUpdates(LinkUpdate linkUpdate) { + senderService.sendLinkUpdate(linkUpdate); + } +} diff --git a/bot/src/main/java/edu/java/bot/service/RateLimiterService.java b/bot/src/main/java/edu/java/bot/service/RateLimiterService.java new file mode 100644 index 0000000..612cdb0 --- /dev/null +++ b/bot/src/main/java/edu/java/bot/service/RateLimiterService.java @@ -0,0 +1,20 @@ +package edu.java.bot.service; + +import java.util.List; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; + +@Service +public class RateLimiterService { + + private final List whitelist; + + public RateLimiterService(@Value("${rate-limiter.whitelist}") List whitelist) { + this.whitelist = whitelist; + } + + public boolean isSkipped(String ip) { + return whitelist.contains(ip); + } + +} diff --git a/bot/src/main/java/edu/java/bot/util/URLCreator.java b/bot/src/main/java/edu/java/bot/util/URLCreator.java new file mode 100644 index 0000000..a0c175d --- /dev/null +++ b/bot/src/main/java/edu/java/bot/util/URLCreator.java @@ -0,0 +1,18 @@ +package edu.java.bot.util; + +import java.net.MalformedURLException; +import java.net.URI; +import java.net.URL; + +public final class URLCreator { + private URLCreator() { + } + + public static URL createURL(String link) { + try { + return URI.create(link).toURL(); + } catch (MalformedURLException e) { + throw new RuntimeException(e); + } + } +} diff --git a/bot/src/main/resources/application.yml b/bot/src/main/resources/application.yml index 175d44a..d7b4e0a 100644 --- a/bot/src/main/resources/application.yml +++ b/bot/src/main/resources/application.yml @@ -1,5 +1,7 @@ app: - telegram-token: ${TELEGRAM_BOT_TOKEN} + telegram-token: ${TELEGRAM_API_KEY} + kafka-configuration-info: + topic-name: updates spring: application: @@ -13,6 +15,23 @@ spring: spec: maximumSize=100000,expireAfterAccess=3600s cache-names: - rate-limit-bucket + kafka: + consumer: + bootstrap-servers: localhost:29092 + group-id: bot + key-deserializer: org.apache.kafka.common.serialization.StringDeserializer + value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer + properties: + spring.json.trusted.packages: "*" + spring.json.value.default.type: edu.java.bot.dto.request.LinkUpdate + auto-offset-reset: earliest + producer: + key-serializer: org.apache.kafka.common.serialization.StringSerializer + value-serializer: org.springframework.kafka.support.serializer.JsonSerializer + properties: + spring.json.add.type.headers: false + bootstrap-servers: localhost:29092 + bootstrap-servers: localhost:29092 server: port: 8090 @@ -31,10 +50,10 @@ retry-query: retries: - target: scrapper type: exponential - max-attempts: 3 + max-attempts: 5 min-delay: 1s max-delay: 10s - codes: 429 + codes: 500 bucket4j: enabled: true @@ -49,3 +68,7 @@ bucket4j: unit: hours refill-speed: interval cache-key: getRemoteAddr() + skip-condition: '@rateLimiterService.isSkipped(getRemoteAddr())' + +rate-limiter: + whitelist: ${WHITELISTED_IPS:localhost} diff --git a/bot/src/test/java/edu/java/bot/kafka/KafkaIntegrationEnvironment.java b/bot/src/test/java/edu/java/bot/kafka/KafkaIntegrationEnvironment.java new file mode 100644 index 0000000..651618a --- /dev/null +++ b/bot/src/test/java/edu/java/bot/kafka/KafkaIntegrationEnvironment.java @@ -0,0 +1,24 @@ +package edu.java.bot.kafka; + +import org.springframework.test.context.DynamicPropertyRegistry; +import org.springframework.test.context.DynamicPropertySource; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; + +@Testcontainers +public class KafkaIntegrationEnvironment { + public static KafkaContainer KAFKA; + + static { + KAFKA = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.6.0")); + KAFKA.start(); + } + + @DynamicPropertySource + static void kafkaProperties(DynamicPropertyRegistry registry) { + registry.add("spring.kafka.bootstrap-servers", KAFKA::getBootstrapServers); + registry.add("spring.kafka.consumer.bootstrap-servers", KAFKA::getBootstrapServers); + registry.add("spring.kafka.producer.bootstrap-servers", KAFKA::getBootstrapServers); + } +} diff --git a/bot/src/test/java/edu/java/bot/kafka/KafkaUpdatesListenerTest.java b/bot/src/test/java/edu/java/bot/kafka/KafkaUpdatesListenerTest.java new file mode 100644 index 0000000..733b6cf --- /dev/null +++ b/bot/src/test/java/edu/java/bot/kafka/KafkaUpdatesListenerTest.java @@ -0,0 +1,81 @@ +package edu.java.bot.kafka; + + +import edu.java.bot.configuration.ApplicationConfig; +import edu.java.bot.dto.request.LinkUpdate; +import edu.java.bot.service.LinkUpdatesSenderService; +import edu.java.bot.util.URLCreator; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.kafka.KafkaProperties; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.mock.mockito.MockBean; +import org.springframework.kafka.core.KafkaTemplate; + +import java.time.Duration; +import java.util.List; +import java.util.Map; + +import static org.awaitility.Awaitility.await; + +@SpringBootTest +public class KafkaUpdatesListenerTest extends KafkaIntegrationEnvironment { + + @MockBean + private LinkUpdatesSenderService linkUpdatesSenderService; + + @Autowired + private KafkaTemplate kafkaTemplate; + + @Autowired + private ApplicationConfig config; + + @Autowired + private KafkaProperties kafkaProperties; + + @Test + public void listenUpdateCorrectTest() { + LinkUpdate linkUpdate = new LinkUpdate( + 1L, + URLCreator.createURL("https://github.com"), + "github", + List.of(1L), + Map.of() + ); + kafkaTemplate.send("updates", linkUpdate); + await() + .pollInterval(Duration.ofMillis(100)) + .atMost(Duration.ofSeconds(5)) + .untilAsserted(() -> Mockito.verify(linkUpdatesSenderService, Mockito.times(1)) + .sendLinkUpdate(linkUpdate)); + } + + @Test + public void listenUpdateInCorrectTest() { + LinkUpdate linkUpdate = new LinkUpdate( + 1L, + URLCreator.createURL("https://github.com"), + "github", + List.of(1L), + Map.of() + ); + Mockito.doThrow(RuntimeException.class).when(linkUpdatesSenderService).sendLinkUpdate(linkUpdate); + KafkaConsumer dlqKafkaConsumer = new KafkaConsumer<>( + kafkaProperties.buildConsumerProperties(null) + ); + dlqKafkaConsumer.subscribe(List.of(config.kafkaConfigurationInfo().topicName() + "_dlq")); + kafkaTemplate.send(config.kafkaConfigurationInfo().topicName(), linkUpdate); + await() + .pollInterval(Duration.ofMillis(100)) + .atMost(Duration.ofSeconds(10)) + .untilAsserted(() -> { + var values = dlqKafkaConsumer.poll(Duration.ofMillis(100)); + Assertions.assertThat(values).hasSize(1); + Assertions.assertThat(values.iterator().next().value()).isEqualTo(linkUpdate); + Mockito.verify(linkUpdatesSenderService).sendLinkUpdate(linkUpdate); + }); + } +} diff --git a/compose.yml b/compose.yml index eccec15..5cf9984 100644 --- a/compose.yml +++ b/compose.yml @@ -1,4 +1,38 @@ services: + zookeeper: + image: confluentinc/cp-zookeeper:7.6.0 + hostname: zookeeper + container_name: zookeeper + ports: + - "2181:2181" + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + ZOOKEEPER_SERVER_ID: 1 + volumes: + - zookeeper:/var/lib/zookeeper/data + - zookeeper:/var/lib/zookeeper/log + + kafka1: + image: confluentinc/cp-kafka:7.6.0 + hostname: kafka1 + container_name: kafka1 + ports: + - "29092:29092" + - "9092:9092" + environment: + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:9092, PLAINTEXT_HOST://localhost:29092 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT + KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_BROKER_ID: 1 + BOOTSTRAP_SERVERS: kafka1:9092 + KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO" + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + depends_on: + - zookeeper + volumes: + - kafka:/var/lib/kafka/data + postgresql: image: postgres:16 ports: @@ -30,6 +64,8 @@ services: volumes: postgresql: { } + kafka: { } + zookeeper: { } networks: backend: { } diff --git a/retry/src/main/java/edu/java/RetryElement.java b/retry/src/main/java/edu/java/RetryElement.java deleted file mode 100644 index 2a46f6b..0000000 --- a/retry/src/main/java/edu/java/RetryElement.java +++ /dev/null @@ -1,16 +0,0 @@ -package edu.java; - -import java.time.Duration; -import java.util.List; -import org.jetbrains.annotations.NotNull; - -public record RetryElement( - @NotNull String target, - @NotNull String type, - int maxAttempts, - double factor, - Duration minDelay, - Duration maxDelay, - List codes -) { -} diff --git a/retry/src/main/java/edu/java/RetryFactory.java b/retry/src/main/java/edu/java/RetryFactory.java index a7df46c..5089dcc 100644 --- a/retry/src/main/java/edu/java/RetryFactory.java +++ b/retry/src/main/java/edu/java/RetryFactory.java @@ -14,7 +14,8 @@ @UtilityClass public class RetryFactory { - private static final Map> RETRY_BUILDERS = new HashMap<>(); + private static final Map> RETRY_BUILDERS = + new HashMap<>(); static { RETRY_BUILDERS.put("fixed", new FixedRetryBuilder()); @@ -36,6 +37,6 @@ public static ExchangeFilterFunction createFilter(Retry retry) { public static Retry createRetry(RetryQueryConfiguration config, String target) { return config.retries().stream().filter(element -> element.target().equals(target)).findFirst() .map(element -> RETRY_BUILDERS.get(element.type()).apply(element)) - .orElseThrow(() -> new RuntimeException("Unknown target " + target)); + .orElseThrow(() -> new IllegalStateException("Unknown target " + target)); } } diff --git a/retry/src/main/java/edu/java/RetryQueryConfiguration.java b/retry/src/main/java/edu/java/RetryQueryConfiguration.java index b65789c..f45606c 100644 --- a/retry/src/main/java/edu/java/RetryQueryConfiguration.java +++ b/retry/src/main/java/edu/java/RetryQueryConfiguration.java @@ -1,10 +1,22 @@ package edu.java; +import java.time.Duration; import java.util.List; +import org.jetbrains.annotations.NotNull; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.validation.annotation.Validated; @Validated @ConfigurationProperties(prefix = "retry-query", ignoreUnknownFields = false) public record RetryQueryConfiguration(List retries) { + public record RetryElement( + @NotNull String target, + @NotNull String type, + int maxAttempts, + double factor, + Duration minDelay, + Duration maxDelay, + List codes + ) { + } } diff --git a/retry/src/main/java/edu/java/builders/ExponentialRetryBuilder.java b/retry/src/main/java/edu/java/builders/ExponentialRetryBuilder.java index 16de20d..5aff1f7 100644 --- a/retry/src/main/java/edu/java/builders/ExponentialRetryBuilder.java +++ b/retry/src/main/java/edu/java/builders/ExponentialRetryBuilder.java @@ -1,14 +1,14 @@ package edu.java.builders; import edu.java.ErrorFilterPredicate; -import edu.java.RetryElement; +import edu.java.RetryQueryConfiguration; import java.util.function.Function; import reactor.util.retry.Retry; import reactor.util.retry.RetryBackoffSpec; -public class ExponentialRetryBuilder implements Function { +public class ExponentialRetryBuilder implements Function { @Override - public Retry apply(RetryElement retryElement) { + public Retry apply(RetryQueryConfiguration.RetryElement retryElement) { return RetryBackoffSpec.backoff(retryElement.maxAttempts(), retryElement.minDelay()) .maxBackoff(retryElement.maxDelay()) .filter(new ErrorFilterPredicate(retryElement.codes())); diff --git a/retry/src/main/java/edu/java/builders/FixedRetryBuilder.java b/retry/src/main/java/edu/java/builders/FixedRetryBuilder.java index 676816b..d9e6724 100644 --- a/retry/src/main/java/edu/java/builders/FixedRetryBuilder.java +++ b/retry/src/main/java/edu/java/builders/FixedRetryBuilder.java @@ -1,14 +1,14 @@ package edu.java.builders; import edu.java.ErrorFilterPredicate; -import edu.java.RetryElement; +import edu.java.RetryQueryConfiguration; import java.util.function.Function; import reactor.util.retry.Retry; import reactor.util.retry.RetryBackoffSpec; -public class FixedRetryBuilder implements Function { +public class FixedRetryBuilder implements Function { @Override - public Retry apply(RetryElement retryElement) { + public Retry apply(RetryQueryConfiguration.RetryElement retryElement) { return RetryBackoffSpec.fixedDelay(retryElement.maxAttempts(), retryElement.minDelay()) .filter(new ErrorFilterPredicate(retryElement.codes())); } diff --git a/retry/src/main/java/edu/java/builders/LinearRetryBuilder.java b/retry/src/main/java/edu/java/builders/LinearRetryBuilder.java index 88206de..7b95b37 100644 --- a/retry/src/main/java/edu/java/builders/LinearRetryBuilder.java +++ b/retry/src/main/java/edu/java/builders/LinearRetryBuilder.java @@ -2,13 +2,13 @@ import edu.java.ErrorFilterPredicate; import edu.java.LinearRetryBackoffSpec; -import edu.java.RetryElement; +import edu.java.RetryQueryConfiguration; import java.util.function.Function; import reactor.util.retry.Retry; -public class LinearRetryBuilder implements Function { +public class LinearRetryBuilder implements Function { @Override - public Retry apply(RetryElement retryElement) { + public Retry apply(RetryQueryConfiguration.RetryElement retryElement) { return LinearRetryBackoffSpec.linear(retryElement.maxAttempts(), retryElement.minDelay()) .factor(retryElement.factor()).filter(new ErrorFilterPredicate(retryElement.codes())); } diff --git a/scrapper/src/main/java/edu/java/configuration/ApplicationConfig.java b/scrapper/src/main/java/edu/java/configuration/ApplicationConfig.java index a82d130..187886b 100644 --- a/scrapper/src/main/java/edu/java/configuration/ApplicationConfig.java +++ b/scrapper/src/main/java/edu/java/configuration/ApplicationConfig.java @@ -16,7 +16,9 @@ public record ApplicationConfig( @NotNull Scheduler scheduler, String githubToken, - AccessType databaseAccessType + AccessType databaseAccessType, + KafkaConfigurationInfo kafkaConfigurationInfo, + boolean useQueue ) { public record Scheduler(@NotNull boolean enable, @NotNull Duration interval, @NotNull Duration forceCheckDelay, int maxLinksPerCheck) { @@ -27,4 +29,7 @@ public enum AccessType { JPA, JOOQ } + + public record KafkaConfigurationInfo(String topicName) { + } } diff --git a/scrapper/src/main/java/edu/java/configuration/KafkaConfiguration.java b/scrapper/src/main/java/edu/java/configuration/KafkaConfiguration.java new file mode 100644 index 0000000..a762245 --- /dev/null +++ b/scrapper/src/main/java/edu/java/configuration/KafkaConfiguration.java @@ -0,0 +1,18 @@ +package edu.java.configuration; + +import org.apache.kafka.clients.admin.NewTopic; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.config.TopicBuilder; + +@Configuration +public class KafkaConfiguration { + + @Bean + public NewTopic topic(ApplicationConfig config) { + return TopicBuilder.name(config.kafkaConfigurationInfo().topicName()) + .partitions(1) + .replicas(1) + .build(); + } +} diff --git a/scrapper/src/main/java/edu/java/service/RateLimiterService.java b/scrapper/src/main/java/edu/java/service/RateLimiterService.java new file mode 100644 index 0000000..a47dc92 --- /dev/null +++ b/scrapper/src/main/java/edu/java/service/RateLimiterService.java @@ -0,0 +1,20 @@ +package edu.java.service; + +import java.util.List; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; + +@Service +public class RateLimiterService { + + private final List whitelist; + + public RateLimiterService(@Value("${rate-limiter.whitelist}") List whitelist) { + this.whitelist = whitelist; + } + + public boolean isSkipped(String ip) { + return whitelist.contains(ip); + } + +} diff --git a/scrapper/src/main/java/edu/java/updates/LinkUpdateScheduler.java b/scrapper/src/main/java/edu/java/updates/LinkUpdateScheduler.java new file mode 100644 index 0000000..0d6fd3a --- /dev/null +++ b/scrapper/src/main/java/edu/java/updates/LinkUpdateScheduler.java @@ -0,0 +1,71 @@ +package edu.java.updates; + +import edu.java.client.bot.BotClient; +import edu.java.client.bot.dto.request.LinkUpdate; +import edu.java.configuration.ApplicationConfig; +import edu.java.dto.Chat; +import edu.java.service.LinkService; +import edu.java.supplier.InfoSuppliers; +import edu.java.supplier.api.InfoSupplier; +import edu.java.supplier.api.LinkInfo; +import edu.java.updates.sender.LinkUpdateSender; +import java.util.List; +import lombok.RequiredArgsConstructor; +import lombok.extern.log4j.Log4j2; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +@Log4j2 +@Component +@RequiredArgsConstructor +public class LinkUpdateScheduler { + + private final LinkService linkService; + + private final ApplicationConfig applicationConfig; + + private final InfoSuppliers infoSuppliers; + + private final BotClient botClient; + + private final LinkUpdateSender linkUpdateSender; + + @Scheduled(fixedDelayString = "#{@'app-edu.java.configuration.ApplicationConfig'.scheduler.interval}") + public void update() { + log.info("Updating links start"); + linkService.getOldLinks( + applicationConfig.scheduler().forceCheckDelay(), + applicationConfig.scheduler().maxLinksPerCheck() + ) + .forEach(link -> { + log.info("Updating link {}", link); + String host = link.url().getHost(); + String domain = host.replaceAll("^(www\\.)?|\\.com$", ""); + InfoSupplier supplier = infoSuppliers.getSupplierByTypeHost(domain); + LinkInfo linkInfo = supplier.fetchInfo(link.url()); + if (linkInfo != null) { + linkInfo = supplier.filterByDateTime(linkInfo, link.lastUpdate(), link.metaInfo()); + + if (linkInfo.events().isEmpty()) { + linkService.checkNow(link.linkId()); + return; + } + linkService.update(link.linkId(), linkInfo.events().getFirst().lastUpdate(), linkInfo.metaInfo()); + List subscribers = linkService.getLinkSubscribers(link.url()).chats().stream() + .map(Chat::chatId) + .toList(); + linkInfo.events().reversed().forEach( + event -> linkUpdateSender.sendUpdate(new LinkUpdate( + link.linkId(), + link.url(), + event.typeEvent(), + subscribers, + event.eventData() + ))); + } + }); + log.info("Updating links ends"); + } + +} + diff --git a/scrapper/src/main/java/edu/java/updates/sender/HttpLinkUpdateSender.java b/scrapper/src/main/java/edu/java/updates/sender/HttpLinkUpdateSender.java new file mode 100644 index 0000000..0fc4c68 --- /dev/null +++ b/scrapper/src/main/java/edu/java/updates/sender/HttpLinkUpdateSender.java @@ -0,0 +1,19 @@ +package edu.java.updates.sender; + +import edu.java.client.bot.BotClient; +import edu.java.client.bot.dto.request.LinkUpdate; +import lombok.RequiredArgsConstructor; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Service; + +@RequiredArgsConstructor +@Service +@ConditionalOnProperty(name = "app.use-queue", havingValue = "false") +public class HttpLinkUpdateSender implements LinkUpdateSender { + private final BotClient botClient; + + @Override + public void sendUpdate(LinkUpdate linkUpdate) { + botClient.handleUpdate(linkUpdate); + } +} diff --git a/scrapper/src/main/java/edu/java/updates/sender/KafkaLinkUpdateSender.java b/scrapper/src/main/java/edu/java/updates/sender/KafkaLinkUpdateSender.java new file mode 100644 index 0000000..71e90db --- /dev/null +++ b/scrapper/src/main/java/edu/java/updates/sender/KafkaLinkUpdateSender.java @@ -0,0 +1,22 @@ +package edu.java.updates.sender; + +import edu.java.client.bot.dto.request.LinkUpdate; +import edu.java.configuration.ApplicationConfig; +import lombok.RequiredArgsConstructor; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.stereotype.Service; + +@RequiredArgsConstructor +@Service +@ConditionalOnProperty(name = "app.use-queue", havingValue = "true") +public class KafkaLinkUpdateSender implements LinkUpdateSender { + + private final KafkaTemplate kafkaTemplate; + private final ApplicationConfig config; + + @Override + public void sendUpdate(LinkUpdate linkUpdate) { + kafkaTemplate.send(config.kafkaConfigurationInfo().topicName(), linkUpdate); + } +} diff --git a/scrapper/src/main/java/edu/java/updates/sender/LinkUpdateSender.java b/scrapper/src/main/java/edu/java/updates/sender/LinkUpdateSender.java new file mode 100644 index 0000000..d5effd8 --- /dev/null +++ b/scrapper/src/main/java/edu/java/updates/sender/LinkUpdateSender.java @@ -0,0 +1,7 @@ +package edu.java.updates.sender; + +import edu.java.client.bot.dto.request.LinkUpdate; + +public interface LinkUpdateSender { + void sendUpdate(LinkUpdate linkUpdate); +} diff --git a/scrapper/src/main/resources/application.yml b/scrapper/src/main/resources/application.yml index cf9d148..705c064 100644 --- a/scrapper/src/main/resources/application.yml +++ b/scrapper/src/main/resources/application.yml @@ -1,11 +1,14 @@ app: scheduler: enable: true - interval: 20s - force-check-delay: 20s + interval: 10s + force-check-delay: 10s max-links-per-check: 100 github-token: ${GITHUB_TOKEN} database-access-type: jpa + kafka-configuration-info: + topic-name: updates + use-queue: true supplier: github: @@ -38,6 +41,16 @@ spring: cache-names: - rate-limit-bucket + kafka: + bootstrap-servers: localhost:29092 + producer: + key-serializer: org.apache.kafka.common.serialization.StringSerializer + value-serializer: org.springframework.kafka.support.serializer.JsonSerializer + bootstrap-servers: localhost:29092 + properties: + spring.json.add.type.headers: false + + server: port: 8080 @@ -55,22 +68,21 @@ retry-query: retries: - target: github type: exponential - max-attempts: 3 + max-attempts: 5 min-delay: 1s max-delay: 10s - codes: 429 + codes: 500 - target: stackoverflow - type: exponential - max-attempts: 3 + type: fixed + max-attempts: 5 min-delay: 1s max-delay: 10s - codes: 429 + codes: 500 - target: bot - type: exponential - max-attempts: 3 - min-delay: 1s - max-delay: 10s - codes: 429 + type: linear + max-attempts: 5 + min-delay: 5s + codes: 500 bucket4j: enabled: true @@ -85,3 +97,7 @@ bucket4j: unit: hours refill-speed: interval cache-key: getRemoteAddr() + skip-condition: '@rateLimiterService.isSkipped(getRemoteAddr())' + +rate-limiter: + whitelist: ${WHITELISTED_IPS:localhost} diff --git a/scrapper/src/test/java/edu/java/scrapper/IntegrationEnvironment.java b/scrapper/src/test/java/edu/java/scrapper/IntegrationEnvironment.java index a54fc45..dfbe00b 100644 --- a/scrapper/src/test/java/edu/java/scrapper/IntegrationEnvironment.java +++ b/scrapper/src/test/java/edu/java/scrapper/IntegrationEnvironment.java @@ -14,13 +14,16 @@ import org.springframework.test.context.DynamicPropertyRegistry; import org.springframework.test.context.DynamicPropertySource; import org.testcontainers.containers.JdbcDatabaseContainer; +import org.testcontainers.containers.KafkaContainer; import org.testcontainers.containers.PostgreSQLContainer; import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; @Testcontainers @DirtiesContext public abstract class IntegrationEnvironment { public static PostgreSQLContainer POSTGRES; + public static KafkaContainer KAFKA; static { POSTGRES = new PostgreSQLContainer<>("postgres:16") @@ -30,6 +33,9 @@ public abstract class IntegrationEnvironment { POSTGRES.start(); runMigrations(POSTGRES); + + KAFKA = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.6.0")); + KAFKA.start(); } @SneakyThrows private static void runMigrations(JdbcDatabaseContainer container) { @@ -57,5 +63,8 @@ static void jdbcProperties(DynamicPropertyRegistry registry) { registry.add("spring.datasource.url", POSTGRES::getJdbcUrl); registry.add("spring.datasource.username", POSTGRES::getUsername); registry.add("spring.datasource.password", POSTGRES::getPassword); + registry.add("spring.kafka.bootstrap-servers", KAFKA::getBootstrapServers); + registry.add("spring.kafka.producer.bootstrap-servers", KAFKA::getBootstrapServers); + registry.add("app.use-queue", () -> true); } } diff --git a/scrapper/src/test/java/edu/java/scrapper/supplier/github/GithubInfoSupplierTest.java b/scrapper/src/test/java/edu/java/scrapper/supplier/github/GithubInfoSupplierTest.java index 8ec94cd..9326d4f 100644 --- a/scrapper/src/test/java/edu/java/scrapper/supplier/github/GithubInfoSupplierTest.java +++ b/scrapper/src/test/java/edu/java/scrapper/supplier/github/GithubInfoSupplierTest.java @@ -1,7 +1,6 @@ package edu.java.scrapper.supplier.github; import com.github.tomakehurst.wiremock.WireMockServer; -import edu.java.RetryElement; import edu.java.RetryQueryConfiguration; import edu.java.configuration.ApplicationConfig; import edu.java.configuration.supplier.GithubConfig; @@ -28,11 +27,13 @@ public class GithubInfoSupplierTest { private static final ApplicationConfig NULL_APPLICATION_CONFIG = new ApplicationConfig( null, null, - null + null, + null, + false ); private static final RetryQueryConfiguration RETRY_QUERY_CONFIGURATION = new RetryQueryConfiguration( - List.of(new RetryElement( + List.of(new RetryQueryConfiguration.RetryElement( "github", "fixed", 1, diff --git a/scrapper/src/test/java/edu/java/scrapper/supplier/stackoverflow/StackOverflowInfoSupplierTest.java b/scrapper/src/test/java/edu/java/scrapper/supplier/stackoverflow/StackOverflowInfoSupplierTest.java index fdbcb02..c39cfe0 100644 --- a/scrapper/src/test/java/edu/java/scrapper/supplier/stackoverflow/StackOverflowInfoSupplierTest.java +++ b/scrapper/src/test/java/edu/java/scrapper/supplier/stackoverflow/StackOverflowInfoSupplierTest.java @@ -2,7 +2,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.github.tomakehurst.wiremock.WireMockServer; -import edu.java.RetryElement; import edu.java.RetryQueryConfiguration; import edu.java.configuration.supplier.StackOverflowConfig; import edu.java.configuration.supplier.StackOverflowPatternConfig; @@ -25,7 +24,7 @@ public class StackOverflowInfoSupplierTest { private static WireMockServer server; private static final RetryQueryConfiguration RETRY_QUERY_CONFIGURATION = new RetryQueryConfiguration( - List.of(new RetryElement( + List.of(new RetryQueryConfiguration.RetryElement( "stackoverflow", "fixed", 1, diff --git a/scrapper/src/test/java/edu/java/scrapper/updates/KafkaLinkUpdateSenderTest.java b/scrapper/src/test/java/edu/java/scrapper/updates/KafkaLinkUpdateSenderTest.java new file mode 100644 index 0000000..d2fc82b --- /dev/null +++ b/scrapper/src/test/java/edu/java/scrapper/updates/KafkaLinkUpdateSenderTest.java @@ -0,0 +1,61 @@ +package edu.java.scrapper.updates; + +import edu.java.client.bot.dto.request.LinkUpdate; +import edu.java.configuration.ApplicationConfig; +import edu.java.scrapper.IntegrationEnvironment; +import edu.java.updates.sender.KafkaLinkUpdateSender; +import edu.java.updates.sender.LinkUpdateSender; +import edu.java.util.URLCreator; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.kafka.core.KafkaTemplate; +import java.time.Duration; +import java.util.List; +import java.util.Map; +import static org.awaitility.Awaitility.await; + +@SpringBootTest +public class KafkaLinkUpdateSenderTest extends IntegrationEnvironment { + + @Autowired + private KafkaTemplate kafkaTemplate; + + @Autowired + private ApplicationConfig config; + + @Test + public void sendUpdateTest() { + LinkUpdateSender linkUpdateSender = new KafkaLinkUpdateSender(kafkaTemplate, config); + LinkUpdate linkUpdate = new LinkUpdate( + 1L, + URLCreator.createURL("https://github.com"), + "github", + List.of(1L), + Map.of() + ); + KafkaConsumer kafkaConsumer = new KafkaConsumer<>( + Map.of( + "bootstrap.servers", KAFKA.getBootstrapServers(), + "group.id", "scrapper", + "key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer", + "value.deserializer", "org.springframework.kafka.support.serializer.JsonDeserializer", + "properties.spring.json.trusted.packages", "*", + "spring.json.value.default.type", "edu.java.client.bot.dto.request.LinkUpdate", + "auto.offset.reset", "earliest" + ) + ); + kafkaConsumer.subscribe(List.of(config.kafkaConfigurationInfo().topicName())); + linkUpdateSender.sendUpdate(linkUpdate); + await() + .pollInterval(Duration.ofMillis(100)) + .atMost(Duration.ofSeconds(5)) + .untilAsserted(() -> { + var records = kafkaConsumer.poll(Duration.ofMillis(100)); + Assertions.assertThat(records).hasSize(1); + Assertions.assertThat(records.iterator().next().value()).isEqualTo(linkUpdate); + }); + } +}