Skip to content
This repository has been archived by the owner on Jun 2, 2019. It is now read-only.

Commit

Permalink
Add UrlStore reducer
Browse files Browse the repository at this point in the history
  • Loading branch information
dethi committed Dec 9, 2017
1 parent a380487 commit 1cb4681
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 51 deletions.
10 changes: 4 additions & 6 deletions src/main/java/com/epita/guereza/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,21 @@
import com.epita.guereza.service.CrawlerService;
import com.epita.guereza.service.indexer.IndexerService;
import com.epita.winter.Scope;
import com.epita.winter.provider.LazySingleton;
import com.epita.winter.provider.Prototype;
import com.epita.winter.provider.Singleton;
import com.fasterxml.jackson.core.JsonProcessingException;

import java.lang.reflect.Method;
import java.util.Map;
import java.util.function.Function;

import static com.epita.winter.Scope.getMethod;

public class Main {
private static final String NETTY_HOST = "localhost";
private static final int NETTY_PORT = 8000;

public static void main(String[] args) {
// final Index index = new Index();
// final Repo repo = new RepoStore();
// final Repo repo = new UrlStore();
//
// //repo.store(new String[]{"https://www.bbc.co.uk/food/recipes/saladenicoise_6572"});
//
Expand All @@ -44,17 +42,17 @@ public static void main(String[] args) {
private static void testApp() {
final Crawler crawler = new CrawlerService();
final Indexer indexer = new IndexerService();
final Repo repo = new RepoStore();

final Function<Scope, EventBusClient> newEventBus = (s) -> new NettyEventBusClient();
final Function<Scope, CrawlerApp> newCrawlerApp = (s) -> new CrawlerApp(s.instanceOf(EventBusClient.class), s.instanceOf(Crawler.class));
final Function<Scope, IndexerApp> newIndexerApp = (s) -> new IndexerApp(s.instanceOf(EventBusClient.class), s.instanceOf(Indexer.class),
s.instanceOf(Crawler.class));
final Function<Scope, Repo> newRepo = (s) -> new UrlStore(s.instanceOf(EventBusClient.class));

new Scope()
.register(new Singleton<>(Crawler.class, crawler))
.register(new Singleton<>(Indexer.class, indexer))
.register(new Singleton<>(Repo.class, repo))
.register(new LazySingleton<>(Repo.class, newRepo))
.register(new Prototype<>(EventBusClient.class, newEventBus))
.register(new Prototype<>(CrawlerApp.class, newCrawlerApp))
.register(new Prototype<>(IndexerApp.class, newIndexerApp))
Expand Down
45 changes: 0 additions & 45 deletions src/main/java/com/epita/guereza/RepoStore.java

This file was deleted.

88 changes: 88 additions & 0 deletions src/main/java/com/epita/guereza/UrlStore.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package com.epita.guereza;

import com.epita.eventbus.EventBusClient;
import com.epita.eventbus.EventMessage;
import com.epita.guereza.eventsourcing.Event;
import com.epita.guereza.eventsourcing.Reducer;
import com.epita.guereza.service.CrawlerService;
import com.fasterxml.jackson.core.JsonProcessingException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.LinkedHashSet;
import java.util.Set;

public class UrlStore implements Repo, Reducer {
private static final Logger LOGGER = LoggerFactory.getLogger(CrawlerService.class);

private final EventBusClient eventBus;

private Set<String> urlDone = new LinkedHashSet<>();
private Set<String> urlTodo = new LinkedHashSet<>();

public UrlStore(final EventBusClient eventBus) {
this.eventBus = eventBus;
}

@Override
public void store(String[] urls) {
for (String url : urls) {
if (url == null || url.isEmpty())
continue;

if (!urlDone.contains(url))
urlTodo.add(url);
}
}

@Override
public String nextUrl() {
if (!urlTodo.isEmpty()) {
// There is still
String url = urlTodo.iterator().next();
urlTodo.remove(url);
urlDone.add(url);
LOGGER.info("Repo still contains {} links", urlTodo.size());
return url;
}
LOGGER.warn("No more url to analyse.");
return null;
}

@SuppressWarnings("unchecked")
@Override
public void reduce(final Event<?> event) {
switch (event.type) {
case "ADD_URLS":
addUrls((Event<String[]>) event);
break;
case "CRAWLER_REQUEST_URL":
crawlerRequestUrl((Event<String>) event);
break;
case "INDEXER_REQUEST_URL":
indexerRequestUrl((Event<String>) event);
break;
}
}

private void addUrls(Event<String[]> event) {
store(event.obj);
LOGGER.info("added URLs to the repo");
}

private void crawlerRequestUrl(Event<String> event) {
try {
eventBus.publish(new EventMessage(event.obj, nextUrl()));
} catch (JsonProcessingException e) {
LOGGER.error("cannot serialize: {}", e.getMessage());
}
}

private void indexerRequestUrl(Event<String> event) {
try {
eventBus.publish(new EventMessage(event.obj, nextUrl()));
} catch (JsonProcessingException e) {
LOGGER.error("cannot serialize: {}", e.getMessage());
}
}
}

0 comments on commit 1cb4681

Please sign in to comment.