Skip to content

Commit

Permalink
add shard and merge unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
ldsimonassi committed Feb 5, 2012
1 parent b2cbbdf commit bfa2ab5
Show file tree
Hide file tree
Showing 15 changed files with 147 additions and 131 deletions.
6 changes: 3 additions & 3 deletions node-js-server/node-drpc-server.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ Array.prototype.display = function() {
}

// Parameters
var topology_timeout= 2000;
var claim_timeout= 2000;
var topology_timeout= 20000;
var claim_timeout= 20000;
var base_port = 8080;
var content_type = 'application/json; charset=utf-8';

Expand Down Expand Up @@ -258,4 +258,4 @@ setInterval(function () {


// Log status information each 10 seconds interval.
setInterval(report, 10000) ;
setInterval(report, 10000) ;
2 changes: 2 additions & 0 deletions src/main/java/search/AnswerItemsFeedBolt.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.http.impl.conn.SingleClientConnManager;

import search.utils.SerializationUtils;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
Expand Down
1 change: 1 addition & 0 deletions src/main/java/search/AnswerQueryBolt.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.json.simple.JSONValue;

import search.model.Item;
import search.utils.SerializationUtils;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
package search;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;

import search.model.Item;
import search.utils.SerializationUtils;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
Expand All @@ -16,7 +19,7 @@
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

public class JoinSortBolt implements IRichBolt {
public class MergeBolt implements IRichBolt {
private static final long serialVersionUID = 1L;

@SuppressWarnings("rawtypes")
Expand All @@ -29,7 +32,7 @@ public class JoinSortBolt implements IRichBolt {


public static class Merger {
ArrayList<Item> items;
List<Item> items;
int size;
int totalMerged;
long start;
Expand Down Expand Up @@ -65,48 +68,26 @@ public int getTotalMerged(){
return totalMerged;
}

public void merge(List<Item> newItems){
public void merge(List<Item> newItems) {
totalMerged++;
if(items.size()==0){
int copy= newItems.size()<size?newItems.size():size;
for(int i=0; i < copy;i++){
items.add(i, newItems.get(i));
ArrayList<Item> chgList= new ArrayList<Item>(newItems.size()+items.size());
chgList.addAll(newItems);
chgList.addAll(items);
Collections.sort(chgList, new Comparator<Item>() {
@Override
public int compare(Item i1, Item i2) {
if(i1.price > i2.price)
return -1;
else
return 1;
}
} else {
ArrayList<Item> newList= new ArrayList<Item>();
int iA = 0;
int iB = 0;
for(int i=0; i<size; i++){
boolean overA= iA>=items.size();
boolean overB= iB>=newItems.size();

if(!overB && !overA){
Item itmA= items.get(iA);
Item itmB= newItems.get(iB);
if(itmA.greaterThan(itmB)){
iA++;
newList.add(i, itmA);
} else {
iB++;
newList.add(i, itmB);
}
} else if(overA && overB) {
break;
} else if(overA){
Item itmB= newItems.get(iB);
iB++;
newList.add(i, itmB);
} else { //overB
Item itmA= items.get(iA);
iA++;
newList.add(i, itmA);
}
}

items= newList;
}
});
if(chgList.size() < size)
items = chgList;
else
items = chgList.subList(0, size);
}

@Override
public String toString() {
return items.toString();
Expand Down Expand Up @@ -139,7 +120,7 @@ public void run() {
ArrayList<Merger> mergers;

synchronized (inCourse) {
mergers= new ArrayList<JoinSortBolt.Merger>(inCourse.values());
mergers= new ArrayList<MergeBolt.Merger>(inCourse.values());
}

for (Merger merger : mergers) {
Expand All @@ -157,8 +138,7 @@ public void run() {
public void execute(Tuple input) {
String origin= input.getString(0);
String requestId= input.getString(1);

//String query= input.getString(2);

byte[] binary= input.getBinary(3);
List<Item> shardResults= su.fromByteArray(binary);

Expand All @@ -181,7 +161,6 @@ public void execute(Tuple input) {

if(merger.getTotalMerged()>=totalShards){
finish(merger);

}
}

Expand All @@ -197,29 +176,4 @@ protected void finish(Merger merger){
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("origin", "requestId", "results"));
}

public static void main(String[] args) {

ArrayList<Item> a= new ArrayList<Item>();
ArrayList<Item> b= new ArrayList<Item>();
ArrayList<Item> c= new ArrayList<Item>();
long id=0;
for(int i=0; i<10;i++){
a.add(new Item(id, "a", i));
id++;
b.add(new Item(id, "b", i+0.25));
id++;
c.add(new Item(id, "c", i+0.5));
id++;
}

Merger m= new Merger("localhost", "44", 5);

m.merge(a);
System.out.println(m);
m.merge(b);
System.out.println(m);
m.merge(c);
System.out.println(m);
}
}
4 changes: 0 additions & 4 deletions src/main/java/search/QueriesSpout.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,6 @@ private void reconnect() {
@Override
public void close() {
}





@Override
public void nextTuple() {
Expand Down
1 change: 1 addition & 0 deletions src/main/java/search/ReadItemDataBolt.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.json.simple.JSONValue;

import search.model.Item;
import search.utils.SerializationUtils;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
Expand Down
1 change: 1 addition & 0 deletions src/main/java/search/SearchBucketBolt.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import search.model.Item;
import search.model.ItemsShard;
import search.utils.SerializationUtils;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/search/SearchEngineTopologyStarter.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public static StormTopology createTopology() {

builder.setBolt("queries-processor", new SearchBucketBolt(), 10).allGrouping("queries-spout").allGrouping("read-item-data");

builder.setBolt("join-sort", new JoinSortBolt(), 3).fieldsGrouping("queries-processor", new Fields("origin", "requestId"));
builder.setBolt("join-sort", new MergeBolt(), 3).fieldsGrouping("queries-processor", new Fields("origin", "requestId"));
builder.setBolt("answer-query", new AnswerQueryBolt(), 2).fieldsGrouping("join-sort", new Fields("origin"));

return builder.createTopology();
Expand Down
10 changes: 4 additions & 6 deletions src/main/java/search/model/Item.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@
public class Item implements Serializable {
private static final long serialVersionUID = 1L;


public long id;
public String title;
public double price;

public Item() {

}

public Item(long id, String title, double price) {
Expand All @@ -19,10 +21,6 @@ public Item(long id, String title, double price) {
this.price= price;
}

public long id;
public String title;
public double price;

@Override
public boolean equals(Object obj) {
Item other= (Item)obj;
Expand Down
43 changes: 0 additions & 43 deletions src/main/java/search/model/ItemsShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -134,47 +134,4 @@ public Set<Item> getItemsContainingWords(String words){
}
return result;
}

public static void main(String[] args) {
ItemsShard myShard= new ItemsShard(20);

Item a= new Item(0, "nice dvd player with usb and card reader", 100);
Item b= new Item(1, "new laptop computer with dvd and usb and manual", 100);
Item c= new Item(2, "elegant cell phone with usb charger manual dvd included", 100);
Item d= new Item(3, "new balck microwave includes cooking book and operation manual", 100);

myShard.add(a);
myShard.add(b);
myShard.add(c);
myShard.add(d);

Set<Item> result= myShard.getItemsContainingWords("nice-dvd");
System.out.println(result.size());
System.out.println(result);
System.out.println("-----------------");
assert result.contains(a);
assert !result.contains(b);
assert !result.contains(c);
assert !result.contains(d);

result= myShard.getItemsContainingWords("new-manual");
System.out.println(result.size());
System.out.println(result);
System.out.println("-----------------");
assert result.contains(b);
assert result.contains(d);
assert !result.contains(a);
assert !result.contains(c);

result= myShard.getItemsContainingWords("nice-dvd-player-with-usb-and-card-reader");
System.out.println(result.size());
System.out.println(result);
System.out.println("-----------------");
assert result.contains(a);
assert !result.contains(b);
assert !result.contains(c);
assert !result.contains(d);

}

}
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package search;
package search.utils;

import org.apache.log4j.Level;
import org.apache.log4j.Logger;

import search.SearchEngineTopologyStarter;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.generated.StormTopology;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package search;
package search.utils;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
Expand Down
4 changes: 2 additions & 2 deletions src/test/groovy/AbstractStormTest.groovy
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import search.LocalTopologyStarter
import search.utils.LocalTopologyStarter
import search.SearchEngineTopologyStarter
import backtype.storm.LocalCluster;
import groovyx.net.http.ContentType;
Expand Down Expand Up @@ -53,7 +53,7 @@ public abstract class AbstractStormTest extends Assert {
toSend['title'] = title
toSend['price'] = price

println "Posting item [${document}] [${toSend}]"
println "Posting item [ ${document}] [${toSend}]"
def resp= itemsApiClient.post(path : document,
body: toSend,
requestContentType: ContentType.JSON)
Expand Down
Loading

0 comments on commit bfa2ab5

Please sign in to comment.