From bd804945e39e1a8cda37c182f233a0e424aa6716 Mon Sep 17 00:00:00 2001 From: Luis Dario Simonassi Date: Thu, 2 Feb 2012 01:05:24 -0300 Subject: [PATCH] several bug fixes --- node-js-server/node-drpc-server.js | 13 ++- prepare-test-environment.sh | 2 +- .../java/search/LocalTopologyStarter.java | 3 +- src/main/java/search/QueriesSpout.java | 1 + src/main/java/search/ReadItemDataBolt.java | 3 +- src/main/java/search/SearchBucketBolt.java | 45 ++++++--- src/main/java/search/SerializationUtils.java | 12 +-- src/main/java/search/model/ItemsDao.java | 92 ------------------- src/main/java/search/model/ItemsShard.java | 8 +- src/test/groovy/AbstractStormTest.groovy | 3 +- src/test/groovy/PreparationTest.groovy | 2 +- src/test/groovy/SearchTest.groovy | 25 +---- src/test/resources/0.json | 4 +- src/test/resources/1.json | 4 +- src/test/resources/2.json | 4 +- src/test/resources/3.json | 4 +- src/test/resources/4.json | 4 +- src/test/resources/5.json | 4 +- src/test/resources/publish.sh | 14 +++ 19 files changed, 92 insertions(+), 155 deletions(-) delete mode 100644 src/main/java/search/model/ItemsDao.java create mode 100755 src/test/resources/publish.sh diff --git a/node-js-server/node-drpc-server.js b/node-js-server/node-drpc-server.js index 0ca4d15..740de5b 100644 --- a/node-js-server/node-drpc-server.js +++ b/node-js-server/node-drpc-server.js @@ -36,6 +36,7 @@ Array.prototype.display = function() { var topology_timeout= 2000; var claim_timeout= 2000; var base_port = 8080; +var content_type = 'application/json; charset=utf-8'; // Metrics var total_active_tasks = 0; @@ -106,6 +107,12 @@ if(process.argv.length>5) { console.log("No baseport provided, using default "+base_port); } +if(process.argv.length>6) { + content_type = process.argv[6]; +} else { + console.log("No content encoding provided, using default "+content_type); +} + var report = function () { var d= new Date(); @@ -115,6 +122,7 @@ var report = function () { console.log("* Topology TO: "+ topology_timeout); console.log("* Claim TO: "+ claim_timeout); console.log("* Base Port: " + base_port); + console.log("* Content-type: " + content_type); console.log("************* WORK *************"); console.log("* Total requests made: "+ total_requests_made); console.log("* Total requests answered: "+ total_requests_answered); @@ -201,13 +209,16 @@ http.createServer(function (request, response) { }); request.on("end", function() { + active_tasks[id].response.writeHead(200, { + 'Content-Type' : content_type + }); + active_tasks[id].response.end(data); delete active_tasks[id] total_active_tasks = total_active_tasks - 1; response.end("OK!\n"); total_requests_answered = total_requests_answered + 1; }); - } } }).listen(base_port+2); diff --git a/prepare-test-environment.sh b/prepare-test-environment.sh index edef20c..09ed6de 100755 --- a/prepare-test-environment.sh +++ b/prepare-test-environment.sh @@ -9,7 +9,7 @@ nohup node ./src/test/node/mock-api-server.js 1> ./logs/mock-items-api-server.lo # Start the NewsFeed DRPC Server # Timeouts are set to 10 seconds so, because indexing can take longer. -nohup node ./node-js-server/node-drpc-server.js localhost 10000 10000 9090 1> ./logs/news-feed-drpc-server.log 2>./logs/news-feed-drpc-server.err & +nohup node ./node-js-server/node-drpc-server.js localhost 10000 10000 9090 text 1> ./logs/news-feed-drpc-server.log 2>./logs/news-feed-drpc-server.err & # Start the queries DRPC Server # Timeouts are set to 2 seconds, because queries are expected to return fast!. diff --git a/src/main/java/search/LocalTopologyStarter.java b/src/main/java/search/LocalTopologyStarter.java index 2dff59d..ca63e7f 100644 --- a/src/main/java/search/LocalTopologyStarter.java +++ b/src/main/java/search/LocalTopologyStarter.java @@ -12,8 +12,9 @@ public class LocalTopologyStarter { public static void main(String[] args) { - //java.util.logging.Logger.getLogger("httpclient").setLevel(java.util.logging.Level.INFO); Logger.getRootLogger().setLevel(Level.ERROR); + Logger.getLogger("org.apache.http.wire").setLevel(Level.ERROR); + //java.util.logging.Logger.getLogger("httpclient").setLevel(java.util.logging.Level.INFO); LocalCluster cluster = new LocalCluster(); StormTopology topology = SearchEngineTopologyStarter.createTopology(); Config conf = SearchEngineTopologyStarter.createConf("127.0.0.1:8081", "127.0.0.1:9091", "127.0.0.1:8888", 10); diff --git a/src/main/java/search/QueriesSpout.java b/src/main/java/search/QueriesSpout.java index 372ed3a..b0b57ec 100644 --- a/src/main/java/search/QueriesSpout.java +++ b/src/main/java/search/QueriesSpout.java @@ -79,6 +79,7 @@ public void nextTuple() { if(id==null || query==null) break; else { + query = query.substring(1); // I don't send the message id object, so I disable the ackers mechanism collector.emit(new Values(origin, id, query)); } diff --git a/src/main/java/search/ReadItemDataBolt.java b/src/main/java/search/ReadItemDataBolt.java index b7a1e79..4371255 100644 --- a/src/main/java/search/ReadItemDataBolt.java +++ b/src/main/java/search/ReadItemDataBolt.java @@ -96,10 +96,11 @@ public void execute(Tuple input) { Item i; try { i = readItem(itemId); + System.out.println("Item readed "+ itemId+" ["+i+"]"); if(i==null) { collector.emit(new Values(origin, requestId, itemId, null)); } else { - collector.emit(new Values(origin, requestId, itemId, su.toByteArray(i))); + collector.emit(new Values(origin, requestId, itemId, su.itemToByteArray(i))); } } catch (Exception e) { e.printStackTrace(); diff --git a/src/main/java/search/SearchBucketBolt.java b/src/main/java/search/SearchBucketBolt.java index 4147676..f749601 100644 --- a/src/main/java/search/SearchBucketBolt.java +++ b/src/main/java/search/SearchBucketBolt.java @@ -1,11 +1,12 @@ package search; import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; import java.util.List; import java.util.Map; import search.model.Item; -import search.model.ItemsDao; import search.model.ItemsShard; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; @@ -28,6 +29,8 @@ public class SearchBucketBolt implements IRichBolt { SerializationUtils su; ItemsShard shard; + + @Override public void prepare(@SuppressWarnings("rawtypes") Map stormConf, TopologyContext context, @@ -50,11 +53,11 @@ private boolean isMine(int itemId) { @Override public void execute(Tuple input) { if(input.getSourceComponent().equals("read-item-data")){ - - String origin= input.getString(0); - String requestId= input.getString(1); + //String origin= input.getString(0); + //String requestId= input.getString(1); int itemId= input.getInteger(2); if(isMine(itemId)){ + System.out.println("Mine! "+currentShard+"/"+totalShards); byte[] ba = input.getBinary(3); if(ba==null) { System.out.println("Removing item id:"+itemId); @@ -73,21 +76,31 @@ public void execute(Tuple input) { String requestId= input.getString(1); String query= input.getString(2); - // Execute query with local data scope - List results= executeLocalQuery(query); + // Execute query with local data scope + List results= executeLocalQuery(query, 5); + System.out.println("Searching ["+ query +"] in shard "+currentShard +" "+results.size()+" results found"); // Send data to next step: Merger collector.emit(new Values(origin, requestId, query, su.toByteArray(results))); } - private List executeLocalQuery(String query) { - // TODO REMOVE - ArrayList list= new ArrayList(); - int size= (int)(Math.random()*10); - for (int i = 0; i < size; i++) { - list.add(new Item(base_id++, ItemsDao.getRandomTitle(), Math.random()*1000)); - } - return list; + private List executeLocalQuery(String query, int quantity) { + List items= new ArrayList(shard.getItemsContainingWords(query)); + + Collections.sort(items, new Comparator() { + @Override + public int compare(Item o1, Item o2) { + double diff= o1.price-o2.price; + if(diff>0) + return 1; + else + return -1; + } + }); + + if(items.size()>quantity) + items = items.subList(0, quantity-1); + return items; } @Override @@ -98,4 +111,8 @@ public void cleanup() { public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("origin", "requestId", "query", "shardMatches")); } + + + + } diff --git a/src/main/java/search/SerializationUtils.java b/src/main/java/search/SerializationUtils.java index 7d2a1f9..a8376ee 100644 --- a/src/main/java/search/SerializationUtils.java +++ b/src/main/java/search/SerializationUtils.java @@ -70,9 +70,9 @@ public byte[] itemToByteArray(Item itm) { ByteArrayOutputStream out = new ByteArrayOutputStream(); try{ Packer packer = msgpack.createPacker(out); - packer.write(itm); - packer.write(itm); - packer.write(itm); + packer.write(itm.price); + packer.write(itm.title); + packer.write(itm.id); return out.toByteArray(); } catch (Exception ex) { @@ -117,10 +117,4 @@ public List fromByteArray(byte[] binary) { } } } - - public Object toByteArray(Item i) { - // TODO Auto-generated method stub - return null; - } - } diff --git a/src/main/java/search/model/ItemsDao.java b/src/main/java/search/model/ItemsDao.java deleted file mode 100644 index 7340011..0000000 --- a/src/main/java/search/model/ItemsDao.java +++ /dev/null @@ -1,92 +0,0 @@ -package search.model; - -import java.util.HashMap; -import java.util.Map; - -public class ItemsDao { - - public static final long TOTAL_ITEMS= 100; - public static final long BASE_ID= 0; - - int totalShards, currentShard; - - - public static String[] possibleWords= new String[] { - "dvd", "laptop", "usb", "home", "theater", "sound", - "notebook", "17", "15", "14", "ipod", "iphone", - "mac", "pava", "mate", "bombilla", - "aire", "acondicionado", "departamento", - "casa", "cochera", "silla", "sillon", "mesa", "cable", - "cortina", "lavarropa", "lavavajilla", - "televisor", "led", "lcd", "ambientes", - "cuadro", "decoracion", "pintura", "jarron", "escultura", - "ventana", "vidrio", "aluminio", "pvc", - "nokia", "1100", "blackberry", "curve", - "android", "samsung", "galaxy", "sII", "windows", "mobile", - "aeromodelismo", "automodelismo", "bateria", - "motor", "control", "remoto", "alas", "avion", "pilas", - "combustible", "autos", "peugeot", "206", "207", - "307", "308", "407", "408", "fiat", "uno", "palio", - "siena", "linea", "stilo", "idea", - "chevrolet", "corsa", "agile", "aveo", "vecra", - "astra", "cruze", "captiva", "volkswagen", "gol", "trend", - "power", "fox", "suran", "bora", "vento", "passat", "cc", - "tiguan", "touareg", "ford", "fiesta", "ka", "kinetic", - "design", "focus", "mondeo", "ecosport", "kuga" - }; - - - public static int MAX_WORDS= 10; - - - public static String getRandomTitle(){ - int cantWords= (int)(Math.random()*(MAX_WORDS-1))+1; - String res= ""; - for(int i= 0; i< cantWords; i++){ - int ind= (int)(Math.random()*(possibleWords.length-1)); - res+=possibleWords[ind]+" "; - } - return res; - } - - - public ItemsDao(int totalShards, int currentShard){ - this.totalShards= totalShards; - this.currentShard= currentShard; - } - - public Map getItems(){ - int shardSize= (int)(TOTAL_ITEMS/totalShards); - HashMap items= new HashMap(shardSize); - - for(long i=0; i map= dao.getItems(); - - for (Long itemId : map.keySet()) { - System.out.println(map.get(itemId)); - } - } - - - } -} - diff --git a/src/main/java/search/model/ItemsShard.java b/src/main/java/search/model/ItemsShard.java index e8cf079..740bef3 100644 --- a/src/main/java/search/model/ItemsShard.java +++ b/src/main/java/search/model/ItemsShard.java @@ -1,6 +1,7 @@ package search.model; import java.util.ArrayList; +import java.util.Collections; import java.util.ConcurrentModificationException; import java.util.HashMap; import java.util.HashSet; @@ -91,6 +92,8 @@ private List getItemWords(Item i) { } + + Set emptySet= Collections.emptySet(); /** * This method is highly concurrent * @param word @@ -98,6 +101,9 @@ private List getItemWords(Item i) { */ public Set getItemsContainingWord(String word) { Set items= index.get(word); + if(items==null){ + return emptySet; + } //System.out.println("\tWord: ["+word +"] res:"+items.size()); return items; } @@ -107,7 +113,7 @@ public Set getItemsContainingWords(String words){ //System.out.println("Query: ["+words+"]"); StringTokenizer strTok= new StringTokenizer(words, "-", false); - HashSet result= null; + HashSet result= new HashSet(); boolean first= true; while(strTok.hasMoreTokens()){ String word= strTok.nextToken(); diff --git a/src/test/groovy/AbstractStormTest.groovy b/src/test/groovy/AbstractStormTest.groovy index 3c061c2..5a3ba38 100644 --- a/src/test/groovy/AbstractStormTest.groovy +++ b/src/test/groovy/AbstractStormTest.groovy @@ -53,6 +53,7 @@ public abstract class AbstractStormTest extends Assert { toSend['title'] = title toSend['price'] = price + println "Posting item [${document}] [${toSend}]" def resp= itemsApiClient.post(path : document, body: toSend, requestContentType: ContentType.JSON) @@ -76,7 +77,7 @@ public abstract class AbstractStormTest extends Assert { } public Object searchApi(String query) { - def document = "/${query}.json" + def document = "/${query}" def resp = searchEngineApiClient.get(path:document) assertEquals(200, resp.status) diff --git a/src/test/groovy/PreparationTest.groovy b/src/test/groovy/PreparationTest.groovy index e106bda..ad207ac 100644 --- a/src/test/groovy/PreparationTest.groovy +++ b/src/test/groovy/PreparationTest.groovy @@ -14,7 +14,7 @@ class PreparationTest extends AbstractStormTest { @Test public void searchEngineExists() { - def resp = searchApi('/mp3') + def resp = searchApi('mp3') } @Test diff --git a/src/test/groovy/SearchTest.groovy b/src/test/groovy/SearchTest.groovy index 1b53c75..e61e165 100644 --- a/src/test/groovy/SearchTest.groovy +++ b/src/test/groovy/SearchTest.groovy @@ -8,24 +8,7 @@ public class SearchTest extends AbstractStormTest { @Test public void newsFeedTest() { def result = searchApi("new") - println "-----------------------" - println "-----------------------" - println "-----------------------" - println "-----------------------" - println "-----------------------" - println "-----------------------" - println result[0] - println "-----------------------" - println "-----------------------" - println "-----------------------" - println "-----------------------" - println "-----------------------" - println result[0] - println result[0] - println result[0] - println result[0] - println result[0] - //assertEquals(result.size(), 0) + assertEquals(result.size(), 0) addItem(1, "new dvd player", 100) addItem(2, "new digital camera", 80) @@ -36,10 +19,10 @@ public class SearchTest extends AbstractStormTest { result = searchApi("drive") - //assertEquals(result.size(), 0) + assertEquals(0, result.size()) result = searchApi("new") - //assertEquals(result.size(), 3) + assertEquals(3, result.size()) removeItem(1) removeItem(2) @@ -49,7 +32,7 @@ public class SearchTest extends AbstractStormTest { postNew(3) result = searchApi("new") - //assertEquals(result.size(), 0) + assertEquals(0, result.size()) } @Test diff --git a/src/test/resources/0.json b/src/test/resources/0.json index b8d73f4..39d4e42 100644 --- a/src/test/resources/0.json +++ b/src/test/resources/0.json @@ -1,5 +1,5 @@ { - "id": "0", + "id": 0, "title": "new high quality mp3 player 64Gb", - "price": "200" + "price": 200 } diff --git a/src/test/resources/1.json b/src/test/resources/1.json index 0269357..aaeea29 100644 --- a/src/test/resources/1.json +++ b/src/test/resources/1.json @@ -1,5 +1,5 @@ { - "id": "1", + "id": 1, "title": "new air conditioner with led indicator", - "price": "1500" + "price": 1500 } diff --git a/src/test/resources/2.json b/src/test/resources/2.json index ceded1e..ac659aa 100644 --- a/src/test/resources/2.json +++ b/src/test/resources/2.json @@ -1,5 +1,5 @@ { - "id": "2", + "id": 2, "title": "digital camera 10 megapixels", - "price": "500" + "price": 500 } diff --git a/src/test/resources/3.json b/src/test/resources/3.json index 7a3dccf..3e87767 100644 --- a/src/test/resources/3.json +++ b/src/test/resources/3.json @@ -1,5 +1,5 @@ { - "id": "3", + "id": 3, "title": "new design patterns explained", - "price": "55" + "price": 55 } diff --git a/src/test/resources/4.json b/src/test/resources/4.json index 13cf7a4..5e24117 100644 --- a/src/test/resources/4.json +++ b/src/test/resources/4.json @@ -1,5 +1,5 @@ { - "id": "4", + "id": 4, "title": "high performance laptop computer", - "price": "2000" + "price": 2000 } diff --git a/src/test/resources/5.json b/src/test/resources/5.json index fdc5188..64119b6 100644 --- a/src/test/resources/5.json +++ b/src/test/resources/5.json @@ -1,5 +1,5 @@ { - "id": "5", + "id": 5, "title": "big led tv high quality images", - "price": "1000" + "price": 1000 } diff --git a/src/test/resources/publish.sh b/src/test/resources/publish.sh new file mode 100755 index 0000000..1d8aa9d --- /dev/null +++ b/src/test/resources/publish.sh @@ -0,0 +1,14 @@ +#!/bin/bash + +for FILE in `ls *.json` +do + echo "Posting $FILE" + curl -v -X 'POST' --data @$FILE localhost:8888/$FILE + echo "indexing..." +done + +for item_id in `ls *.json | cut -c 1-1` +do + echo "Indexing.. $item_id" + curl -v localhost:9090/$item_id +done