Skip to content

Commit

Permalink
Initial Commit
Browse files Browse the repository at this point in the history
  • Loading branch information
ldsimonassi committed Jan 21, 2012
0 parents commit b974ca6
Show file tree
Hide file tree
Showing 20 changed files with 1,147 additions and 0 deletions.
5 changes: 5 additions & 0 deletions README
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Storm Use Case
----- --- ----

Node.js DRPC Server + Map Reduce Search Engine Topology

23 changes: 23 additions & 0 deletions darioarquitectura.pem
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
-----BEGIN RSA PRIVATE KEY-----
MIIEogIBAAKCAQEAwIqONo9EOvQ/NPBv9AHeO5YyQcSQCzNcJH6EVwvk9Y5bqg9NFeAZ6CrtSX5O
D/IhCv2HGs9wV2EBrualFJ1XVnVxu+MxSHCWZFrHDJM56fHg+qLfciTdCxATW/7qP27K6BDpcHnD
+y7ZqjrwyaX+n7irUai4gbQ9eKZwOl0k5smBFN4+ni0Aw6ZKHhgpExW9OA+U6hpc0zliL8lXQFPE
ocjXAF2E2FXoVXMzvJuoXnIc8+Hj9iCOg7ERVw0nT+ZqaThCaqFk1XkoUdTn9hAk5F9Ja9CESIL/
oCp/ympO2LrWXzUh0+BGMDFN59vU/UC8Kfow4lPxXSZIZbYjhngsbwIDAQABAoIBADuAVhScT6yc
YJAeoapZjXECqINUmGMtuxL4GjPAVEBifwdd9SeGetsZsPzeUIdy/y0jTkZFxuTp8Jg5ZNirCxuH
7d7vhgJp8MAQoaTMNsCKZElwXfcrVzgc/q6WZ5O0zHXNDCcbKiqMvu1xi4n1h2uaqS7yIhrLNZf8
b4BJ2qaLI+PeKnI4ZqoldCp7jMiKAd6CjaEkwpBALwDPk4RqH9N2ISiWNIHBvZtNwbJAcHM0M4Tq
82RlDpUb+9GcW74r0y5FJex7g7VM1j+gRTYcOTM3zuDRqj/gui0lYHjwAaugUpapfc5N1qNM1AT1
rl39Z6NODCmpyDGM2lOsWsndPHECgYEA4AVaXjRpY2uS6eLE8tvaovRvieBin5KpVqWpIcQI2DQA
6odubRyvPsIqqjUw6AEqRa42cojelTTYwIEaAoEi2Dsof04aGR/WdhRKmpO8OmuYPmAlegKEhMVN
StjvFdD23PsZH+Q9sazIEfzn6yMEhLM870UgMbd5Ttlwdhnup/cCgYEA3AbOLw4TIo0Zc6xLEXUL
Kom/Qy0uUdybogHlZyyDS50xzVFxEdfcUKbSAs+G7hNT6mufOpO3Qvwr6xaJ2OWsW/2YYvZ9J+1/
dhsGtU5jjDv3Q5Bu0mJ49jlUSAnbqLkRfkOV3Wikrdrf66sfwgPZbSMqufBZdLMmQhU02P3cMUkC
gYBf7erkrEz05fvja9gqpzrYzRN2Vz/kVUlucUIb03Z2Hs7Fn3kKAF6K4VqjyGNI9jbD3/Yw1at6
+UZYKPCaYfIp5itRWICUga20orvPtbPE0I5BJ6rktG9K67JNetfm37TWrC/2GCbTDsod6c7mQfiN
WrOdQlym7Ypk2XfvGuu3wQKBgEO2kHOoyDjE5cVUi9G2jJYtyD/bQrsMwpTMMpZa+5kkqnP+kWal
YPctL8qPpX3VUuj88Abt+ONTigySZh/rJu00kVY7d273R1fIn3riwf4hYkpXw9NZXNKh+A8ngYNe
WUTbdd6q2qtqhakYg/CIkLxmqzqH/m/MxoRl1FrHXaGJAoGAFK0BSX+PcNlFO/FAtX27FyN7DZ7z
Ze2AUFva37QiOdHse7jzp4+/tyybJnWkiFBcsRjlyvnBcSBGERXCsp92BotIEbommQzzLxcIXJl5
8YUk9tbmhMGFq+3AvFPkCTRDkw6+YrGfPd0kaa9yHXndxJZR+94gK4UN/JeHRbCqq0I=
-----END RSA PRIVATE KEY-----
6 changes: 6 additions & 0 deletions deploy.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
scp -i darioarquitectura.pem node-drpc-server.js [email protected]:./
scp -i darioarquitectura.pem node-drpc-server.js [email protected]:./
scp -i darioarquitectura.pem node-drpc-server.js [email protected]:./
scp -i darioarquitectura.pem *.sh [email protected]:./
scp -i darioarquitectura.pem *.sh [email protected]:./
scp -i darioarquitectura.pem *.sh [email protected]:./
156 changes: 156 additions & 0 deletions node-drpc-server.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
/**
********************************
* Node.JS -> Storm DRPC Server *
********************************
*
* This server listens in 3 ports
* ------------------------------
*
* Port 8080 Synchronous task execution requests.
* Port 8081 Pull pending tasks.
* Port 8082 Push ready tasks results.
*
**/

var http = require('http');
var parser = require('url');
var os = require('os');

/**
* Add the FIFO functionality to Array class.
**/
Array.prototype.store = function (info) {
this.push(info);
}

Array.prototype.fetch = function() {
if (this.length <= 0) { return ''; } // nothing in array
return this.shift();
}

Array.prototype.display = function() {
return this.join('\n');
}

// Metrics
var total_active_tasks = 0;
var total_requests_made = 0;
var total_requests_answered = 0;

// Tasks FIFO
var pending_tasks = new Array();

// Waiting workers FIFO
var waiting_workers = new Array();

// Current active tasks (Assigned to a worker).
var active_tasks = {};

// This RPC Server Request ID
var global_task_id = 0;

// My IP, to be sent in the origin
var local_ip= null;

function get_local_ip() {
if(local_ip==null) {
var interfaces= os.networkInterfaces();
for(var interf_name in interfaces) {
var addresses= interfaces[interf_name];
for(var addr_name in addresses) {
var addr= addresses[addr_name];
if(addr.family=="IPv4" && !addr.internal && (/en\d/.test(interf_name) || /eth\d/.test(interf_name))) {
local_ip= addr.address;
return local_ip;
}
}
}
}
return local_ip;
}

// If there is a task for a worker, make them meet each other!
function check_queues() {
if(waiting_workers.length > 0 && pending_tasks.length > 0) {
var worker = waiting_workers.fetch();
var max= worker.max;
var send = get_local_ip() +"\n";

for(var i=0; (i<max) && (pending_tasks.length > 0);i++){
var task = pending_tasks.fetch();
send = send + task.id +"\n";
send = send + task.query +"\n";
active_tasks[task.id] = task;
total_active_tasks = total_active_tasks + 1;
}
worker.response.end(send);
}
}

// Server to be used to receive search querys (tasks) and answer them in a synchronous way.
http.createServer(function (request, response) {
var query= request.url;
if(query=="/isAlive")
response.end("YES!");
else {
var task_entry = { "id":global_task_id, "query": query, "response": response };
global_task_id = global_task_id+1;
pending_tasks.store(task_entry);
total_requests_made++;
check_queues();
}
}).listen(8080);

// Server to be used for requesting pending tasks
http.createServer(function (request, response) {
var parsed_url= parser.parse(request.url, true);
var max= parsed_url.query.max;
if(max==null || typeof(max)=="undefined")
max=1;
var waiter = { "request": request, "response": response, "max": max };
waiting_workers.store(waiter);
check_queues();
}).listen(8081);

// Response receiver server
http.createServer(function (request, response) {
if(request.method=="POST") {
var parsed_url= parser.parse(request.url, true);
var id= parsed_url.query.id;
if(id==null || typeof(id)=="undefined" ||
active_tasks[id]==null || typeof(active_tasks[id])=="undefined") {
response.writeHead(404);
response.end("Error ["+id+"] is not a waiting task in this server");
} else {
var data='';
request.on("data", function(chunk) {
data += chunk;
});

request.on("end", function() {
active_tasks[id].response.end(data);
delete active_tasks[id]
total_active_tasks = total_active_tasks - 1;
response.end("OK!\n");
total_requests_answered++;
});
}
}
}).listen(8082);

// Log status information each 10 seconds interval.
setInterval(function () {
var d= new Date();
console.log("****************************");
console.log("* Date: "+ d.getFullYear() +"/"+(d.getMonth()+1)+"/"+d.getDate()+" "+d.getHours()+":"+d.getMinutes()+":"+d.getSeconds());
console.log("* Local IP: "+local_ip);
console.log("* Total requests made: "+ total_requests_made);
console.log("* Total requests answered: "+ total_requests_answered);
console.log("* Waiting workers: "+ waiting_workers.length);
console.log("* Pending tasks: "+ pending_tasks.length);
console.log("* Active threads: "+ total_active_tasks);
for(var task in active_tasks) {
console.log(task);
}
console.log("****************************");
}, 10000) ;
100 changes: 100 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>storm.search</groupId>
<artifactId>storm-search</artifactId>
<version>0.0.1</version>
<packaging>jar</packaging>

<name>storm-search</name>
<url>https://github.com/ldsimonassi/storm-search</url>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>

<repositories>
<repository>
<id>github-releases</id>
<url>http://oss.sonatype.org/content/repositories/github-releases/</url>
</repository>
<repository>
<id>clojars.org</id>
<url>http://clojars.org/repo</url>
</repository>
</repositories>

<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>storm</groupId>
<artifactId>storm</artifactId>
<version>0.6.2</version>
<!-- keep storm out of the jar-with-dependencies -->
<scope>provided</scope>
</dependency>

<dependency>
<groupId>com.googlecode.json-simple</groupId>
<artifactId>json-simple</artifactId>
<version>1.1</version>
</dependency>

<dependency>
<groupId>org.msgpack</groupId>
<artifactId>msgpack</artifactId>
<version>0.6.5</version>
</dependency>

</dependencies>

<build>
<sourceDirectory>src/</sourceDirectory>
<plugins>
<!--
bind the maven-assembly-plugin to the package phase
this will create a jar file without the storm dependencies
suitable for deployment to a cluster.
-->
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass></mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>

</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.6</source>
<target>1.6</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
7 changes: 7 additions & 0 deletions restart_all.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
ssh -i darioarquitectura.pem [email protected] ./stop_server.sh
ssh -i darioarquitectura.pem [email protected] ./stop_server.sh
ssh -i darioarquitectura.pem [email protected] ./stop_server.sh

ssh -i darioarquitectura.pem [email protected] ./run_server.sh
ssh -i darioarquitectura.pem [email protected] ./run_server.sh
ssh -i darioarquitectura.pem [email protected] ./run_server.sh
4 changes: 4 additions & 0 deletions run_server.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
#!/bin/bash
echo Starting...
nohup node ./node-drpc-server.js 1> server.log 2>server.err &
echo DONE!
Loading

0 comments on commit b974ca6

Please sign in to comment.