Skip to content

Commit 9449050

Browse files
committedDec 24, 2010
initial changes to implement version 0.9.1 of the AMQP specification
1 parent ca1c1b3 commit 9449050

7 files changed

+2941
-3961
lines changed
 

‎README.md

+13-3
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
IMPORTANT: This module only works with node v0.1.90 and later.
44

55
This is a client for RabbitMQ (and maybe other servers?). It partially
6-
implements the 0.8 version of the AMQP protocol.
6+
implements the 0.9.1 version of the AMQP protocol.
77

88

99
## Synopsis
@@ -35,7 +35,7 @@ An example of connecting to a server and listening on a queue.
3535
## Connection
3636

3737
`new amqp.Connection()` Instantiates a new connection. Use
38-
`connection.connect()` to connect to a server.
38+
`connection.connect()` to connect to a server.
3939

4040
`amqp.createConnection()` returns an instance of `amqp.Connection`, which is
4141
a subclass of `net.Stream`. All the event and methods which work on
@@ -145,11 +145,21 @@ bound it will not receive any messages.
145145

146146
If the `exchange` argument is left out `'amq.topic'` will be used.
147147

148+
### queue.bind_headers([exchange,] routing)
149+
150+
This method binds a queue to an exchange. Until a queue is
151+
bound it will not receive any messages.
152+
153+
This method is to be used on an "headers"-type exchange. The routing
154+
argument must contain the routing keys and the `x-match` value (`all` or `any`).
155+
156+
If the `exchange` argument is left out `'amq.headers'` will be used.
157+
148158

149159
### queue.destroy(options)
150160

151161
Delete the queue. Without options, the queue will be deleted even if it has
152-
pending messages or attached consumers. If +options.ifUnused+ is true, then
162+
pending messages or attached consumers. If +options.ifUnused+ is true, then
153163
the queue will only be deleted if there are no consumers. If
154164
+options.ifEmpty+ is true, the queue will only be deleted if it has no
155165
messages.

‎amqp-0-9-1.xml

+2,843
Large diffs are not rendered by default.

‎amqp-0.8.xml

-3,908
This file was deleted.

‎amqp-definitions-0-8.js

-2
This file was deleted.

‎amqp-definitions-0-9-1.js

+2
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

‎amqp.js

+81-46
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
var events = require('events'),
22
sys = require('sys'),
33
net = require('net'),
4-
protocol = require('./amqp-definitions-0-8'),
4+
protocol,
55
Buffer = require('buffer').Buffer,
66
Promise = require('./promise').Promise;
77

@@ -80,32 +80,6 @@ var methods = {};
8080
// classes keyed on their index
8181
var classes = {};
8282

83-
(function () { // anon scope for init
84-
//debug("initializing amqp methods...");
85-
for (var i = 0; i < protocol.classes.length; i++) {
86-
var classInfo = protocol.classes[i];
87-
classes[classInfo.index] = classInfo;
88-
for (var j = 0; j < classInfo.methods.length; j++) {
89-
var methodInfo = classInfo.methods[j];
90-
91-
var name = classInfo.name
92-
+ methodInfo.name[0].toUpperCase()
93-
+ methodInfo.name.slice(1);
94-
//debug(name);
95-
96-
var method = { name: name
97-
, fields: methodInfo.fields
98-
, methodIndex: methodInfo.index
99-
, classIndex: classInfo.index
100-
};
101-
102-
if (!methodTable[classInfo.index]) methodTable[classInfo.index] = {};
103-
methodTable[classInfo.index][methodInfo.index] = method;
104-
methods[name] = method;
105-
}
106-
}
107-
})(); // end anon scope
108-
10983

11084

11185
// parser
@@ -117,7 +91,7 @@ var maxFrameBuffer = 131072; // same as rabbitmq
11791
// An interruptible AMQP parser.
11892
//
11993
// type is either 'server' or 'client'
120-
// version is '0-8' or '0-9-1'. Currently only supporting '0-8'.
94+
// version is '0-9-1'.
12195
//
12296
// Instances of this class have several callbacks
12397
// - onMethod(channel, method, args);
@@ -132,7 +106,35 @@ function AMQPParser (version, type) {
132106
this.isClient = (type == 'client');
133107
this.state = this.isClient ? 'frameHeader' : 'protocolHeader';
134108

135-
if (version != '0-8') throw new Error("Unsupported protocol version");
109+
if (version != '0-9-1') throw new Error("Unsupported protocol version");
110+
111+
protocol = require('./amqp-definitions-'+version);
112+
113+
(function () { // anon scope for init
114+
//debug("initializing amqp methods...");
115+
for (var i = 0; i < protocol.classes.length; i++) {
116+
var classInfo = protocol.classes[i];
117+
classes[classInfo.index] = classInfo;
118+
for (var j = 0; j < classInfo.methods.length; j++) {
119+
var methodInfo = classInfo.methods[j];
120+
121+
var name = classInfo.name
122+
+ methodInfo.name[0].toUpperCase()
123+
+ methodInfo.name.slice(1);
124+
//debug(name);
125+
126+
var method = { name: name
127+
, fields: methodInfo.fields
128+
, methodIndex: methodInfo.index
129+
, classIndex: classInfo.index
130+
};
131+
132+
if (!methodTable[classInfo.index]) methodTable[classInfo.index] = {};
133+
methodTable[classInfo.index][methodInfo.index] = method;
134+
methods[name] = method;
135+
}
136+
}
137+
})(); // end anon scope
136138

137139
this.frameHeader = new Buffer(7);
138140
this.frameHeader.used = 0;
@@ -619,7 +621,7 @@ function serializeFields (buffer, fields, args, strict) {
619621
buffer[buffer.used++] = bitField;
620622
bitField = 0;
621623
bitIndex = 0;
622-
}
624+
}
623625
break;
624626

625627
case 'octet':
@@ -692,7 +694,7 @@ function Connection (options) {
692694
self.queues = {};
693695
self.exchanges = {};
694696

695-
parser = new AMQPParser('0-8', 'client');
697+
parser = new AMQPParser('0-9-1', 'client');
696698

697699
parser.onMethod = function (channel, method, args) {
698700
self._onMethod(channel, method, args);
@@ -723,7 +725,7 @@ function Connection (options) {
723725
//debug("connected...");
724726
// Time to start the AMQP 7-way connection initialization handshake!
725727
// 1. The client sends the server a version string
726-
self.write("AMQP" + String.fromCharCode(1,1,8,0));
728+
self.write("AMQP" + String.fromCharCode(0,0,9,1));
727729
state = 'handshake';
728730
});
729731

@@ -791,8 +793,8 @@ Connection.prototype._onMethod = function (channel, method, args) {
791793
// 2. The server responds, after the version string, with the
792794
// 'connectionStart' method (contains various useless information)
793795
case methods.connectionStart:
794-
// We check that they're serving us AMQP 0-8
795-
if (args.versionMajor != 8 && args.versionMinor != 0) {
796+
// We check that they're serving us AMQP 0-9
797+
if (args.versionMajor != 0 && args.versionMinor != 9) {
796798
this.end();
797799
this.emit('error', new Error("Bad server version"));
798800
return;
@@ -825,8 +827,10 @@ Connection.prototype._onMethod = function (channel, method, args) {
825827
// 6. Then we have to send a connectionOpen request
826828
this._sendMethod(0, methods.connectionOpen,
827829
{ virtualHost: this.options.vhost
828-
, capabilities: ''
829-
, insist: true
830+
// , capabilities: ''
831+
// , insist: true
832+
, reserved1: ''
833+
, reserved2: true
830834
});
831835
break;
832836

@@ -1140,7 +1144,7 @@ function Channel (connection, channel) {
11401144
this.connection = connection;
11411145
this._tasks = [];
11421146

1143-
this.connection._sendMethod(channel, methods.channelOpen, {outOfBand: ""});
1147+
this.connection._sendMethod(channel, methods.channelOpen, {reserved1: ""});
11441148
}
11451149
sys.inherits(Channel, events.EventEmitter);
11461150

@@ -1212,13 +1216,13 @@ Queue.prototype.subscribeRaw = function (/* options, messageListener */) {
12121216

12131217
return this._taskPush(methods.basicConsumeOk, function () {
12141218
self.connection._sendMethod(self.channel, methods.basicConsume,
1215-
{ ticket: 0
1219+
{ reserved1: 0
12161220
, queue: self.name
12171221
, consumerTag: "."
12181222
, noLocal: options.noLocal ? true : false
12191223
, noAck: options.noAck ? true : false
12201224
, exclusive: options.exclusive ? true : false
1221-
, nowait: false
1225+
, noWait: false
12221226
, "arguments": {}
12231227
});
12241228
});
@@ -1306,31 +1310,62 @@ Queue.prototype.bind = function (/* [exchange,] routingKey */) {
13061310
// The first argument, exchange is optional.
13071311
// If not supplied the connection will use the default 'amq.topic'
13081312
// exchange.
1309-
1313+
13101314
var exchange, routingKey;
13111315

13121316
if (arguments.length == 2) {
13131317
exchange = arguments[0];
13141318
routingKey = arguments[1];
13151319
} else {
1316-
exchange = 'amq.topic';
1320+
exchange = 'amq.topic';
13171321
routingKey = arguments[0];
13181322
}
13191323

13201324

13211325
return this._taskPush(methods.queueBindOk, function () {
13221326
var exchangeName = exchange instanceof Exchange ? exchange.name : exchange;
13231327
self.connection._sendMethod(self.channel, methods.queueBind,
1324-
{ ticket: 0
1328+
{ reserved1: 0
13251329
, queue: self.name
13261330
, exchange: exchangeName
13271331
, routingKey: routingKey
1328-
, nowait: false
1332+
, noWait: false
13291333
, "arguments": {}
13301334
});
13311335
});
13321336
};
13331337

1338+
Queue.prototype.bind_headers = function (/* [exchange,] matchingPairs */) {
1339+
var self = this;
1340+
1341+
// The first argument, exchange is optional.
1342+
// If not supplied the connection will use the default 'amq.headers'
1343+
// exchange.
1344+
1345+
var exchange, matchingPairs;
1346+
1347+
if (arguments.length == 2) {
1348+
exchange = arguments[0];
1349+
matchingPairs = arguments[1];
1350+
} else {
1351+
exchange = 'amq.headers';
1352+
matchingPairs = arguments[0];
1353+
}
1354+
1355+
1356+
return this._taskPush(methods.queueBindOk, function () {
1357+
var exchangeName = exchange instanceof Exchange ? exchange.name : exchange;
1358+
self.connection._sendMethod(self.channel, methods.queueBind,
1359+
{ reserved1: 0
1360+
, queue: self.name
1361+
, exchange: exchangeName
1362+
, routingKey: ''
1363+
, noWait: false
1364+
, "arguments": matchingPairs
1365+
});
1366+
});
1367+
};
1368+
13341369

13351370
Queue.prototype.destroy = function (options) {
13361371
var self = this;
@@ -1354,13 +1389,13 @@ Queue.prototype._onMethod = function (channel, method, args) {
13541389
switch (method) {
13551390
case methods.channelOpenOk:
13561391
this.connection._sendMethod(channel, methods.queueDeclare,
1357-
{ ticket: 0
1392+
{ reserved1: 0
13581393
, queue: this.name
13591394
, passive: this.options.passive ? true : false
13601395
, durable: this.options.durable ? true : false
13611396
, exclusive: this.options.exclusive ? true : false
13621397
, autoDelete: this.options.autoDelete ? true : false
1363-
, nowait: false
1398+
, noWait: false
13641399
, "arguments": {}
13651400
});
13661401
this.state = "declare queue";
@@ -1516,7 +1551,7 @@ Exchange.prototype.publish = function (routingKey, data, options) {
15161551
var self = this;
15171552
return this._taskPush(null, function () {
15181553
self.connection._sendMethod(self.channel, methods.basicPublish,
1519-
{ ticket: 0
1554+
{ reserved1: 0
15201555
, exchange: self.name
15211556
, routingKey: routingKey
15221557
, mandatory: options.mandatory ? true : false

‎package.json

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{ "name" : "amqp"
2-
, "version" : "0.0.2"
2+
, "version" : "0.0.3"
33
, "description" : "amqp bindings for RabbitMQ"
4-
, "author": "Ryan Dahl"
4+
, "author": "Ryan Dahl, Stephane Alnet"
55
, "main": "./amqp"
66
}

0 commit comments

Comments
 (0)
Please sign in to comment.