This module implements an asynchronous job queueing system with the WorkingQueue
class. Jobs are processed concurrently up to a given degree of "concurrency" ; of course, if this degree is 1, then jobs are processed sequentially.
The basic use case of this module is when you want to perform a bunch (hundreds or thousands) of I/O related tasks, for instance HTTP requests. In order to play nice with others and make sure the I/O stack (file descriptors, TCP/IP stack etc.) won't be submerged by thousands of concurrent requests, you have to put an artificial limit on the requests that are launched concurrently. When this limit is reached, jobs have to be queued until some other job is finished. And that's exactly what WorkingQueue
is for !
capisce
is published as a npm module, so all you have to do to use the latest official release of npm is :
npm install capisce
Or list capisce
as a dependency in your package.json
file and let npm perform its magic.
This class is instanciated with a single parameter : the concurrency
level of the queue. Again, if the concurrency level is 1, then it means that all jobs will be processed sequentially.
var WorkingQueue = require('capisce').WorkingQueue;
var queue = new WorkingQueue(16);
You can then launch jobs using the perform()
method. If the current concurrency limit has not been reached, then the job will be scheduled immediatly. Otherwise, it is queued for later execution.
Jobs are simple functions that are passed a very important parameter : the over()
function. The job MUST call the over function at the end of its process to signal the WorkingQueue
that it is, well, over.
queue.perform(function(over) {
console.log("Hello, world !");
over();
});
The over()
function can be passed around inside your job. In fact it's the only way to perform interesting things : since I/O are asynchronous, you have to call over once the I/O request is over, that is to say in an event handler or completion callback.
var fs = require('fs');
queue.perform(function(over) {
console.log("Reading file...");
fs.readFile('README.md', function(err, result) {
console.log("Over !");
if(err) {
console.error(err);
} else {
stdout.write(result);
}
over();
});
});
Of course you can name the over function any way you want. Other similar libraries like to call it done
.
Before capisce 0.4.0, jobs where function that could only take one parameter, the over()
function. This forced any job parameters to be passed through the closure mechanism, which may have undesirable memory or performance downsides.
From capisce 0.4.0, you can pass additional arguments to the perform()
call and they will be passed right along to your job, before the over function. Internally the data stored in the queue is [job, arg1, arg2...]
so no surprises regarding memory usage.
Here is a sample of parameter passing :
// Note how the over function is passed as the last parameter
function myJob(word1, word2, over) {
console.log('' + word1 + ', ' + word2 + ' !');
over();
}
queue.perform(myJob, 'Hello', 'world');
queue.perform(myJob, 'Howdy', 'pardner');
As a shortcut, from capisce 0.6.0, you can process all elements in collections using processList
and processObject
:
var queue = new WorkingQueue(16);
var result = 0;
var list = [1, 1, 2, 3, 5, 8, 13];
function adder(index, element, k, over) {
if(index % 2 == 0) {
result += element + k;
}
over();
}
queue.processList(adder, list, 5);
queue.onceDone(function() {
assert.equal((1 + 5) + (2 + 5) + (5 + 5) + (13 + 5), result);
done();
});
var queue = new WorkingQueue(16);
var list = {'Nicolas':30, 'LoĂŻc':3, 'Lachlan':0.75};
function Mean() {
var total = 0.0;
var count = 0;
this.process = function(key, value, f, over) {
total += value + f(value);
count += 1;
over();
};
this.result = function() {
return total / count;
};
}
var mean = new Mean();
queue.processObject(mean.process, list, function(value) { return value + 5; });
queue.onceDone(function() {
assert.equal(((30 + 30 + 5) + (3 + 3 + 5) + (0.75 + 0.75 + 5)) / 3, mean.result());
done();
});
When queuing a bunch of jobs, it is often required to wait for all jobs to complete before continuing a process. For that you use the onceDone()
method :
function myJob(name, over) {
var duration = Math.floor(Math.random() * 1000);
console.log("Starting "+name+" for "+duration+"ms");
setTimeout(function() {
console.log(name + " over, duration="+duration+"ms");
over();
}, duration);
}
var i;
for(i=0;i<1000;i++) {
queue.perform(myJob, "job-"+i);
}
queue.onceDone(function() {
console.log("All done !");
});
The onceDone()
method can be called multiple times to register multiple handlers, and the handlers will be called in the same order they were added. Maybe I should have used the EventEmitter
pattern for this.
From 0.4.1 the situation when onceDone
callbacks are called is more precise.
The onceDone
callbacks will be called :
- when the last job from the queue is over
- when you call
doneAddingJobs()
and no job was performed since the lastonceDone
situation
This is required to handle the case when a queue may or may not receive jobs, and you want cleanup callbacks to be called in both situations. In this case you would do :
function myJob(name, over) {
var duration = Math.floor(Math.random() * 1000);
console.log("Starting "+name+" for "+duration+"ms");
setTimeout(function() {
console.log(name + " over, duration="+duration+"ms");
over();
}, duration);
}
var i;
// In some cases you can have no queued jobs
for(i=0;i<Math.random(10);i++) {
queue.perform(myJob, "job-"+i);
}
queue.onceDone(function() {
console.log("All done with "+i+" jobs !");
});
// This does nothing if jobs where added to the queue
// If no jobs where added, the onceDone callbacks are called
queue.doneAddingJobs();
Calling doneAddingJobs()
is not mandatory : it's just needed if you want to make sure the onceDone
callbacks are called even if no job was effectively done.
As the name implies, onceDone
callbacks are called only once, and forgotten immediatly after. This is an important change in capisce 0.5.0 ; previously those callbacks where kept and repeatedly called whenever the done situation was encountered. This created a lot of opportunities for memory leaks. If you want to reproduce the same behaviour as before, re-register your callback using onceDone
from within your callback, capisce supports that.
Since capisce 0.2.0, if you want to fill in the queue first, then launch jobs later, you can use the hold and go methods :
queue.hold(); // Hold queue processing
for(i=0;i<1000;i++) {
queue.perform(myJob, "job-"+i);
}
queue.go(); // Resume queue processing
Since capisce 0.3.0, you can use the wait()
method as a convenient way to include delays into job execution.
var queue = new WorkingQueue(1); // Basically, a sequence
queue.perform(function(over) {
console.log("Waiting 5 seconds...");
over();
}).wait(5000).perform(function(over) {
console.log("done !");
over();
});
Of course the above example would be useless with some concurrency in the queue. If you want concurrency, you can pass a job parameter to wait()
:
var queue = new WorkingQueue(16);
queue.perform(function(over) {
console.log("First job done");
over();
}).wait(5000, function(over) {
console.log("Second job started after 5 seconds");
over();
});
This is just a wrapper around WorkingQueue
that do the very common task of collecting result of each job. When using CollectingWorkingQueue
, the over function takes the err, result
of the job as parameters, and the wellDone
handler receive the array of job results (as [jobId, err, result]
sub-arrays). It is your choice to sort this array if you want to have results in the same orders the jobs where submitted.
Note : before version 0.4.5, the sample below had a bug.
var queue2 = new CollectingWorkingQueue(16);
function myJob(name) {
return function(over) {
var duration = Math.floor(Math.random() * 1000);
console.log("Starting "+name+" for "+duration+"ms");
setTimeout(function() {
console.log(name + " over, duration="+duration+"ms");
over(null, "result-"+name);
}, duration);
};
}
var i;
for(i=0;i<1000;i++) {
queue.perform(myJob("job-"+i));
}
queue.onceDone(function(results) {
console.log("All done !");
console.log("Before sorting : ")
console.log(results[0]);
console.log(results[999]);
results.sort()
console.log("After sorting : ")
console.log(results[0]);
console.log(results[999]);
});
Since capisce 0.4.5, you can pass parameters to jobs, just like you would do with a standard WorkingQueue
. Once again, this saves you from using function builders (however there are closure built behind the scene).
var queue2 = new CollectingWorkingQueue(16);
function myJob(name, over) {
var duration = Math.floor(Math.random() * 1000);
console.log("Starting "+name+" for "+duration+"ms");
setTimeout(function() {
console.log(name + " over, duration="+duration+"ms");
over(null, "result-"+name);
}, duration);
}
var i;
for(i=0;i<1000;i++) {
queue.perform(myJob, "job-"+i);
}
queue.onceDone(function(results) {
console.log("All done !");
console.log("Before sorting : ")
console.log(results[0]);
console.log(results[999]);
results.sort()
console.log("After sorting : ")
console.log(results[0]);
console.log(results[999]);
});
Also since capisce 0.4.5, you can call CollectingWorkingQueue.hold()
and CollectingWorkingQueue.go()
just like with WorkingQueue
.
capisce.sequence()
can be used as a shorcut :
// Those three block codes are equivalent :
// Basic version
var queue = new capisce.WorkingQueue(1);
queue.perform(job1);
queue.perform(job2);
queue.perform(job3);
// Using capisce.sequence() :
capisce.sequence().perform(job1).then(job2).then(job3);
// capisce.sequence() accepts a job as parameter :
capisce.sequence(job1).then(job2).then(job3);
// capisce.sequence() also accepts job parameters.
// By the way, perform is good to use, too
capisce.sequence(job1, param1).then(job2).perform(job3, param2);
For now, then()
doesn't accept job parameters like perform()
. This is due to a feature that I'd rather remove in the near future, that allows then()
to create concurrent blocks within a sequence.
- 0.6.2 (2012-11-02) : Fixed a bug that broke
onceDone
semantics when callingprocessList
orprocessObject
on empty lists / objects. - 0.6.1 (2012-10-29) :
processList
andprocessDictionary
support extra callback arguments. - 0.6.0 (2012-10-29) : Added
processList
andprocessDictionary
. - 0.5.0 (2012-10-29) :
whenDone
is deprecated, useonceDone
instead.onceDone
callbacks are called only once. - 0.4.5 (2012-08-28) :
CollectingWorkingQueue.perform()
now accepts parameters for the job, just likeWorkingQueue.perform()
. AddedWorkingQueue.hold()
andWorkingQueue.go()
. Fixed a bug wherein the (optional) job passed tosequence()
was not scheduled. - 0.4.4 (2012-08-28) : wrote proper unit tests using mocha (
npm test
to launch them). - 0.4.3 (2012-05-03) : with the help of @penartur, fixed a problem where a single worker was launched after a
WorkingQueue.hold()
/WorkingQueue.go()
sequence. - 0.4.2 (2012-03-16) : fixed a problem with
WorkingQueue.whenDone()
. - 0.4.1 (2012-03-15) : clarified behavior of
WorkingQueue.whenDone()
and addedWorkingQueue.doneAddingJobs()
- 0.4.0 (2012-02-15) :
WorkingQueue.perform()
now accepts extra parameters that are passed to the job when it is scheduled. - 0.3.1 (2012-02-12) : new behavior for
WorkingQueue.whenDone()
, not so satisfying. - 0.3.0 (2012-02-10) : Added the
WorkingQueue.wait()
method. - 0.2.0 (2012-02-10) : Added
WorkingQueue.hold()
andWorkingQueue.go()
methods. - 0.1.0 (2012-02-09) : Initial version.