diff --git a/.gitignore b/.gitignore index 3005c1397..abd0d58e5 100644 --- a/.gitignore +++ b/.gitignore @@ -34,4 +34,5 @@ coverage # Dependency directories node_modules package-lock.json +yarn.lock jspm_packages diff --git a/README.md b/README.md index 8cbbfecf0..d82b92ad7 100644 --- a/README.md +++ b/README.md @@ -19,6 +19,7 @@ tracker](https://github.com/share/sharedb/issues). - Realtime synchronization of any JSON document - Concurrent multi-user collaboration +- Realtime synchronization of any ephemeral "presence" data - Synchronous editing API with asynchronous eventual consistency - Realtime query subscriptions - Simple integration with any database - [MongoDB](https://github.com/share/sharedb-mongo), [PostgresQL](https://github.com/share/sharedb-postgres) (experimental) @@ -73,6 +74,12 @@ initial data. Then you can submit editing operations on the document (using OT). Finally you can delete the document with a delete operation. By default, ShareDB stores all operations forever - nothing is truly deleted. +## User presence synchronization + +ShareDB supports synchronization of user presence data. This feature is opt-in, not enabled by default. To enable this feature, pass the `enablePresence: true` option to the ShareDB constructor (e.g. `var share = new ShareDB({ enablePresence: true })`). + +Presence data represents a user and is automatically synchronized between all clients subscribed to the same document. Its format is defined by the document's [OT Type](https://github.com/ottypes/docs), for example it may contain a user ID and a cursor position in a text document. All clients can modify their own presence data and receive a read-only version of other client's data. Presence data is automatically cleared when a client unsubscribes from the document or disconnects. It is also automatically transformed against applied operations, so that it still makes sense in the context of a modified document, for example a cursor position may be automatically advanced when a user types at the beginning of a text document. + ## Server API ### Initialization @@ -91,6 +98,8 @@ __Options__ * `options.pubsub` _(instance of `ShareDB.PubSub`)_ Notify other ShareDB processes when data changes through this pub/sub adapter. Defaults to `ShareDB.MemoryPubSub()`. +* `options.enablePresence` _(optional boolean)_ + Enable user presence synchronization. #### Database Adapters * `ShareDB.MemoryDB`, backed by a non-persistent database with no queries @@ -308,6 +317,9 @@ Unique document ID `doc.data` _(Object)_ Document contents. Available after document is fetched or subscribed to. +`doc.presence.current` _(Object)_ +Each property under `doc.presence.current` contains presence data shared by a client subscribed to this document. The property name is an empty string for this client's data and connection IDs for other clients' data. + `doc.fetch(function(err) {...})` Populate the fields on `doc` with a snapshot of the document from the server. @@ -337,6 +349,9 @@ An operation was applied to the data. `source` will be `false` for ops received `doc.on('del', function(data, source) {...})` The document was deleted. Document contents before deletion are passed in as an argument. `source` will be `false` for ops received from the server and defaults to `true` for ops generated locally. +`doc.on('presence', function(srcList, submitted) {...})` +Presence data has changed. `srcList` is an Array of `doc.presence` property names for which values have changed. `submitted` is `true`, if the event is the result of new presence data being submitted by the local or remote user, otherwise it is `false` - eg if the presence data was transformed against an operation or was cleared on unsubscribe, disconnect or roll-back. + `doc.on('error', function(err) {...})` There was an error fetching the document or applying an operation. @@ -370,6 +385,11 @@ Invokes the given callback function after Note that `whenNothingPending` does NOT wait for pending `model.query()` calls. +`doc.submitPresence(presenceData[, function(err) {...}])` +Set local presence data and publish it for other clients. +`presenceData` structure depends on the document type. +Presence is synchronized only when subscribed to the document. + ### Class: `ShareDB.Query` `query.ready` _(Boolean)_ @@ -467,6 +487,9 @@ Additional fields may be added to the error object for debugging context dependi * 4022 - Database adapter does not support queries * 4023 - Cannot project snapshots of this type * 4024 - Invalid version +* 4025 - Not subscribed to document +* 4026 - Presence data superseded +* 4027 - OT Type does not support presence ### 5000 - Internal error diff --git a/lib/agent.js b/lib/agent.js index b5cef65c1..53891b306 100644 --- a/lib/agent.js +++ b/lib/agent.js @@ -2,6 +2,7 @@ var hat = require('hat'); var util = require('./util'); var types = require('./types'); var logger = require('./logger'); +var ShareDBError = require('./error'); /** * Agent deserializes the wire protocol messages received from the stream and @@ -26,6 +27,69 @@ function Agent(backend, stream) { // Map from queryId -> emitter this.subscribedQueries = {}; + this.presence = { + agent: this, + isPresenceData: function (data) { + return data.a === 'p'; + }, + processPresenceData: function (data) { + if (data.a === 'p') { + // Send other clients' presence data + if (data.src !== this.agent.clientId) this.agent.send(data); + return true; + } + }, + // The max presence sequence number received from the client. + maxPresenceSeq: 0, + createPresence: function(collection, id, data, version, requestReply, seq) { + return { + a: 'p', + src: this.agent.clientId, + seq: seq != null ? seq : this.maxPresenceSeq, + c: collection, + d: id, + p: data, + v: version, + r: requestReply + }; + }, + subscribeToStream: function (collection, id, stream) { + var agent = this.agent; + stream.on('end', function() { + agent.backend.sendPresence(agent.presence.createPresence(collection, id)); + }); + }, + checkRequest: function (request) { + if (request.a === 'p') { + if (typeof request.c !== 'string') return 'Invalid collection'; + if (typeof request.d !== 'string') return 'Invalid id'; + if (typeof request.v !== 'number' || request.v < 0) return 'Invalid version'; + if (typeof request.seq !== 'number' || request.seq <= 0) return 'Invalid seq'; + if (typeof request.r !== 'undefined' && typeof request.r !== 'boolean') { + return 'Invalid "request reply" value'; + } + } + }, + handlePresenceMessage: function(request, callback) { + var presence = this.createPresence(request.c, request.d, request.p, request.v, request.r, request.seq); + if (presence.seq <= this.maxPresenceSeq) { + return process.nextTick(function() { + callback(new ShareDBError(4026, 'Presence data superseded')); + }); + } + this.maxPresenceSeq = presence.seq; + if (!this.agent.subscribedDocs[presence.c] || !this.agent.subscribedDocs[presence.c][presence.d]) { + return process.nextTick(function() { + callback(new ShareDBError(4025, 'Cannot send presence. Not subscribed to document: ' + presence.c + ' ' + presence.d)); + }); + } + this.agent.backend.sendPresence(presence, function(err) { + if (err) return callback(err); + callback(null, { seq: presence.seq }); + }); + } + }; + // We need to track this manually to make sure we don't reply to messages // after the stream was closed. this.closed = false; @@ -106,6 +170,10 @@ Agent.prototype._subscribeToStream = function(collection, id, stream) { logger.error('Doc subscription stream error', collection, id, data.error); return; } + if (agent.presence.isPresenceData(data)) { + agent.presence.processPresenceData(data); + return; + } if (agent._isOwnOp(collection, data)) return; agent._sendOp(collection, id, data); }); @@ -117,6 +185,7 @@ Agent.prototype._subscribeToStream = function(collection, id, stream) { if (util.hasKeys(streams)) return; delete agent.subscribedDocs[collection]; }); + this.presence.subscribeToStream(collection, id, stream); }; Agent.prototype._subscribeToQuery = function(emitter, queryId, collection, query) { @@ -288,6 +357,8 @@ Agent.prototype._checkRequest = function(request) { // Bulk request if (request.c != null && typeof request.c !== 'string') return 'Invalid collection'; if (typeof request.b !== 'object') return 'Invalid bulk subscribe data'; + } else { + return this.presence.checkRequest(request); } }; @@ -325,6 +396,9 @@ Agent.prototype._handleMessage = function(request, callback) { case 'nt': return this._fetchSnapshotByTimestamp(request.c, request.d, request.ts, callback); default: + if (this.presence.isPresenceData(request)) { + return this.presence.handlePresenceMessage(request, callback); + } callback({code: 4000, message: 'Invalid or unknown message'}); } } catch (err) { diff --git a/lib/backend.js b/lib/backend.js index 442da075c..192636a76 100644 --- a/lib/backend.js +++ b/lib/backend.js @@ -12,6 +12,7 @@ var Snapshot = require('./snapshot'); var StreamSocket = require('./stream-socket'); var SubmitRequest = require('./submit-request'); var types = require('./types'); +var dummyPresence = require('./presence/dummy'); var warnDeprecatedDoc = true; var warnDeprecatedAfterSubmit = true; @@ -48,6 +49,8 @@ function Backend(options) { if (!options.disableSpaceDelimitedActions) { this._shimAfterSubmit(); } + + this.presence = options.presence || dummyPresence; } module.exports = Backend; emitter.mixin(Backend); @@ -155,6 +158,9 @@ Backend.prototype.connect = function(connection, req) { // not used internal to ShareDB, but it is handy for server-side only user // code that may cache state on the agent and read it in middleware connection.agent = agent; + + connection.DocPresence = this.presence.DocPresence; + return connection; }; @@ -720,6 +726,11 @@ Backend.prototype._buildSnapshotFromOps = function (id, startingSnapshot, ops, c callback(error, snapshot); }; +Backend.prototype.sendPresence = function(presence, callback) { + var channels = [ this.getDocChannel(presence.c, presence.d) ]; + this.pubsub.publish(channels, presence, callback); +}; + function pluckIds(snapshots) { var ids = []; for (var i = 0; i < snapshots.length; i++) { diff --git a/lib/client/connection.js b/lib/client/connection.js index cd56306b2..5be290676 100644 --- a/lib/client/connection.js +++ b/lib/client/connection.js @@ -63,6 +63,34 @@ function Connection(socket) { this.state = connectionState(socket); this.bindToSocket(socket); + + this.presence = { + connection: this, + // TODO unify with code in agent.js + isPresenceMessage: function (message) { + return message.a === 'p'; + }, + handlePresenceMessage: function (err, message) { + var doc = this.connection.getExisting(message.c, message.d); + if (doc) doc._handlePresence(err, message); + }, + sendPresence: function(doc, data, requestReply) { + // Ensure the doc is registered so that it receives the reply message + this.connection._addDoc(doc); + var message = { + a: 'p', + c: doc.collection, + d: doc.id, + p: data, + v: doc.version || 0, + seq: this.connection.seq++ + }; + if (requestReply) { + message.r = true; + } + this.connection.send(message); + } + }; } emitter.mixin(Connection); @@ -255,6 +283,9 @@ Connection.prototype.handleMessage = function(message) { return; default: + if (this.presence.isPresenceMessage(message)) { + return this.presence.handlePresenceMessage(err, message); + } logger.warn('Ignoring unrecognized message', message); } }; @@ -424,6 +455,13 @@ Connection.prototype.sendOp = function(doc, op) { this.send(message); }; +/** + * Sends presence data down the socket + */ +Connection.prototype.sendPresence = function(doc, data, requestReply) { + this.presence.sendPresence(doc, data, requestReply); +}; + /** * Sends a message down the socket diff --git a/lib/client/doc.js b/lib/client/doc.js index 39f3ea08f..86336ec7e 100644 --- a/lib/client/doc.js +++ b/lib/client/doc.js @@ -29,6 +29,14 @@ var types = require('../types'); * }) * * + * Presence + * -------- + * + * We can associate transient "presence" data with a document, eg caret position, etc. + * The presence data is synchronized on the best-effort basis between clients subscribed to the same document. + * Each client has their own presence data which is read-write. Other clients' data is read-only. + * + * * Events * ------ * @@ -43,9 +51,11 @@ var types = require('../types'); * the data is null. It is passed the data before delteion as an * arguments * - `load ()` Fired when a new snapshot is ingested from a fetch, subscribe, or query + * - `presence ([src])` Fired after the presence data has changed. */ module.exports = Doc; + function Doc(connection, collection, id) { emitter.EventEmitter.call(this); @@ -58,6 +68,8 @@ function Doc(connection, collection, id) { this.type = null; this.data = undefined; + this.presence = new connection.DocPresence(this); + // Array of callbacks or nulls as placeholders this.inflightFetch = []; this.inflightSubscribe = []; @@ -111,10 +123,12 @@ Doc.prototype.destroy = function(callback) { if (callback) return callback(err); return doc.emit('error', err); } + if (doc.presence) doc.presence.destroyPresence(); doc.connection._destroyDoc(doc); if (callback) callback(); }); } else { + if (doc.presence) doc.presence.destroyPresence(); doc.connection._destroyDoc(doc); if (callback) callback(); } @@ -195,21 +209,28 @@ Doc.prototype.ingestSnapshot = function(snapshot, callback) { if (this.version > snapshot.v) return callback && callback(); this.version = snapshot.v; + + this.presence.clearCachedOps(); + var type = (snapshot.type === undefined) ? types.defaultType : snapshot.type; this._setType(type); this.data = (this.type && this.type.deserialize) ? this.type.deserialize(snapshot.data) : snapshot.data; this.emit('load'); + this.presence.processAllReceivedPresence(); callback && callback(); }; Doc.prototype.whenNothingPending = function(callback) { - if (this.hasPending()) { - this.once('nothing pending', callback); - return; - } - callback(); + var doc = this; + process.nextTick(function() { + if (doc.hasPending()) { + doc.once('nothing pending', callback); + return; + } + callback(); + }); }; Doc.prototype.hasPending = function() { @@ -219,7 +240,8 @@ Doc.prototype.hasPending = function() { this.inflightFetch.length || this.inflightSubscribe.length || this.inflightUnsubscribe.length || - this.pendingFetch.length + this.pendingFetch.length || + this.presence.hasPendingPresence() ); }; @@ -266,6 +288,7 @@ Doc.prototype._handleSubscribe = function(err, snapshot) { if (this.wantSubscribe) this.subscribed = true; this.ingestSnapshot(snapshot, callback); this._emitNothingPending(); + this.flush(); }; Doc.prototype._handleUnsubscribe = function(err) { @@ -327,13 +350,23 @@ Doc.prototype._handleOp = function(err, message) { } this.version++; + this.presence.cacheOp(message); try { this._otApply(message, false); + this.presence.processAllReceivedPresence(); } catch (error) { return this._hardRollback(error); } }; +Doc.prototype._handlePresence = function (err, presence) { + this.presence.handlePresence(err, presence); +}; + +Doc.prototype.submitPresence = function (data, callback) { + this.presence.submitPresence(data, callback); +}; + // Called whenever (you guessed it!) the connection state changes. This will // happen when we get disconnected & reconnect. Doc.prototype._onConnectionStateChanged = function() { @@ -354,7 +387,10 @@ Doc.prototype._onConnectionStateChanged = function() { if (this.inflightUnsubscribe.length) { var callbacks = this.inflightUnsubscribe; this.inflightUnsubscribe = []; + this.presence.pausePresence(); callEach(callbacks); + } else { + this.presence.pausePresence(); } } }; @@ -414,8 +450,11 @@ Doc.prototype.unsubscribe = function(callback) { if (this.connection.canSend) { var isDuplicate = this.connection.sendUnsubscribe(this); pushActionCallback(this.inflightUnsubscribe, isDuplicate, callback); + + this.presence.pausePresence(); return; } + this.presence.pausePresence(); if (callback) process.nextTick(callback); }; @@ -446,6 +485,10 @@ Doc.prototype.flush = function() { if (!this.paused && this.pendingOps.length) { this._sendOp(); } + + if (this.subscribed && !this.hasWritePending()) { + this.presence.flushPresence(); + } }; // Helper function to set op to contain a no-op. @@ -550,6 +593,7 @@ Doc.prototype._otApply = function(op, source) { // Apply the individual op component this.emit('before op', componentOp.op, source); this.data = this.type.apply(this.data, componentOp.op); + this.presence.transformAllPresence(componentOp); this.emit('op', componentOp.op, source); } // Pop whatever was submitted since we started applying this op @@ -562,6 +606,7 @@ Doc.prototype._otApply = function(op, source) { this.emit('before op', op.op, source); // Apply the operation to the local data, mutating it in place this.data = this.type.apply(this.data, op.op); + this.presence.transformAllPresence(op); // Emit an 'op' event once the local data includes the changes from the // op. For locally submitted ops, this will be synchronously with // submission and before the server or other clients have received the op. @@ -578,6 +623,7 @@ Doc.prototype._otApply = function(op, source) { this.type.createDeserialized(op.create.data) : this.type.deserialize(this.type.create(op.create.data)) : this.type.create(op.create.data); + this.presence.transformAllPresence(op); this.emit('create', source); return; } @@ -585,6 +631,7 @@ Doc.prototype._otApply = function(op, source) { if (op.del) { var oldData = this.data; this._setType(null); + this.presence.transformAllPresence(op); this.emit('del', oldData, source); return; } @@ -836,7 +883,7 @@ Doc.prototype.resume = function() { Doc.prototype._opAcknowledged = function(message) { if (this.inflightOp.create) { this.version = message.v; - + this.presence.clearCachedOps(); } else if (message.v !== this.version) { // We should already be at the same version, because the server should // have sent all the ops that have happened before acknowledging our op @@ -849,7 +896,9 @@ Doc.prototype._opAcknowledged = function(message) { // The op was committed successfully. Increment the version number this.version++; + this.presence.cacheOp(this.inflightOp); this._clearInflightOp(); + this.presence.processAllReceivedPresence(); }; Doc.prototype._rollback = function(err) { @@ -897,6 +946,10 @@ Doc.prototype._hardRollback = function(err) { if (this.inflightOp) pendingOps.push(this.inflightOp); pendingOps = pendingOps.concat(this.pendingOps); + // Apply a similar technique for presence. + var pendingPresence = this.presence.getPendingPresence(); + this.presence.hardRollbackPresence(); + // Cancel all pending ops and reset if we can't invert this._setType(null); this.version = null; @@ -909,13 +962,22 @@ Doc.prototype._hardRollback = function(err) { // We want to check that no errors are swallowed, so we check that: // - there are callbacks to call, and // - that every single pending op called a callback - // If there are no ops queued, or one of them didn't handle the error, - // then we emit the error. var allOpsHadCallbacks = !!pendingOps.length; for (var i = 0; i < pendingOps.length; i++) { allOpsHadCallbacks = callEach(pendingOps[i].callbacks, err) && allOpsHadCallbacks; } - if (err && !allOpsHadCallbacks) return doc.emit('error', err); + + // Apply the same technique for presence. + var allPresenceHadCallbacks = !!pendingPresence.length; + for (var i = 0; i < pendingPresence.length; i++) { + allPresenceHadCallbacks = callEach(pendingPresence[i], err) && allPresenceHadCallbacks; + } + + // If there are no ops or presence queued, or one of them didn't handle the error, + // then we emit the error. + if (err && !allOpsHadCallbacks && !allPresenceHadCallbacks) { + doc.emit('error', err); + } }); }; @@ -943,3 +1005,6 @@ function callEach(callbacks, err) { } return called; } + +// Expose callEach to presence methods via Doc prototype. +Doc.prototype._callEach = callEach; diff --git a/lib/presence/dummy.js b/lib/presence/dummy.js new file mode 100644 index 000000000..8a4bac340 --- /dev/null +++ b/lib/presence/dummy.js @@ -0,0 +1,40 @@ +/* + * Dummy Presence + * -------------- + * + * This module provides a dummy implementation of presence that does nothing. + * Its purpose is to stand in for a real implementation, to simplify code in doc.js. + */ +var presence = require('./index'); + +function DocPresence () { } + +// Inherit from Presence to support instanceof type checking. +DocPresence.prototype = Object.create(presence.DocPresence.prototype); + +function noop () {} +function returnEmptyArray () { return []; }; +function returnFalse () { return false; }; + +Object.assign(DocPresence.prototype, { + submitPresence: noop, + handlePresence: noop, + processAllReceivedPresence: noop, + transformAllPresence: noop, + pausePresence: noop, + cacheOp: noop, + flushPresence: noop, + destroyPresence: noop, + clearCachedOps: noop, + hardRollbackPresence: returnEmptyArray, + hasPendingPresence: returnFalse, + getPendingPresence: returnEmptyArray, + _processReceivedPresence: noop, + _transformPresence: noop, + _setPresence: noop, + _emitPresence: noop +}); + +module.exports = { + DocPresence: DocPresence +}; diff --git a/lib/presence/index.js b/lib/presence/index.js new file mode 100644 index 000000000..e4de5669a --- /dev/null +++ b/lib/presence/index.js @@ -0,0 +1,5 @@ +var DocPresence = function () {}; + +module.exports = { + DocPresence: DocPresence +}; diff --git a/lib/presence/stateless.js b/lib/presence/stateless.js new file mode 100644 index 000000000..2f65ca930 --- /dev/null +++ b/lib/presence/stateless.js @@ -0,0 +1,398 @@ +/* + * Stateless Presence + * ------------------ + * + * This module provides an implementation of presence that works, + * but has some scalability problems. Each time a client joins a document, + * this implementation requests current presence information from all other clients, + * via the server. The server does not store any state at all regarding presence, + * it exists only in clients, hence the name "Doc Presence". + * + */ +var ShareDBError = require('../error'); +var presence = require('./index'); + +function DocPresence(doc) { + + // The Doc instance that this Presence is attached to. + this.doc = doc; + + // The current presence data. + // Map of src -> presence data + // Local src === '' + this.current = {}; + + // The presence objects received from the server. + // Map of src -> presence + this.received = {}; + + // The minimum amount of time to wait before removing processed presence from this.presence.received. + // The processed presence is removed to avoid leaking memory, in case peers keep connecting and disconnecting a lot. + // The processed presence is not removed immediately to enable avoiding race conditions, where messages with lower + // sequence number arrive after messages with higher sequence numbers. + this.receivedTimeout = 60000; + + // If set to true, then the next time the local presence is sent, + // all other clients will be asked to reply with their own presence data. + this.requestReply = true; + + // A list of ops sent by the server. These are needed for transforming presence data, + // if we get that presence data for an older version of the document. + this.cachedOps = []; + + // The ops are cached for at least 1 minute by default, which should be lots, considering that the presence + // data is supposed to be synced in real-time. + this.cachedOpsTimeout = 60000; + + // The sequence number of the inflight presence request. + this.inflightSeq = 0; + + // Callbacks (or null) for pending and inflight presence requests. + this.pending = null; + this.inflight = null; +} + +// Inherit from Presence to support instanceof type checking. +DocPresence.prototype = Object.create(presence.DocPresence.prototype); + +// Submit presence data to a document. +// This is the only public facing method. +// All the others are marked as internal with a leading "_". +DocPresence.prototype.submitPresence = function (data, callback) { + if (data != null) { + if (!this.doc.type) { + var doc = this.doc; + return process.nextTick(function() { + var err = new ShareDBError(4015, 'Cannot submit presence. Document has not been created. ' + doc.collection + '.' + doc.id); + if (callback) return callback(err); + doc.emit('error', err); + }); + } + + if (!this.doc.type.createPresence || !this.doc.type.transformPresence) { + var doc = this.doc; + return process.nextTick(function() { + var err = new ShareDBError(4027, 'Cannot submit presence. Document\'s type does not support presence. ' + doc.collection + '.' + doc.id); + if (callback) return callback(err); + doc.emit('error', err); + }); + } + + data = this.doc.type.createPresence(data); + } + + if (this._setPresence('', data, true) || this.pending || this.inflight) { + if (!this.pending) { + this.pending = []; + } + if (callback) { + this.pending.push(callback); + } + + } else if (callback) { + process.nextTick(callback); + } + + process.nextTick(this.doc.flush.bind(this.doc)); +}; + +DocPresence.prototype.handlePresence = function (err, presence) { + if (!this.doc.subscribed) return; + + var src = presence.src; + if (!src) { + // Handle the ACK for the presence data we submitted. + // this.inflightSeq would not equal presence.seq after a hard rollback, + // when all callbacks are flushed with an error. + if (this.inflightSeq === presence.seq) { + var callbacks = this.inflight; + this.inflight = null; + this.inflightSeq = 0; + var called = callbacks && this.doc._callEach(callbacks, err); + if (err && !called) this.doc.emit('error', err); + this.doc.flush(); + this.doc._emitNothingPending(); + } + return; + } + + // This shouldn't happen but check just in case. + if (err) return this.doc.emit('error', err); + + if (presence.r && !this.pending) { + // Another client requested us to share our current presence data + this.pending = []; + this.doc.flush(); + } + + // Ignore older messages which arrived out of order + if ( + this.received[src] && ( + this.received[src].seq > presence.seq || + (this.received[src].seq === presence.seq && presence.v != null) + ) + ) return; + + this.received[src] = presence; + + if (presence.v == null) { + // null version should happen only when the server automatically sends + // null presence for an unsubscribed client + presence.processedAt = Date.now(); + return this._setPresence(src, null, true); + } + + // Get missing ops first, if necessary + if (this.doc.version == null || this.doc.version < presence.v) return this.doc.fetch(); + + this._processReceivedPresence(src, true); +}; + +// If emit is true and presence has changed, emits a presence event. +// Returns true, if presence has changed for src. Otherwise false. +DocPresence.prototype._processReceivedPresence = function (src, emit) { + if (!src) return false; + var presence = this.received[src]; + if (!presence) return false; + + if (presence.processedAt != null) { + if (Date.now() >= presence.processedAt + this.receivedTimeout) { + // Remove old received and processed presence. + delete this.received[src]; + } + return false; + } + + if (this.doc.version == null || this.doc.version < presence.v) { + // keep waiting for the missing snapshot or ops. + return false; + } + + if (presence.p == null) { + // Remove presence data as requested. + presence.processedAt = Date.now(); + return this._setPresence(src, null, emit); + } + + if (!this.doc.type || !this.doc.type.createPresence || !this.doc.type.transformPresence) { + // Remove presence data because the document is not created or its type does not support presence + presence.processedAt = Date.now(); + return this._setPresence(src, null, emit); + } + + if (this.doc.inflightOp && this.doc.inflightOp.op == null) { + // Remove presence data because presence.received can be transformed only against "op", not "create" nor "del" + presence.processedAt = Date.now(); + return this._setPresence(src, null, emit); + } + + for (var i = 0; i < this.doc.pendingOps.length; i++) { + if (this.doc.pendingOps[i].op == null) { + // Remove presence data because presence.received can be transformed only against "op", not "create" nor "del" + presence.processedAt = Date.now(); + return this._setPresence(src, null, emit); + } + } + + var startIndex = this.cachedOps.length - (this.doc.version - presence.v); + if (startIndex < 0) { + // Remove presence data because we can't transform presence.received + presence.processedAt = Date.now(); + return this._setPresence(src, null, emit); + } + + for (var i = startIndex; i < this.cachedOps.length; i++) { + if (this.cachedOps[i].op == null) { + // Remove presence data because presence.received can be transformed only against "op", not "create" nor "del" + presence.processedAt = Date.now(); + return this._setPresence(src, null, emit); + } + } + + // Make sure the format of the data is correct + var data = this.doc.type.createPresence(presence.p); + + // Transform against past ops + for (var i = startIndex; i < this.cachedOps.length; i++) { + var op = this.cachedOps[i]; + data = this.doc.type.transformPresence(data, op.op, presence.src === op.src); + } + + // Transform against pending ops + if (this.doc.inflightOp) { + data = this.doc.type.transformPresence(data, this.doc.inflightOp.op, false); + } + + for (var i = 0; i < this.doc.pendingOps.length; i++) { + data = this.doc.type.transformPresence(data, this.doc.pendingOps[i].op, false); + } + + // Set presence data + presence.processedAt = Date.now(); + return this._setPresence(src, data, emit); +}; + +DocPresence.prototype.processAllReceivedPresence = function () { + var srcList = Object.keys(this.received); + var changedSrcList = []; + for (var i = 0; i < srcList.length; i++) { + var src = srcList[i]; + if (this._processReceivedPresence(src)) { + changedSrcList.push(src); + } + } + this._emitPresence(changedSrcList, true); +}; + +DocPresence.prototype._transformPresence = function (src, op) { + var presenceData = this.current[src]; + if (op.op != null) { + var isOwnOperation = src === (op.src || ''); + presenceData = this.doc.type.transformPresence(presenceData, op.op, isOwnOperation); + } else { + presenceData = null; + } + return this._setPresence(src, presenceData); +}; + +DocPresence.prototype.transformAllPresence = function (op) { + var srcList = Object.keys(this.current); + var changedSrcList = []; + for (var i = 0; i < srcList.length; i++) { + var src = srcList[i]; + if (this._transformPresence(src, op)) { + changedSrcList.push(src); + } + } + this._emitPresence(changedSrcList, false); +}; + +DocPresence.prototype.pausePresence = function () { + if (!this) return; + + if (this.inflight) { + this.pending = this.pending + ? this.inflight.concat(this.pending) + : this.inflight; + this.inflight = null; + this.inflightSeq = 0; + } else if (!this.pending && this.current[''] != null) { + this.pending = []; + } + this.received = {}; + this.requestReply = true; + var srcList = Object.keys(this.current); + var changedSrcList = []; + for (var i = 0; i < srcList.length; i++) { + var src = srcList[i]; + if (src && this._setPresence(src, null)) { + changedSrcList.push(src); + } + } + this._emitPresence(changedSrcList, false); +}; + +// If emit is true and presence has changed, emits a presence event. +// Returns true, if presence has changed. Otherwise false. +DocPresence.prototype._setPresence = function (src, data, emit) { + if (data == null) { + if (this.current[src] == null) return false; + delete this.current[src]; + } else { + var isPresenceEqual = + this.current[src] === data || + (this.doc.type.comparePresence && this.doc.type.comparePresence(this.current[src], data)); + if (isPresenceEqual) return false; + this.current[src] = data; + } + if (emit) this._emitPresence([ src ], true); + return true; +}; + +DocPresence.prototype._emitPresence = function (srcList, submitted) { + if (srcList && srcList.length > 0) { + var doc = this.doc; + process.nextTick(function() { + doc.emit('presence', srcList, submitted); + }); + } +}; + +DocPresence.prototype.cacheOp = function (message) { + var op = { + src: message.src, + time: Date.now(), + create: !!message.create, + op: message.op, + del: !!message.del + } + // Remove the old ops. + var oldOpTime = Date.now() - this.cachedOpsTimeout; + var i; + for (i = 0; i < this.cachedOps.length; i++) { + if (this.cachedOps[i].time >= oldOpTime) { + break; + } + } + if (i > 0) { + this.cachedOps.splice(0, i); + } + + // Cache the new op. + this.cachedOps.push(op); +}; + +// If there are no pending ops, this method sends the pending presence data, if possible. +DocPresence.prototype.flushPresence = function () { + if(!this.inflight && this.pending) { + this.inflight = this.pending; + this.inflightSeq = this.doc.connection.seq; + this.pending = null; + this.doc.connection.sendPresence(this.doc, this.current[''], this.requestReply); + this.requestReply = false; + } +}; + +DocPresence.prototype.destroyPresence = function () { + this.received = {}; + this.clearCachedOps(); +}; + +DocPresence.prototype.clearCachedOps = function () { + this.cachedOps.length = 0; +}; + +// Reset presence-related properties. +DocPresence.prototype.hardRollbackPresence = function () { + this.inflight = null; + this.inflightSeq = 0; + this.pending = null; + this.cachedOps.length = 0; + this.received = {}; + this.requestReply = true; + + var srcList = Object.keys(this.current); + var changedSrcList = []; + for (var i = 0; i < srcList.length; i++) { + var src = srcList[i]; + if (this._setPresence(src, null)) { + changedSrcList.push(src); + } + } + this._emitPresence(changedSrcList, false); +}; + +DocPresence.prototype.hasPendingPresence = function () { + return this.inflight || this.pending; +}; + +DocPresence.prototype.getPendingPresence = function () { + var pendingPresence = []; + if (this.inflight) pendingPresence.push(this.inflight); + if (this.pending) pendingPresence.push(this.pending); + return pendingPresence; +}; + +module.exports = { + DocPresence: DocPresence +}; diff --git a/package.json b/package.json index 668ecc7fc..76dc3f878 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "sharedb", - "version": "1.0.0-beta.21", + "version": "1.0.0-beta.22", "description": "JSON OT database backend", "main": "lib/index.js", "dependencies": { diff --git a/test/client/doc.js b/test/client/doc.js index dd7ad5396..738c3055a 100644 --- a/test/client/doc.js +++ b/test/client/doc.js @@ -15,28 +15,39 @@ describe('Doc', function() { expect(doc).equal(doc2); }); - it('calling doc.destroy unregisters it', function() { - var doc = this.connection.get('dogs', 'fido'); - expect(this.connection.getExisting('dogs', 'fido')).equal(doc); + it('calling doc.destroy unregisters it', function(done) { + var connection = this.connection; + var doc = connection.get('dogs', 'fido'); + expect(connection.getExisting('dogs', 'fido')).equal(doc); - doc.destroy(); - expect(this.connection.getExisting('dogs', 'fido')).equal(undefined); + doc.destroy(function(err) { + if (err) return done(err); + expect(connection.getExisting('dogs', 'fido')).equal(undefined); - var doc2 = this.connection.get('dogs', 'fido'); - expect(doc).not.equal(doc2); + var doc2 = connection.get('dogs', 'fido'); + expect(doc).not.equal(doc2); + done(); + }); + + // destroy is async + expect(connection.getExisting('dogs', 'fido')).equal(doc); }); - it('getting then destroying then getting returns a new doc object', function() { - var doc = this.connection.get('dogs', 'fido'); - doc.destroy(); - var doc2 = this.connection.get('dogs', 'fido'); - expect(doc).not.equal(doc2); - expect(doc).eql(doc2); + it('getting then destroying then getting returns a new doc object', function(done) { + var connection = this.connection; + var doc = connection.get('dogs', 'fido'); + doc.destroy(function(err) { + if (err) return done(err); + var doc2 = connection.get('dogs', 'fido'); + expect(doc).not.equal(doc2); + expect(doc).eql(doc2); + done(); + }); }); - it('doc.destroy() calls back', function(done) { + it('doc.destroy() works without a callback', function() { var doc = this.connection.get('dogs', 'fido'); - doc.destroy(done); + doc.destroy(); }); describe('applyStack', function() { diff --git a/test/client/presence-type.js b/test/client/presence-type.js new file mode 100644 index 000000000..6138eae7f --- /dev/null +++ b/test/client/presence-type.js @@ -0,0 +1,78 @@ +// A simple type for testing presence, where: +// +// - snapshot is a list +// - operation is { index, value } -> insert value at index in snapshot +// - presence is { index } -> an index in the snapshot +exports.type = { + name: 'wrapped-presence-no-compare', + uri: 'http://sharejs.org/types/wrapped-presence-no-compare', + create: create, + apply: apply, + transform: transform, + createPresence: createPresence, + transformPresence: transformPresence +}; + +// The same as `exports.type` but implements `comparePresence`. +exports.type2 = { + name: 'wrapped-presence-with-compare', + uri: 'http://sharejs.org/types/wrapped-presence-with-compare', + create: create, + apply: apply, + transform: transform, + createPresence: createPresence, + transformPresence: transformPresence, + comparePresence: comparePresence +}; + +// The same as `exports.type` but `presence.index` is unwrapped. +exports.type3 = { + name: 'unwrapped-presence', + uri: 'http://sharejs.org/types/unwrapped-presence', + create: create, + apply: apply, + transform: transform, + createPresence: createPresence2, + transformPresence: transformPresence2 +}; + +function create(data) { + return data || []; +} + +function apply(snapshot, op) { + snapshot.splice(op.index, 0, op.value); + return snapshot; +} + +function transform(op1, op2, side) { + return op1.index < op2.index || (op1.index === op2.index && side === 'left') + ? op1 + : { index: op1.index + 1, value: op1.value }; +} + +function createPresence(data) { + return { index: (data && data.index) | 0 }; +} + +function transformPresence(presence, op, isOwnOperation) { + return presence.index < op.index || (presence.index === op.index && !isOwnOperation) + ? presence + : { index: presence.index + 1 }; +} + +function comparePresence(presence1, presence2) { + return presence1 === presence2 || + (presence1 == null && presence2 == null) || + (presence1 != null && presence2 != null && presence1.index === presence2.index); +} + +function createPresence2(data) { + return data | 0; +} + +function transformPresence2(presence, op, isOwnOperation) { + return presence < op.index || (presence === op.index && !isOwnOperation) + ? presence + : presence + 1; +} diff --git a/test/client/presence.js b/test/client/presence.js new file mode 100644 index 000000000..c7ee09d93 --- /dev/null +++ b/test/client/presence.js @@ -0,0 +1,1477 @@ +var async = require('async'); +var lolex = require('lolex'); +var util = require('../util'); +var errorHandler = util.errorHandler; +var Backend = require('../../lib/backend'); +var presence = require('../../lib/presence'); +var dummyPresence = require('../../lib/presence/dummy'); +var statelessPresence = require('../../lib/presence/stateless'); +var ShareDBError = require('../../lib/error'); +var expect = require('expect.js'); +var types = require('../../lib/types'); +var presenceType = require('./presence-type'); +types.register(presenceType.type); +types.register(presenceType.type2); +types.register(presenceType.type3); + +describe('client presence', function() { + it('should use dummyPresence.DocPresence if presence option not provided', function() { + var backend = new Backend(); + var connection = backend.connect(); + var doc = connection.get('dogs', 'fido'); + expect(doc.presence instanceof dummyPresence.DocPresence).to.be(true); + }); + + it('DummyPresence should subclass Presence', function() { + expect(dummyPresence.DocPresence.prototype instanceof presence.DocPresence).to.be(true); + }); + + it('StatelessPresence should subclass Presence', function() { + expect(statelessPresence.DocPresence.prototype instanceof presence.DocPresence).to.be(true); + }); +}); + +[ + 'wrapped-presence-no-compare', + 'wrapped-presence-with-compare', + 'unwrapped-presence' +].forEach(function(typeName) { + function p(index) { + return typeName === 'unwrapped-presence' ? index : { index: index }; + } + + describe('client presence (' + typeName + ')', function() { + beforeEach(function() { + this.backend = new Backend({ presence: statelessPresence }); + this.connection = this.backend.connect(); + this.connection2 = this.backend.connect(); + this.doc = this.connection.get('dogs', 'fido'); + this.doc2 = this.connection2.get('dogs', 'fido'); + }); + + afterEach(function(done) { + this.backend.close(done); + }); + + it('sends presence immediately', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + function(done) { + this.doc.presence.requestReply = false; + this.doc.submitPresence(p(1), errorHandler(done)); + this.doc2.once('presence', function(srcList, submitted) { + expect(srcList).to.eql([ this.connection.id ]); + expect(submitted).to.equal(true); + expect(this.doc2.data).to.eql([]); + expect(this.doc2.presence.current[this.connection.id]).to.eql(p(1)); + done(); + }.bind(this)); + }.bind(this) + ], allDone); + }); + + it('sends presence after pending ops', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + function(done) { + this.doc.submitOp({ index: 0, value: 'a' }, errorHandler(done)); + this.doc.submitOp({ index: 1, value: 'b' }, errorHandler(done)); + this.doc.presence.requestReply = false; + this.doc.submitPresence(p(1), errorHandler(done)); + this.doc2.once('presence', function(srcList, submitted) { + expect(srcList).to.eql([ this.connection.id ]); + expect(submitted).to.equal(true); + expect(this.doc2.data).to.eql([ 'a', 'b' ]); + expect(this.doc2.presence.current[this.connection.id]).to.eql(p(1)); + done(); + }.bind(this)); + }.bind(this) + ], allDone); + }); + + it('waits for pending ops before processing future presence', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + function(done) { + this.doc2.on('presence', function(srcList, submitted) { + expect(srcList).to.eql([ this.connection.id ]); + expect(submitted).to.equal(true); + expect(this.doc2.data).to.eql([ 'a', 'b' ]); + expect(this.doc2.presence.current[this.connection.id]).to.eql(p(1)); + done(); + }.bind(this)); + // A hack to send presence for a future version. + this.doc.version += 2; + this.doc.presence.requestReply = false; + this.doc.submitPresence(p(1), function(err) { + if (err) return done(err); + this.doc.version -= 2; + this.doc.submitOp({ index: 0, value: 'a' }, errorHandler(done)); + this.doc.submitOp({ index: 1, value: 'b' }, errorHandler(done)); + }.bind(this)); + }.bind(this) + ], allDone); + }); + + it('handles presence sent for earlier revisions (own ops, presence.index < op.index)', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'a' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + this.doc.submitOp.bind(this.doc, { index: 1, value: 'b' }), + this.doc.submitOp.bind(this.doc, { index: 2, value: 'c' }), + function(done) { + this.doc2.on('presence', function(srcList, submitted) { + expect(srcList).to.eql([ this.connection.id ]); + expect(submitted).to.equal(true); + expect(this.doc2.data).to.eql([ 'a', 'b', 'c' ]); + expect(this.doc2.presence.current[this.connection.id]).to.eql(p(0)); + done(); + }.bind(this)); + // A hack to send presence for an older version. + this.doc.version = 1; + this.doc.data = [ 'a' ]; + this.doc.presence.requestReply = false; + this.doc.submitPresence(p(0), errorHandler(done)); + }.bind(this) + ], allDone); + }); + + it('handles presence sent for earlier revisions (own ops, presence.index === op.index)', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'a' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + this.doc.submitOp.bind(this.doc, { index: 1, value: 'c' }), + this.doc.submitOp.bind(this.doc, { index: 1, value: 'b' }), + function(done) { + this.doc2.on('presence', function(srcList, submitted) { + expect(srcList).to.eql([ this.connection.id ]); + expect(submitted).to.equal(true); + expect(this.doc2.data).to.eql([ 'a', 'b', 'c' ]); + expect(this.doc2.presence.current[this.connection.id]).to.eql(p(3)); + done(); + }.bind(this)); + // A hack to send presence for an older version. + this.doc.version = 1; + this.doc.data = [ 'a' ]; + this.doc.presence.requestReply = false; + this.doc.submitPresence(p(1), errorHandler(done)); + }.bind(this) + ], allDone); + }); + + it('handles presence sent for earlier revisions (own ops, presence.index > op.index)', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'c' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + this.doc.submitOp.bind(this.doc, { index: 0, value: 'b' }), + this.doc.submitOp.bind(this.doc, { index: 0, value: 'a' }), + function(done) { + this.doc2.on('presence', function(srcList, submitted) { + expect(srcList).to.eql([ this.connection.id ]); + expect(submitted).to.equal(true); + expect(this.doc2.data).to.eql([ 'a', 'b', 'c' ]); + expect(this.doc2.presence.current[this.connection.id]).to.eql(p(3)); + done(); + }.bind(this)); + // A hack to send presence for an older version. + this.doc.version = 1; + this.doc.data = [ 'c' ]; + this.doc.presence.requestReply = false; + this.doc.submitPresence(p(1), errorHandler(done)); + }.bind(this) + ], allDone); + }); + + it('handles presence sent for earlier revisions (non-own ops, presence.index < op.index)', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'a' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + this.doc2.submitOp.bind(this.doc2, { index: 1, value: 'b' }), + this.doc2.submitOp.bind(this.doc2, { index: 2, value: 'c' }), + function(done) { + this.doc2.on('presence', function(srcList, submitted) { + expect(srcList).to.eql([ this.connection.id ]); + expect(submitted).to.equal(true); + expect(this.doc2.data).to.eql([ 'a', 'b', 'c' ]); + expect(this.doc2.presence.current[this.connection.id]).to.eql(p(0)); + done(); + }.bind(this)); + // A hack to send presence for an older version. + this.doc.version = 1; + this.doc.data = [ 'a' ]; + this.doc.presence.requestReply = false; + this.doc.submitPresence(p(0), errorHandler(done)); + }.bind(this) + ], allDone); + }); + + it('handles presence sent for earlier revisions (non-own ops, presence.index === op.index)', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'a' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + this.doc2.submitOp.bind(this.doc2, { index: 1, value: 'c' }), + this.doc2.submitOp.bind(this.doc2, { index: 1, value: 'b' }), + function(done) { + this.doc2.on('presence', function(srcList, submitted) { + expect(srcList).to.eql([ this.connection.id ]); + expect(submitted).to.equal(true); + expect(this.doc2.data).to.eql([ 'a', 'b', 'c' ]); + expect(this.doc2.presence.current[this.connection.id]).to.eql(p(1)); + done(); + }.bind(this)); + // A hack to send presence for an older version. + this.doc.version = 1; + this.doc.data = [ 'a' ]; + this.doc.presence.requestReply = false; + this.doc.submitPresence(p(1), errorHandler(done)); + }.bind(this) + ], allDone); + }); + + it('handles presence sent for earlier revisions (non-own ops, presence.index > op.index)', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'c' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + this.doc2.submitOp.bind(this.doc2, { index: 0, value: 'b' }), + this.doc2.submitOp.bind(this.doc2, { index: 0, value: 'a' }), + function(done) { + this.doc2.on('presence', function(srcList, submitted) { + expect(srcList).to.eql([ this.connection.id ]); + expect(submitted).to.equal(true); + expect(this.doc2.data).to.eql([ 'a', 'b', 'c' ]); + expect(this.doc2.presence.current[this.connection.id]).to.eql(p(3)); + done(); + }.bind(this)); + // A hack to send presence for an older version. + this.doc.version = 1; + this.doc.data = [ 'c' ]; + this.doc.presence.requestReply = false; + this.doc.submitPresence(p(1), errorHandler(done)); + }.bind(this) + ], allDone); + }); + + it('handles presence sent for earlier revisions (transform against non-op)', function(allDone) { + async.series([ + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + this.doc.create.bind(this.doc, [], typeName), + this.doc.submitOp.bind(this.doc, { index: 0, value: 'a' }), + this.doc.del.bind(this.doc), + this.doc.create.bind(this.doc, [ 'b' ], typeName), + function(done) { + this.doc2.once('presence', function(srcList, submitted) { + expect(srcList).to.eql([ this.connection.id ]); + expect(submitted).to.equal(true); + expect(this.doc2.data).to.eql([ 'b' ]); + expect(this.doc2.presence.current[this.connection.id]).to.eql(p(0)); + done(); + }.bind(this)); + this.doc.presence.requestReply = false; + this.doc.submitPresence(p(0), errorHandler(done)); + }.bind(this), + function(done) { + this.doc2.on('presence', function(srcList, submitted) { + expect(srcList).to.eql([ this.connection.id ]); + expect(submitted).to.equal(true); + expect(this.doc2.data).to.eql([ 'b' ]); + expect(this.doc2.presence.current).to.not.have.key(this.connection.id); + done(); + }.bind(this)); + // A hack to send presence for an older version. + this.doc.version = 2; + this.doc.presence.requestReply = false; + this.doc.submitPresence(p(1), errorHandler(done)); + }.bind(this) + ], allDone); + }); + + it('handles presence sent for earlier revisions (no cached ops)', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'a' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + this.doc.submitOp.bind(this.doc, { index: 1, value: 'b' }), + this.doc.submitOp.bind(this.doc, { index: 2, value: 'c' }), + function(done) { + this.doc2.once('presence', function(srcList, submitted) { + expect(srcList).to.eql([ this.connection.id ]); + expect(submitted).to.equal(true); + expect(this.doc2.data).to.eql([ 'a', 'b', 'c' ]); + expect(this.doc2.presence.current[this.connection.id]).to.eql(p(0)); + done(); + }.bind(this)); + this.doc.presence.requestReply = false; + this.doc.submitPresence(p(0), errorHandler(done)); + }.bind(this), + function(done) { + this.doc2.presence.cachedOps = []; + this.doc2.on('presence', function(srcList, submitted) { + expect(srcList).to.eql([ this.connection.id ]); + expect(submitted).to.equal(true); + expect(this.doc2.data).to.eql([ 'a', 'b', 'c' ]); + expect(this.doc2.presence.current).to.not.have.key(this.connection.id); + done(); + }.bind(this)); + // A hack to send presence for an older version. + this.doc.version = 1; + this.doc.data = [ 'a' ]; + this.doc.presence.requestReply = false; + this.doc.submitPresence(p(1), errorHandler(done)); + }.bind(this) + ], allDone); + }); + + it('transforms presence against local delete', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + this.doc.submitPresence.bind(this.doc, p(0)), + this.doc2.submitPresence.bind(this.doc2, p(0)), + setTimeout, + function(done) { + this.doc.on('presence', function(srcList, submitted) { + expect(srcList.sort()).to.eql([ '', this.connection2.id ]); + expect(submitted).to.equal(false); + expect(this.doc.presence.current).to.not.have.key(''); + expect(this.doc.presence.current).to.not.have.key(this.connection2.id); + done(); + }.bind(this)); + this.doc.del(errorHandler(done)); + }.bind(this) + ], allDone); + }); + + it('transforms presence against non-local delete', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + this.doc.submitPresence.bind(this.doc, p(0)), + this.doc2.submitPresence.bind(this.doc2, p(0)), + setTimeout, + function(done) { + this.doc.on('presence', function(srcList, submitted) { + expect(srcList.sort()).to.eql([ '', this.connection2.id ]); + expect(submitted).to.equal(false); + expect(this.doc.presence.current).to.not.have.key(''); + expect(this.doc.presence.current).to.not.have.key(this.connection2.id); + done(); + }.bind(this)); + this.doc2.del(errorHandler(done)); + }.bind(this) + ], allDone); + }); + + it('transforms presence against local op (presence.index != op.index)', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'a', 'c' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + this.doc.submitPresence.bind(this.doc, p(0)), + this.doc2.submitPresence.bind(this.doc2, p(2)), + setTimeout, + function(done) { + this.doc.on('presence', function(srcList, submitted) { + expect(srcList).to.eql([ this.connection2.id ]); + expect(submitted).to.equal(false); + expect(this.doc.presence.current['']).to.eql(p(0)); + expect(this.doc.presence.current[this.connection2.id]).to.eql(p(3)); + done(); + }.bind(this)); + this.doc.submitOp({ index: 1, value: 'b' }, errorHandler(done)); + }.bind(this) + ], allDone); + }); + + it('transforms presence against non-local op (presence.index != op.index)', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'a', 'c' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + this.doc.submitPresence.bind(this.doc, p(0)), + this.doc2.submitPresence.bind(this.doc2, p(2)), + setTimeout, + function(done) { + this.doc.on('presence', function(srcList, submitted) { + expect(srcList).to.eql([ this.connection2.id ]); + expect(submitted).to.equal(false); + expect(this.doc.presence.current['']).to.eql(p(0)); + expect(this.doc.presence.current[this.connection2.id]).to.eql(p(3)); + done(); + }.bind(this)); + this.doc2.submitOp({ index: 1, value: 'b' }, errorHandler(done)); + }.bind(this) + ], allDone); + }); + + it('transforms presence against local op (presence.index == op.index)', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'a', 'c' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + this.doc.submitPresence.bind(this.doc, p(1)), + this.doc2.submitPresence.bind(this.doc2, p(1)), + setTimeout, + function(done) { + this.doc.on('presence', function(srcList, submitted) { + expect(srcList).to.eql([ '' ]); + expect(submitted).to.equal(false); + expect(this.doc.presence.current['']).to.eql(p(2)); + expect(this.doc.presence.current[this.connection2.id]).to.eql(p(1)); + done(); + }.bind(this)); + this.doc.submitOp({ index: 1, value: 'b' }, errorHandler(done)); + }.bind(this) + ], allDone); + }); + + it('transforms presence against non-local op (presence.index == op.index)', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'a', 'c' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + this.doc.submitPresence.bind(this.doc, p(1)), + this.doc2.submitPresence.bind(this.doc2, p(1)), + setTimeout, + function(done) { + this.doc.on('presence', function(srcList, submitted) { + expect(srcList).to.eql([ this.connection2.id ]); + expect(submitted).to.equal(false); + expect(this.doc.presence.current['']).to.eql(p(1)); + expect(this.doc.presence.current[this.connection2.id]).to.eql(p(2)); + done(); + }.bind(this)); + this.doc2.submitOp({ index: 1, value: 'b' }, errorHandler(done)); + }.bind(this) + ], allDone); + }); + + it('caches local ops', function(allDone) { + var op = { index: 1, value: 'b' }; + async.series([ + this.doc.create.bind(this.doc, [ 'a' ], typeName), + this.doc.submitOp.bind(this.doc, op), + this.doc.del.bind(this.doc), + function(done) { + expect(this.doc.presence.cachedOps.length).to.equal(3); + expect(this.doc.presence.cachedOps[0].create).to.equal(true); + expect(this.doc.presence.cachedOps[1].op).to.equal(op); + expect(this.doc.presence.cachedOps[2].del).to.equal(true); + done(); + }.bind(this) + ], allDone); + }); + + it('caches non-local ops', function(allDone) { + var op = { index: 1, value: 'b' }; + async.series([ + this.doc2.subscribe.bind(this.doc2), + this.doc.create.bind(this.doc, [ 'a' ], typeName), + this.doc.submitOp.bind(this.doc, op), + this.doc.del.bind(this.doc), + setTimeout, + function(done) { + expect(this.doc2.presence.cachedOps.length).to.equal(3); + expect(this.doc2.presence.cachedOps[0].create).to.equal(true); + expect(this.doc2.presence.cachedOps[1].op).to.eql(op); + expect(this.doc2.presence.cachedOps[2].del).to.equal(true); + done(); + }.bind(this) + ], allDone); + }); + + it('expires cached ops', function(allDone) { + var clock = lolex.install(); + var op1 = { index: 1, value: 'b' }; + var op2 = { index: 2, value: 'b' }; + var op3 = { index: 3, value: 'b' }; + this.doc.presence.cachedOpsTimeout = 60; + async.series([ + // Cache 2 ops. + this.doc.create.bind(this.doc, [ 'a' ], typeName), + this.doc.submitOp.bind(this.doc, op1), + function(done) { + expect(this.doc.presence.cachedOps.length).to.equal(2); + expect(this.doc.presence.cachedOps[0].create).to.equal(true); + expect(this.doc.presence.cachedOps[1].op).to.equal(op1); + done(); + }.bind(this), + + // Cache another op before the first 2 expire. + function (callback) { + setTimeout(callback, 30); + clock.next(); + }, + this.doc.submitOp.bind(this.doc, op2), + function(done) { + expect(this.doc.presence.cachedOps.length).to.equal(3); + expect(this.doc.presence.cachedOps[0].create).to.equal(true); + expect(this.doc.presence.cachedOps[1].op).to.equal(op1); + expect(this.doc.presence.cachedOps[2].op).to.equal(op2); + done(); + }.bind(this), + + // Cache another op after the first 2 expire. + function (callback) { + setTimeout(callback, 31); + clock.next(); + }, + this.doc.submitOp.bind(this.doc, op3), + function(done) { + expect(this.doc.presence.cachedOps.length).to.equal(2); + expect(this.doc.presence.cachedOps[0].op).to.equal(op2); + expect(this.doc.presence.cachedOps[1].op).to.equal(op3); + clock.uninstall(); + done(); + }.bind(this) + ], allDone); + }); + + it('requests reply presence when sending presence for the first time', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'a' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc.submitPresence.bind(this.doc, p(0)), + this.doc2.subscribe.bind(this.doc2), + function(done) { + this.doc2.on('presence', function(srcList, submitted) { + if (srcList[0] === '') { + expect(srcList).to.eql([ '' ]); + expect(submitted).to.equal(true); + expect(this.doc2.presence.current['']).to.eql(p(1)); + expect(this.doc2.presence.current).to.not.have.key(this.connection.id); + } else { + expect(srcList).to.eql([ this.connection.id ]); + expect(this.doc2.presence.current['']).to.eql(p(1)); + expect(this.doc2.presence.current[this.connection.id]).to.eql(p(0)); + expect(this.doc2.presence.requestReply).to.equal(false); + done(); + } + }.bind(this)); + this.doc2.submitPresence(p(1), errorHandler(done)); + }.bind(this) + ], allDone); + }); + + it('fails to submit presence for uncreated document: callback(err)', function(allDone) { + async.series([ + this.doc.subscribe.bind(this.doc), + function(done) { + this.doc.submitPresence(p(0), function(err) { + expect(err).to.be.an(Error); + expect(err.code).to.equal(4015); + done(); + }); + }.bind(this) + ], allDone); + }); + + it('fails to submit presence for uncreated document: emit(err)', function(allDone) { + async.series([ + this.doc.subscribe.bind(this.doc), + function(done) { + this.doc.on('error', function(err) { + expect(err).to.be.an(Error); + expect(err.code).to.equal(4015); + done(); + }); + this.doc.submitPresence(p(0)); + }.bind(this) + ], allDone); + }); + + it('fails to submit presence, if type does not support presence: callback(err)', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, {}), + this.doc.subscribe.bind(this.doc), + function(done) { + this.doc.submitPresence(p(0), function(err) { + expect(err).to.be.an(Error); + expect(err.code).to.equal(4027); + done(); + }); + }.bind(this) + ], allDone); + }); + + it('fails to submit presence, if type does not support presence: emit(err)', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, {}), + this.doc.subscribe.bind(this.doc), + function(done) { + this.doc.on('error', function(err) { + expect(err).to.be.an(Error); + expect(err.code).to.equal(4027); + done(); + }); + this.doc.submitPresence(p(0)); + }.bind(this) + ], allDone); + }); + + it('submits null presence', function(allDone) { + async.series([ + this.doc.subscribe.bind(this.doc), + this.doc.submitPresence.bind(this.doc, null) + ], allDone); + }); + + it('sends presence once, if submitted multiple times synchronously', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'a' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + function(done) { + this.doc2.on('presence', function(srcList, submitted) { + expect(srcList).to.eql([ this.connection.id ]); + expect(submitted).to.equal(true); + expect(this.doc2.presence.current[this.connection.id]).to.eql(p(2)); + done(); + }.bind(this)); + this.doc.presence.requestReply = false; + this.doc.submitPresence(p(0), errorHandler(done)); + this.doc.submitPresence(p(1), errorHandler(done)); + this.doc.submitPresence(p(2), errorHandler(done)); + }.bind(this) + ], allDone); + }); + + it('buffers presence until subscribed', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'a' ], typeName), + this.doc2.subscribe.bind(this.doc2), + function(done) { + this.doc2.on('presence', function(srcList, submitted) { + expect(srcList).to.eql([ this.connection.id ]); + expect(submitted).to.equal(true); + expect(this.doc2.presence.current[this.connection.id]).to.eql(p(1)); + done(); + }.bind(this)); + this.doc.presence.requestReply = false; + this.doc.submitPresence(p(1), errorHandler(done)); + setTimeout(function() { + this.doc.subscribe(function(err) { + if (err) return done(err); + expect(this.doc2.presence.current).to.eql({}); + }.bind(this)); + }.bind(this)); + }.bind(this) + ], allDone); + }); + + it('buffers presence when disconnected', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'a' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + function(done) { + this.doc2.on('presence', function(srcList, submitted) { + expect(srcList).to.eql([ this.connection.id ]); + expect(submitted).to.equal(true); + expect(this.doc2.presence.current[this.connection.id]).to.eql(p(1)); + done(); + }.bind(this)); + this.connection.close(); + this.doc.submitPresence(p(1), errorHandler(done)); + process.nextTick(function() { + this.backend.connect(this.connection); + this.doc.presence.requestReply = false; + }.bind(this)); + }.bind(this) + ], allDone); + }); + + it('submits presence without a callback', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'a' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + function(done) { + this.doc2.on('presence', function(srcList, submitted) { + expect(srcList).to.eql([ this.connection.id ]); + expect(submitted).to.equal(true); + expect(this.doc2.presence.current[this.connection.id]).to.eql(p(0)); + done(); + }.bind(this)); + this.doc.presence.requestReply = false; + this.doc.submitPresence(p(0)); + }.bind(this) + ], allDone); + }); + + it('hasPending is true, if there is pending presence', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'a' ], typeName), + this.doc.subscribe.bind(this.doc), + function(done) { + expect(this.doc.hasPending()).to.equal(false); + this.doc.submitPresence(p(0)); + expect(this.doc.hasPending()).to.equal(true); + expect(!!this.doc.presence.pending).to.equal(true); + expect(!!this.doc.presence.inflight).to.equal(false); + this.doc.whenNothingPending(done); + }.bind(this), + function(done) { + expect(this.doc.hasPending()).to.equal(false); + expect(!!this.doc.presence.pending).to.equal(false); + expect(!!this.doc.presence.inflight).to.equal(false); + done(); + }.bind(this) + ], allDone); + }); + + it('hasPending is true, if there is inflight presence', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'a' ], typeName), + this.doc.subscribe.bind(this.doc), + function(done) { + expect(this.doc.hasPending()).to.equal(false); + this.doc.submitPresence(p(0)); + expect(this.doc.hasPending()).to.equal(true); + expect(!!this.doc.presence.pending).to.equal(true); + expect(!!this.doc.presence.inflight).to.equal(false); + process.nextTick(done); + }.bind(this), + function(done) { + expect(this.doc.hasPending()).to.equal(true); + expect(!!this.doc.presence.pending).to.equal(false); + expect(!!this.doc.presence.inflight).to.equal(true); + this.doc.whenNothingPending(done); + }.bind(this), + function(done) { + expect(this.doc.hasPending()).to.equal(false); + expect(!!this.doc.presence.pending).to.equal(false); + expect(!!this.doc.presence.inflight).to.equal(false); + done(); + }.bind(this) + ], allDone); + }); + + it('receives presence after doc is deleted', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'a' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + this.doc.submitPresence.bind(this.doc, p(0)), + setTimeout, + function(done) { + expect(this.doc2.presence.current[this.connection.id]).to.eql(p(0)); + this.doc2.on('presence', function(srcList, submitted) { + expect(srcList).to.eql([ this.connection.id ]); + // The call to `del` transforms the presence and fires the event. + // The call to `submitPresence` does not fire the event because presence is already null. + expect(submitted).to.equal(false); + expect(this.doc2.presence.current).to.not.have.key(this.connection.id); + done(); + }.bind(this)); + this.doc.presence.requestReply = false; + this.doc.submitPresence(p(1), errorHandler(done)); + this.doc2.del(errorHandler(done)); + }.bind(this) + ], allDone); + }); + + it('clears peer presence on peer disconnection', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'a' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + this.doc.submitPresence.bind(this.doc, p(0)), + this.doc2.submitPresence.bind(this.doc2, p(1)), + setTimeout, + function(done) { + expect(this.doc.presence.current['']).to.eql(p(0)); + expect(this.doc.presence.current[this.connection2.id]).to.eql(p(1)); + expect(this.doc2.presence.current[this.connection.id]).to.eql(p(0)); + expect(this.doc2.presence.current['']).to.eql(p(1)); + + var connectionId = this.connection.id; + this.doc2.on('presence', function(srcList, submitted) { + expect(srcList).to.eql([ connectionId ]); + expect(submitted).to.equal(true); + expect(this.doc2.presence.current).to.not.have.key(connectionId); + expect(this.doc2.presence.current['']).to.eql(p(1)); + done(); + }.bind(this)); + this.connection.close(); + }.bind(this) + ], allDone); + }); + + it('clears peer presence on own disconnection', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'a' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + this.doc.submitPresence.bind(this.doc, p(0)), + this.doc2.submitPresence.bind(this.doc2, p(1)), + setTimeout, + function(done) { + expect(this.doc.presence.current['']).to.eql(p(0)); + expect(this.doc.presence.current[this.connection2.id]).to.eql(p(1)); + expect(this.doc2.presence.current[this.connection.id]).to.eql(p(0)); + expect(this.doc2.presence.current['']).to.eql(p(1)); + + var connectionId = this.connection.id; + this.doc2.on('presence', function(srcList, submitted) { + expect(srcList).to.eql([ connectionId ]); + expect(submitted).to.equal(false); + expect(this.doc2.presence.current).to.not.have.key(connectionId); + expect(this.doc2.presence.current['']).to.eql(p(1)); + done(); + }.bind(this)); + this.connection2.close(); + }.bind(this) + ], allDone); + }); + + it('clears peer presence on peer unsubscribe', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'a' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + this.doc.submitPresence.bind(this.doc, p(0)), + this.doc2.submitPresence.bind(this.doc2, p(1)), + setTimeout, + function(done) { + expect(this.doc.presence.current['']).to.eql(p(0)); + expect(this.doc.presence.current[this.connection2.id]).to.eql(p(1)); + expect(this.doc2.presence.current[this.connection.id]).to.eql(p(0)); + expect(this.doc2.presence.current['']).to.eql(p(1)); + + var connectionId = this.connection.id; + this.doc2.on('presence', function(srcList, submitted) { + expect(srcList).to.eql([ connectionId ]); + expect(submitted).to.equal(true); + expect(this.doc2.presence.current).to.not.have.key(connectionId); + expect(this.doc2.presence.current['']).to.eql(p(1)); + done(); + }.bind(this)); + this.doc.unsubscribe(errorHandler(done)); + }.bind(this) + ], allDone); + }); + + it('clears peer presence on own unsubscribe', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'a' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + this.doc.submitPresence.bind(this.doc, p(0)), + this.doc2.submitPresence.bind(this.doc2, p(1)), + setTimeout, + function(done) { + expect(this.doc.presence.current['']).to.eql(p(0)); + expect(this.doc.presence.current[this.connection2.id]).to.eql(p(1)); + expect(this.doc2.presence.current[this.connection.id]).to.eql(p(0)); + expect(this.doc2.presence.current['']).to.eql(p(1)); + + var connectionId = this.connection.id; + this.doc2.on('presence', function(srcList, submitted) { + expect(srcList).to.eql([ connectionId ]); + expect(submitted).to.equal(false); + expect(this.doc2.presence.current).to.not.have.key(connectionId); + expect(this.doc2.presence.current['']).to.eql(p(1)); + done(); + }.bind(this)); + this.doc2.unsubscribe(errorHandler(done)); + }.bind(this) + ], allDone); + }); + + it('pauses inflight and pending presence on disconnect', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'a' ], typeName), + this.doc.subscribe.bind(this.doc), + function(done) { + var called = 0; + function callback(err) { + if (err) return done(err); + if (++called === 2) done(); + } + this.doc.submitPresence(p(0), callback); + process.nextTick(function() { + this.doc.submitPresence(p(1), callback); + this.connection.close(); + process.nextTick(function() { + this.backend.connect(this.connection); + }.bind(this)); + }.bind(this)); + }.bind(this) + ], allDone); + }); + + it('pauses inflight and pending presence on unsubscribe', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'a' ], typeName), + this.doc.subscribe.bind(this.doc), + function(done) { + var called = 0; + function callback(err) { + if (err) return done(err); + if (++called === 2) done(); + } + this.doc.submitPresence(p(0), callback); + process.nextTick(function() { + this.doc.submitPresence(p(1), callback); + this.doc.unsubscribe(errorHandler(done)); + process.nextTick(function() { + this.doc.subscribe(errorHandler(done)); + }.bind(this)); + }.bind(this)); + }.bind(this) + ], allDone); + }); + + it('re-synchronizes presence after reconnecting', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'a' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + this.doc.submitPresence.bind(this.doc, p(0)), + this.doc2.submitPresence.bind(this.doc2, p(1)), + setTimeout, + function(done) { + expect(this.doc.presence.current['']).to.eql(p(0)); + expect(this.doc.presence.current[this.connection2.id]).to.eql(p(1)); + this.connection.close(); + expect(this.doc.presence.current['']).to.eql(p(0)); + expect(this.doc.presence.current).to.not.have.key(this.connection2.id); + this.backend.connect(this.connection); + process.nextTick(done); + }.bind(this), + setTimeout, // wait for re-sync + function(done) { + expect(this.doc.presence.current['']).to.eql(p(0)); + expect(this.doc.presence.current[this.connection2.id]).to.eql(p(1)); + process.nextTick(done); + }.bind(this) + ], allDone); + }); + + it('re-synchronizes presence after resubscribing', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'a' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + this.doc.submitPresence.bind(this.doc, p(0)), + this.doc2.submitPresence.bind(this.doc2, p(1)), + setTimeout, + function(done) { + expect(this.doc.presence.current['']).to.eql(p(0)); + expect(this.doc.presence.current[this.connection2.id]).to.eql(p(1)); + this.doc.unsubscribe(errorHandler(done)); + expect(this.doc.presence.current['']).to.eql(p(0)); + expect(this.doc.presence.current).to.not.have.key(this.connection2.id); + this.doc.subscribe(done); + }.bind(this), + setTimeout, // wait for re-sync + function(done) { + expect(this.doc.presence.current['']).to.eql(p(0)); + expect(this.doc.presence.current[this.connection2.id]).to.eql(p(1)); + process.nextTick(done); + }.bind(this) + ], allDone); + }); + + it('transforms received presence against inflight and pending ops (presence.index < op.index)', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'a' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + function(done) { + this.doc2.on('presence', function(srcList, submitted) { + expect(srcList).to.eql([ this.connection.id ]); + expect(submitted).to.equal(true); + expect(this.doc2.presence.current[this.connection.id]).to.eql(p(0)); + done(); + }.bind(this)); + this.doc.presence.requestReply = false; + this.doc.submitPresence(p(0), errorHandler(done)); + this.doc2.submitOp({ index: 1, value: 'b' }, errorHandler(done)) + this.doc2.submitOp({ index: 2, value: 'c' }, errorHandler(done)) + }.bind(this) + ], allDone); + }); + + it('transforms received presence against inflight and pending ops (presence.index === op.index)', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'a' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + function(done) { + this.doc2.on('presence', function(srcList, submitted) { + expect(srcList).to.eql([ this.connection.id ]); + expect(submitted).to.equal(true); + expect(this.doc2.presence.current[this.connection.id]).to.eql(p(1)); + done(); + }.bind(this)); + this.doc.presence.requestReply = false; + this.doc.submitPresence(p(1), errorHandler(done)); + this.doc2.submitOp({ index: 1, value: 'c' }, errorHandler(done)) + this.doc2.submitOp({ index: 1, value: 'b' }, errorHandler(done)) + }.bind(this) + ], allDone); + }); + + it('transforms received presence against inflight and pending ops (presence.index > op.index)', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'c' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + function(done) { + this.doc2.on('presence', function(srcList, submitted) { + expect(srcList).to.eql([ this.connection.id ]); + expect(submitted).to.equal(true); + expect(this.doc2.presence.current[this.connection.id]).to.eql(p(3)); + done(); + }.bind(this)); + this.doc.presence.requestReply = false; + this.doc.submitPresence(p(1), errorHandler(done)); + this.doc2.submitOp({ index: 0, value: 'b' }, errorHandler(done)) + this.doc2.submitOp({ index: 0, value: 'a' }, errorHandler(done)) + }.bind(this) + ], allDone); + }); + + it('transforms received presence against inflight delete', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'c' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + this.doc.submitPresence.bind(this.doc, p(1)), + setTimeout, + function(done) { + this.doc2.on('presence', function(srcList, submitted) { + expect(srcList).to.eql([ this.connection.id ]); + // The call to `del` transforms the presence and fires the event. + // The call to `submitPresence` does not fire the event because presence is already null. + expect(submitted).to.equal(false); + expect(this.doc2.presence.current).to.not.have.key(this.connection.id); + done(); + }.bind(this)); + this.doc.presence.requestReply = false; + this.doc.submitPresence(p(2), errorHandler(done)); + this.doc2.del(errorHandler(done)); + this.doc2.create([ 'c' ], typeName, errorHandler(done)); + }.bind(this) + ], allDone); + }); + + it('transforms received presence against a pending delete', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'c' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + this.doc.submitPresence.bind(this.doc, p(1)), + setTimeout, + function(done) { + var firstCall = true; + this.doc2.on('presence', function(srcList, submitted) { + if (firstCall) return firstCall = false; + expect(srcList).to.eql([ this.connection.id ]); + // The call to `del` transforms the presence and fires the event. + // The call to `submitPresence` does not fire the event because presence is already null. + expect(submitted).to.equal(false); + expect(this.doc2.presence.current).to.not.have.key(this.connection.id); + done(); + }.bind(this)); + this.doc.presence.requestReply = false; + this.doc.submitPresence(p(2), errorHandler(done)); + this.doc2.submitOp({ index: 0, value: 'b' }, errorHandler(done)); + this.doc2.del(errorHandler(done)); + this.doc2.create([ 'c' ], typeName, errorHandler(done)); + }.bind(this) + ], allDone); + }); + + it('emits the same presence only if comparePresence is not implemented (local presence)', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'c' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc.submitPresence.bind(this.doc, p(1)), + function(done) { + this.doc.on('presence', function(srcList, submitted) { + if (typeName === 'wrapped-presence-no-compare') { + expect(srcList).to.eql([ '' ]); + expect(submitted).to.equal(true); + expect(this.doc.presence.current['']).to.eql(p(1)); + done(); + } else { + done(new Error('Unexpected presence event')); + } + }.bind(this)); + this.doc.submitPresence(p(1), typeName === 'wrapped-presence-no-compare' ? errorHandler(done) : done); + }.bind(this) + ], allDone); + }); + + it('emits the same presence only if comparePresence is not implemented (non-local presence)', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'c' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + this.doc.submitPresence.bind(this.doc, p(1)), + setTimeout, + function(done) { + this.doc2.on('presence', function(srcList, submitted) { + if (typeName === 'wrapped-presence-no-compare') { + expect(srcList).to.eql([ this.connection.id ]); + expect(submitted).to.equal(true); + expect(this.doc2.presence.current[this.connection.id]).to.eql(p(1)); + done(); + } else { + done(new Error('Unexpected presence event')); + } + }.bind(this)); + this.doc.submitPresence(p(1), typeName === 'wrapped-presence-no-compare' ? errorHandler(done) : done); + }.bind(this) + ], allDone); + }); + + it('returns an error when not subscribed on the server', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'c' ], typeName), + this.doc.subscribe.bind(this.doc), + function(done) { + this.connection.sendUnsubscribe(this.doc); + process.nextTick(done); + }.bind(this), + function(done) { + this.doc.on('error', done); + this.doc.submitPresence(p(0), function(err) { + expect(err).to.be.an(Error); + expect(err.code).to.equal(4025); + done(); + }.bind(this)); + }.bind(this) + ], allDone); + }); + + it('emits an error when not subscribed on the server and no callback is provided', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'c' ], typeName), + this.doc.subscribe.bind(this.doc), + function(done) { + this.connection.sendUnsubscribe(this.doc); + process.nextTick(done); + }.bind(this), + function(done) { + this.doc.on('error', function(err) { + expect(err).to.be.an(Error); + expect(err.code).to.equal(4025); + done(); + }.bind(this)); + this.doc.submitPresence(p(0)); + }.bind(this) + ], allDone); + }); + + it('returns an error when the server gets an old sequence number', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'c' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc.submitPresence.bind(this.doc, p(0)), + setTimeout, + function(done) { + this.doc.on('error', done); + this.connection.seq--; + this.doc.submitPresence(p(1), function(err) { + expect(err).to.be.an(Error); + expect(err.code).to.equal(4026); + done(); + }.bind(this)); + }.bind(this) + ], allDone); + }); + + it('emits an error when the server gets an old sequence number and no callback is provided', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'c' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc.submitPresence.bind(this.doc, p(0)), + setTimeout, + function(done) { + this.doc.on('error', function(err) { + expect(err).to.be.an(Error); + expect(err.code).to.equal(4026); + done(); + }.bind(this)); + this.connection.seq--; + this.doc.submitPresence(p(1)); + }.bind(this) + ], allDone); + }); + + it('does not publish presence unnecessarily', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'c' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc.submitPresence.bind(this.doc, p(0)), + setTimeout, + function(done) { + this.doc.on('error', done); + // Decremented sequence number would cause the server to return an error, however, + // the message won't be sent to the server at all because the presence data has not changed. + this.connection.seq--; + this.doc.submitPresence(p(0), function(err) { + if (typeName === 'wrapped-presence-no-compare') { + // The OT type does not support comparing presence. + expect(err).to.be.an(Error); + expect(err.code).to.equal(4026); + } else { + expect(err).to.not.be.ok(); + } + done(); + }.bind(this)); + }.bind(this) + ], allDone); + }); + + it('does not publish presence unnecessarily when no callback is provided', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'c' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc.submitPresence.bind(this.doc, p(0)), + setTimeout, + function(done) { + this.doc.on('error', function(err) { + if (typeName === 'wrapped-presence-no-compare') { + // The OT type does not support comparing presence. + expect(err).to.be.an(Error); + expect(err.code).to.equal(4026); + done(); + } else { + done(err); + } + }.bind(this)); + // Decremented sequence number would cause the server to return an error, however, + // the message won't be sent to the server at all because the presence data has not changed. + this.connection.seq--; + this.doc.submitPresence(p(0)); + if (typeName !== 'wrapped-presence-no-compare') { + process.nextTick(done); + } + }.bind(this) + ], allDone); + }); + + it('returns an error when publishing presence fails', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'c' ], typeName), + this.doc.subscribe.bind(this.doc), + setTimeout, + function(done) { + var sendPresence = this.backend.sendPresence; + this.backend.sendPresence = function(presence, callback) { + if (presence.a === 'p' && presence.v != null) { + return callback(new ShareDBError(-1, 'Test publishing error')); + } + sendPresence.apply(this, arguments); + }; + this.doc.on('error', done); + this.doc.submitPresence(p(0), function(err) { + expect(err).to.be.an(Error); + expect(err.code).to.equal(-1); + done(); + }.bind(this)); + }.bind(this) + ], allDone); + }); + + it('emits an error when publishing presence fails and no callback is provided', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'c' ], typeName), + this.doc.subscribe.bind(this.doc), + setTimeout, + function(done) { + var sendPresence = this.backend.sendPresence; + this.backend.sendPresence = function(presence, callback) { + if (presence.a === 'p' && presence.v != null) { + return callback(new ShareDBError(-1, 'Test publishing error')); + } + sendPresence.apply(this, arguments); + }; + this.doc.on('error', function(err) { + expect(err).to.be.an(Error); + expect(err.code).to.equal(-1); + done(); + }.bind(this)); + this.doc.submitPresence(p(0)); + }.bind(this) + ], allDone); + }); + + it('clears presence on hard rollback and emits an error', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'a', 'b', 'c' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + this.doc.submitPresence.bind(this.doc, p(0)), + this.doc2.submitPresence.bind(this.doc2, p(0)), + setTimeout, + function(done) { + // A hack to allow testing of hard rollback of both inflight and pending presence. + var doc = this.doc; + var _handlePresence = this.doc._handlePresence; + this.doc._handlePresence = function(err, presence) { + setTimeout(function() { + _handlePresence.call(doc, err, presence); + }); + }; + process.nextTick(done); + }.bind(this), + this.doc.submitPresence.bind(this.doc, p(1)), // presence.inflight + process.nextTick, // wait for "presence" event + this.doc.submitPresence.bind(this.doc, p(2)), // presence.pending + process.nextTick, // wait for "presence" event + function(done) { + var presenceEmitted = false; + this.doc.on('presence', function(srcList, submitted) { + expect(presenceEmitted).to.equal(false); + presenceEmitted = true; + expect(srcList.sort()).to.eql([ '', this.connection2.id ]); + expect(submitted).to.equal(false); + expect(this.doc.presence.current).to.not.have.key(''); + expect(this.doc.presence.current).to.not.have.key(this.connection2.id); + }.bind(this)); + + this.doc.on('error', function(err) { + expect(presenceEmitted).to.equal(true); + expect(err).to.be.an(Error); + expect(err.code).to.equal(4000); + done(); + }.bind(this)); + + // send an invalid op + this.doc._submit({}, null); + }.bind(this) + ], allDone); + }); + + it('clears presence on hard rollback and executes all callbacks', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'a', 'b', 'c' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + this.doc.submitPresence.bind(this.doc, p(0)), + this.doc2.submitPresence.bind(this.doc2, p(0)), + setTimeout, + function(done) { + // A hack to allow testing of hard rollback of both inflight and pending presence. + var doc = this.doc; + var _handlePresence = this.doc._handlePresence; + this.doc._handlePresence = function(err, presence) { + setTimeout(function() { + _handlePresence.call(doc, err, presence); + }); + }; + process.nextTick(done); + }.bind(this), + function(done) { + var presenceEmitted = false; + var called = 0; + function callback(err) { + expect(presenceEmitted).to.equal(true); + expect(err).to.be.an(Error); + expect(err.code).to.equal(4000); + if (++called < 3) return; + done(); + } + this.doc.submitPresence(p(1), callback); // presence.inflight + process.nextTick(function() { // wait for presence event + this.doc.submitPresence(p(2), callback); // presence.pending + process.nextTick(function() { // wait for presence event + this.doc.on('presence', function(srcList, submitted) { + expect(presenceEmitted).to.equal(false); + presenceEmitted = true; + expect(srcList.sort()).to.eql([ '', this.connection2.id ]); + expect(submitted).to.equal(false); + expect(this.doc.presence.current).to.not.have.key(''); + expect(this.doc.presence.current).to.not.have.key(this.connection2.id); + }.bind(this)); + this.doc.on('error', done); + + // send an invalid op + this.doc._submit({ index: 3, value: 'b' }, null, callback); + }.bind(this)); + }.bind(this)); + }.bind(this) + ], allDone); + }); + + function testReceivedMessageExpiry(expireCache, reduceSequence) { + return function(allDone) { + var lastPresence = null; + var handleMessage = this.connection.handleMessage; + this.connection.handleMessage = function(message) { + if (message.a === 'p' && message.src) { + lastPresence = JSON.parse(JSON.stringify(message)); + } + return handleMessage.apply(this, arguments); + }; + if (expireCache) { + this.doc.presence.receivedTimeout = 0; + } + async.series([ + this.doc.create.bind(this.doc, [ 'a' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + function(done) { + this.doc2.presence.requestReply = false; + this.doc2.submitPresence(p(0), done); + }.bind(this), + setTimeout, + this.doc2.submitOp.bind(this.doc2, { index: 1, value: 'b' }), // forces processing of all received presence + setTimeout, + function(done) { + expect(this.doc.data).to.eql([ 'a', 'b' ]); + expect(this.doc.presence.current[this.connection2.id]).to.eql(p(0)); + // Replay the `lastPresence` with modified payload. + lastPresence.p = p(1); + lastPresence.v++; // +1 to account for the op above + if (reduceSequence) { + lastPresence.seq--; + } + this.connection.handleMessage(lastPresence); + process.nextTick(done); + }.bind(this), + function(done) { + expect(this.doc.presence.current[this.connection2.id]).to.eql(expireCache ? p(1) : p(0)); + process.nextTick(done); + }.bind(this) + ], allDone); + }; + } + + it('ignores an old message (cache not expired, presence.seq === cachedPresence.seq)', testReceivedMessageExpiry(false, false)); + it('ignores an old message (cache not expired, presence.seq < cachedPresence.seq)', testReceivedMessageExpiry(false, true)); + it('processes an old message (cache expired, presence.seq === cachedPresence.seq)', testReceivedMessageExpiry(true, false)); + it('processes an old message (cache expired, presence.seq < cachedPresence.seq)', testReceivedMessageExpiry(true, true)); + + it('invokes presence.destroy inside doc.destroy', function(done) { + var presence = this.doc.presence; + presence.cachedOps = ['foo']; + presence.received = { bar: true }; + this.doc.destroy(function(err) { + expect(presence.cachedOps).to.eql([]); + expect(presence.received).to.eql({}); + done(); + }); + }); + }); +}); diff --git a/test/client/submit.js b/test/client/submit.js index 82cecbbe3..1314cfa89 100644 --- a/test/client/submit.js +++ b/test/client/submit.js @@ -1078,6 +1078,39 @@ describe('client submit', function() { }); }); + it('hasWritePending is false when create\'s callback is executed', function(done) { + var doc = this.backend.connect().get('dogs', 'fido'); + doc.create({age: 3}, function(err) { + if (err) return done(err); + expect(doc.hasWritePending()).equal(false); + done(); + }); + }); + + it('hasWritePending is false when submimtOp\'s callback is executed', function(done) { + var doc = this.backend.connect().get('dogs', 'fido'); + doc.create({age: 3}, function(err) { + if (err) return done(err); + doc.submitOp({p: ['age'], na: 2}, function(err) { + if (err) return done(err); + expect(doc.hasWritePending()).equal(false); + done(); + }); + }); + }); + + it('hasWritePending is false when del\'s callback is executed', function(done) { + var doc = this.backend.connect().get('dogs', 'fido'); + doc.create({age: 3}, function(err) { + if (err) return done(err); + doc.del(function(err) { + if (err) return done(err); + expect(doc.hasWritePending()).equal(false); + done(); + }); + }); + }); + describe('type.deserialize', function() { it('can create a new doc', function(done) { var doc = this.backend.connect().get('dogs', 'fido'); diff --git a/test/util.js b/test/util.js index 5f982ed6e..a7c58c38e 100644 --- a/test/util.js +++ b/test/util.js @@ -15,6 +15,12 @@ exports.pluck = function(docs, key) { return values; }; +exports.errorHandler = function(callback) { + return function(err) { + if (err) callback(err); + }; +}; + // Wrap a done function to call back only after a specified number of calls. // For example, `var callbackAfter = callAfter(1, callback)` means that if // `callbackAfter` is called once, it won't call back. If it is called twice