From b974ca682bb3e92f4356aa05db2d9a5b6a530093 Mon Sep 17 00:00:00 2001 From: Luis Dario Simonassi Date: Sat, 21 Jan 2012 15:26:26 -0300 Subject: [PATCH] Initial Commit --- README | 5 + darioarquitectura.pem | 23 +++ deploy.sh | 6 + node-drpc-server.js | 156 ++++++++++++++++++++ pom.xml | 100 +++++++++++++ restart_all.sh | 7 + run_server.sh | 4 + src/search/AnswerBolt.java | 108 ++++++++++++++ src/search/JoinSortBolt.java | 94 ++++++++++++ src/search/QueriesSpout.java | 132 +++++++++++++++++ src/search/SearchBucketBolt.java | 75 ++++++++++ src/search/SearchEngineTopologyStarter.java | 35 +++++ src/search/SerializationUtils.java | 71 +++++++++ src/search/model/Item.java | 29 ++++ src/search/model/ItemsDao.java | 94 ++++++++++++ src/smalltest/AlgoBolt.java | 62 ++++++++ src/smalltest/AlgoSpout.java | 94 ++++++++++++ src/smalltest/AlgoTopology.java | 25 ++++ stop_server.sh | 4 + test-drpc-server.sh | 23 +++ 20 files changed, 1147 insertions(+) create mode 100644 README create mode 100644 darioarquitectura.pem create mode 100755 deploy.sh create mode 100644 node-drpc-server.js create mode 100644 pom.xml create mode 100755 restart_all.sh create mode 100755 run_server.sh create mode 100644 src/search/AnswerBolt.java create mode 100644 src/search/JoinSortBolt.java create mode 100644 src/search/QueriesSpout.java create mode 100644 src/search/SearchBucketBolt.java create mode 100644 src/search/SearchEngineTopologyStarter.java create mode 100644 src/search/SerializationUtils.java create mode 100644 src/search/model/Item.java create mode 100644 src/search/model/ItemsDao.java create mode 100644 src/smalltest/AlgoBolt.java create mode 100644 src/smalltest/AlgoSpout.java create mode 100644 src/smalltest/AlgoTopology.java create mode 100755 stop_server.sh create mode 100755 test-drpc-server.sh diff --git a/README b/README new file mode 100644 index 0000000..e51e8bd --- /dev/null +++ b/README @@ -0,0 +1,5 @@ +Storm Use Case +----- --- ---- + +Node.js DRPC Server + Map Reduce Search Engine Topology + diff --git a/darioarquitectura.pem b/darioarquitectura.pem new file mode 100644 index 0000000..3b62a8d --- /dev/null +++ b/darioarquitectura.pem @@ -0,0 +1,23 @@ +-----BEGIN RSA PRIVATE KEY----- +MIIEogIBAAKCAQEAwIqONo9EOvQ/NPBv9AHeO5YyQcSQCzNcJH6EVwvk9Y5bqg9NFeAZ6CrtSX5O +D/IhCv2HGs9wV2EBrualFJ1XVnVxu+MxSHCWZFrHDJM56fHg+qLfciTdCxATW/7qP27K6BDpcHnD ++y7ZqjrwyaX+n7irUai4gbQ9eKZwOl0k5smBFN4+ni0Aw6ZKHhgpExW9OA+U6hpc0zliL8lXQFPE +ocjXAF2E2FXoVXMzvJuoXnIc8+Hj9iCOg7ERVw0nT+ZqaThCaqFk1XkoUdTn9hAk5F9Ja9CESIL/ +oCp/ympO2LrWXzUh0+BGMDFN59vU/UC8Kfow4lPxXSZIZbYjhngsbwIDAQABAoIBADuAVhScT6yc +YJAeoapZjXECqINUmGMtuxL4GjPAVEBifwdd9SeGetsZsPzeUIdy/y0jTkZFxuTp8Jg5ZNirCxuH +7d7vhgJp8MAQoaTMNsCKZElwXfcrVzgc/q6WZ5O0zHXNDCcbKiqMvu1xi4n1h2uaqS7yIhrLNZf8 +b4BJ2qaLI+PeKnI4ZqoldCp7jMiKAd6CjaEkwpBALwDPk4RqH9N2ISiWNIHBvZtNwbJAcHM0M4Tq +82RlDpUb+9GcW74r0y5FJex7g7VM1j+gRTYcOTM3zuDRqj/gui0lYHjwAaugUpapfc5N1qNM1AT1 +rl39Z6NODCmpyDGM2lOsWsndPHECgYEA4AVaXjRpY2uS6eLE8tvaovRvieBin5KpVqWpIcQI2DQA +6odubRyvPsIqqjUw6AEqRa42cojelTTYwIEaAoEi2Dsof04aGR/WdhRKmpO8OmuYPmAlegKEhMVN +StjvFdD23PsZH+Q9sazIEfzn6yMEhLM870UgMbd5Ttlwdhnup/cCgYEA3AbOLw4TIo0Zc6xLEXUL +Kom/Qy0uUdybogHlZyyDS50xzVFxEdfcUKbSAs+G7hNT6mufOpO3Qvwr6xaJ2OWsW/2YYvZ9J+1/ +dhsGtU5jjDv3Q5Bu0mJ49jlUSAnbqLkRfkOV3Wikrdrf66sfwgPZbSMqufBZdLMmQhU02P3cMUkC +gYBf7erkrEz05fvja9gqpzrYzRN2Vz/kVUlucUIb03Z2Hs7Fn3kKAF6K4VqjyGNI9jbD3/Yw1at6 ++UZYKPCaYfIp5itRWICUga20orvPtbPE0I5BJ6rktG9K67JNetfm37TWrC/2GCbTDsod6c7mQfiN +WrOdQlym7Ypk2XfvGuu3wQKBgEO2kHOoyDjE5cVUi9G2jJYtyD/bQrsMwpTMMpZa+5kkqnP+kWal +YPctL8qPpX3VUuj88Abt+ONTigySZh/rJu00kVY7d273R1fIn3riwf4hYkpXw9NZXNKh+A8ngYNe +WUTbdd6q2qtqhakYg/CIkLxmqzqH/m/MxoRl1FrHXaGJAoGAFK0BSX+PcNlFO/FAtX27FyN7DZ7z +Ze2AUFva37QiOdHse7jzp4+/tyybJnWkiFBcsRjlyvnBcSBGERXCsp92BotIEbommQzzLxcIXJl5 +8YUk9tbmhMGFq+3AvFPkCTRDkw6+YrGfPd0kaa9yHXndxJZR+94gK4UN/JeHRbCqq0I= +-----END RSA PRIVATE KEY----- diff --git a/deploy.sh b/deploy.sh new file mode 100755 index 0000000..828a0fd --- /dev/null +++ b/deploy.sh @@ -0,0 +1,6 @@ +scp -i darioarquitectura.pem node-drpc-server.js ubuntu@ec2-107-20-89-124.compute-1.amazonaws.com:./ +scp -i darioarquitectura.pem node-drpc-server.js ubuntu@ec2-50-16-110-129.compute-1.amazonaws.com:./ +scp -i darioarquitectura.pem node-drpc-server.js ubuntu@ec2-23-20-23-116.compute-1.amazonaws.com:./ +scp -i darioarquitectura.pem *.sh ubuntu@ec2-107-20-89-124.compute-1.amazonaws.com:./ +scp -i darioarquitectura.pem *.sh ubuntu@ec2-50-16-110-129.compute-1.amazonaws.com:./ +scp -i darioarquitectura.pem *.sh ubuntu@ec2-23-20-23-116.compute-1.amazonaws.com:./ diff --git a/node-drpc-server.js b/node-drpc-server.js new file mode 100644 index 0000000..ffdf52e --- /dev/null +++ b/node-drpc-server.js @@ -0,0 +1,156 @@ +/** + ******************************** + * Node.JS -> Storm DRPC Server * + ******************************** + * + * This server listens in 3 ports + * ------------------------------ + * + * Port 8080 Synchronous task execution requests. + * Port 8081 Pull pending tasks. + * Port 8082 Push ready tasks results. + * + **/ + +var http = require('http'); +var parser = require('url'); +var os = require('os'); + +/** + * Add the FIFO functionality to Array class. + **/ +Array.prototype.store = function (info) { + this.push(info); +} + +Array.prototype.fetch = function() { + if (this.length <= 0) { return ''; } // nothing in array + return this.shift(); +} + +Array.prototype.display = function() { + return this.join('\n'); +} + +// Metrics +var total_active_tasks = 0; +var total_requests_made = 0; +var total_requests_answered = 0; + +// Tasks FIFO +var pending_tasks = new Array(); + +// Waiting workers FIFO +var waiting_workers = new Array(); + +// Current active tasks (Assigned to a worker). +var active_tasks = {}; + +// This RPC Server Request ID +var global_task_id = 0; + +// My IP, to be sent in the origin +var local_ip= null; + +function get_local_ip() { + if(local_ip==null) { + var interfaces= os.networkInterfaces(); + for(var interf_name in interfaces) { + var addresses= interfaces[interf_name]; + for(var addr_name in addresses) { + var addr= addresses[addr_name]; + if(addr.family=="IPv4" && !addr.internal && (/en\d/.test(interf_name) || /eth\d/.test(interf_name))) { + local_ip= addr.address; + return local_ip; + } + } + } + } + return local_ip; +} + +// If there is a task for a worker, make them meet each other! +function check_queues() { + if(waiting_workers.length > 0 && pending_tasks.length > 0) { + var worker = waiting_workers.fetch(); + var max= worker.max; + var send = get_local_ip() +"\n"; + + for(var i=0; (i 0);i++){ + var task = pending_tasks.fetch(); + send = send + task.id +"\n"; + send = send + task.query +"\n"; + active_tasks[task.id] = task; + total_active_tasks = total_active_tasks + 1; + } + worker.response.end(send); + } +} + +// Server to be used to receive search querys (tasks) and answer them in a synchronous way. +http.createServer(function (request, response) { + var query= request.url; + if(query=="/isAlive") + response.end("YES!"); + else { + var task_entry = { "id":global_task_id, "query": query, "response": response }; + global_task_id = global_task_id+1; + pending_tasks.store(task_entry); + total_requests_made++; + check_queues(); + } +}).listen(8080); + +// Server to be used for requesting pending tasks +http.createServer(function (request, response) { + var parsed_url= parser.parse(request.url, true); + var max= parsed_url.query.max; + if(max==null || typeof(max)=="undefined") + max=1; + var waiter = { "request": request, "response": response, "max": max }; + waiting_workers.store(waiter); + check_queues(); +}).listen(8081); + +// Response receiver server +http.createServer(function (request, response) { + if(request.method=="POST") { + var parsed_url= parser.parse(request.url, true); + var id= parsed_url.query.id; + if(id==null || typeof(id)=="undefined" || + active_tasks[id]==null || typeof(active_tasks[id])=="undefined") { + response.writeHead(404); + response.end("Error ["+id+"] is not a waiting task in this server"); + } else { + var data=''; + request.on("data", function(chunk) { + data += chunk; + }); + + request.on("end", function() { + active_tasks[id].response.end(data); + delete active_tasks[id] + total_active_tasks = total_active_tasks - 1; + response.end("OK!\n"); + total_requests_answered++; + }); + } + } +}).listen(8082); + +// Log status information each 10 seconds interval. +setInterval(function () { + var d= new Date(); + console.log("****************************"); + console.log("* Date: "+ d.getFullYear() +"/"+(d.getMonth()+1)+"/"+d.getDate()+" "+d.getHours()+":"+d.getMinutes()+":"+d.getSeconds()); + console.log("* Local IP: "+local_ip); + console.log("* Total requests made: "+ total_requests_made); + console.log("* Total requests answered: "+ total_requests_answered); + console.log("* Waiting workers: "+ waiting_workers.length); + console.log("* Pending tasks: "+ pending_tasks.length); + console.log("* Active threads: "+ total_active_tasks); + for(var task in active_tasks) { + console.log(task); + } + console.log("****************************"); +}, 10000) ; \ No newline at end of file diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..dfdb0d3 --- /dev/null +++ b/pom.xml @@ -0,0 +1,100 @@ + + 4.0.0 + + storm.search + storm-search + 0.0.1 + jar + + storm-search + https://github.com/ldsimonassi/storm-search + + + UTF-8 + + + + + github-releases + http://oss.sonatype.org/content/repositories/github-releases/ + + + clojars.org + http://clojars.org/repo + + + + + + junit + junit + 3.8.1 + test + + + + storm + storm + 0.6.2 + + provided + + + + com.googlecode.json-simple + json-simple + 1.1 + + + + org.msgpack + msgpack + 0.6.5 + + + + + + src/ + + + + maven-assembly-plugin + + + jar-with-dependencies + + + + + + + + + + make-assembly + package + + single + + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + 1.6 + 1.6 + + + + + diff --git a/restart_all.sh b/restart_all.sh new file mode 100755 index 0000000..0a6b9af --- /dev/null +++ b/restart_all.sh @@ -0,0 +1,7 @@ +ssh -i darioarquitectura.pem ubuntu@ec2-107-20-89-124.compute-1.amazonaws.com ./stop_server.sh +ssh -i darioarquitectura.pem ubuntu@ec2-50-16-110-129.compute-1.amazonaws.com ./stop_server.sh +ssh -i darioarquitectura.pem ubuntu@ec2-23-20-23-116.compute-1.amazonaws.com ./stop_server.sh + +ssh -i darioarquitectura.pem ubuntu@ec2-107-20-89-124.compute-1.amazonaws.com ./run_server.sh +ssh -i darioarquitectura.pem ubuntu@ec2-50-16-110-129.compute-1.amazonaws.com ./run_server.sh +ssh -i darioarquitectura.pem ubuntu@ec2-23-20-23-116.compute-1.amazonaws.com ./run_server.sh diff --git a/run_server.sh b/run_server.sh new file mode 100755 index 0000000..f1ea584 --- /dev/null +++ b/run_server.sh @@ -0,0 +1,4 @@ +#!/bin/bash +echo Starting... +nohup node ./node-drpc-server.js 1> server.log 2>server.err & +echo DONE! diff --git a/src/search/AnswerBolt.java b/src/search/AnswerBolt.java new file mode 100644 index 0000000..4c73473 --- /dev/null +++ b/src/search/AnswerBolt.java @@ -0,0 +1,108 @@ +package search; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.PrintStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.apache.http.HttpResponse; +import org.apache.http.client.ClientProtocolException; +import org.apache.http.client.HttpClient; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.entity.StringEntity; +import org.apache.http.impl.client.DefaultHttpClient; +import org.apache.http.impl.conn.SingleClientConnManager; + +import search.model.Item; +import backtype.storm.task.OutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.IRichBolt; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.tuple.Tuple; + +public class AnswerBolt implements IRichBolt { + + @Override + public void prepare(Map stormConf, TopologyContext context, + OutputCollector collector) { + client= new DefaultHttpClient(); + this.su= new SerializationUtils(); + } + + SerializationUtils su; + + @Override + public void execute(Tuple input) { + String origin= input.getString(0); + String requestId= input.getString(1); + List finalResult= su.fromByteArray(input.getBinary(2)); + sendBack(origin, requestId, finalResult); + } + + + HttpClient client; + + private void sendBack(String origin, String id, List finalResult){ + HttpPost post= new HttpPost("http://"+origin+":8082/?id="+id); + StringBuffer strBuff= new StringBuffer(); + for (Item item : finalResult) { + strBuff.append(item.id); + strBuff.append("\t"); + strBuff.append(item.name); + strBuff.append("\t"); + strBuff.append(item.price); + strBuff.append("\n"); + } + + try { + StringEntity entity= new StringEntity(strBuff.toString()); + post.setEntity(entity); + + + HttpResponse response= client.execute(post); + + InputStream is= response.getEntity().getContent(); + + is.close(); + + } catch (Exception e) { + e.printStackTrace(); + } + } + + private void saveToFile(String id, List finalResult) { + File f= new File("/var/tmp/spool/"+id+".log"); + PrintStream pstr= null; + try { + pstr = new PrintStream(f); + + for (Item item : finalResult) { + pstr.println(item); + } + + + } catch (Exception e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } finally { + if(pstr!=null) + pstr.close(); + } + } + + @Override + public void cleanup() { + // TODO Auto-generated method stub + + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + // TODO Auto-generated method stub + + } + +} diff --git a/src/search/JoinSortBolt.java b/src/search/JoinSortBolt.java new file mode 100644 index 0000000..7967413 --- /dev/null +++ b/src/search/JoinSortBolt.java @@ -0,0 +1,94 @@ +package search; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import search.model.Item; +import backtype.storm.task.OutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.IRichBolt; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.tuple.Fields; +import backtype.storm.tuple.Tuple; +import backtype.storm.tuple.Values; + +public class JoinSortBolt implements IRichBolt { + OutputCollector collector; + Map stormConf; + TopologyContext context; + int totalShards; + + @Override + public void prepare(Map stormConf, + TopologyContext context, + OutputCollector collector) { + this.stormConf= stormConf; + this.context= context; + this.collector= collector; + this.su = new SerializationUtils(); + totalShards = context.getRawTopology().get_bolts().get("queries-processor").get_common().get_parallelism_hint(); + } + + + HashMap>> inCourse= new HashMap>>(); + SerializationUtils su; + + @Override + public void execute(Tuple input) { + String origin= input.getString(0); + String requestId= input.getString(1); + String query= input.getString(2); + byte[] binary= input.getBinary(3); + List shardResults= su.fromByteArray(binary); + String id= origin+"-"+requestId; + List> finished= inCourse.get(id); + + if(finished==null){ + finished= new ArrayList>(); + inCourse.put(id, finished); + } + + finished.add(shardResults); + + if(finished.size()>=totalShards){ + //System.out.println("Finishing query ["+id+"]"); + List finalResult= joinSortAndCut(finished); + collector.emit(new Values(origin, requestId, su.toByteArray(finalResult))); + inCourse.remove(id); + } + } + + private List joinSortAndCut(List> finished) { + List finalList= new ArrayList(); + for (List list : finished) { + finalList.addAll(list); + } + Collections.sort(finalList, new Comparator() { + @Override + public int compare(Item o1, Item o2) { + if(o1.price>o2.price){ + return -1; + } else if(o1.price(finalList.subList(0, 5)); + return finalList; + } + + @Override + public void cleanup() { + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("origin", "requestId", "finalResult")); + } +} diff --git a/src/search/QueriesSpout.java b/src/search/QueriesSpout.java new file mode 100644 index 0000000..466550d --- /dev/null +++ b/src/search/QueriesSpout.java @@ -0,0 +1,132 @@ +package search; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.util.LinkedList; +import java.util.Map; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +import org.apache.http.HttpEntity; +import org.apache.http.HttpResponse; +import org.apache.http.client.ClientProtocolException; +import org.apache.http.client.HttpClient; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.conn.ClientConnectionManager; +import org.apache.http.impl.client.DefaultHttpClient; +import org.apache.http.impl.conn.SingleClientConnManager; + + +import search.model.ItemsDao; +import backtype.storm.spout.SpoutOutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.IRichSpout; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.tuple.Fields; +import backtype.storm.tuple.Values; + +public class QueriesSpout implements IRichSpout { + + Map conf; + TopologyContext context; + SpoutOutputCollector collector; + String server; + int max; + HttpClient httpclient; + HttpGet httpget; + + /** + * Open a thread for each processed server. + */ + @Override + public void open(Map conf, TopologyContext context, + SpoutOutputCollector collector) { + this.conf= conf; + this.context= context; + this.collector= collector; + this.server= (String) conf.get("server"); + try{ + this.max= Integer.parseInt((String)conf.get("max")); + } catch(Exception ex){ + this.max= 1; + } + reconnect(); + } + + + private void reconnect() { + httpclient = new DefaultHttpClient(new SingleClientConnManager()); + httpget = new HttpGet("http://"+server+"/?max="+max); + } + + + @Override + public void close() { + } + + int id=0; + + + public static final int TIMEOUT= 1000; + + @Override + public void nextTuple() { + HttpResponse response; + BufferedReader reader= null; + try { + response = httpclient.execute(httpget); + HttpEntity entity = response.getEntity(); + reader= new BufferedReader(new InputStreamReader(entity.getContent())); + String origin= reader.readLine(); + while(true) { + String id= reader.readLine(); + String query= reader.readLine(); + if(id==null || query==null) + break; + else + collector.emit(new Values(origin, id, query)); + } + } catch (Exception e) { + e.printStackTrace(); + reconnect(); + } finally { + if(reader!=null) + try { + reader.close(); + } catch (Exception e) { + e.printStackTrace(); + } + } + + } + + private String getRandomSearchQuery() { + int idx= (int)(ItemsDao.possibleWords.length*Math.random()); + return ItemsDao.possibleWords[idx]; + } + + @Override + public void ack(Object msgId) { + // TODO Auto-generated method stub + + } + + @Override + public void fail(Object msgId) { + // TODO Auto-generated method stub + + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("origin", "requestId", "query")); + } + + @Override + public boolean isDistributed() { + return true; + } + +} diff --git a/src/search/SearchBucketBolt.java b/src/search/SearchBucketBolt.java new file mode 100644 index 0000000..0636aee --- /dev/null +++ b/src/search/SearchBucketBolt.java @@ -0,0 +1,75 @@ +package search; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import search.model.Item; +import search.model.ItemsDao; +import backtype.storm.task.OutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.IRichBolt; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.tuple.Fields; +import backtype.storm.tuple.Tuple; +import backtype.storm.tuple.Values; + + +public class SearchBucketBolt implements IRichBolt { + OutputCollector collector; + Map stormConf; + TopologyContext context; + int currentShard; + int totalShards; + int base_id; + SerializationUtils su; + + @Override + public void prepare(Map stormConf, + TopologyContext context, + OutputCollector collector) { + this.stormConf= stormConf; + this.context= context; + this.collector= collector; + currentShard = context.getThisTaskIndex(); + totalShards = context.getRawTopology().get_bolts().get(context.getThisComponentId()).get_common().get_parallelism_hint(); + System.out.println("SearchBucket Bolt created "+currentShard+" of "+totalShards); + su= new SerializationUtils(); + base_id= 10000*currentShard; + } + + @Override + public void execute(Tuple input) { + // Get request routing information + String origin= input.getString(0); + String requestId= input.getString(1); + String query= input.getString(2); + + // Execute query with local data scope + List results= executeLocalQuery(query); + + //System.out.println("results for "+query+":"+results.size()); + + // Send data to next step: Merger + collector.emit(new Values(origin, requestId, query, su.toByteArray(results))); + } + + + + private List executeLocalQuery(String query) { + ArrayList list= new ArrayList(); + int size= (int)(Math.random()*10); + for (int i = 0; i < size; i++) { + list.add(new Item(base_id++, ItemsDao.getRandomTitle(), Math.random()*1000)); + } + return list; + } + + @Override + public void cleanup() { + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("origin", "requestId", "query", "shardMatches")); + } +} diff --git a/src/search/SearchEngineTopologyStarter.java b/src/search/SearchEngineTopologyStarter.java new file mode 100644 index 0000000..d05583c --- /dev/null +++ b/src/search/SearchEngineTopologyStarter.java @@ -0,0 +1,35 @@ +package search; + +import backtype.storm.Config; +import backtype.storm.LocalCluster; +import backtype.storm.StormSubmitter; +import backtype.storm.topology.TopologyBuilder; +import backtype.storm.tuple.Fields; + +public class SearchEngineTopologyStarter { + public static void main(String[] args) { + TopologyBuilder builder= new TopologyBuilder(); + + builder.setSpout("queries-generator", new QueriesSpout(), 6); + builder.setBolt("queries-processor", new SearchBucketBolt(), 10).allGrouping("queries-generator"); + builder.setBolt("join-sort", new JoinSortBolt(), 10).fieldsGrouping("queries-processor", new Fields("origin", "requestId")); + builder.setBolt("sender", new AnswerBolt(), 10).fieldsGrouping("join-sort", new Fields("origin")); + + Config conf= new Config(); + conf.put("server", "localhost:8081"); + conf.put("max", "100"); + + if(args!=null && args.length > 0) { + conf.setNumWorkers(20); + + try { + StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); + } catch (Exception e) { + e.printStackTrace(); + } + } else { + LocalCluster cluster= new LocalCluster(); + cluster.submitTopology("TheSearchEngine", conf, builder.createTopology()); + } + } +} diff --git a/src/search/SerializationUtils.java b/src/search/SerializationUtils.java new file mode 100644 index 0000000..2465483 --- /dev/null +++ b/src/search/SerializationUtils.java @@ -0,0 +1,71 @@ +package search; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.msgpack.MessagePack; +import org.msgpack.packer.Packer; +import org.msgpack.unpacker.Unpacker; + +import search.model.Item; + + +public class SerializationUtils { + + public byte[] toByteArray(List list) { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + try{ + Packer packer = msgpack.createPacker(out); + packer.write(list.size()); + + for (Item i : list) { + packer.write(i.price); + packer.write(i.name); + packer.write(i.id); + } + return out.toByteArray(); + } catch (Exception ex) { + ex.printStackTrace(); + return null; + } finally { + try { + out.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + } + + MessagePack msgpack = new MessagePack(); + + public List fromByteArray(byte[] binary) { + ByteArrayInputStream in = new ByteArrayInputStream(binary); + try { + Unpacker unpacker = msgpack.createUnpacker(in); + ArrayList list= new ArrayList(); + int size= unpacker.readInt(); + Item i= null; + for (int j = 0; j < size; j++) { + i= new Item(); + i.price= unpacker.readDouble(); + i.name= unpacker.readString(); + i.id= unpacker.readLong(); + list.add(i); + } + return list; + } catch (Exception e) { + e.printStackTrace(); + return null; + } finally { + try { + in.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + } + +} diff --git a/src/search/model/Item.java b/src/search/model/Item.java new file mode 100644 index 0000000..ce31d4e --- /dev/null +++ b/src/search/model/Item.java @@ -0,0 +1,29 @@ +package search.model; + +import java.io.Serializable; + +import org.msgpack.annotation.Message; + +@Message +public class Item implements Serializable { + + public Item() { + + } + + public Item(long id, String title, double price) { + this.id= id; + this.name= title; + this.price= price; + } + + public long id; + public String name; + public double price; + + + @Override + public String toString() { + return "id:"+id+ " title: "+name+ " price:"+price; + } +} diff --git a/src/search/model/ItemsDao.java b/src/search/model/ItemsDao.java new file mode 100644 index 0000000..3f12111 --- /dev/null +++ b/src/search/model/ItemsDao.java @@ -0,0 +1,94 @@ +package search.model; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class ItemsDao { + + public static final long TOTAL_ITEMS= 100; + public static final long BASE_ID= 0; + + int totalShards, currentShard; + + + public static String[] possibleWords= new String[] { + "dvd", "laptop", "usb", "home", "theater", "sound", + "notebook", "17", "15", "14", "ipod", "iphone", + "mac", "pava", "mate", "bombilla", + "aire", "acondicionado", "departamento", + "casa", "cochera", "silla", "sillon", "mesa", "cable", + "cortina", "lavarropa", "lavavajilla", + "televisor", "led", "lcd", "ambientes", + "cuadro", "decoracion", "pintura", "jarron", "escultura", + "ventana", "vidrio", "aluminio", "pvc", + "nokia", "1100", "blackberry", "curve", + "android", "samsung", "galaxy", "sII", "windows", "mobile", + "aeromodelismo", "automodelismo", "bateria", + "motor", "control", "remoto", "alas", "avion", "pilas", + "combustible", "autos", "peugeot", "206", "207", + "307", "308", "407", "408", "fiat", "uno", "palio", + "siena", "linea", "stilo", "idea", + "chevrolet", "corsa", "agile", "aveo", "vecra", + "astra", "cruze", "captiva", "volkswagen", "gol", "trend", + "power", "fox", "suran", "bora", "vento", "passat", "cc", + "tiguan", "touareg", "ford", "fiesta", "ka", "kinetic", + "design", "focus", "mondeo", "ecosport", "kuga" + }; + + + public static int MAX_WORDS= 10; + + + public static String getRandomTitle(){ + int cantWords= (int)(Math.random()*(MAX_WORDS-1))+1; + String res= ""; + for(int i= 0; i< cantWords; i++){ + int ind= (int)(Math.random()*(possibleWords.length-1)); + res+=possibleWords[ind]+" "; + } + return res; + } + + + public ItemsDao(int totalShards, int currentShard){ + this.totalShards= totalShards; + this.currentShard= currentShard; + } + + public Map getItems(){ + int shardSize= (int)(TOTAL_ITEMS/totalShards); + HashMap items= new HashMap(shardSize); + + for(long i=0; i map= dao.getItems(); + + for (Long itemId : map.keySet()) { + System.out.println(map.get(itemId)); + } + } + + + } +} + diff --git a/src/smalltest/AlgoBolt.java b/src/smalltest/AlgoBolt.java new file mode 100644 index 0000000..12bddf0 --- /dev/null +++ b/src/smalltest/AlgoBolt.java @@ -0,0 +1,62 @@ +package smalltest; + +import java.io.ByteArrayInputStream; +import java.io.ObjectInputStream; +import java.util.ArrayList; +import java.util.Map; + +import search.model.Item; +import backtype.storm.task.OutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.IRichBolt; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.tuple.Tuple; + +public class AlgoBolt implements IRichBolt{ + + Map stormConf; + TopologyContext context; + OutputCollector collector; + int currentShard; + int totalShards; + + @Override + public void prepare(Map stormConf, TopologyContext context, + OutputCollector collector) { + this.stormConf= stormConf; + this.context= context; + System.out.println("Preparing bolt! ["+this+"]"); + currentShard = context.getThisTaskIndex(); + totalShards = context.getRawTopology().get_bolts().get(context.getThisComponentId()).get_common().get_parallelism_hint(); + + System.out.println("Loading data for shard ["+currentShard+" of "+totalShards+"]..."); + this.collector= collector; + } + + @Override + public void execute(Tuple input) { + System.out.println("======== Execute: ["+input.getInteger(0)+"] =============="); + try { + ObjectInputStream ois= new ObjectInputStream(new ByteArrayInputStream(input.getBinary(1))); + ArrayList list= (ArrayList)ois.readObject(); + for (int i = 0; i < list.size(); i++) { + System.out.print(list.get(i)); + System.out.print(", "); + } + + } catch (Exception e) { + e.printStackTrace(); + } + } + + @Override + public void cleanup() { + // TODO Auto-generated method stub + + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + } + +} diff --git a/src/smalltest/AlgoSpout.java b/src/smalltest/AlgoSpout.java new file mode 100644 index 0000000..acbea56 --- /dev/null +++ b/src/smalltest/AlgoSpout.java @@ -0,0 +1,94 @@ +package smalltest; + +import java.io.ByteArrayOutputStream; +import java.io.ObjectOutputStream; +import java.util.ArrayList; +import java.util.Map; + +import search.model.Item; +import search.model.ItemsDao; + +import backtype.storm.spout.SpoutOutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.IRichSpout; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.tuple.Fields; +import backtype.storm.tuple.Values; + +public class AlgoSpout implements IRichSpout { + Map conf; + TopologyContext context; + SpoutOutputCollector collector; + + + @Override + public void open(Map conf, TopologyContext context, + SpoutOutputCollector collector) { + this.conf = conf; + this.context = context; + this.collector = collector; + } + + @Override + public void close() { + + } + + int i= 0; + + @Override + public void nextTuple() { + i++; + //collector.emit(new Values(i, "Dario")); + + int size= (int)(Math.random()*10); + + ArrayList list= new ArrayList(); + + for (int i = 0; i < size; i++) { + list.add(new Item(i, ItemsDao.getRandomTitle(), 100)); + } + + System.out.println("List has ["+list.size()+"] elements"); + + try{ + ByteArrayOutputStream baos= new ByteArrayOutputStream(); + ObjectOutputStream oos= new ObjectOutputStream(baos); + oos.writeObject(list); + oos.close(); + baos.close(); + byte[] array= baos.toByteArray(); + System.out.println("Sending...:"+array.length); + + collector.emit(new Values(i, array)); + } catch (Exception ex){ + ex.printStackTrace(); + } + + } + + @Override + public void ack(Object msgId) { + // TODO Auto-generated method stub + + } + + @Override + public void fail(Object msgId) { + // TODO Auto-generated method stub + + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("id", "value")); + + } + + @Override + public boolean isDistributed() { + // TODO Auto-generated method stub + return false; + } + +} diff --git a/src/smalltest/AlgoTopology.java b/src/smalltest/AlgoTopology.java new file mode 100644 index 0000000..8a267f0 --- /dev/null +++ b/src/smalltest/AlgoTopology.java @@ -0,0 +1,25 @@ +package smalltest; + +import java.util.HashMap; + +import backtype.storm.LocalCluster; +import backtype.storm.topology.TopologyBuilder; +import backtype.storm.tuple.Fields; + +public class AlgoTopology { + public static void main(String[] args) throws InterruptedException { + TopologyBuilder builder= new TopologyBuilder(); + builder.setSpout("MainSpout", new AlgoSpout(), 2); + builder.setBolt("MainBolt", new AlgoBolt(), 7).fieldsGrouping("MainSpout", new Fields("id")); + builder.setBolt("MainBolt2", new AlgoBolt(), 3).fieldsGrouping("MainSpout", new Fields("id")); + + + LocalCluster cluster= new LocalCluster(); + cluster.submitTopology("TheTopology", new HashMap(), builder.createTopology()); + + Thread.sleep(10000); + + cluster.shutdown(); + + } +} diff --git a/stop_server.sh b/stop_server.sh new file mode 100755 index 0000000..f68ab95 --- /dev/null +++ b/stop_server.sh @@ -0,0 +1,4 @@ +#!/bin/bash +echo Stopping... +pkill -9 node +echo DONE! diff --git a/test-drpc-server.sh b/test-drpc-server.sh new file mode 100755 index 0000000..8dd0b74 --- /dev/null +++ b/test-drpc-server.sh @@ -0,0 +1,23 @@ +curl localhost:8080/buick & +curl localhost:8080/gps & +curl localhost:8080/ipod & +curl localhost:8081/ +curl localhost:8081/ +curl localhost:8081/ +curl -X POST --data "Very nice buick USD1000" localhost:8082/?id=0 +curl -X POST --data "Very precise GPS, USD100" localhost:8082/?id=1 +curl -X POST --data "Quite useful ipod, USD150" localhost:8082/?id=2 +curl localhost:8081/ & +sleep 1 +curl localhost:8080/nokia & +sleep 1 +curl -X POST --data "Windows mobile nokia.., USD150" localhost:8082/?id=3 +curl localhost:8080/led & +curl localhost:8080/tv & +curl localhost:8080/dvd & +echo "" +sleep 1 +curl localhost:8081/?max=20 +curl -X POST --data "Led TV Full HD" localhost:8082/?id=4 +curl -X POST --data "CRT TV" localhost:8082/?id=5 +curl -X POST --data "DVD HD UpScaling" localhost:8082/?id=6