Skip to content

Commit 3952db9

Browse files
author
Alec Gibson
committed
Add Presence functionality
This change adds the ability for clients to broadcast information about "Presence" - the notion of a client's position or state in a particular document. This might be represent a cursor in a text document, or a highlighted field in a more complex JSON document, or any other transient, current information about a client that shouldn't necessarily be stored in the document's chain of ops. The main complication that this feature solves is the issue of keeping presence correctly associated with the version of a `Doc` it was created at. For example, in a "naive" implementation of presence, presence information can arrive ahead of or behind ops, which - in a text-based example - can cause the cursor to "jitter" around the change. Using the ShareDB implementation will ensure that the presence is correctly transformed against any ops, and will ensure that presence information is always consistent with the version of the document. We also locally transform existing presence, which should help to keep (static) remote presence correctly positioned, independent of latency. In order to facilitate this, the feature must be used with an OT type that supports presence. The only requirement for supporting presence is the support of a `transformPresence` method: ```javascript type.transformPresence(presence, op, isOwnOperation): presence; ``` * `presence` _Object_: the presence data being transformed. The type will define this shape to be whatever is appropriate for the type. * `op` _Op_: the operation against which to transform the presence * `isOwnOperation`: _boolean_: whether the presence and the op have the same "owner". This information can be useful for some types to break ties when transforming a presence, for example as used in [`rich-text`][1] This work is based on the [work][2] by @gkubisa and @curran, but with the following aims: - avoid modifying the existing `Doc` class as much as possible, and instead use lifecycle hooks - keep presence separate as its own conceptual entity - break the presence subscriptions apart from `Doc` subscriptions (although in practice, the two are obviously tightly coupled) - allow multiple presences on a single `Doc` on the same `Connection` [1]: https://github.com/quilljs/delta#tranformposition [2]: #288
1 parent bbdfbe1 commit 3952db9

13 files changed

+2061
-18
lines changed

README.md

+63
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,7 @@ Register a new middleware.
156156
the database.
157157
* `'receive'`: Received a message from a client
158158
* `'reply'`: About to send a non-error reply to a client message
159+
* `'sendPresence'`: About to send presence information to a client
159160
* `fn` _(Function(context, callback))_
160161
Call this function at the time specified by `action`.
161162
* `context` will always have the following properties:
@@ -303,6 +304,16 @@ Get a read-only snapshot of a document at the requested version.
303304
}
304305
```
305306

307+
`connection.getPresence(collection, id, presenceId): LocalPresence;`
308+
Get a [`LocalPresence`](#class-sharedblocalpresence) instance that can be used to send presence information to other clients. Note that the `Doc` must use a type that supports presence.
309+
310+
* `collection` _(String)_
311+
Collection name of the `Doc`
312+
* `id` _(String)_
313+
ID of the `Doc`
314+
* `presenceId` _(String)_
315+
A unique presence ID that will be sent to remote clients along with the presence data
316+
306317
### Class: `ShareDB.Doc`
307318

308319
`doc.type` _(String_)
@@ -349,6 +360,9 @@ The document was deleted. Document contents before deletion are passed in as an
349360
`doc.on('error', function(err) {...})`
350361
There was an error fetching the document or applying an operation.
351362

363+
`doc.on('presence', function(id, presence) {...})`
364+
A remote client has sent presence information. `id` is an ID provided by the remote client, and `presence` is the presence data, whose structure will depend on document's OT type.
365+
352366
`doc.removeListener(eventName, listener)`
353367
Removes any listener you added with `doc.on`. `eventName` should be one of `'load'`, `'create'`, `'before op'`, `'op'`, `'del'`, or `'error'`. `listener` should be the function you passed in as the second argument to `on`. Note that both `on` and `removeListener` are inherited from [EventEmitter](https://nodejs.org/api/events.html#events_class_eventemitter).
354368

@@ -379,6 +393,12 @@ Invokes the given callback function after
379393

380394
Note that `whenNothingPending` does NOT wait for pending `model.query()` calls.
381395

396+
`doc.subscribeToPresence([function(err) {...}])`
397+
Subscribes to presence updates sent by other clients, emitting `presence` events (see above).
398+
399+
`doc.unsubscribeFromPresence(function(err) {...})`
400+
Unsubscribe from presence updates sent by other clients, and stop receiving `presence` events (see above).
401+
382402
### Class: `ShareDB.Query`
383403

384404
`query.ready` _(Boolean)_
@@ -629,6 +649,49 @@ var connectionInfo = getUserPermissions();
629649
var connection = backend.connect(null, connectionInfo);
630650
```
631651

652+
### Class: `ShareDB.LocalPresence`
653+
654+
`LocalPresence` represents the presence of the local client in a given `Doc`. For example, this might be the position of a caret in a text document; which field has been highlighted in a complex JSON object; etc. Multiple presences may exist per `Doc` even on the same client.
655+
656+
#### `update`
657+
658+
```javascript
659+
localPresence.update(presence, options, callback);
660+
```
661+
662+
Update the local representation of presence, and broadcast that presence to any other document presence subscribers.
663+
664+
* `presence` _Object_: the presence object to broadcast. The structure of this will depend on the OT type
665+
* `options` _Object (optional)_
666+
* `options.subscribe` _boolean (default `true`)_: if set to `true`, will subscribe to presence updates on the document. Setting to `false` will unsubscribe.
667+
* `callback` _Function_: a callback with the signature `function (error: Error): void;`
668+
669+
#### `clear`
670+
671+
```javascript
672+
localPresence.clear(callback);
673+
```
674+
675+
Tell remote subscribers that this presence is no longer present in the `Doc`. For example, this might prompt remote clients to remove a cursor from a text document, or to clear highlighting around a selected field, etc.
676+
677+
The `LocalPresence` object will not be destroyed (see below), and can still be used to send further updates.
678+
679+
This is shorthand for `localPresence.update(null, callback)`.
680+
681+
* `callback` _Function_: a callback with the signature `function (error: Error): void;`
682+
683+
#### `destroy`
684+
685+
```javascript
686+
localPresence.destroy(callback);
687+
```
688+
689+
Updates all remote clients with a `null` presence. Then destroys the local instance of the presence by de-registering all the `Doc` hooks it listens to, and removes it from the `Connection` cache, so that it can be garbage-collected. This should be called when you are done with a presence, and no longer need to use it to fire updates.
690+
691+
This method is automatically called when calling `doc.destroy`.
692+
693+
* `callback` _Function_: a callback with the signature `function (error: Error): void;`
694+
632695
### Logging
633696

634697
By default, ShareDB logs to `console`. This can be overridden if you wish to silence logs, or to log to your own logging driver or alert service.

lib/agent.js

+116-2
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,10 @@ function Agent(backend, stream) {
2626
// Map from queryId -> emitter
2727
this.subscribedQueries = {};
2828

29+
// Track which documents are subscribed to presence by the client. This is a
30+
// map of collection -> id -> stream
31+
this.subscribedPresences = {};
32+
2933
// We need to track this manually to make sure we don't reply to messages
3034
// after the stream was closed.
3135
this.closed = false;
@@ -74,6 +78,15 @@ Agent.prototype._cleanup = function() {
7478
}
7579
this.subscribedDocs = {};
7680

81+
for (var collection in this.subscribedPresences) {
82+
var streams = this.subscribedPresences[collection];
83+
for (var id in streams) {
84+
var stream = streams[id];
85+
stream.destroy();
86+
}
87+
}
88+
this.subscribedPresences = {};
89+
7790
// Clean up query subscription streams
7891
for (var id in this.subscribedQueries) {
7992
var emitter = this.subscribedQueries[id];
@@ -117,6 +130,25 @@ Agent.prototype._subscribeToStream = function(collection, id, stream) {
117130
});
118131
};
119132

133+
Agent.prototype._subscribeToPresenceStream = function(collection, id, stream) {
134+
if (this.closed) return stream.destroy();
135+
136+
stream.on('data', function(data) {
137+
if (data.error) {
138+
logger.error('Presence subscription stream error', collection, id, data.error);
139+
}
140+
this._handlePresenceData(data);
141+
}.bind(this));
142+
143+
stream.on('end', function() {
144+
var streams = this.subscribedPresences[collection];
145+
if (!streams || !streams[id] !== stream) return;
146+
delete streams[id];
147+
if (util.hasKeys(streams)) return;
148+
delete agent.subscribedPresences[collection];
149+
}.bind(this));
150+
};
151+
120152
Agent.prototype._subscribeToQuery = function(emitter, queryId, collection, query) {
121153
var previous = this.subscribedQueries[queryId];
122154
if (previous) previous.destroy();
@@ -307,14 +339,18 @@ Agent.prototype._checkRequest = function(request) {
307339
if (request.a === 'qf' || request.a === 'qs' || request.a === 'qu') {
308340
// Query messages need an ID property.
309341
if (typeof request.id !== 'number') return 'Missing query ID';
310-
} else if (request.a === 'op' || request.a === 'f' || request.a === 's' || request.a === 'u') {
342+
} else if (request.a === 'op' || request.a === 'f' || request.a === 's' || request.a === 'u' || request.a === 'p') {
311343
// Doc-based request.
312344
if (request.c != null && typeof request.c !== 'string') return 'Invalid collection';
313345
if (request.d != null && typeof request.d !== 'string') return 'Invalid id';
314346

315-
if (request.a === 'op') {
347+
if (request.a === 'op' || request.a === 'p') {
316348
if (request.v != null && (typeof request.v !== 'number' || request.v < 0)) return 'Invalid version';
317349
}
350+
351+
if (request.a === 'p') {
352+
if (typeof request.id !== 'string') return 'Missing presence ID';
353+
}
318354
} else if (request.a === 'bf' || request.a === 'bs' || request.a === 'bu') {
319355
// Bulk request
320356
if (request.c != null && typeof request.c !== 'string') return 'Invalid collection';
@@ -356,6 +392,12 @@ Agent.prototype._handleMessage = function(request, callback) {
356392
return this._fetchSnapshot(request.c, request.d, request.v, callback);
357393
case 'nt':
358394
return this._fetchSnapshotByTimestamp(request.c, request.d, request.ts, callback);
395+
case 'p':
396+
var presence = this._createPresence(request);
397+
if (!util.supportsPresence(types.map[presence.t])) {
398+
return callback({code: 9999, message: 'Type does not support presence: ' + presence.t});
399+
}
400+
return this._broadcastPresence(request.c, request.d, presence, callback);
359401
default:
360402
callback({code: 4000, message: 'Invalid or unknown message'});
361403
}
@@ -624,6 +666,78 @@ Agent.prototype._fetchSnapshotByTimestamp = function(collection, id, timestamp,
624666
this.backend.fetchSnapshotByTimestamp(this, collection, id, timestamp, callback);
625667
};
626668

669+
Agent.prototype._broadcastPresence = function(collection, id, presence, callback) {
670+
var wantsSubscribe = presence.s;
671+
this._handlePresenceSubscription(collection, id, wantsSubscribe, function(error) {
672+
if (error) return callback(error);
673+
this.backend.transformPresenceToLatestVersion(this, presence, function(error, presence) {
674+
if (error) return callback(error);
675+
var channel = this.backend.getPresenceChannel(collection, id);
676+
this.backend.pubsub.publish([channel], presence, function(error) {
677+
if (error) return callback(error);
678+
callback(null, presence);
679+
});
680+
}.bind(this));
681+
}.bind(this));
682+
};
683+
684+
Agent.prototype._handlePresenceSubscription = function(collection, id, wantsSubscribe, callback) {
685+
var streams = this.subscribedPresences[collection] || (this.subscribedPresences[collection] = {});
686+
var stream = streams[id];
687+
688+
if (stream) {
689+
if (wantsSubscribe) {
690+
return callback();
691+
}
692+
stream.destroy();
693+
return callback();
694+
}
695+
696+
if (!wantsSubscribe) return callback();
697+
698+
var channel = this.backend.getPresenceChannel(collection, id);
699+
this.backend.pubsub.subscribe(channel, function(error, stream) {
700+
if (error) return callback(error);
701+
streams[id] = stream;
702+
this._subscribeToPresenceStream(collection, id, stream);
703+
callback();
704+
}.bind(this));
705+
};
706+
707+
Agent.prototype._createPresence = function(request) {
708+
// src can be provided if it is not the same as the current agent,
709+
// such as a resubmission after a reconnect, but it usually isn't needed
710+
var src = request.src || this.clientId;
711+
return {
712+
a: 'p',
713+
src: src,
714+
seq: request.seq,
715+
c: request.c,
716+
d: request.d,
717+
id: request.id,
718+
v: request.v,
719+
p: request.p,
720+
t: request.t,
721+
r: !!request.r,
722+
s: !!request.s
723+
};
724+
};
725+
726+
Agent.prototype._handlePresenceData = function(presence) {
727+
if (presence.src !== this.clientId) {
728+
var backend = this.backend;
729+
var context = {
730+
collection: presence.c,
731+
presence: presence
732+
};
733+
backend.trigger(backend.MIDDLEWARE_ACTIONS.sendPresence, this, context, function(error) {
734+
if (error) {
735+
return this.send({a: 'p', c: presence.c, d: presence.d, id: presence.id, error: getReplyErrorObject(error)});
736+
}
737+
this.send(presence);
738+
}.bind(this));
739+
}
740+
};
627741

628742
function createClientOp(request, clientId) {
629743
// src can be provided if it is not the same as the current agent,

lib/backend.js

+21
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,8 @@ Backend.prototype.MIDDLEWARE_ACTIONS = {
8787
// by design, changing existing reply properties can cause weird bugs, since
8888
// the rest of ShareDB would be unaware of those changes.
8989
reply: 'reply',
90+
// About to send presence information to a client
91+
sendPresence: 'sendPresence',
9092
// An operation is about to be submitted to the database
9193
submit: 'submit'
9294
};
@@ -680,6 +682,10 @@ Backend.prototype.getDocChannel = function(collection, id) {
680682
return collection + '.' + id;
681683
};
682684

685+
Backend.prototype.getPresenceChannel = function(collection, id) {
686+
return this.getDocChannel(collection, id) + '.presence';
687+
};
688+
683689
Backend.prototype.getChannels = function(collection, id) {
684690
return [
685691
this.getCollectionChannel(collection),
@@ -800,6 +806,21 @@ Backend.prototype._buildSnapshotFromOps = function(id, startingSnapshot, ops, ca
800806
callback(error, snapshot);
801807
};
802808

809+
Backend.prototype.transformPresenceToLatestVersion = function(agent, presence, callback) {
810+
this.getOps(agent, presence.c, presence.d, presence.v, null, function(error, ops) {
811+
if (error) return callback(error);
812+
for (var i = 0; i < ops.length; i++) {
813+
var op = ops[i];
814+
var isOwnOp = op.src === presence.src;
815+
var transformError = ot.transformPresence(presence, op, isOwnOp);
816+
if (transformError) {
817+
return callback(transformError);
818+
}
819+
}
820+
callback(null, presence);
821+
});
822+
};
823+
803824
function pluckIds(snapshots) {
804825
var ids = [];
805826
for (var i = 0; i < snapshots.length; i++) {

0 commit comments

Comments
 (0)