Skip to content

Commit

Permalink
several bug fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
ldsimonassi committed Feb 2, 2012
1 parent 49eed93 commit bd80494
Show file tree
Hide file tree
Showing 19 changed files with 92 additions and 155 deletions.
13 changes: 12 additions & 1 deletion node-js-server/node-drpc-server.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ Array.prototype.display = function() {
var topology_timeout= 2000;
var claim_timeout= 2000;
var base_port = 8080;
var content_type = 'application/json; charset=utf-8';

// Metrics
var total_active_tasks = 0;
Expand Down Expand Up @@ -106,6 +107,12 @@ if(process.argv.length>5) {
console.log("No baseport provided, using default "+base_port);
}

if(process.argv.length>6) {
content_type = process.argv[6];
} else {
console.log("No content encoding provided, using default "+content_type);
}


var report = function () {
var d= new Date();
Expand All @@ -115,6 +122,7 @@ var report = function () {
console.log("* Topology TO: "+ topology_timeout);
console.log("* Claim TO: "+ claim_timeout);
console.log("* Base Port: " + base_port);
console.log("* Content-type: " + content_type);
console.log("************* WORK *************");
console.log("* Total requests made: "+ total_requests_made);
console.log("* Total requests answered: "+ total_requests_answered);
Expand Down Expand Up @@ -201,13 +209,16 @@ http.createServer(function (request, response) {
});

request.on("end", function() {
active_tasks[id].response.writeHead(200, {
'Content-Type' : content_type
});

active_tasks[id].response.end(data);
delete active_tasks[id]
total_active_tasks = total_active_tasks - 1;
response.end("OK!\n");
total_requests_answered = total_requests_answered + 1;
});

}
}
}).listen(base_port+2);
Expand Down
2 changes: 1 addition & 1 deletion prepare-test-environment.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ nohup node ./src/test/node/mock-api-server.js 1> ./logs/mock-items-api-server.lo

# Start the NewsFeed DRPC Server
# Timeouts are set to 10 seconds so, because indexing can take longer.
nohup node ./node-js-server/node-drpc-server.js localhost 10000 10000 9090 1> ./logs/news-feed-drpc-server.log 2>./logs/news-feed-drpc-server.err &
nohup node ./node-js-server/node-drpc-server.js localhost 10000 10000 9090 text 1> ./logs/news-feed-drpc-server.log 2>./logs/news-feed-drpc-server.err &

# Start the queries DRPC Server
# Timeouts are set to 2 seconds, because queries are expected to return fast!.
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/search/LocalTopologyStarter.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@
public class LocalTopologyStarter {

public static void main(String[] args) {
//java.util.logging.Logger.getLogger("httpclient").setLevel(java.util.logging.Level.INFO);
Logger.getRootLogger().setLevel(Level.ERROR);
Logger.getLogger("org.apache.http.wire").setLevel(Level.ERROR);
//java.util.logging.Logger.getLogger("httpclient").setLevel(java.util.logging.Level.INFO);
LocalCluster cluster = new LocalCluster();
StormTopology topology = SearchEngineTopologyStarter.createTopology();
Config conf = SearchEngineTopologyStarter.createConf("127.0.0.1:8081", "127.0.0.1:9091", "127.0.0.1:8888", 10);
Expand Down
1 change: 1 addition & 0 deletions src/main/java/search/QueriesSpout.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ public void nextTuple() {
if(id==null || query==null)
break;
else {
query = query.substring(1);
// I don't send the message id object, so I disable the ackers mechanism
collector.emit(new Values(origin, id, query));
}
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/search/ReadItemDataBolt.java
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,11 @@ public void execute(Tuple input) {
Item i;
try {
i = readItem(itemId);
System.out.println("Item readed "+ itemId+" ["+i+"]");
if(i==null) {
collector.emit(new Values(origin, requestId, itemId, null));
} else {
collector.emit(new Values(origin, requestId, itemId, su.toByteArray(i)));
collector.emit(new Values(origin, requestId, itemId, su.itemToByteArray(i)));
}
} catch (Exception e) {
e.printStackTrace();
Expand Down
45 changes: 31 additions & 14 deletions src/main/java/search/SearchBucketBolt.java
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package search;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;

import search.model.Item;
import search.model.ItemsDao;
import search.model.ItemsShard;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
Expand All @@ -28,6 +29,8 @@ public class SearchBucketBolt implements IRichBolt {
SerializationUtils su;
ItemsShard shard;



@Override
public void prepare(@SuppressWarnings("rawtypes") Map stormConf,
TopologyContext context,
Expand All @@ -50,11 +53,11 @@ private boolean isMine(int itemId) {
@Override
public void execute(Tuple input) {
if(input.getSourceComponent().equals("read-item-data")){

String origin= input.getString(0);
String requestId= input.getString(1);
//String origin= input.getString(0);
//String requestId= input.getString(1);
int itemId= input.getInteger(2);
if(isMine(itemId)){
System.out.println("Mine! "+currentShard+"/"+totalShards);
byte[] ba = input.getBinary(3);
if(ba==null) {
System.out.println("Removing item id:"+itemId);
Expand All @@ -73,21 +76,31 @@ public void execute(Tuple input) {
String requestId= input.getString(1);
String query= input.getString(2);

// Execute query with local data scope
List<Item> results= executeLocalQuery(query);

// Execute query with local data scope
List<Item> results= executeLocalQuery(query, 5);
System.out.println("Searching ["+ query +"] in shard "+currentShard +" "+results.size()+" results found");
// Send data to next step: Merger
collector.emit(new Values(origin, requestId, query, su.toByteArray(results)));
}

private List<Item> executeLocalQuery(String query) {
// TODO REMOVE
ArrayList<Item> list= new ArrayList<Item>();
int size= (int)(Math.random()*10);
for (int i = 0; i < size; i++) {
list.add(new Item(base_id++, ItemsDao.getRandomTitle(), Math.random()*1000));
}
return list;
private List<Item> executeLocalQuery(String query, int quantity) {
List<Item> items= new ArrayList<Item>(shard.getItemsContainingWords(query));

Collections.sort(items, new Comparator<Item>() {
@Override
public int compare(Item o1, Item o2) {
double diff= o1.price-o2.price;
if(diff>0)
return 1;
else
return -1;
}
});

if(items.size()>quantity)
items = items.subList(0, quantity-1);
return items;
}

@Override
Expand All @@ -98,4 +111,8 @@ public void cleanup() {
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("origin", "requestId", "query", "shardMatches"));
}




}
12 changes: 3 additions & 9 deletions src/main/java/search/SerializationUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,9 @@ public byte[] itemToByteArray(Item itm) {
ByteArrayOutputStream out = new ByteArrayOutputStream();
try{
Packer packer = msgpack.createPacker(out);
packer.write(itm);
packer.write(itm);
packer.write(itm);
packer.write(itm.price);
packer.write(itm.title);
packer.write(itm.id);

return out.toByteArray();
} catch (Exception ex) {
Expand Down Expand Up @@ -117,10 +117,4 @@ public List<Item> fromByteArray(byte[] binary) {
}
}
}

public Object toByteArray(Item i) {
// TODO Auto-generated method stub
return null;
}

}
92 changes: 0 additions & 92 deletions src/main/java/search/model/ItemsDao.java

This file was deleted.

8 changes: 7 additions & 1 deletion src/main/java/search/model/ItemsShard.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package search.model;

import java.util.ArrayList;
import java.util.Collections;
import java.util.ConcurrentModificationException;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -91,13 +92,18 @@ private List<String> getItemWords(Item i) {
}



Set<Item> emptySet= Collections.emptySet();
/**
* This method is highly concurrent
* @param word
* @return
*/
public Set<Item> getItemsContainingWord(String word) {
Set<Item> items= index.get(word);
if(items==null){
return emptySet;
}
//System.out.println("\tWord: ["+word +"] res:"+items.size());
return items;
}
Expand All @@ -107,7 +113,7 @@ public Set<Item> getItemsContainingWords(String words){
//System.out.println("Query: ["+words+"]");

StringTokenizer strTok= new StringTokenizer(words, "-", false);
HashSet<Item> result= null;
HashSet<Item> result= new HashSet<Item>();
boolean first= true;
while(strTok.hasMoreTokens()){
String word= strTok.nextToken();
Expand Down
3 changes: 2 additions & 1 deletion src/test/groovy/AbstractStormTest.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public abstract class AbstractStormTest extends Assert {
toSend['title'] = title
toSend['price'] = price

println "Posting item [${document}] [${toSend}]"
def resp= itemsApiClient.post(path : document,
body: toSend,
requestContentType: ContentType.JSON)
Expand All @@ -76,7 +77,7 @@ public abstract class AbstractStormTest extends Assert {
}

public Object searchApi(String query) {
def document = "/${query}.json"
def document = "/${query}"
def resp = searchEngineApiClient.get(path:document)

assertEquals(200, resp.status)
Expand Down
2 changes: 1 addition & 1 deletion src/test/groovy/PreparationTest.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ class PreparationTest extends AbstractStormTest {

@Test
public void searchEngineExists() {
def resp = searchApi('/mp3')
def resp = searchApi('mp3')
}

@Test
Expand Down
Loading

0 comments on commit bd80494

Please sign in to comment.