diff --git a/README.md b/README.md index 3b029f039..1012c26ba 100644 --- a/README.md +++ b/README.md @@ -95,6 +95,8 @@ __Options__ through this pub/sub adapter. Defaults to `ShareDB.MemoryPubSub()`. * `options.milestoneDb` _(instance of ShareDB.MilestoneDB`)_ Store snapshots of documents at a specified interval of versions +* `options.presence` _boolean_ + Enable presence functionality. Off by default. Note that this feature is not optimized for large numbers of clients and could cause fan-out issues #### Database Adapters * `ShareDB.MemoryDB`, backed by a non-persistent database with no queries @@ -158,6 +160,7 @@ Register a new middleware. the database. * `'receive'`: Received a message from a client * `'reply'`: About to send a non-error reply to a client message + * `'sendPresence'`: About to send presence information to a client * `fn` _(Function(context, callback))_ Call this function at the time specified by `action`. * `context` will always have the following properties: @@ -307,6 +310,20 @@ Get a read-only snapshot of a document at the requested version. } ``` +`connection.getPresence(channel): Presence;` +Get a [`Presence`](#class-sharedbpresence) instance that can be used to subscribe to presence information to other clients, and create instances of local presence. + +* `channel` _(String)_ + Presence channel to subscribe to + +`connection.getDocPresence(collection, id): DocPresence;` +Get a special [`DocPresence`](#class-sharedbdocpresence) instance that can be used to subscribe to presence information to other clients, and create instances of local presence. This is tied to a `Doc`, and all presence will be automatically transformed against ops to keep presence current. Note that the `Doc` must be of a type that supports presence. + +* `collection` _(String)_ + Document collection +* `id` _(String)_ + Document ID + ### Class: `ShareDB.Doc` `doc.type` _(String_) @@ -640,6 +657,109 @@ const connectionInfo = getUserPermissions(); const connection = backend.connect(null, connectionInfo); ``` +### Class: `ShareDB.Presence` + +Representation of the presence data associated with a given channel. + +#### `subscribe` + +```javascript +presence.subscribe(callback): void; +``` + +Subscribe to presence updates from other clients. Note that presence can be submitted without subscribing, but remote clients will not be able to re-request presence from you if you are not subscribed. + +* `callback` _Function_: a callback with the signature `function (error: Error): void;` + +#### `unsubscribe` + +```javascript +presence.unsubscribe(callback): void; +``` + +Unsubscribe from presence updates from remote clients. + +* `callback` _Function_: a callback with the signature `function (error: Error): void;` + +#### `on` + +```javascript +presence.on('receive', callback): void; +``` + +An update from a remote presence client has been received. + +* `callback` _Function_: callback for handling the received presence: `function (presenceId, presenceValue): void;` + +```javascript +presence.on('error', callback): void; +``` + +A presence-related error has occurred. + +* `callback` _Function_: a callback with the signature `function (error: Error): void;` + +#### `create` + +```javascript +presence.create(presenceId): LocalPresence; +``` + +Create an instance of [`LocalPresence`](#class-sharedblocalpresence), which can be used to represent local presence. Many or none such local presences may exist on a `Presence` instance. + +* `presenceId` _string (optional)_: a unique ID representing the local presence. Remember - depending on use-case - the same client might have multiple presences, so this might not necessarily be a user or client ID. If one is not provided, a random ID will be assigned for you. + +#### `destroy` + +```javascript +presence.destroy(callback); +``` + +Updates all remote clients with a `null` presence, and removes it from the `Connection` cache, so that it can be garbage-collected. This should be called when you are done with a presence, and no longer need to use it to fire updates. + +* `callback` _Function_: a callback with the signature `function (error: Error): void;` + +### Class: `ShareDB.DocPresence` + +Specialised case of [`Presence`](#class-sharedbpresence), which is tied to a specific [`Doc`](#class-sharedbdoc). When using presence with an associated `Doc`, any ops applied to the `Doc` will automatically be used to transform associated presence. On destroy, the `DocPresence` will unregister its listeners from the `Doc`. + +See [`Presence`](#class-sharedbpresence) for available methods. + +### Class: `ShareDB.LocalPresence` + +`LocalPresence` represents the presence of the local client in a given `Doc`. For example, this might be the position of a caret in a text document; which field has been highlighted in a complex JSON object; etc. Multiple presences may exist per `Doc` even on the same client. + +#### `submit` + +```javascript +localPresence.submit(presence, callback): void; +``` + +Update the local representation of presence, and broadcast that presence to any other document presence subscribers. + +* `presence` _Object_: the presence object to broadcast. The structure of this will depend on the OT type +* `callback` _Function_: a callback with the signature `function (error: Error): void;` + +#### `send` + +```javascript +localPresence.send(callback): void; +``` + +Send presence like `submit`, but without updating the value. Can be useful if local presences expire periodically. + +* `callback` _Function_: a callback with the signature `function (error: Error): void;` + +#### `destroy` + +```javascript +localPresence.destroy(callback): void; +``` + +Informs all remote clients that this presence is now `null`, and deletes itself for garbage collection. + +* `callback` _Function_: a callback with the signature `function (error: Error): void;` + ### Logging By default, ShareDB logs to `console`. This can be overridden if you wish to silence logs, or to log to your own logging driver or alert service. diff --git a/examples/rich-text-presence/.gitignore b/examples/rich-text-presence/.gitignore new file mode 100644 index 000000000..eab5b2c0f --- /dev/null +++ b/examples/rich-text-presence/.gitignore @@ -0,0 +1 @@ +static/dist/ diff --git a/examples/rich-text-presence/README.md b/examples/rich-text-presence/README.md new file mode 100644 index 000000000..25fcf2f08 --- /dev/null +++ b/examples/rich-text-presence/README.md @@ -0,0 +1,20 @@ +# Collaborative Rich Text Editor with ShareDB + +This is a collaborative rich text editor using [Quill](https://github.com/quilljs/quill) and the [rich-text OT type](https://github.com/ottypes/rich-text). + +In this demo, data is not persisted. To persist data, run a Mongo +server and initialize ShareDB with the +[ShareDBMongo](https://github.com/share/sharedb-mongo) database adapter. + +## Install dependencies +``` +npm install +``` + +## Build JavaScript bundle and run server +``` +npm run build && npm start +``` + +## Run app in browser +Load [http://localhost:8080](http://localhost:8080) diff --git a/examples/rich-text-presence/client.js b/examples/rich-text-presence/client.js new file mode 100644 index 000000000..ee4a44427 --- /dev/null +++ b/examples/rich-text-presence/client.js @@ -0,0 +1,101 @@ +var ReconnectingWebSocket = require('reconnecting-websocket'); +var sharedb = require('sharedb/lib/client'); +var richText = require('./rich-text'); +var Quill = require('quill'); +var QuillCursors = require('quill-cursors'); +var tinycolor = require('tinycolor2'); +var ObjectID = require('bson-objectid'); + +sharedb.types.register(richText.type); +Quill.register('modules/cursors', QuillCursors); + +var connectionButton = document.getElementById('client-connection'); +connectionButton.addEventListener('click', function() { + toggleConnection(connectionButton); +}); + +var nameInput = document.getElementById('name'); + +var colors = {}; + +var collection = 'examples'; +var id = 'richtext'; +var presenceId = new ObjectID().toString(); + +var socket = new ReconnectingWebSocket('ws://' + window.location.host); +var connection = new sharedb.Connection(socket); +var doc = connection.get(collection, id); + +doc.subscribe(function(err) { + if (err) throw err; + initialiseQuill(doc); +}); + +function initialiseQuill(doc) { + var quill = new Quill('#editor', { + theme: 'bubble', + modules: {cursors: true} + }); + var cursors = quill.getModule('cursors'); + + quill.setContents(doc.data); + + quill.on('text-change', function(delta, oldDelta, source) { + if (source !== 'user') return; + doc.submitOp(delta); + }); + + doc.on('op', function(op, source) { + if (source) return; + quill.updateContents(op); + }); + + var presence = doc.connection.getDocPresence(collection, id); + presence.subscribe(function(error) { + if (error) throw error; + }); + var localPresence = presence.create(presenceId); + + quill.on('selection-change', function(range) { + // Ignore blurring, so that we can see lots of users in the + // same window. In real use, you may want to clear the cursor. + if (!range) return; + // In this particular instance, we can send extra information + // on the presence object. This ability will vary depending on + // type. + range.name = nameInput.value; + localPresence.submit(range, function(error) { + if (error) throw error; + }); + }); + + presence.on('receive', function(id, range) { + colors[id] = colors[id] || tinycolor.random().toHexString(); + var name = (range && range.name) || 'Anonymous'; + cursors.createCursor(id, name, colors[id]); + cursors.moveCursor(id, range); + }); + + return quill; +} + +function toggleConnection(button) { + if (button.classList.contains('connected')) { + button.classList.remove('connected'); + button.textContent = 'Connect'; + disconnect(); + } else { + button.classList.add('connected'); + button.textContent = 'Disconnect'; + connect(); + } +} + +function disconnect() { + doc.connection.close(); +} + +function connect() { + var socket = new ReconnectingWebSocket('ws://' + window.location.host); + doc.connection.bindToSocket(socket); +} diff --git a/examples/rich-text-presence/package.json b/examples/rich-text-presence/package.json new file mode 100644 index 000000000..6747a9a32 --- /dev/null +++ b/examples/rich-text-presence/package.json @@ -0,0 +1,32 @@ +{ + "name": "sharedb-example-rich-text-presence", + "version": "1.0.0", + "description": "An example of presence using ShareDB and Quill", + "main": "server.js", + "scripts": { + "build": "mkdir -p static/dist/ && ./node_modules/.bin/browserify client.js -o static/dist/bundle.js", + "test": "echo \"Error: no test specified\" && exit 1", + "start": "node server.js" + }, + "author": "Nate Smith", + "contributors": [ + "Avital Oliver (https://aoliver.org/)", + "Alec Gibson " + ], + "license": "MIT", + "dependencies": { + "@teamwork/websocket-json-stream": "^2.0.0", + "bson-objectid": "^1.3.0", + "express": "^4.17.1", + "quill": "^1.3.7", + "quill-cursors": "^2.2.1", + "reconnecting-websocket": "^4.2.0", + "rich-text": "^4.0.0", + "sharedb": "file:../../", + "tinycolor2": "^1.4.1", + "ws": "^7.2.0" + }, + "devDependencies": { + "browserify": "^16.5.0" + } +} diff --git a/examples/rich-text-presence/rich-text.js b/examples/rich-text-presence/rich-text.js new file mode 100644 index 000000000..653ae0592 --- /dev/null +++ b/examples/rich-text-presence/rich-text.js @@ -0,0 +1,20 @@ +var richText = require('rich-text'); + +richText.type.transformPresence = function(presence, op, isOwnOp) { + if (!presence) { + return null; + } + + var start = presence.index; + var end = presence.index + presence.length; + var delta = new richText.Delta(op); + start = delta.transformPosition(start, !isOwnOp); + end = delta.transformPosition(end, !isOwnOp); + + return Object.assign({}, presence, { + index: start, + length: end - start + }); +}; + +module.exports = richText; diff --git a/examples/rich-text-presence/server.js b/examples/rich-text-presence/server.js new file mode 100644 index 000000000..0bcf7f821 --- /dev/null +++ b/examples/rich-text-presence/server.js @@ -0,0 +1,42 @@ +var http = require('http'); +var express = require('express'); +var ShareDB = require('sharedb'); +var richText = require('./rich-text'); +var WebSocket = require('ws'); +var WebSocketJSONStream = require('@teamwork/websocket-json-stream'); + +ShareDB.types.register(richText.type); +var backend = new ShareDB({presence: true}); +createDoc(startServer); + +// Create initial document then fire callback +function createDoc(callback) { + var connection = backend.connect(); + var doc = connection.get('examples', 'richtext'); + doc.fetch(function(err) { + if (err) throw err; + if (doc.type === null) { + doc.create([{insert: 'Hi!'}], 'rich-text', callback); + return; + } + callback(); + }); +} + +function startServer() { + // Create a web server to serve files and listen to WebSocket connections + var app = express(); + app.use(express.static('static')); + app.use(express.static('node_modules/quill/dist')); + var server = http.createServer(app); + + // Connect any incoming WebSocket connection to ShareDB + var wss = new WebSocket.Server({server: server}); + wss.on('connection', function(ws) { + var stream = new WebSocketJSONStream(ws); + backend.listen(stream); + }); + + server.listen(8080); + console.log('Listening on http://localhost:8080'); +} diff --git a/examples/rich-text-presence/static/index.html b/examples/rich-text-presence/static/index.html new file mode 100644 index 000000000..01f27be0f --- /dev/null +++ b/examples/rich-text-presence/static/index.html @@ -0,0 +1,19 @@ + + +ShareDB Rich Text + + + + +
+ + +
+ +
+ Open a new window to see another client! +
+ +
+ + diff --git a/examples/rich-text-presence/static/style.css b/examples/rich-text-presence/static/style.css new file mode 100644 index 000000000..fb9858eb2 --- /dev/null +++ b/examples/rich-text-presence/static/style.css @@ -0,0 +1,27 @@ +body { + font-family: sans-serif; +} + +input, button { + font-size: 16px; + margin-right: 10px; +} + +.controls { + width: 100%; + text-align: center; + margin: 20px; +} + +.ql-container { + padding: 10px; +} + +.ql-editor { + border: 1px solid grey; +} + +/* Keep the example simple by hiding the toolbar */ +.ql-tooltip { + display: none; +} diff --git a/lib/agent.js b/lib/agent.js index 466d2516e..a8ab4a3bb 100644 --- a/lib/agent.js +++ b/lib/agent.js @@ -35,6 +35,19 @@ function Agent(backend, stream) { // Map from queryId -> emitter this.subscribedQueries = {}; + // Track which documents are subscribed to presence by the client. This is a + // map of channel -> stream + this.subscribedPresences = {}; + // Highest seq received for a subscription request. Any seq lower than this + // value is stale, and should be ignored. Used for keeping the subscription + // state in sync with the client's desired state. Map of channel -> seq + this.presenceSubscriptionSeq = {}; + // Keep track of the last request that has been sent by each local presence + // belonging to this agent. This is used to generate a new disconnection + // request if the client disconnects ungracefully. This is a + // map of channel -> id -> request + this.presenceRequests = {}; + // We need to track this manually to make sure we don't reply to messages // after the stream was closed. this.closed = false; @@ -78,6 +91,11 @@ Agent.prototype._cleanup = function() { } this.subscribedDocs = {}; + for (var channel in this.subscribedPresences) { + this.subscribedPresences[channel].destroy(); + } + this.subscribedPresences = {}; + // Clean up query subscription streams for (var id in this.subscribedQueries) { var emitter = this.subscribedQueries[id]; @@ -121,6 +139,34 @@ Agent.prototype._subscribeToStream = function(collection, id, stream) { }); }; +Agent.prototype._subscribeToPresenceStream = function(channel, stream) { + if (this.closed) return stream.destroy(); + var agent = this; + + stream.on('data', function(data) { + if (data.error) { + logger.error('Presence subscription stream error', channel, data.error); + } + agent._handlePresenceData(data); + }); + + stream.on('end', function() { + var requests = agent.presenceRequests[channel] || {}; + for (var id in requests) { + var request = agent.presenceRequests[channel][id]; + request.seq++; + request.p = null; + agent._broadcastPresence(request, function(error) { + if (error) logger.error('Error broadcasting disconnect presence', channel, error); + }); + } + if (agent.subscribedPresences[channel] === stream) { + delete agent.subscribedPresences[channel]; + } + delete agent.presenceRequests[channel]; + }); +}; + Agent.prototype._subscribeToQuery = function(emitter, queryId, collection, query) { var previous = this.subscribedQueries[queryId]; if (previous) previous.destroy(); @@ -311,14 +357,18 @@ Agent.prototype._checkRequest = function(request) { if (request.a === 'qf' || request.a === 'qs' || request.a === 'qu') { // Query messages need an ID property. if (typeof request.id !== 'number') return 'Missing query ID'; - } else if (request.a === 'op' || request.a === 'f' || request.a === 's' || request.a === 'u') { + } else if (request.a === 'op' || request.a === 'f' || request.a === 's' || request.a === 'u' || request.a === 'p') { // Doc-based request. if (request.c != null && typeof request.c !== 'string') return 'Invalid collection'; if (request.d != null && typeof request.d !== 'string') return 'Invalid id'; - if (request.a === 'op') { + if (request.a === 'op' || request.a === 'p') { if (request.v != null && (typeof request.v !== 'number' || request.v < 0)) return 'Invalid version'; } + + if (request.a === 'p') { + if (typeof request.id !== 'string') return 'Missing presence ID'; + } } else if (request.a === 'bf' || request.a === 'bs' || request.a === 'bu') { // Bulk request if (request.c != null && typeof request.c !== 'string') return 'Invalid collection'; @@ -369,6 +419,21 @@ Agent.prototype._handleMessage = function(request, callback) { return this._fetchSnapshot(request.c, request.d, request.v, callback); case 'nt': return this._fetchSnapshotByTimestamp(request.c, request.d, request.ts, callback); + case 'p': + if (!this.backend.presenceEnabled) return; + var presence = this._createPresence(request); + if (presence.t && !util.supportsPresence(types.map[presence.t])) { + return callback({ + code: ERROR_CODE.ERR_TYPE_DOES_NOT_SUPPORT_PRESENCE, + message: 'Type does not support presence: ' + presence.t + }); + } + return this._broadcastPresence(presence, callback); + case 'ps': + if (!this.backend.presenceEnabled) return; + return this._subscribePresence(request.ch, request.seq, callback); + case 'pu': + return this._unsubscribePresence(request.ch, request.seq, callback); default: callback(new ShareDBError(ERROR_CODE.ERR_MESSAGE_BADLY_FORMED, 'Invalid or unknown message')); } @@ -669,6 +734,92 @@ Agent.prototype._src = function() { return this.src || this.clientId; }; +Agent.prototype._broadcastPresence = function(presence, callback) { + var agent = this; + var requests = this.presenceRequests[presence.ch] || (this.presenceRequests[presence.ch] = {}); + var previousRequest = requests[presence.id]; + if (!previousRequest || previousRequest.pv < presence.pv) { + this.presenceRequests[presence.ch][presence.id] = presence; + } + this.backend.transformPresenceToLatestVersion(this, presence, function(error, presence) { + if (error) return callback(error); + var channel = agent._getPresenceChannel(presence.ch); + agent.backend.pubsub.publish([channel], presence, function(error) { + if (error) return callback(error); + callback(null, presence); + }); + }); +}; + +Agent.prototype._createPresence = function(request) { + return { + a: 'p', + ch: request.ch, + src: this._src(), + id: request.id, // Presence ID, not Doc ID (which is 'd') + p: request.p, + pv: request.pv, + // The c,d,v,t fields are only set for DocPresence + c: request.c, + d: request.d, + v: request.v, + t: request.t + }; +}; + +Agent.prototype._subscribePresence = function(channel, seq, callback) { + var agent = this; + var presenceChannel = this._getPresenceChannel(channel); + this.backend.pubsub.subscribe(presenceChannel, function(error, stream) { + if (error) return callback(error); + if (seq < agent.presenceSubscriptionSeq[channel]) { + stream.destroy(); + return callback(null, {ch: channel, seq: seq}); + } + agent.presenceSubscriptionSeq[channel] = seq; + agent.subscribedPresences[channel] = stream; + agent._subscribeToPresenceStream(channel, stream); + agent._requestPresence(channel, function(error) { + callback(error, {ch: channel, seq: seq}); + }); + }); +}; + +Agent.prototype._unsubscribePresence = function(channel, seq, callback) { + if (seq < this.presenceSubscriptionSeq[channel]) return; + this.presenceSubscriptionSeq[channel] = seq; + var stream = this.subscribedPresences[channel]; + if (stream) stream.destroy(); + callback(null, {ch: channel, seq: seq}); +}; + +Agent.prototype._getPresenceChannel = function(channel) { + return '$presence.' + channel; +}; + +Agent.prototype._requestPresence = function(channel, callback) { + var presenceChannel = this._getPresenceChannel(channel); + this.backend.pubsub.publish([presenceChannel], {ch: channel, r: true, src: this.clientId}, callback); +}; + +Agent.prototype._handlePresenceData = function(presence) { + if (presence.src === this._src()) return; + + if (presence.r) return this.send({a: 'pr', ch: presence.ch}); + + var backend = this.backend; + var context = { + collection: presence.c, + presence: presence + }; + var agent = this; + backend.trigger(backend.MIDDLEWARE_ACTIONS.sendPresence, this, context, function(error) { + if (error) { + return agent.send({a: 'p', ch: presence.ch, id: presence.id, error: getReplyErrorObject(error)}); + } + agent.send(presence); + }); +}; function createClientOp(request, clientId) { // src can be provided if it is not the same as the current agent, diff --git a/lib/backend.js b/lib/backend.js index f1178d730..8330546e4 100644 --- a/lib/backend.js +++ b/lib/backend.js @@ -32,6 +32,7 @@ function Backend(options) { this.suppressPublish = !!options.suppressPublish; this.maxSubmitRetries = options.maxSubmitRetries || null; + this.presenceEnabled = !!options.presence; // Map from event name to a list of middleware this.middleware = {}; @@ -66,6 +67,8 @@ Backend.prototype.MIDDLEWARE_ACTIONS = { // by design, changing existing reply properties can cause weird bugs, since // the rest of ShareDB would be unaware of those changes. reply: 'reply', + // About to send presence information to a client + sendPresence: 'sendPresence', // An operation is about to be submitted to the database submit: 'submit' }; @@ -822,6 +825,22 @@ Backend.prototype._buildSnapshotFromOps = function(id, startingSnapshot, ops, ca callback(error, snapshot); }; +Backend.prototype.transformPresenceToLatestVersion = function(agent, presence, callback) { + if (!presence.c || !presence.d) return callback(null, presence); + this.getOps(agent, presence.c, presence.d, presence.v, null, function(error, ops) { + if (error) return callback(error); + for (var i = 0; i < ops.length; i++) { + var op = ops[i]; + var isOwnOp = op.src === presence.src; + var transformError = ot.transformPresence(presence, op, isOwnOp); + if (transformError) { + return callback(transformError); + } + } + callback(null, presence); + }); +}; + function pluckIds(snapshots) { var ids = []; for (var i = 0; i < snapshots.length; i++) { diff --git a/lib/client/connection.js b/lib/client/connection.js index 6fe67b7c9..81cb28149 100644 --- a/lib/client/connection.js +++ b/lib/client/connection.js @@ -1,5 +1,7 @@ var Doc = require('./doc'); var Query = require('./query'); +var Presence = require('./presence/presence'); +var DocPresence = require('./presence/doc-presence'); var SnapshotVersionRequest = require('./snapshot-request/snapshot-version-request'); var SnapshotTimestampRequest = require('./snapshot-request/snapshot-timestamp-request'); var emitter = require('../emitter'); @@ -46,6 +48,9 @@ function Connection(socket) { // Map from query ID -> query object. this.queries = {}; + // Maps from channel -> presence objects + this._presences = {}; + // Map from snapshot request ID -> snapshot request this._snapshotRequests = {}; @@ -236,6 +241,14 @@ Connection.prototype.handleMessage = function(message) { var doc = this.getExisting(message.c, message.d); if (doc) doc._handleOp(err, message); return; + case 'p': + return this._handlePresence(err, message); + case 'ps': + return this._handlePresenceSubscribe(err, message); + case 'pu': + return this._handlePresenceUnsubscribe(err, message); + case 'pr': + return this._handlePresenceRequest(err, message); default: logger.warn('Ignoring unrecognized message', message); @@ -339,6 +352,10 @@ Connection.prototype._setState = function(newState, reason) { docs[id]._onConnectionStateChanged(); } } + // Emit the event to all Presences + for (var channel in this._presences) { + this._presences[channel]._onConnectionStateChanged(); + } // Emit the event to all snapshots for (var id in this._snapshotRequests) { var snapshotRequest = this._snapshotRequests[id]; @@ -493,17 +510,7 @@ Connection.prototype.get = function(collection, id) { * @private */ Connection.prototype._destroyDoc = function(doc) { - var docs = this.collections[doc.collection]; - if (!docs) return; - - delete docs[doc.id]; - - // Delete the collection container if its empty. This could be a source of - // memory leaks if you slowly make a billion collections, which you probably - // won't do anyway, but whatever. - if (!util.hasKeys(docs)) { - delete this.collections[doc.collection]; - } + util.digAndRemove(this.collections, doc.collection, doc.id); }; Connection.prototype._addDoc = function(doc) { @@ -733,3 +740,52 @@ Connection.prototype._initialize = function(message) { this._setState('connected'); }; + +Connection.prototype.getPresence = function(channel) { + var connection = this; + return util.digOrCreate(this._presences, channel, function() { + return new Presence(connection, channel); + }); +}; + +Connection.prototype.getDocPresence = function(collection, id) { + var channel = DocPresence.channel(collection, id); + var connection = this; + return util.digOrCreate(this._presences, channel, function() { + return new DocPresence(connection, collection, id); + }); +}; + +Connection.prototype._sendPresenceAction = function(action, seq, presence) { + // Ensure the presence is registered so that it receives the reply message + this._addPresence(presence); + var message = {a: action, ch: presence.channel, seq: seq}; + this.send(message); + return message.seq; +}; + +Connection.prototype._addPresence = function(presence) { + util.digOrCreate(this._presences, presence.channel, function() { + return presence; + }); +}; + +Connection.prototype._handlePresenceSubscribe = function(error, message) { + var presence = util.dig(this._presences, message.ch); + if (presence) presence._handleSubscribe(error, message.seq); +}; + +Connection.prototype._handlePresenceUnsubscribe = function(error, message) { + var presence = util.dig(this._presences, message.ch); + if (presence) presence._handleUnsubscribe(error, message.seq); +}; + +Connection.prototype._handlePresence = function(error, message) { + var presence = util.dig(this._presences, message.ch); + if (presence) presence._receiveUpdate(error, message); +}; + +Connection.prototype._handlePresenceRequest = function(error, message) { + var presence = util.dig(this._presences, message.ch); + if (presence) presence._broadcastAllLocalPresence(error, message); +}; diff --git a/lib/client/doc.js b/lib/client/doc.js index da7a7a050..a49b717d0 100644 --- a/lib/client/doc.js +++ b/lib/client/doc.js @@ -115,10 +115,12 @@ Doc.prototype.destroy = function(callback) { return doc.emit('error', err); } doc.connection._destroyDoc(doc); + doc.emit('destroy'); if (callback) callback(); }); } else { doc.connection._destroyDoc(doc); + doc.emit('destroy'); if (callback) callback(); } }); @@ -569,9 +571,9 @@ Doc.prototype._otApply = function(op, source) { if (transformErr) return this._hardRollback(transformErr); } // Apply the individual op component - this.emit('before op', componentOp.op, source); + this.emit('before op', componentOp.op, source, op.src); this.data = this.type.apply(this.data, componentOp.op); - this.emit('op', componentOp.op, source); + this.emit('op', componentOp.op, source, op.src); } // Pop whatever was submitted since we started applying this op this._popApplyStack(stackLength); @@ -580,7 +582,7 @@ Doc.prototype._otApply = function(op, source) { // The 'before op' event enables clients to pull any necessary data out of // the snapshot before it gets changed - this.emit('before op', op.op, source); + this.emit('before op', op.op, source, op.src); // Apply the operation to the local data, mutating it in place this.data = this.type.apply(this.data, op.op); // Emit an 'op' event once the local data includes the changes from the @@ -588,7 +590,7 @@ Doc.prototype._otApply = function(op, source) { // submission and before the server or other clients have received the op. // For ops from other clients, this will be after the op has been // committed to the database and published - this.emit('op', op.op, source); + this.emit('op', op.op, source, op.src); return; } @@ -861,7 +863,6 @@ Doc.prototype.resume = function() { this.flush(); }; - // *** Receiving operations // This is called when the server acknowledges an operation from the client. diff --git a/lib/client/presence/doc-presence.js b/lib/client/presence/doc-presence.js new file mode 100644 index 000000000..612893c59 --- /dev/null +++ b/lib/client/presence/doc-presence.js @@ -0,0 +1,26 @@ +var Presence = require('./presence'); +var LocalDocPresence = require('./local-doc-presence'); +var RemoteDocPresence = require('./remote-doc-presence'); + +function DocPresence(connection, collection, id) { + var channel = DocPresence.channel(collection, id); + Presence.call(this, connection, channel); + + this.collection = collection; + this.id = id; +} +module.exports = DocPresence; + +DocPresence.prototype = Object.create(Presence.prototype); + +DocPresence.channel = function(collection, id) { + return collection + '.' + id; +}; + +DocPresence.prototype._createLocalPresence = function(id) { + return new LocalDocPresence(this, id); +}; + +DocPresence.prototype._createRemotePresence = function(id) { + return new RemoteDocPresence(this, id); +}; diff --git a/lib/client/presence/local-doc-presence.js b/lib/client/presence/local-doc-presence.js new file mode 100644 index 000000000..fc525b0d6 --- /dev/null +++ b/lib/client/presence/local-doc-presence.js @@ -0,0 +1,110 @@ +var LocalPresence = require('./local-presence'); +var ShareDBError = require('../../error'); +var ERROR_CODE = ShareDBError.CODES; + +module.exports = LocalDocPresence; +function LocalDocPresence(presence, presenceId) { + LocalPresence.call(this, presence, presenceId); + + this.collection = this.presence.collection; + this.id = this.presence.id; + + this._doc = this.connection.get(this.collection, this.id); + this._isSending = false; + + this._opHandler = this._transformAgainstOp.bind(this); + this._createOrDelHandler = this._handleCreateOrDel.bind(this); + this._loadHandler = this._handleLoad.bind(this); + this._destroyHandler = this.destroy.bind(this); + this._registerWithDoc(); +} + +LocalDocPresence.prototype = Object.create(LocalPresence.prototype); + +LocalDocPresence.prototype.submit = function(value, callback) { + if (!this._doc.type) { + var error = { + code: ERROR_CODE.ERR_DOC_DOES_NOT_EXIST, + message: 'Cannot submit presence. Document has not been created' + }; + return this._callbackOrEmit(error, callback); + }; + + LocalPresence.prototype.submit.call(this, value, callback); +}; + +LocalDocPresence.prototype.destroy = function(callback) { + this._doc.removeListener('op', this._opHandler); + this._doc.removeListener('create', this._createOrDelHandler); + this._doc.removeListener('del', this._createOrDelHandler); + this._doc.removeListener('load', this._loadHandler); + this._doc.removeListener('destroy', this._destroyHandler); + + LocalPresence.prototype.destroy.call(this, callback); +}; + +LocalDocPresence.prototype._sendPending = function() { + if (this._isSending) return; + this._isSending = true; + var presence = this; + this._doc.whenNothingPending(function() { + presence._isSending = false; + if (!presence.connection.canSend) return; + + presence._pendingMessages.forEach(function(message) { + message.t = presence._doc.type.uri; + message.v = presence._doc.version; + presence.connection.send(message); + }); + + presence._pendingMessages = []; + }); +}; + +LocalDocPresence.prototype._registerWithDoc = function() { + this._doc.on('op', this._opHandler); + this._doc.on('create', this._createOrDelHandler); + this._doc.on('del', this._createOrDelHandler); + this._doc.on('load', this._loadHandler); + this._doc.on('destroy', this._destroyHandler); +}; + +LocalDocPresence.prototype._transformAgainstOp = function(op, source) { + var presence = this; + this._pendingMessages.forEach(function(message) { + try { + message.p = presence._doc.type.transformPresence(message.p, op, source); + } catch (error) { + var callback = presence._getCallback(message.pv); + presence._callbackOrEmit(error, callback); + } + }); + + try { + this.value = this._doc.type.transformPresence(this.value, op, source); + } catch (error) { + this.emit('error', error); + } +}; + +LocalDocPresence.prototype._handleCreateOrDel = function() { + this._pendingMessages.forEach(function(message) { + message.p = null; + }); + + this.value = null; +}; + +LocalDocPresence.prototype._handleLoad = function() { + this.value = null; + this._pendingMessages = []; +}; + +LocalDocPresence.prototype._message = function() { + var message = LocalPresence.prototype._message.call(this); + message.c = this.collection, + message.d = this.id, + message.v = null; + message.t = null; + return message; +}; diff --git a/lib/client/presence/local-presence.js b/lib/client/presence/local-presence.js new file mode 100644 index 000000000..9ea764b42 --- /dev/null +++ b/lib/client/presence/local-presence.js @@ -0,0 +1,78 @@ +var emitter = require('../../emitter'); + +module.exports = LocalPresence; +function LocalPresence(presence, presenceId) { + emitter.EventEmitter.call(this); + + if (!presenceId || typeof presenceId !== 'string') { + throw new Error('LocalPresence presenceId must be a string'); + } + + this.presence = presence; + this.presenceId = presenceId; + this.connection = presence.connection; + this.presenceVersion = 0; + + this.value = null; + + this._pendingMessages = []; + this._callbacksByPresenceVersion = {}; +} +emitter.mixin(LocalPresence); + +LocalPresence.prototype.submit = function(value, callback) { + this.value = value; + this.send(callback); +}; + +LocalPresence.prototype.send = function(callback) { + var message = this._message(); + this._pendingMessages.push(message); + this._callbacksByPresenceVersion[message.pv] = callback; + this._sendPending(); +}; + +LocalPresence.prototype.destroy = function(callback) { + var presence = this; + this.submit(null, function(error) { + if (error) return presence._callbackOrEmit(error, callback); + delete presence.presence.localPresences[presence.presenceId]; + if (callback) callback(); + }); +}; + +LocalPresence.prototype._sendPending = function() { + if (!this.connection.canSend) return; + var presence = this; + this._pendingMessages.forEach(function(message) { + presence.connection.send(message); + }); + + this._pendingMessages = []; +}; + +LocalPresence.prototype._ack = function(error, presenceVersion) { + var callback = this._getCallback(presenceVersion); + this._callbackOrEmit(error, callback); +}; + +LocalPresence.prototype._message = function() { + return { + a: 'p', + ch: this.presence.channel, + id: this.presenceId, + p: this.value, + pv: this.presenceVersion++ + }; +}; + +LocalPresence.prototype._getCallback = function(presenceVersion) { + var callback = this._callbacksByPresenceVersion[presenceVersion]; + delete this._callbacksByPresenceVersion[presenceVersion]; + return callback; +}; + +LocalPresence.prototype._callbackOrEmit = function(error, callback) { + if (callback) return process.nextTick(callback, error); + if (error) this.emit('error', error); +}; diff --git a/lib/client/presence/presence.js b/lib/client/presence/presence.js new file mode 100644 index 000000000..f7c81e227 --- /dev/null +++ b/lib/client/presence/presence.js @@ -0,0 +1,177 @@ +var emitter = require('../../emitter'); +var LocalPresence = require('./local-presence'); +var RemotePresence = require('./remote-presence'); +var util = require('../../util'); +var async = require('async'); +var hat = require('hat'); + +module.exports = Presence; +function Presence(connection, channel) { + emitter.EventEmitter.call(this); + + if (!channel || typeof channel !== 'string') { + throw new Error('Presence channel must be provided'); + } + + this.connection = connection; + this.channel = channel; + + this.wantSubscribe = false; + this.subscribed = false; + this.remotePresences = {}; + this.localPresences = {}; + this.seq = 1; + + this._remotePresenceInstances = {}; + this._subscriptionCallbacksBySeq = {}; +} +emitter.mixin(Presence); + +Presence.prototype.subscribe = function(callback) { + this._sendSubscriptionAction(true, callback); +}; + +Presence.prototype.unsubscribe = function(callback) { + this._sendSubscriptionAction(false, callback); +}; + +Presence.prototype.create = function(id) { + id = id || hat(); + var localPresence = this._createLocalPresence(id); + this.localPresences[id] = localPresence; + return localPresence; +}; + +Presence.prototype.destroy = function(callback) { + var presence = this; + this.unsubscribe(function(error) { + if (error) return presence._callbackOrEmit(error, callback); + var localIds = Object.keys(presence.localPresences); + var remoteIds = Object.keys(presence._remotePresenceInstances); + async.parallel( + [ + function(next) { + async.each(localIds, function(presenceId, next) { + presence.localPresences[presenceId].destroy(next); + }, next); + }, + function(next) { + async.each(remoteIds, function(presenceId, next) { + presence._remotePresenceInstances[presenceId].destroy(next); + }, next); + } + ], + function(error) { + delete presence.connection._presences[presence.channel]; + presence._callbackOrEmit(error, callback); + } + ); + }); +}; + +Presence.prototype._sendSubscriptionAction = function(wantSubscribe, callback) { + this.wantSubscribe = !!wantSubscribe; + var action = this.wantSubscribe ? 'ps' : 'pu'; + var seq = this.seq++; + this._subscriptionCallbacksBySeq[seq] = callback; + if (this.connection.canSend) { + this.connection._sendPresenceAction(action, seq, this); + } +}; + +Presence.prototype._handleSubscribe = function(error, seq) { + if (this.wantSubscribe) this.subscribed = true; + var callback = this._subscriptionCallback(seq); + this._callbackOrEmit(error, callback); +}; + +Presence.prototype._handleUnsubscribe = function(error, seq) { + this.subscribed = false; + var callback = this._subscriptionCallback(seq); + this._callbackOrEmit(error, callback); +}; + +Presence.prototype._receiveUpdate = function(error, message) { + var localPresence = util.dig(this.localPresences, message.id); + if (localPresence) return localPresence._ack(error, message.pv); + + if (error) return this.emit('error', error); + var presence = this; + var remotePresence = util.digOrCreate(this._remotePresenceInstances, message.id, function() { + return presence._createRemotePresence(message.id); + }); + + remotePresence.receiveUpdate(message); +}; + +Presence.prototype._updateRemotePresence = function(remotePresence) { + this.remotePresences[remotePresence.presenceId] = remotePresence.value; + if (remotePresence.value === null) this._removeRemotePresence(remotePresence.presenceId); + this.emit('receive', remotePresence.presenceId, remotePresence.value); +}; + +Presence.prototype._broadcastAllLocalPresence = function(error) { + if (error) return this.emit('error', error); + for (var id in this.localPresences) { + var localPresence = this.localPresences[id]; + if (localPresence.value !== null) localPresence.send(); + } +}; + +Presence.prototype._removeRemotePresence = function(id) { + this._remotePresenceInstances[id].destroy(); + delete this._remotePresenceInstances[id]; + delete this.remotePresences[id]; +}; + +Presence.prototype._onConnectionStateChanged = function() { + if (!this.connection.canSend) return; + this._resubscribe(); + for (var id in this.localPresences) { + this.localPresences[id]._sendPending(); + } +}; + +Presence.prototype._resubscribe = function() { + var callbacks = []; + for (var seq in this._subscriptionCallbacksBySeq) { + var callback = this._subscriptionCallback(seq); + callbacks.push(callback); + } + + if (!this.wantSubscribe) return this._callEachOrEmit(null, callbacks); + + var presence = this; + this.subscribe(function(error) { + presence._callEachOrEmit(error, callbacks); + }); +}; + +Presence.prototype._subscriptionCallback = function(seq) { + var callback = this._subscriptionCallbacksBySeq[seq]; + delete this._subscriptionCallbacksBySeq[seq]; + return callback; +}; + +Presence.prototype._callbackOrEmit = function(error, callback) { + if (callback) return process.nextTick(callback, error); + if (error) this.emit('error', error); +}; + +Presence.prototype._createLocalPresence = function(id) { + return new LocalPresence(this, id); +}; + +Presence.prototype._createRemotePresence = function(id) { + return new RemotePresence(this, id); +}; + +Presence.prototype._callEachOrEmit = function(error, callbacks) { + if (callbacks && callbacks.length) { + return callbacks.forEach(function(callback) { + process.nextTick(callback, error); + }); + } + + if (error) this.emit('error', error); +}; diff --git a/lib/client/presence/remote-doc-presence.js b/lib/client/presence/remote-doc-presence.js new file mode 100644 index 000000000..2fef91ffa --- /dev/null +++ b/lib/client/presence/remote-doc-presence.js @@ -0,0 +1,151 @@ +var RemotePresence = require('./remote-presence'); +var ot = require('../../ot'); + +module.exports = RemoteDocPresence; +function RemoteDocPresence(presence, presenceId) { + RemotePresence.call(this, presence, presenceId); + + this.collection = this.presence.collection; + this.id = this.presence.id; + this.src = null; + this.presenceVersion = null; + + this._doc = this.connection.get(this.collection, this.id); + this._pending = null; + this._opCache = null; + this._pendingSetPending = false; + + this._opHandler = this._handleOp.bind(this); + this._createDelHandler = this._handleCreateDel.bind(this); + this._loadHandler = this._handleLoad.bind(this); + this._registerWithDoc(); +} + +RemoteDocPresence.prototype = Object.create(RemotePresence.prototype); + +RemoteDocPresence.prototype.receiveUpdate = function(message) { + if (this._pending && message.pv < this._pending.pv) return; + this.src = message.src; + this._pending = message; + this._setPendingPresence(); +}; + +RemoteDocPresence.prototype.destroy = function(callback) { + this._doc.removeListener('op', this._opHandler); + this._doc.removeListener('create', this._createDelHandler); + this._doc.removeListener('del', this._createDelHandler); + this._doc.removeListener('load', this._loadHandler); + + RemotePresence.prototype.destroy.call(this, callback); +}; + +RemoteDocPresence.prototype._registerWithDoc = function() { + this._doc.on('op', this._opHandler); + this._doc.on('create', this._createDelHandler); + this._doc.on('del', this._createDelHandler); + this._doc.on('load', this._loadHandler); +}; + +RemoteDocPresence.prototype._setPendingPresence = function() { + if (this._pendingSetPending) return; + this._pendingSetPending = true; + var presence = this; + this._doc.whenNothingPending(function() { + presence._pendingSetPending = false; + if (!presence._pending) return; + if (presence._pending.pv < presence.presenceVersion) return presence._pending = null; + + if (presence._pending.v > presence._doc.version) { + return presence._doc.fetch(); + } + + if (!presence._catchUpStalePresence()) return; + + presence.value = presence._pending.p; + presence.presenceVersion = presence._pending.pv; + presence._pending = null; + presence.presence._updateRemotePresence(presence); + }); +}; + +RemoteDocPresence.prototype._handleOp = function(op, source, connectionId) { + var isOwnOp = connectionId === this.src; + this._transformAgainstOp(op, isOwnOp); + this._cacheOp(op, isOwnOp); + this._setPendingPresence(); +}; + +RemotePresence.prototype._handleCreateDel = function() { + this._cacheOp(null); + this._setPendingPresence(); +}; + +RemotePresence.prototype._handleLoad = function() { + this.value = null; + this._pending = null; + this._opCache = null; + this.presence._updateRemotePresence(this); +}; + +RemoteDocPresence.prototype._transformAgainstOp = function(op, isOwnOp) { + if (!this.value) return; + + try { + this.value = this._doc.type.transformPresence(this.value, op, isOwnOp); + } catch (error) { + return this.presence.emit('error', error); + } + this.presence._updateRemotePresence(this); +}; + +RemoteDocPresence.prototype._catchUpStalePresence = function() { + if (this._pending.v >= this._doc.version) return true; + + if (!this._opCache) { + this._startCachingOps(); + this._doc.fetch(); + // We're already subscribed, but we send another subscribe message + // to force presence updates from other clients + this.presence.subscribe(); + return false; + } + + while (this._opCache[this._pending.v]) { + var item = this._opCache[this._pending.v]; + var op = item.op; + var isOwnOp = item.isOwnOp; + // We use a null op to signify a create or a delete operation. In both + // cases we just want to reset the presence (which doesn't make sense + // in a new document), so just set the presence to null. + if (op === null) { + this._pending.p = null; + this._pending.v++; + } else { + ot.transformPresence(this._pending, op, isOwnOp); + } + } + + var hasCaughtUp = this._pending.v >= this._doc.version; + if (hasCaughtUp) { + this._stopCachingOps(); + } + + return hasCaughtUp; +}; + +RemoteDocPresence.prototype._startCachingOps = function() { + this._opCache = []; +}; + +RemoteDocPresence.prototype._stopCachingOps = function() { + this._opCache = null; +}; + +RemoteDocPresence.prototype._cacheOp = function(op, isOwnOp) { + if (this._opCache) { + op = op ? {op: op} : null; + // Subtract 1 from the current doc version, because an op with v3 + // should be read as the op that takes a doc from v3 -> v4 + this._opCache[this._doc.version - 1] = {op: op, isOwnOp: isOwnOp}; + } +}; diff --git a/lib/client/presence/remote-presence.js b/lib/client/presence/remote-presence.js new file mode 100644 index 000000000..6a2188782 --- /dev/null +++ b/lib/client/presence/remote-presence.js @@ -0,0 +1,22 @@ +module.exports = RemotePresence; +function RemotePresence(presence, presenceId) { + this.presence = presence; + this.presenceId = presenceId; + this.connection = this.presence.connection; + + this.value = null; + this.presenceVersion = 0; +} + +RemotePresence.prototype.receiveUpdate = function(message) { + if (message.pv < this.presenceVersion) return; + this.value = message.p; + this.presenceVersion = message.pv; + this.presence._updateRemotePresence(this); +}; + +RemotePresence.prototype.destroy = function(callback) { + delete this.presence._remotePresenceInstances[this.presenceId]; + delete this.presence.remotePresences[this.presenceId]; + if (callback) process.nextTick(callback); +}; diff --git a/lib/error.js b/lib/error.js index 4cd4f770f..437a1d446 100644 --- a/lib/error.js +++ b/lib/error.js @@ -40,6 +40,7 @@ ShareDBError.CODES = { ERR_OP_VERSION_NEWER_THAN_CURRENT_SNAPSHOT: 'ERR_OP_VERSION_NEWER_THAN_CURRENT_SNAPSHOT', ERR_OT_OP_BADLY_FORMED: 'ERR_OT_OP_BADLY_FORMED', ERR_OT_OP_NOT_PROVIDED: 'ERR_OT_OP_NOT_PROVIDED', + ERR_PRESENCE_TRANSFORM_FAILED: 'ERR_PRESENCE_TRANSFORM_FAILED', ERR_PROTOCOL_VERSION_NOT_SUPPORTED: 'ERR_PROTOCOL_VERSION_NOT_SUPPORTED', ERR_QUERY_EMITTER_LISTENER_NOT_ASSIGNED: 'ERR_QUERY_EMITTER_LISTENER_NOT_ASSIGNED', /** @@ -63,6 +64,7 @@ ShareDBError.CODES = { ERR_SNAPSHOT_READS_REJECTED: 'ERR_SNAPSHOT_READS_REJECTED', ERR_SUBMIT_TRANSFORM_OPS_NOT_FOUND: 'ERR_SUBMIT_TRANSFORM_OPS_NOT_FOUND', ERR_TYPE_CANNOT_BE_PROJECTED: 'ERR_TYPE_CANNOT_BE_PROJECTED', + ERR_TYPE_DOES_NOT_SUPPORT_PRESENCE: 'ERR_TYPE_DOES_NOT_SUPPORT_PRESENCE', ERR_UNKNOWN_ERROR: 'ERR_UNKNOWN_ERROR' }; diff --git a/lib/ot.js b/lib/ot.js index 1de04ef64..e6e96f755 100644 --- a/lib/ot.js +++ b/lib/ot.js @@ -5,6 +5,7 @@ var types = require('./types').map; var ShareDBError = require('./error'); +var util = require('./util'); var ERROR_CODE = ShareDBError.CODES; @@ -185,3 +186,33 @@ exports.applyOps = function(snapshot, ops) { } } }; + +exports.transformPresence = function(presence, op, isOwnOp) { + var opError = this.checkOp(op); + if (opError) return opError; + + var type = presence.t; + if (typeof type === 'string') { + type = types[type]; + } + if (!type) return {code: ERROR_CODE.ERR_DOC_TYPE_NOT_RECOGNIZED, message: 'Unknown type'}; + if (!util.supportsPresence(type)) { + return {code: ERROR_CODE.ERR_TYPE_DOES_NOT_SUPPORT_PRESENCE, message: 'Type does not support presence'}; + } + + if (op.create || op.del) { + presence.p = null; + presence.v++; + return; + } + + try { + presence.p = presence.p === null ? + null : + type.transformPresence(presence.p, op.op, isOwnOp); + } catch (error) { + return {code: ERROR_CODE.ERR_PRESENCE_TRANSFORM_FAILED, message: error.message || error}; + } + + presence.v++; +}; diff --git a/lib/util.js b/lib/util.js index 601e98cb8..a036aab02 100644 --- a/lib/util.js +++ b/lib/util.js @@ -24,3 +24,45 @@ exports.isValidTimestamp = function(timestamp) { }; exports.MAX_SAFE_INTEGER = 9007199254740991; + +exports.dig = function() { + var obj = arguments[0]; + for (var i = 1; i < arguments.length; i++) { + var key = arguments[i]; + obj = obj[key] || (i === arguments.length - 1 ? undefined : {}); + } + return obj; +}; + +exports.digOrCreate = function() { + var obj = arguments[0]; + var createCallback = arguments[arguments.length - 1]; + for (var i = 1; i < arguments.length - 1; i++) { + var key = arguments[i]; + obj = obj[key] || + (obj[key] = i === arguments.length - 2 ? createCallback() : {}); + } + return obj; +}; + +exports.digAndRemove = function() { + var obj = arguments[0]; + var objects = [obj]; + for (var i = 1; i < arguments.length - 1; i++) { + var key = arguments[i]; + if (!obj.hasOwnProperty(key)) break; + obj = obj[key]; + objects.push(obj); + }; + + for (var i = objects.length - 1; i >= 0; i--) { + var parent = objects[i]; + var key = arguments[i + 1]; + var child = parent[key]; + if (i === objects.length - 1 || !exports.hasKeys(child)) delete parent[key]; + } +}; + +exports.supportsPresence = function(type) { + return type && typeof type.transformPresence === 'function'; +}; diff --git a/test/client/presence/doc-presence.js b/test/client/presence/doc-presence.js new file mode 100644 index 000000000..4fb433547 --- /dev/null +++ b/test/client/presence/doc-presence.js @@ -0,0 +1,948 @@ +var Backend = require('../../../lib/backend'); +var expect = require('chai').expect; +var async = require('async'); +var types = require('../../../lib/types'); +var presenceTestType = require('./presence-test-type'); +var errorHandler = require('../../util').errorHandler; +var PresencePauser = require('./presence-pauser'); +types.register(presenceTestType.type); + +describe('DocPresence', function() { + var backend; + var connection1; + var connection2; + var doc1; + var doc2; + var presence1; + var presence2; + var presencePauser; + + beforeEach(function(done) { + backend = new Backend({presence: true}); + connection1 = backend.connect(); + connection2 = backend.connect(); + + presencePauser = new PresencePauser(); + + backend.use(backend.MIDDLEWARE_ACTIONS.sendPresence, function(request, callback) { + presencePauser.sendPresence(request, callback); + }); + + doc1 = connection1.get('books', 'northern-lights'); + doc2 = connection2.get('books', 'northern-lights'); + + async.series([ + doc1.create.bind(doc1, 'North Lights', presenceTestType.type.name), + doc1.subscribe.bind(doc1), + doc2.subscribe.bind(doc2), + function(next) { + presence1 = connection1.getDocPresence('books', 'northern-lights'); + presence2 = connection2.getDocPresence('books', 'northern-lights'); + next(); + } + ], done); + }); + + afterEach(function(done) { + delete presenceTestType.type.invert; + connection1.close(); + connection2.close(); + backend.close(done); + }); + + it('subscribes to presence from another client', function(done) { + var localPresence1 = presence1.create('presence-1'); + + async.series([ + presence2.subscribe.bind(presence2), + function(next) { + localPresence1.submit({index: 1}, errorHandler(done)); + presence2.once('receive', function(id, presence) { + expect(id).to.equal('presence-1'); + expect(presence).to.eql({index: 1}); + next(); + }); + } + ], done); + }); + + it('transforms existing remote presence when a new local op is applied', function(done) { + var localPresence1 = presence1.create('presence-1'); + + async.series([ + presence2.subscribe.bind(presence2), + function(next) { + localPresence1.submit({index: 7}, errorHandler(done)); + presence2.once('receive', function(id, presence) { + expect(presence).to.eql({index: 7}); + next(); + }); + }, + function(next) { + presence2.once('receive', function(id, presence) { + expect(doc2.data).to.eql('Northern Lights'); + expect(presence).to.eql({index: 10}); + expect(presence2.remotePresences).to.eql({ + 'presence-1': {index: 10} + }); + next(); + }); + + doc2.submitOp({index: 5, value: 'ern'}); + } + ], done); + }); + + it('transforms existing local presence when a new local op is applied', function(done) { + var localPresence1 = presence1.create('presence-1'); + + async.series([ + localPresence1.submit.bind(localPresence1, {index: 7}), + doc1.submitOp.bind(doc1, {index: 5, value: 'ern'}), + function(next) { + expect(localPresence1.value).to.eql({index: 10}); + next(); + } + ], done); + }); + + it('progresses another client\'s presence when they send an op at their index', function(done) { + var localPresence2 = presence2.create('presence-2'); + + async.series([ + presence1.subscribe.bind(presence1), + function(next) { + localPresence2.submit({index: 5}, errorHandler(done)); + presence1.once('receive', function() { + next(); + }); + }, + function(next) { + doc2.submitOp({index: 5, value: 'ern'}, errorHandler(done)); + presence1.once('receive', function(id, presence) { + expect(presence).to.eql({index: 8}); + next(); + }); + } + ], done); + }); + + it('does not progress another client\'s index when inserting a local op at their index', function(done) { + var localPresence2 = presence2.create('presence-2'); + + async.series([ + presence1.subscribe.bind(presence1), + function(next) { + localPresence2.submit({index: 5}, errorHandler(done)); + presence1.once('receive', function() { + next(); + }); + }, + function(next) { + presence1.once('receive', function(id, presence) { + expect(presence).to.eql({index: 5}); + next(); + }); + doc1.submitOp({index: 5, value: 'ern'}, errorHandler(done)); + } + ], done); + }); + + it('waits for pending ops before submitting presence', function(done) { + var localPresence1 = presence1.create('presence-1'); + + async.series([ + presence2.subscribe.bind(presence2), + function(next) { + doc1.submitOp({index: 12, value: ': His Dark Materials'}, errorHandler(done)); + localPresence1.submit({index: 20}, errorHandler(done)); + + presence2.once('receive', function(id, presence) { + expect(presence).to.eql({index: 20}); + expect(doc2.version).to.eql(2); + next(); + }); + } + ], done); + }); + + it('queues two updates immediately after one another', function(done) { + var localPresence1 = presence1.create('presence-1'); + + async.series([ + presence2.subscribe.bind(presence2), + function(next) { + localPresence1.submit({index: 4}, errorHandler(done)); + localPresence1.submit({index: 5}, errorHandler(done)); + + presence2.once('receive', function(id, presence) { + expect(presence).to.eql({index: 4}); + presence2.once('receive', function(id, presence) { + expect(presence).to.eql({index: 5}); + next(); + }); + }); + } + ], done); + }); + + it('transforms pending presence by another op submitted before a flush', function(done) { + var localPresence1 = presence1.create('presence-1'); + + async.series([ + presence2.subscribe.bind(presence2), + function(next) { + doc1.submitOp({index: 12, value: ': His Dark Materials'}, errorHandler(done)); + localPresence1.submit({index: 20}, errorHandler(done)); + doc1.submitOp({index: 5, value: 'ern'}, errorHandler(done)); + + presence2.once('receive', function(id, presence) { + expect(doc2.version).to.eql(3); + expect(doc2.data).to.eql('Northern Lights: His Dark Materials'); + expect(presence).to.eql({index: 23}); + next(); + }); + } + ], done); + }); + + it('updates the document when the presence version is ahead', function(done) { + var localPresence2 = presence2.create('presence-2'); + + async.series([ + presence1.subscribe.bind(presence1), + doc1.unsubscribe.bind(doc1), + doc2.submitOp.bind(doc2, {index: 5, value: 'ern'}), + function(next) { + expect(doc1.version).to.eql(1); + expect(doc2.version).to.eql(2); + + localPresence2.submit({index: 12}, errorHandler(done)); + + presence1.once('receive', function(id, presence) { + expect(doc1.version).to.eql(2); + expect(presence).to.eql({index: 12}); + next(); + }); + } + ], done); + }); + + it('transforms old presence when its version is behind the latest doc', function(done) { + var localPresence1 = presence1.create('presence-1'); + + async.series([ + presence2.subscribe.bind(presence2), + doc1.unsubscribe.bind(doc1), + doc2.submitOp.bind(doc2, {index: 5, value: 'ern'}), + function(next) { + expect(doc1.version).to.eql(1); + expect(doc2.version).to.eql(2); + + localPresence1.submit({index: 12}, errorHandler(done)); + presence2.once('receive', function(id, presence) { + expect(doc2.version).to.eql(2); + expect(presence).to.eql({index: 15}); + next(); + }); + } + ], done); + }); + + it('returns errors when failing to transform old presence to the latest doc', function(done) { + var localPresence1 = presence1.create('presence-1'); + + async.series([ + presence2.subscribe.bind(presence2), + doc1.unsubscribe.bind(doc1), + doc2.submitOp.bind(doc2, {index: 5, value: 'ern'}), + function(next) { + expect(doc1.version).to.eql(1); + expect(doc2.version).to.eql(2); + + localPresence1.submit({badProp: 'foo'}, function(error) { + expect(error.code).to.equal('ERR_PRESENCE_TRANSFORM_FAILED'); + next(); + }); + } + ], done); + }); + + it('transforms old presence when it arrives later than a new op', function(done) { + var localPresence1 = presence1.create('presence-1'); + + async.series([ + presence1.subscribe.bind(presence1), + presence2.subscribe.bind(presence2), + function(next) { + presencePauser.pause(); + presencePauser.onPause = function() { + next(); + }; + localPresence1.submit({index: 12}, errorHandler(done)); + }, + function(next) { + doc1.submitOp({index: 5, value: 'ern'}, errorHandler(done)); + + doc2.once('op', function() { + presencePauser.resume(); + }); + + presence2.once('receive', function(id, presence) { + expect(doc2.version).to.eql(2); + expect(presence).to.eql({index: 15}); + next(); + }); + } + ], done); + }); + + // This test case attempts to force us into a tight race condition corner case: + // 1. doc1 sends presence, as well as submits an op + // 2. doc2 receives the op first, followed by the presence, which is now out-of-date + // 3. doc2 re-requests doc1's presence again + // 4. doc1 sends *another* op, which *again* beats the presence update (this could + // in theory happen many times in a row) + it('transforms old presence when new ops keep beating the presence responses', function(done) { + var localPresence1 = presence1.create('presence-1'); + + async.series([ + presence1.subscribe.bind(presence1), + presence2.subscribe.bind(presence2), + function(next) { + // Pause presence just before sending it back to the clients. It's already been + // transformed by the server to what the server knows as the latest version + presencePauser.pause(); + presencePauser.onPause = function() { + next(); + }; + + localPresence1.submit({index: 12}, errorHandler(done)); + }, + function(next) { + // Now we submit another op, while the presence is still paused. We wait until + // doc2 has received this op, so we know that when we finally receive our + // presence, it will be stale + doc1.submitOp({index: 5, value: 'ern'}, errorHandler(done)); + doc2.once('op', function() { + next(); + }); + }, + function(next) { + // At this point in the test, both docs are up-to-date on v2, but doc2 still + // hasn't received doc1's v1 presence + expect(doc1.version).to.eql(2); + expect(doc2.version).to.eql(2); + + // Resume presence broadcasts so that doc2 receives v1's stale presence + presencePauser.resume(); + // However, now immediately pause again. Set a conditional pause, which + // will allow doc2 to request presence from doc1, but will pause doc1's + // presence response, making it stale again + presencePauser.pause(function(request) { + return request.presence.id === 'presence-1'; + }); + presencePauser.onPause = function() { + presencePauser.onPause = null; + + // When we capture doc1's response, doc1 also submits some ops, which + // will make its response stale again. + doc1.submitOp({index: 0, value: 'The'}, function(error) { + if (error) return done(error); + doc1.submitOp({index: 3, value: ' '}, errorHandler(done)); + doc2.on('op', function() { + // This will get fired for v3 and then v4, so check for the later one + if (doc1.version === 4 && doc2.version === 4) { + // Only once doc2 has received the ops, should we resume our + // broadcasts, ensuring that the update is stale again. + presencePauser.resume(); + // Despite the second reply being stale, we expect to have transformed it + // up to the current version. + presence2.once('receive', function(id, presence) { + expect(doc2.version).to.eql(4); + expect(presence).to.eql({index: 19}); + next(); + }); + } + }); + }); + }; + } + ], done); + }); + + // This test is for a similar case to the above test case, but ensures that our + // op cache correctly handles deletion and creation ops + it('transforms old presence when a doc is deleted and then created', function(done) { + var localPresence1 = presence1.create('presence-1'); + + async.series([ + presence1.subscribe.bind(presence1), + presence2.subscribe.bind(presence2), + function(next) { + localPresence1.submit({index: 3}, errorHandler(done)); + presence2.once('receive', function() { + next(); + }); + }, + function(next) { + localPresence1.submit({index: 12}, errorHandler(done)); + presencePauser.pause(); + presencePauser.onPause = function() { + presencePauser.onPause = null; + next(); + }; + }, + function(next) { + doc1.submitOp({index: 5, value: 'ern'}, errorHandler(done)); + doc2.once('op', function() { + next(); + }); + }, + function(next) { + expect(doc1.version).to.eql(2); + expect(doc2.version).to.eql(2); + + presencePauser.resume(); + presencePauser.pause(function(request) { + return request.presence.id === 'presence-1'; + }); + presencePauser.onPause = function() { + presencePauser.onPause = null; + + async.series([ + doc1.del.bind(doc1), + doc1.create.bind(doc1, 'Subtle Knife', presenceTestType.type.name), + doc1.submitOp.bind(doc1, {index: 0, value: 'The '}) + ], errorHandler(done)); + }; + + doc2.on('op', function() { + if (doc2.version !== 5) return; + presencePauser.resume(); + presence2.once('receive', function(id, presence) { + expect(doc2.version).to.eql(5); + expect(presence).to.be.null; + next(); + }); + }); + } + ], done); + }); + + it('transforms local presence when a doc is deleted and created locally', function(done) { + var localPresence1 = presence1.create('presence-1'); + + async.series([ + localPresence1.submit.bind(localPresence1, {index: 3}), + doc1.del.bind(doc1), + doc1.create.bind(doc1, 'Subtle Knife', presenceTestType.type.uri), + function(next) { + expect(localPresence1.value).to.be.null; + next(); + } + ], done); + }); + + it('transforms pending presence by a re-creation submitted before a flush', function(done) { + var localPresence1 = presence1.create('presence-1'); + + async.series([ + presence2.subscribe.bind(presence2), + function(next) { + localPresence1.submit({index: 2}, errorHandler(done)); + presence2.once('receive', function() { + next(); + }); + }, + function(next) { + doc1.submitOp({index: 12, value: ': His Dark Materials'}, errorHandler(done)); + localPresence1.submit({index: 20}, errorHandler(done)); + doc1.del(errorHandler(done)); + doc1.create('Subtle Knife', presenceTestType.type.uri, errorHandler(done)); + + presence2.on('receive', function(id, presence) { + if (doc2.version !== 4) return; + expect(doc2.data).to.eql('Subtle Knife'); + expect(presence).to.be.null; + next(); + }); + } + ], done); + }); + + it('ignores presence that arrives out of order', function(done) { + var localPresence1 = presence1.create('presence-1'); + + async.series([ + presence2.subscribe.bind(presence2), + function(next) { + var hasPaused = false; + // Catch the first presence update, but then allow later ones + presencePauser.pause(function() { + if (hasPaused) return false; + hasPaused = true; + return true; + }); + + localPresence1.submit({index: 2}, next); + }, + function(next) { + presence2.once('receive', function(id, presence) { + expect(presence).to.eql({index: 3}); + + presence2.once('receive', function() { + done(new Error('should not get another presence event')); + }); + + presencePauser.resume(); + next(); + }); + + localPresence1.submit({index: 3}, errorHandler(done)); + } + ], done); + }); + + it('ignores pending presence that arrives out of order', function(done) { + var localPresence2 = presence2.create('presence-2'); + + async.series([ + presence1.subscribe.bind(presence1), + doc1.unsubscribe.bind(doc1), + doc2.submitOp.bind(doc2, {index: 5, value: 'ern'}), + function(next) { + var pauseCount = 0; + presencePauser.pause(); + presencePauser.onPause = function() { + pauseCount++; + if (pauseCount === 2) { + expect(this._pendingBroadcasts[0][0].presence.p).to.eql({index: 2}); + expect(this._pendingBroadcasts[1][0].presence.p).to.eql({index: 4}); + expect(this._pendingBroadcasts[0][0].presence.pv) + .to.be.lessThan(this._pendingBroadcasts[1][0].presence.pv); + + // Fire the broadcasts in the reverse order + this._pendingBroadcasts[1][1](); + this._pendingBroadcasts[0][1](); + } + }; + + presence1.once('receive', function(id, presence) { + expect(presence).to.eql({index: 4}); + next(); + }); + + localPresence2.submit({index: 2}, errorHandler(done)); + localPresence2.submit({index: 4}, errorHandler(done)); + } + ], done); + }); + + it('rejects a presence message with a numeric collection', function(done) { + var localPresence1 = presence1.create('presence-1'); + localPresence1.on('error', function(error) { + expect(error.code).to.eql('ERR_MESSAGE_BADLY_FORMED'); + done(); + }); + + var message = localPresence1._message(); + message.c = 1; + message.v = 1; + message.t = presenceTestType.type.uri; + connection1.send(message); + }); + + it('rejects a presence message with an invalid version', function(done) { + var localPresence1 = presence1.create('presence-1'); + localPresence1.on('error', function(error) { + expect(error.code).to.eql('ERR_MESSAGE_BADLY_FORMED'); + done(); + }); + + var message = localPresence1._message(); + message.v = -1; + message.t = presenceTestType.type.uri; + connection1.send(message); + }); + + it('rejects a presence message without an ID', function(done) { + var localPresence1 = presence1.create('presence-1'); + // Have to catch the error on the Presence instance, because obviously + // we won't find the LocalPresence without the ID + presence1.on('error', function(error) { + expect(error.code).to.eql('ERR_MESSAGE_BADLY_FORMED'); + done(); + }); + + var message = localPresence1._message(); + message.id = null; + message.v = 1; + message.t = presenceTestType.type.uri; + connection1.send(message); + }); + + it('only sends presence responses for the associated doc', function(done) { + var localPresence1 = presence1.create('presence-1'); + var localPresence2 = presence2.create('presence-2'); + var otherDoc1 = connection1.get('books', 'subtle-knife'); + var otherDoc2 = connection2.get('books', 'subtle-knife'); + var otherPresence1 = connection1.getDocPresence('books', 'subtle-knife'); + var otherPresence2 = connection2.getDocPresence('books', 'subtle-knife'); + var localOtherPresence1 = otherPresence1.create('other-presence-1'); + + async.series([ + otherDoc1.create.bind(otherDoc1, 'Subtle Knife', presenceTestType.type.uri), + otherDoc2.subscribe.bind(otherDoc2), + otherPresence2.subscribe.bind(otherPresence2), + function(next) { + localOtherPresence1.submit({index: 0}, errorHandler(done)); + otherPresence2.once('receive', function() { + next(); + }); + }, + localPresence1.submit.bind(localPresence1, {index: 3}), + function(next) { + localPresence2.submit({index: 5}, next); + otherPresence1.on('receive', function() { + done(new Error('Other document should not have had presence sent')); + }); + otherPresence2.on('receive', function() { + done(new Error('Other document should not have had presence sent')); + }); + } + ], done); + }); + + it('sends the presence data once the connection can send', function(done) { + var localPresence2 = presence2.create('presence-2'); + + async.series([ + presence1.subscribe.bind(presence1), + function(next) { + connection2._setState('disconnected'); + localPresence2.submit({index: 1}, errorHandler(done)); + + doc2.whenNothingPending(function() { + // The connection tests whether we can send just before sending on + // nothing pending, so let's also wait to reset the connection. + connection2._setState('connecting'); + connection2._setState('connected'); + }); + + presence1.once('receive', function(id, presence) { + expect(presence).to.eql({index: 1}); + next(); + }); + } + ], done); + }); + + it('re-requests presence when reconnecting', function(done) { + var localPresence2 = presence2.create('presence-2'); + + async.series([ + presence1.subscribe.bind(presence1), + presence2.subscribe.bind(presence2), + function(next) { + connection1._setState('disconnected'); + next(); + }, + function(next) { + localPresence2.submit({index: 0}, errorHandler(done)); + // We've not _actually_ disconnected the connection, so this + // event will still fire. + presence1.once('receive', function() { + next(); + }); + }, + function(next) { + connection1._setState('connecting'); + connection1._setState('connected'); + presence1.once('receive', function(id, presence) { + expect(presence).to.eql({index: 0}); + next(); + }); + } + ], done); + }); + + it('un-transforms presence after a soft rollback', function(done) { + // Mock invert so that we can trigger a soft rollback instead of a hard rollback + presenceTestType.type.invert = function() { + return {index: 5, del: 3}; + }; + + var localPresence1 = presence1.create('presence-1'); + var localPresence2 = presence2.create('presence-2'); + + async.series([ + presence1.subscribe.bind(presence1), + localPresence1.submit.bind(localPresence1, {index: 7}), + function(next) { + localPresence2.submit({index: 8}, errorHandler(done)); + presence1.once('receive', function() { + next(); + }); + }, + function(next) { + backend.use(backend.MIDDLEWARE_ACTIONS.apply, function(request, callback) { + callback({code: 'ERR_OP_SUBMIT_REJECTED'}); + }); + + presence1.once('receive', function() { + expect(localPresence1.value).to.eql({index: 10}); + expect(presence1.remotePresences).to.eql({ + 'presence-2': {index: 11} + }); + + presence1.once('receive', function() { + expect(localPresence1.value).to.eql({index: 7}); + expect(presence1.remotePresences).to.eql({ + 'presence-2': {index: 8} + }); + next(); + }); + }); + + doc1.submitOp({index: 5, value: 'ern'}, errorHandler(done)); + } + ], done); + }); + + it('performs a hard reset on presence when the doc is hard rolled back', function(done) { + var localPresence1 = presence1.create('presence-1'); + var localPresence2 = presence2.create('presence-2'); + + async.series([ + presence1.subscribe.bind(presence1), + localPresence1.submit.bind(localPresence1, {index: 7}), + function(next) { + localPresence2.submit({index: 8}, errorHandler(done)); + presence1.once('receive', function() { + next(); + }); + }, + function(next) { + backend.use(backend.MIDDLEWARE_ACTIONS.apply, function(request, callback) { + callback({code: 'ERR_OP_SUBMIT_REJECTED'}); + }); + + presence1.once('receive', function() { + expect(localPresence1.value).to.eql({index: 10}); + expect(presence1.remotePresences).to.eql({ + 'presence-2': {index: 11} + }); + + presence1.once('receive', function() { + expect(localPresence1.value).to.be.null; + expect(presence1.remotePresences).to.eql({}); + next(); + }); + }); + + doc1.submitOp({index: 5, value: 'ern'}, errorHandler(done)); + } + ], done); + }); + + it('can receive presence before performing the first fetch on a document', function(done) { + var connection3 = backend.connect(); + var doc3 = connection3.get('books', 'northern-lights'); + var presence3 = connection3.getDocPresence('books', 'northern-lights'); + var localPresence3 = presence3.create('presence-3'); + + async.series([ + presence1.subscribe.bind(presence1), + doc3.fetch.bind(doc3), + function(next) { + localPresence3.submit({index: 1}, errorHandler(done)); + presence1.once('receive', function(id, presence) { + expect(id).to.eql('presence-3'); + expect(presence).to.eql({index: 1}); + next(); + }); + } + ], done); + }); + + it('errors when submitting presence on a document that has not been created', function(done) { + var localPresence1 = presence1.create('presence-1'); + + async.series([ + doc1.del.bind(doc1), + function(next) { + localPresence1.submit({index: 2}, function(error) { + expect(error.code).to.eql('ERR_DOC_DOES_NOT_EXIST'); + next(); + }); + } + ], done); + }); + + it('errors when trying to submit presence on a type that does not support it', function(done) { + var jsonDoc = connection1.get('books', 'snuff'); + var jsonPresence = connection1.getDocPresence('books', 'snuff'); + var localJsonPresence = jsonPresence.create('json-presence'); + + async.series([ + jsonDoc.create.bind(jsonDoc, {title: 'Snuff'}, 'json0'), + function(next) { + localJsonPresence.submit({index: 1}, function(error) { + expect(error.code).to.eql('ERR_TYPE_DOES_NOT_SUPPORT_PRESENCE'); + next(); + }); + } + ], done); + }); + + it('returns errors sent from the middleware', function(done) { + backend.use(backend.MIDDLEWARE_ACTIONS.sendPresence, function(request, callback) { + callback('some error'); + }); + + var localPresence2 = presence2.create('presence-2'); + + async.series([ + presence1.subscribe.bind(presence1), + function(next) { + localPresence2.submit({index: 0}, errorHandler(done)); + presence1.once('error', function(error) { + expect(error.message).to.equal('some error'); + next(); + }); + } + ], done); + }); + + it('removes doc event listeners when destroying presence', function(done) { + var localPresence1 = presence1.create('presence-1'); + var localPresence2 = presence2.create('presence-2'); + + async.series([ + presence2.subscribe.bind(presence2), + localPresence2.submit.bind(localPresence2, {index: 2}), + function(next) { + localPresence1.submit({index: 1}, errorHandler(done)); + presence2.once('receive', function() { + next(); + }); + }, + presence2.destroy.bind(presence2), + function(next) { + expect(doc2._eventsCount).to.equal(0); + next(); + } + ], done); + }); + + it('destroys remote presence when it is updated with null', function(done) { + var localPresence1 = presence1.create('presence-1'); + + async.series([ + presence2.subscribe.bind(presence2), + function(next) { + localPresence1.submit({index: 1}, errorHandler(done)), + presence2.once('receive', function() { + next(); + }); + }, + function(next) { + localPresence1.submit(null, errorHandler(done)), + presence2.once('receive', function(id, presence) { + expect(presence).to.be.null; + expect(doc2._eventsCount).to.equal(0); + next(); + }); + } + ], done); + }); + + it('waits for local pending ops before accepting remote presence', function(done) { + var localPresence2 = presence2.create('presence-2'); + + var triggerApply; + async.series([ + presence1.subscribe.bind(presence1), + presence2.subscribe.bind(presence2), + function(next) { + backend.use(backend.MIDDLEWARE_ACTIONS.apply, function(request, callback) { + triggerApply = callback; + expect(doc1.inflightOp).to.be.ok; + expect(doc1.pendingOps).to.be.empty; + next(); + }); + + doc1.submitOp({index: 5, value: 'ern'}, errorHandler(done)); + }, + localPresence2.submit.bind(localPresence2, {index: 10}), + function(next) { + triggerApply(); + presence1.once('receive', function(id, presence) { + expect(presence).to.eql({index: 13}); + next(); + }); + } + ], done); + }); + + it('emits an error when trying to transform bad local presence against an op', function(done) { + var localPresence1 = presence1.create('presence-1'); + + localPresence1.submit({badProp: 'foo'}, function(error) { + expect(error).to.be.ok; + }); + + localPresence1.once('error', function() { + done(); + }); + + doc1.submitOp({index: 5, value: 'ern'}); + }); + + it('emits an error when trying to transform bad remote presence against an op', function(done) { + var localPresence2 = presence2.create('presence-2'); + + async.series([ + presence1.subscribe.bind(presence1), + function(next) { + localPresence2.submit({badProp: 'foo'}, errorHandler(done)); + presence1.once('receive', function(id, presence) { + expect(presence).to.eql({badProp: 'foo'}); + next(); + }); + }, + function(next) { + localPresence2.once('error', function() { + // Ignore the local error + }); + presence1.once('error', function() { + next(); + }); + doc1.submitOp({index: 5, value: 'ern'}); + } + ], done); + }); + + it('sends null presence when the doc is destroyed', function(done) { + var localPresence1 = presence1.create('presence-1'); + + async.series([ + presence2.subscribe.bind(presence2), + function(next) { + localPresence1.submit({index: 2}, errorHandler(done)); + presence2.once('receive', function() { + next(); + }); + }, + function(next) { + doc1.destroy(errorHandler(done)); + presence2.once('receive', function(id, presence) { + expect(id).to.equal('presence-1'); + expect(presence).to.be.null; + next(); + }); + } + ], done); + }); +}); diff --git a/test/client/presence/presence-pauser.js b/test/client/presence/presence-pauser.js new file mode 100644 index 000000000..09d8f5a52 --- /dev/null +++ b/test/client/presence/presence-pauser.js @@ -0,0 +1,43 @@ +// Helper middleware for precise control over when clients receive +// presence updates +module.exports = PresencePauser; +function PresencePauser() { + // Handler that can be set to be called when a message + // is paused + this.onPause = null; + this._shouldPause = false; + this._pendingBroadcasts = []; + + // Main middleware method + this.sendPresence = function(request, callback) { + if (!this._isPaused(request)) return callback(); + this._pendingBroadcasts.push([request, callback]); + if (typeof this.onPause === 'function') { + this.onPause(request); + } + }; + + // If called without an argument, will pause all broadcasts. + // If called with a function, the returned result will determine + // whether the request is paused + this.pause = function(predicate) { + this._shouldPause = typeof predicate === 'function' ? predicate : true; + }; + + // Send all paused broadcasts, and unpause. Also unsets the onPause + // handler + this.resume = function() { + this._shouldPause = false; + this._pendingBroadcasts.forEach(function(broadcast) { + var callback = broadcast[1]; + callback(); + }); + this._pendingBroadcasts = []; + this.onPause = null; + }; + + this._isPaused = function(request) { + return this._shouldPause === true || + typeof this._shouldPause === 'function' && this._shouldPause(request); + }; +} diff --git a/test/client/presence/presence-test-type.js b/test/client/presence/presence-test-type.js new file mode 100644 index 000000000..d1c1d42e3 --- /dev/null +++ b/test/client/presence/presence-test-type.js @@ -0,0 +1,37 @@ +exports.type = { + name: 'presence-test-type', + uri: 'http://sharejs.org/types/presence-test-type', + create: create, + apply: apply, + transformPresence: transformPresence +}; + +function create(data) { + return typeof data === 'string' ? data : ''; +} + +function apply(snapshot, op) { + if (op.value) { + return snapshot.substring(0, op.index) + op.value + snapshot.substring(op.index); + } else if (op.del) { + return snapshot.substring(0, op.index) + snapshot.substring(op.index + op.del); + } + + throw new Error('Invalid op'); +} + +function transformPresence(presence, op, isOwnOperation) { + if (!presence || presence.index < op.index || (presence.index === op.index && !isOwnOperation)) { + return presence; + } + + if (typeof presence.index !== 'number') throw new Error('Presence index is not a number'); + + if (op.value) { + return {index: presence.index + op.value.length}; + } else if (op.del) { + return {index: presence.index - op.del}; + } + + throw new Error('Invalid op'); +} diff --git a/test/client/presence/presence.js b/test/client/presence/presence.js new file mode 100644 index 000000000..d95aa0a0f --- /dev/null +++ b/test/client/presence/presence.js @@ -0,0 +1,444 @@ +var Backend = require('../../../lib/backend'); +var PresencePauser = require('./presence-pauser'); +var expect = require('chai').expect; +var async = require('async'); +var errorHandler = require('../../util').errorHandler; +var sinon = require('sinon'); + +describe('Presence', function() { + var backend; + var connection1; + var connection2; + var presence1; + var presence2; + var presencePauser; + + beforeEach(function(done) { + backend = new Backend({presence: true}); + var connectedCount = 0; + connection1 = backend.connect(); + connection2 = backend.connect(); + + var checkConnections = function() { + connectedCount++; + if (connectedCount === 2) done(); + }; + + connection1.on('connected', checkConnections); + connection2.on('connected', checkConnections); + + presencePauser = new PresencePauser(); + + backend.use(backend.MIDDLEWARE_ACTIONS.sendPresence, function(request, callback) { + presencePauser.sendPresence(request, callback); + }); + + presence1 = connection1.getPresence('test-channel'); + presence2 = connection2.getPresence('test-channel'); + }); + + afterEach(function(done) { + sinon.restore(); + connection1.close(); + connection2.close(); + backend.close(done); + }); + + it('can subscribe to updates from other clients', function(done) { + var localPresence1 = presence1.create('presence-1'); + + async.series([ + presence2.subscribe.bind(presence2), + function(next) { + localPresence1.submit({index: 5}, errorHandler(done)); + presence2.once('receive', function(id, presence) { + expect(id).to.equal('presence-1'); + expect(presence).to.eql({index: 5}); + next(); + }); + } + ], done); + }); + + it('can unsubscribe from updates to other clients', function(done) { + var localPresence1 = presence1.create('presence-1'); + + async.series([ + presence2.subscribe.bind(presence2), + presence2.unsubscribe.bind(presence2), + function(next) { + localPresence1.submit({index: 5}, errorHandler(done)); + presence2.once('receive', function() { + done(new Error('Should not have received presence update')); + }); + next(); + } + ], done); + }); + + it('requests existing presence from other subscribed clients when subscribing', function(done) { + var localPresence1 = presence1.create('presence-1'); + async.series([ + presence1.subscribe.bind(presence1), + localPresence1.submit.bind(localPresence1, {index: 2}), + function(next) { + presence2.subscribe(errorHandler(done)); + presence2.once('receive', function(id, presence) { + expect(id).to.equal('presence-1'); + expect(presence).to.eql({index: 2}); + next(); + }); + } + ], done); + }); + + it('removes remote presence when it is set to null', function(done) { + var localPresence1 = presence1.create('presence-1'); + async.series([ + presence2.subscribe.bind(presence2), + function(next) { + localPresence1.submit({index: 3}, errorHandler(done)); + presence2.once('receive', function() { + expect(presence2.remotePresences).to.eql({ + 'presence-1': {index: 3} + }); + next(); + }); + }, + function(next) { + localPresence1.submit(null, errorHandler(done)); + presence2.once('receive', function(id, presence) { + expect(presence).to.be.null; + expect(presence2.remotePresences).to.eql({}); + next(); + }); + } + ], done); + }); + + it('does not broadcast null local presence when requested', function(done) { + var localPresence1 = presence1.create('presence-1'); + async.series([ + presence1.subscribe.bind(presence1), + localPresence1.submit.bind(localPresence1, null), + function(next) { + presence2.subscribe(errorHandler(done)); + presence2.once('receive', function() { + done(new Error('should not have received presence')); + }); + next(); + } + ], done); + }); + + it('destroys its connection reference, unsubscribes and nulls its local presences', function(done) { + var localPresence1 = presence1.create('presence-1'); + var localPresence2 = presence2.create('presence-2'); + + async.series([ + presence1.subscribe.bind(presence1), + presence2.subscribe.bind(presence2), + function(next) { + localPresence1.submit({index: 1}, errorHandler(done)); + presence2.once('receive', function() { + next(); + }); + }, + function(next) { + localPresence2.submit({index: 2}, errorHandler(done)); + presence1.once('receive', function() { + next(); + }); + }, + presence1.destroy.bind(presence1), + function(next) { + expect(presence1.localPresences).to.eql({}); + expect(presence2.remotePresences).to.eql({}); + expect(connection1._presences).to.eql({}); + next(); + } + ], done); + }); + + it('supports multiple local presences on a single connection', function(done) { + var localPresence1a = presence1.create('presence-1a'); + var localPresence1b = presence1.create('presence-1b'); + + async.series([ + presence2.subscribe.bind(presence2), + function(next) { + localPresence1a.submit({index: 1}, errorHandler(done)); + presence2.once('receive', function(id, presence) { + expect(id).to.equal('presence-1a'); + expect(presence).to.eql({index: 1}); + next(); + }); + }, + function(next) { + localPresence1b.submit({index: 2}, errorHandler(done)); + presence2.once('receive', function(id, presence) { + expect(id).to.equal('presence-1b'); + expect(presence).to.eql({index: 2}); + expect(Object.keys(presence1.localPresences)).to.eql(['presence-1a', 'presence-1b']); + expect(presence2.remotePresences).to.eql({ + 'presence-1a': {index: 1}, + 'presence-1b': {index: 2} + }); + next(); + }); + } + ], done); + }); + + it('subscribes once the connection can send', function(done) { + var localPresence1 = presence1.create('presence-1'); + + connection2._setState('disconnected'); + expect(connection2.canSend).to.be.false; + async.series([ + function(next) { + presence2.subscribe(next); + connection2._setState('connecting'); + connection2._setState('connected'); + }, + function(next) { + localPresence1.submit({index: 1}, errorHandler(done)); + presence2.once('receive', function() { + next(); + }); + } + ], done); + }); + + it('sends local presence once the connection can send', function(done) { + var localPresence1 = presence1.create('presence-1'); + + connection1._setState('disconnected'); + expect(connection1.canSend).to.be.false; + async.series([ + presence2.subscribe.bind(presence2), + function(next) { + localPresence1.submit({index: 1}, errorHandler(done)); + presence2.once('receive', function() { + next(); + }); + connection1._setState('connecting'); + connection1._setState('connected'); + } + ], done); + }); + + it('re-requests remote presence when reconnecting', function(done) { + var localPresence1 = presence1.create('presence-1'); + + async.series([ + presence1.subscribe.bind(presence1), + presence2.subscribe.bind(presence2), + function(next) { + connection2._setState('disconnected'); + expect(connection2.canSend).to.be.false; + next(); + }, + localPresence1.submit.bind(localPresence1, {index: 1}), + function(next) { + presence2.once('receive', function(id, presence) { + expect(id).to.equal('presence-1'); + expect(presence).to.eql({index: 1}); + next(); + }); + connection2._setState('connecting'); + connection2._setState('connected'); + } + ], done); + }); + + it('calls multiple callbacks if subscribing multiple times in series', function(done) { + var callbacksCalled = 0; + + var callback = function(error) { + if (error) return done(error); + callbacksCalled++; + if (callbacksCalled === 3) done(); + }; + + presence1.subscribe(callback); + presence1.subscribe(callback); + presence1.subscribe(callback); + }); + + it('finishes unsubscribed if calling immediately after subscribe', function(done) { + var localPresence1 = presence1.create('presence-1'); + + async.series([ + function(next) { + var callbackCount = 0; + var callback = function(error) { + if (error) return done(error); + callbackCount++; + if (callbackCount === 2) next(); + }; + + presence2.subscribe(callback); + presence2.unsubscribe(callback); + }, + function(next) { + expect(presence2.wantSubscribe).to.be.false; + expect(presence2.subscribed).to.be.false; + localPresence1.submit({index: 1}, next); + presence2.on('receive', function() { + done(new Error('Should not have received presence')); + }); + } + ], done); + }); + + it('throws an error when trying to create a presence with a non-string ID', function() { + expect(function() { + presence1.create(123); + }).to.throw(); + }); + + it('assigns an ID if one is not provided', function() { + var localPresence = presence1.create(); + expect(localPresence.presenceId).to.be.ok; + }); + + it('returns the error if a local presence cannot be destroyed because of a bad submit', function(done) { + var localPresence1 = presence1.create('presence-1'); + sinon.stub(localPresence1, 'submit').callsFake(function(value, callback) { + callback(new Error('bad')); + }); + + localPresence1.destroy(function(error) { + expect(error).to.be.ok; + done(); + }); + }); + + it('throws an error if a presence is created with a non-string channel', function() { + expect(function() { + connection1.getPresence(123); + }).to.throw(); + }); + + it('throws an error if a presence is created with an empty string channel', function() { + expect(function() { + connection1.getPresence(''); + }).to.throw(); + }); + + it('returns unsubscribe errors when trying to destroy presence', function(done) { + sinon.stub(presence1, 'unsubscribe').callsFake(function(callback) { + callback(new Error('bad')); + }); + + presence1.destroy(function(error) { + expect(error).to.be.ok; + done(); + }); + }); + + it('emits unsubscribe errors when trying to destroy presence', function(done) { + sinon.stub(presence1, 'unsubscribe').callsFake(function(callback) { + callback(new Error('bad')); + }); + + presence1.once('error', function() { + done(); + }); + presence1.destroy(); + }); + + it('emits an error when trying to broadcast all presence with an error', function(done) { + presence1.once('error', function() { + done(); + }); + + presence1._broadcastAllLocalPresence(new Error('bad')); + }); + + it('emits a subscribe error on reconnection', function(done) { + async.series([ + presence1.subscribe.bind(presence1), + function(next) { + sinon.stub(presence1, 'subscribe').callsFake(function(callback) { + callback(new Error('bad')); + }); + + presence1.once('error', function() { + next(); + }); + + connection1._setState('disconnected'); + connection1._setState('connecting'); + connection1._setState('connected'); + } + ], done); + }); + + it('ignores presence that arrives out of order', function(done) { + var localPresence1 = presence1.create('presence-1'); + + async.series([ + presence2.subscribe.bind(presence2), + function(next) { + var hasPaused = false; + // Catch the first presence update, but then allow later ones + presencePauser.pause(function() { + if (hasPaused) return false; + hasPaused = true; + return true; + }); + + localPresence1.submit({index: 2}, next); + }, + function(next) { + presence2.once('receive', function(id, presence) { + expect(presence).to.eql({index: 3}); + + presence2.once('receive', function() { + done(new Error('should not get another presence event')); + }); + + presencePauser.resume(); + next(); + }); + + localPresence1.submit({index: 3}, errorHandler(done)); + } + ], done); + }); + + it('adds itself back onto the connection after a destroy and a resubscribe', function(done) { + async.series([ + presence1.destroy.bind(presence1), + presence1.subscribe.bind(presence1), + function(next) { + expect(connection1._presences[presence1.channel]).to.equal(presence1); + next(); + } + ], done); + }); + + it('broadcasts a null presence when the connection is disconnected', function(done) { + var localPresence1 = presence1.create('presence-1'); + async.series([ + presence1.subscribe.bind(presence1), + presence2.subscribe.bind(presence2), + function(next) { + localPresence1.submit({index: 3}, errorHandler(done)); + presence2.once('receive', function() { + next(); + }); + }, + function(next) { + presence2.once('receive', function(id, presence) { + expect(id).to.equal('presence-1'); + expect(presence).to.be.null; + next(); + }); + connection1.close(); + } + ], done); + }); +}); diff --git a/test/ot.js b/test/ot.js index 379d017da..ed952d8ce 100644 --- a/test/ot.js +++ b/test/ot.js @@ -1,6 +1,10 @@ var expect = require('chai').expect; var ot = require('../lib/ot'); -var type = require('../lib/types').defaultType; +var types = require('../lib/types'); +var type = types.defaultType; +var presenceType = require('./client/presence/presence-test-type').type; + +types.register(presenceType); describe('ot', function() { describe('checkOp', function() { @@ -237,4 +241,132 @@ describe('ot', function() { expect(op).eql({}); }); }); + + describe('transformPresence', function() { + it('transforms a presence by an op', function() { + var presence = { + p: {index: 5}, + t: presenceType.uri, + v: 1 + }; + + var op = { + op: {index: 2, value: 'foo'} + }; + var error = ot.transformPresence(presence, op); + + expect(error).to.be.undefined; + expect(presence).to.eql({ + p: {index: 8}, + t: presenceType.uri, + v: 2 + }); + }); + + it('nulls presence for a create op', function() { + var presence = { + p: {index: 5}, + t: presenceType.uri, + v: 1 + }; + + var op = { + create: {type: presenceType.uri, data: 'foo'} + }; + var error = ot.transformPresence(presence, op); + + expect(error).to.be.undefined; + expect(presence).to.eql({ + p: null, + t: presenceType.uri, + v: 2 + }); + }); + + it('nulls presence for a delete op', function() { + var presence = { + p: {index: 5}, + t: presenceType.uri, + v: 1 + }; + + var op = {del: true}; + var error = ot.transformPresence(presence, op); + + expect(error).to.be.undefined; + expect(presence).to.eql({ + p: null, + t: presenceType.uri, + v: 2 + }); + }); + + it('returns an error for an invalid op', function() { + var presence = { + p: {index: 5}, + t: presenceType.uri, + v: 1 + }; + + var op = {}; + var error = ot.transformPresence(presence, op); + + expect(error.code).to.eql('ERR_OT_OP_BADLY_FORMED'); + }); + + it('considers isOwnOp', function() { + var presence = { + p: {index: 5}, + t: presenceType.uri, + v: 1 + }; + + var op = { + op: {index: 5, value: 'foo'} + }; + var error = ot.transformPresence(presence, op, true); + + expect(error).to.be.undefined; + expect(presence).to.eql({ + p: {index: 8}, + t: presenceType.uri, + v: 2 + }); + }); + + it('checks that the type supports presence', function() { + var presence = { + p: {index: 5}, + t: type.uri, + v: 1 + }; + + var op = { + op: {index: 5, value: 'foo'} + }; + var error = ot.transformPresence(presence, op); + + expect(error.code).to.eql('ERR_TYPE_DOES_NOT_SUPPORT_PRESENCE'); + }); + + it('leaves a null presence untransformed', function() { + var presence = { + p: null, + t: presenceType.uri, + v: 2 + }; + + var op = { + op: {index: 5, value: 'foo'} + }; + var error = ot.transformPresence(presence, op); + + expect(error).to.be.undefined; + expect(presence).to.eql({ + p: null, + t: presenceType.uri, + v: 3 + }); + }); + }); }); diff --git a/test/util.js b/test/util.js index 578c92a11..17fa6500f 100644 --- a/test/util.js +++ b/test/util.js @@ -45,3 +45,9 @@ exports.errorHandler = function(callback) { if (error) callback(error); }; }; + +exports.errorHandler = function(callback) { + return function(error) { + if (error) callback(error); + }; +};