Skip to content

Commit

Permalink
several testing corrections
Browse files Browse the repository at this point in the history
  • Loading branch information
ldsimonassi committed Feb 1, 2012
1 parent db52fca commit 49eed93
Show file tree
Hide file tree
Showing 17 changed files with 252 additions and 99 deletions.
6 changes: 3 additions & 3 deletions node-js-server/node-drpc-server.js
Original file line number Diff line number Diff line change
Expand Up @@ -88,20 +88,20 @@ else {
}

if(process.argv.length>3) {
topology_timeout = process.argv[3];
topology_timeout = parseInt(process.argv[3]);
} else {
console.log("No topology timeout provided, using default "+topology_timeout);
}

if(process.argv.length>4) {
claim_timeout = process.argv[4];
claim_timeout = parseInt(process.argv[4]);
} else {
console.log("No claim timeout provided, using default "+claim_timeout);
}


if(process.argv.length>5) {
base_port = process.argv[5];
base_port = parseInt(process.argv[5]);
} else {
console.log("No baseport provided, using default "+base_port);
}
Expand Down
5 changes: 3 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,7 @@
</goals>
</execution>
</executions>

</plugin>


<plugin>
<groupId>org.apache.maven.plugins</groupId>
Expand Down Expand Up @@ -156,11 +154,14 @@
<configuration>
<tasks>
<mkdir dir="${basedir}/src/main/java"/>

<taskdef name="groovyc"
classname="org.codehaus.groovy.ant.Groovyc">
<classpath refid="maven.compile.classpath"/>
</taskdef>

<mkdir dir="${project.build.outputDirectory}"/>

<groovyc destdir="${project.build.outputDirectory}"
srcdir="${basedir}/src/main/java/" listfiles="true">
<classpath refid="maven.compile.classpath"/>
Expand Down
1 change: 1 addition & 0 deletions src/main/java/search/AnswerItemsFeedBolt.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public void execute(Tuple input) {

private void sendBack(String origin, String id){
String to= "http://"+origin+":9092/?id="+id;
System.out.println("Answering feed:"+to);
HttpPost post= new HttpPost(to);
try {
StringEntity entity= new StringEntity("OK");
Expand Down
34 changes: 18 additions & 16 deletions src/main/java/search/AnswerQueryBolt.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.http.impl.conn.SingleClientConnManager;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
import org.json.simple.JSONValue;

import search.model.Item;
import backtype.storm.task.OutputCollector;
Expand Down Expand Up @@ -42,31 +45,30 @@ public void execute(Tuple input) {

HttpClient client;

@SuppressWarnings("unchecked")
private void sendBack(String origin, String id, List<Item> finalResult){
String to= "http://"+origin+":8082/?id="+id;
System.out.println("Answering to:["+to+"]");

System.out.println("Answering:" + to);

HttpPost post= new HttpPost(to);
StringBuffer strBuff= new StringBuffer();

JSONArray list = new JSONArray();

for (Item item : finalResult) {
strBuff.append(item.id);
strBuff.append("\t");
strBuff.append(item.name);
strBuff.append("\t");
strBuff.append(item.price);
strBuff.append("\n");
JSONObject obj= new JSONObject();
obj.put("title", item.title);
obj.put("id", item.id);
obj.put("price", item.price);
list.add(obj);
}

String json= JSONValue.toJSONString(list);
try {
StringEntity entity= new StringEntity(strBuff.toString());
StringEntity entity= new StringEntity(json);
post.setEntity(entity);


HttpResponse response= client.execute(post);

InputStream is= response.getEntity().getContent();

is.close();

is.close();
} catch (Exception e) {
e.printStackTrace();
}
Expand Down
12 changes: 7 additions & 5 deletions src/main/java/search/ItemsNewsFeedSpout.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,15 @@ public void nextTuple() {
String origin= reader.readLine();
while(true) {
String id= reader.readLine();
int itemId= Integer.valueOf(reader.readLine());
if(id==null)
break;
else {
// I don't send the message id object, so I disable the ackers mechanism
collector.emit(new Values(origin, id, itemId));
}
String strId= reader.readLine();
strId= strId.substring(1);
int itemId= Integer.valueOf(strId);

// I don't send the message id object, so I disable the ackers mechanism
System.out.println("News feed emiting: ["+origin+"] ["+id+"] ["+itemId+"]");
collector.emit(new Values(origin, id, itemId));
}
} catch (Exception e) {
e.printStackTrace();
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/search/JoinSortBolt.java
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ public void merge(List<Item> newItems){
int iA = 0;
int iB = 0;
for(int i=0; i<size; i++){
boolean overA= iA>items.size();
boolean overB= iB>items.size();
boolean overA= iA>=items.size();
boolean overB= iB>=newItems.size();

if(!overB && !overA){
Item itmA= items.get(iA);
Expand Down
23 changes: 23 additions & 0 deletions src/main/java/search/LocalTopologyStarter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package search;

import java.util.Enumeration;

import org.apache.log4j.Level;
import org.apache.log4j.Logger;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.generated.StormTopology;

public class LocalTopologyStarter {

public static void main(String[] args) {
//java.util.logging.Logger.getLogger("httpclient").setLevel(java.util.logging.Level.INFO);
Logger.getRootLogger().setLevel(Level.ERROR);
LocalCluster cluster = new LocalCluster();
StormTopology topology = SearchEngineTopologyStarter.createTopology();
Config conf = SearchEngineTopologyStarter.createConf("127.0.0.1:8081", "127.0.0.1:9091", "127.0.0.1:8888", 10);
conf.setDebug(false);
cluster.submitTopology("TestTopology", conf, topology);
}
}
15 changes: 7 additions & 8 deletions src/main/java/search/QueriesSpout.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@

public class QueriesSpout implements IRichSpout {
private static final long serialVersionUID = 1L;


public static final int TIMEOUT= 1000;

@SuppressWarnings("rawtypes")
Map conf;
TopologyContext context;
Expand All @@ -29,7 +31,8 @@ public class QueriesSpout implements IRichSpout {
int maxPull;
HttpClient httpclient;
HttpGet httpget;

int id=0;

/**
* Open a thread for each processed server.
*/
Expand All @@ -47,22 +50,19 @@ public void open(@SuppressWarnings("rawtypes") Map conf, TopologyContext context
}
reconnect();
}


private void reconnect() {
httpclient = new DefaultHttpClient(new SingleClientConnManager());
httpget = new HttpGet("http://"+queriesPullHost+"/?max="+maxPull);
}


@Override
public void close() {
}

int id=0;



public static final int TIMEOUT= 1000;


@Override
public void nextTuple() {
Expand Down Expand Up @@ -94,7 +94,6 @@ public void nextTuple() {
e.printStackTrace();
}
}

}


Expand Down
59 changes: 34 additions & 25 deletions src/main/java/search/ReadItemDataBolt.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,61 +38,72 @@ public class ReadItemDataBolt implements IRichBolt {
public void prepare(@SuppressWarnings("rawtypes") Map stormConf,
TopologyContext context,
OutputCollector collector) {
this.stormConf= stormConf;
this.context= context;
this.collector= collector;
su= new SerializationUtils();
this.itemsApiHost= (String) stormConf.get("items-api-host");
this.stormConf = stormConf;
this.context = context;
this.collector = collector;
su = new SerializationUtils();
this.itemsApiHost = (String)stormConf.get("items-api-host");
reconnect();
}

private void reconnect() {
httpclient = new DefaultHttpClient(new SingleClientConnManager());

}

public Item readItem(int id) throws Exception{
HttpResponse response;
BufferedReader reader= null;
httpget = new HttpGet("http://"+itemsApiHost+"/"+id+".json");
String url= "http://"+itemsApiHost+"/"+id+".json";
System.out.println("Reading item data:["+url+"]");
httpget = new HttpGet(url);
try {
response = httpclient.execute(httpget);
HttpEntity entity = response.getEntity();

entity.getContent();
reader= new BufferedReader(new InputStreamReader(entity.getContent()));
Object obj=JSONValue.parse(reader);
JSONObject item=(JSONObject)obj;
Item i= new Item((Long)item.get("id"), (String)item.get("title"), (Long)item.get("price"));
return i;

if(response.getStatusLine().getStatusCode()==200) {
HttpEntity entity = response.getEntity();
entity.getContent();
reader= new BufferedReader(new InputStreamReader(entity.getContent()));
Object obj=JSONValue.parse(reader);
JSONObject item=(JSONObject)obj;
Item i= new Item((Long)item.get("id"), (String)item.get("title"), (Long)item.get("price"));
return i;
} else if (response.getStatusLine().getStatusCode() == 404) {
response.getEntity().getContent().close();
return null;
} else
throw new Exception(response.getStatusLine().getStatusCode()+" is not a valid HTTP code for this response");
} catch (Exception e) {
e.printStackTrace();
reconnect();
throw new Exception("Error reading item ["+id+"]", e);
} finally {
if(reader!=null)
if(reader!=null){
try {
reader.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}

@Override
public void execute(Tuple input) {
String origin= input.getString(0);
String requestId= input.getString(1);
int itemId= input.getInteger(2);
String origin = input.getString(0);
String requestId = input.getString(1);
int itemId = input.getInteger(2);

Item i;
try {
i = readItem(itemId);
collector.emit(new Values(origin, requestId, itemId, su.toByteArray(i)));
if(i==null) {
collector.emit(new Values(origin, requestId, itemId, null));
} else {
collector.emit(new Values(origin, requestId, itemId, su.toByteArray(i)));
}
} catch (Exception e) {
e.printStackTrace();
}

}

@Override
Expand All @@ -101,7 +112,7 @@ public void cleanup() {

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("origin"));
declarer.declare(new Fields("origin", "requestId", "itemId", "data"));
}


Expand All @@ -118,7 +129,5 @@ public static void main(String[] args) {
System.out.println(item.get("id"));
System.out.println(item.get("title"));
System.out.println(item.get("price"));


}
}
39 changes: 29 additions & 10 deletions src/main/java/search/SearchBucketBolt.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,33 +36,52 @@ public void prepare(@SuppressWarnings("rawtypes") Map stormConf,
this.context= context;
this.collector= collector;
currentShard = context.getThisTaskIndex();
totalShards = context.getRawTopology().get_bolts().get(context.getThisComponentId()).get_common().get_parallelism_hint();
System.out.println("SearchBucket Bolt created "+currentShard+" of "+totalShards);
su= new SerializationUtils();
String myId = context.getThisComponentId();
totalShards = context.getRawTopology().get_bolts().get(myId).get_common().get_parallelism_hint();
su = new SerializationUtils();
shard = new ItemsShard(10000);
base_id= 10000*currentShard;
}

private boolean isMine(int itemId) {
int remain = itemId % totalShards;
return remain == currentShard;
}

@Override
public void execute(Tuple input) {
// Get request routing information
if(input.getSourceComponent().equals("read-item-data")){

String origin= input.getString(0);
String requestId= input.getString(1);
int itemId= input.getInteger(2);
if(isMine(itemId)){
byte[] ba = input.getBinary(3);
if(ba==null) {
System.out.println("Removing item id:"+itemId);
shard.remove(itemId);
} else {
Item i= su.itemFromByteArray(ba);
System.out.println("Updating item index: "+i);
shard.update(i);
}
}
return ;
}

// Get request routing information
String origin= input.getString(0);
String requestId= input.getString(1);
String query= input.getString(2);

// Execute query with local data scope
List<Item> results= executeLocalQuery(query);

//System.out.println("results for "+query+":"+results.size());

// Send data to next step: Merger
collector.emit(new Values(origin, requestId, query, su.toByteArray(results)));
}




private List<Item> executeLocalQuery(String query) {
// TODO REMOVE
ArrayList<Item> list= new ArrayList<Item>();
int size= (int)(Math.random()*10);
for (int i = 0; i < size; i++) {
Expand Down
Loading

0 comments on commit 49eed93

Please sign in to comment.