From 0cd11e73a9f80135f1f0d9142298b78ea6e60db7 Mon Sep 17 00:00:00 2001 From: Florent Youinou Date: Sat, 9 Dec 2017 16:59:19 +0100 Subject: [PATCH] crawler: add delay function --- src/main/java/com/epita/guereza/App.java | 10 ++++++++++ src/main/java/com/epita/guereza/CrawlerApp.java | 16 ++++++++++------ src/main/java/com/epita/guereza/IndexerApp.java | 14 ++++---------- 3 files changed, 24 insertions(+), 16 deletions(-) diff --git a/src/main/java/com/epita/guereza/App.java b/src/main/java/com/epita/guereza/App.java index d15bcc2..df18d7f 100644 --- a/src/main/java/com/epita/guereza/App.java +++ b/src/main/java/com/epita/guereza/App.java @@ -7,6 +7,9 @@ import org.slf4j.LoggerFactory; import java.util.UUID; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; public abstract class App { private static final Logger LOGGER = LoggerFactory.getLogger(App.class); @@ -38,4 +41,11 @@ public void sendMessage(final String channel, final Object obj) { LOGGER.error("Impossible to send message: {}", e.getMessage()); } } + + protected void retryIn(final int seconds, Runnable consumer) { + LOGGER.info("Retry fetching url in {}seconds", seconds); + final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); + executor.schedule(consumer, seconds, TimeUnit.SECONDS); + executor.shutdownNow(); + } } diff --git a/src/main/java/com/epita/guereza/CrawlerApp.java b/src/main/java/com/epita/guereza/CrawlerApp.java index dfd32b7..0d5d526 100644 --- a/src/main/java/com/epita/guereza/CrawlerApp.java +++ b/src/main/java/com/epita/guereza/CrawlerApp.java @@ -38,12 +38,16 @@ private void storeUrls(final String[] urls) { @Override public void run() { - eventBus.subscribe(subscribeUrl, c -> { - System.out.println(c.getContent()); - final String[] urls = crawlAndExtract(c.getContent()); - storeUrls(urls); - - requestNextUrl(); + eventBus.subscribe(subscribeUrl, msg -> { + if (msg != null) { + LOGGER.info("Receive url: {}", msg.getContent()); + final String[] urls = crawlAndExtract(msg.getContent()); + storeUrls(urls); + + requestNextUrl(); + } else { + retryIn(30, this::requestNextUrl); + } }); requestNextUrl(); diff --git a/src/main/java/com/epita/guereza/IndexerApp.java b/src/main/java/com/epita/guereza/IndexerApp.java index 08f1b01..e26f38b 100644 --- a/src/main/java/com/epita/guereza/IndexerApp.java +++ b/src/main/java/com/epita/guereza/IndexerApp.java @@ -11,6 +11,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; public class IndexerApp extends App { private static final Logger LOGGER = LoggerFactory.getLogger(IndexerApp.class); @@ -44,24 +45,17 @@ private void requestNextUrl() { sendMessage("/request/indexer/url", subscribeUrl); } - private void retryIn(final int seconds) { - LOGGER.info("Retry fetching url in {}seconds", seconds); - final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); - executor.schedule(this::requestNextUrl, seconds, TimeUnit.SECONDS); - executor.shutdownNow(); - } @Override public void run() { eventBus.subscribe(subscribeUrl, msg -> { if (msg != null) { - final String url = msg.getContent(); - LOGGER.info("Receive url: {}", url); - indexAndPublish(url); + LOGGER.info("Receive url: {}", msg.getContent()); + indexAndPublish(msg.getContent()); requestNextUrl(); } else { - retryIn(30); + retryIn(30, this::requestNextUrl); } });