Skip to content

Commit c72c0fb

Browse files
author
Alec Gibson
committed
Add Presence functionality
This change adds the ability for clients to broadcast information about "Presence" - the notion of a client's position or state in a particular document. This might be represent a cursor in a text document, or a highlighted field in a more complex JSON document, or any other transient, current information about a client that shouldn't necessarily be stored in the document's chain of ops. The main complication that this feature solves is the issue of keeping presence correctly associated with the version of a `Doc` it was created at. For example, in a "naive" implementation of presence, presence information can arrive ahead of or behind ops, which - in a text-based example - can cause the cursor to "jitter" around the change. Using the ShareDB implementation will ensure that the presence is correctly transformed against any ops, and will ensure that presence information is always consistent with the version of the document. We also locally transform existing presence, which should help to keep (static) remote presence correctly positioned, independent of latency. In order to facilitate this, the feature must be used with an OT type that supports presence. The only requirement for supporting presence is the support of a `transformPresence` method: ```javascript type.transformPresence(presence, op, isOwnOperation): presence; ``` * `presence` _Object_: the presence data being transformed. The type will define this shape to be whatever is appropriate for the type. * `op` _Op_: the operation against which to transform the presence * `isOwnOperation`: _boolean_: whether the presence and the op have the same "owner". This information can be useful for some types to break ties when transforming a presence, for example as used in [`rich-text`][1] This work is based on the [work][2] by @gkubisa and @curran, but with the following aims: - avoid modifying the existing `Doc` class as much as possible, and instead use lifecycle hooks - keep presence separate as its own conceptual entity - break the presence subscriptions apart from `Doc` subscriptions (although in practice, the two are obviously tightly coupled) - allow multiple presences on a single `Doc` on the same `Connection` [1]: https://github.com/quilljs/delta#tranformposition [2]: #288
1 parent 960f5d1 commit c72c0fb

20 files changed

+2564
-19
lines changed

README.md

+118
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,7 @@ Register a new middleware.
158158
the database.
159159
* `'receive'`: Received a message from a client
160160
* `'reply'`: About to send a non-error reply to a client message
161+
* `'sendPresence'`: About to send presence information to a client
161162
* `fn` _(Function(context, callback))_
162163
Call this function at the time specified by `action`.
163164
* `context` will always have the following properties:
@@ -307,6 +308,20 @@ Get a read-only snapshot of a document at the requested version.
307308
}
308309
```
309310

311+
`connection.getPresence(channel): Presence;`
312+
Get a [`Presence`](#class-sharedbpresence) instance that can be used to subscribe to presence information to other clients, and create instances of local presence.
313+
314+
* `channel` _(String)_
315+
Presence channel to subscribe to
316+
317+
`connection.getDocPresence(collection, id): DocPresence;`
318+
Get a special [`DocPresence`](#class-sharedbdocpresence) instance that can be used to subscribe to presence information to other clients, and create instances of local presence. This is tied to a `Doc`, and all presence will be automatically transformed against ops to keep presence current. Note that the `Doc` must be of a type that supports presence.
319+
320+
* `collection` _(String)_
321+
Document collection
322+
* `id` _(String)_
323+
Document ID
324+
310325
### Class: `ShareDB.Doc`
311326

312327
`doc.type` _(String_)
@@ -640,6 +655,109 @@ const connectionInfo = getUserPermissions();
640655
const connection = backend.connect(null, connectionInfo);
641656
```
642657

658+
### Class: `ShareDB.Presence`
659+
660+
Representation of the presence data associated with a given channel.
661+
662+
#### `subscribe`
663+
664+
```javascript
665+
presence.subscribe(callback): void;
666+
```
667+
668+
Subscribe to presence updates from other clients. Note that presence can be submitted without subscribing, but remote clients will not be able to re-request presence from you if you are not subscribed.
669+
670+
* `callback` _Function_: a callback with the signature `function (error: Error): void;`
671+
672+
#### `unsubscribe`
673+
674+
```javascript
675+
presence.unsubscribe(callback): void;
676+
```
677+
678+
Unsubscribe from presence updates from remote clients.
679+
680+
* `callback` _Function_: a callback with the signature `function (error: Error): void;`
681+
682+
#### `on`
683+
684+
```javascript
685+
presence.on('receive', callback): void;
686+
```
687+
688+
An update from a remote presence client has been received.
689+
690+
* `callback` _Function_: callback for handling the received presence: `function (presenceId, presenceValue): void;`
691+
692+
```javascript
693+
presence.on('error', callback): void;
694+
```
695+
696+
A presence-related error has occurred.
697+
698+
* `callback` _Function_: a callback with the signature `function (error: Error): void;`
699+
700+
#### `create`
701+
702+
```javascript
703+
presence.create(presenceId): LocalPresence;
704+
```
705+
706+
Create an instance of [`LocalPresence`](#class-sharedblocalpresence), which can be used to represent local presence. Many or none such local presences may exist on a `Presence` instance.
707+
708+
* `presenceId` _string_: a unique ID representing the local presence. Remember - depending on use-case - the same client might have multiple presences, so this might not necessarily be a user or client ID.
709+
710+
#### `destroy`
711+
712+
```javascript
713+
presence.destroy(callback);
714+
```
715+
716+
Updates all remote clients with a `null` presence, and removes it from the `Connection` cache, so that it can be garbage-collected. This should be called when you are done with a presence, and no longer need to use it to fire updates.
717+
718+
* `callback` _Function_: a callback with the signature `function (error: Error): void;`
719+
720+
### Class: `ShareDB.DocPresence`
721+
722+
Specialised case of [`Presence`](#class-sharedbpresence), which is tied to a specific [`Doc`](#class-sharedbdoc). When using presence with an associated `Doc`, any ops applied to the `Doc` will automatically be used to transform associated presence. On destroy, the `DocPresence` will unregister its listeners from the `Doc`.
723+
724+
See [`Presence`](#class-sharedbpresence) for available methods.
725+
726+
### Class: `ShareDB.LocalPresence`
727+
728+
`LocalPresence` represents the presence of the local client in a given `Doc`. For example, this might be the position of a caret in a text document; which field has been highlighted in a complex JSON object; etc. Multiple presences may exist per `Doc` even on the same client.
729+
730+
#### `submit`
731+
732+
```javascript
733+
localPresence.submit(presence, callback): void;
734+
```
735+
736+
Update the local representation of presence, and broadcast that presence to any other document presence subscribers.
737+
738+
* `presence` _Object_: the presence object to broadcast. The structure of this will depend on the OT type
739+
* `callback` _Function_: a callback with the signature `function (error: Error): void;`
740+
741+
#### `send`
742+
743+
```javascript
744+
localPresence.send(callback): void;
745+
```
746+
747+
Send presence like `submit`, but without updating the value. Can be useful if local presences expire periodically.
748+
749+
* `callback` _Function_: a callback with the signature `function (error: Error): void;`
750+
751+
#### `destroy`
752+
753+
```javascript
754+
localPresence.destroy(callback): void;
755+
```
756+
757+
Informs all remote clients that this presence is now `null`, and deletes itself for garbage collection.
758+
759+
* `callback` _Function_: a callback with the signature `function (error: Error): void;`
760+
643761
### Logging
644762

645763
By default, ShareDB logs to `console`. This can be overridden if you wish to silence logs, or to log to your own logging driver or alert service.

lib/agent.js

+141-2
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,19 @@ function Agent(backend, stream) {
2929
// Map from queryId -> emitter
3030
this.subscribedQueries = {};
3131

32+
// Track which documents are subscribed to presence by the client. This is a
33+
// map of channel -> stream
34+
this.subscribedPresences = {};
35+
// Highest seq received for a subscription request. Any seq lower than this
36+
// value is stale, and should be ignored. Used for keeping the subscription
37+
// state in sync with the client's desired state
38+
this.presenceSubscriptionSeq = 0;
39+
// Keep track of the last request that has been sent by each local presence
40+
// belonging to this agent. This is used to generate a new disconnection
41+
// request if the client disconnects ungracefully. This is a
42+
// map of channel -> id -> request
43+
this.presenceRequests = {};
44+
3245
// We need to track this manually to make sure we don't reply to messages
3346
// after the stream was closed.
3447
this.closed = false;
@@ -77,6 +90,11 @@ Agent.prototype._cleanup = function() {
7790
}
7891
this.subscribedDocs = {};
7992

93+
for (var channel in this.subscribedPresences) {
94+
this.subscribedPresences[channel].destroy();
95+
}
96+
this.subscribedPresences = {};
97+
8098
// Clean up query subscription streams
8199
for (var id in this.subscribedQueries) {
82100
var emitter = this.subscribedQueries[id];
@@ -120,6 +138,31 @@ Agent.prototype._subscribeToStream = function(collection, id, stream) {
120138
});
121139
};
122140

141+
Agent.prototype._subscribeToPresenceStream = function(channel, stream) {
142+
if (this.closed) return stream.destroy();
143+
144+
stream.on('data', function(data) {
145+
if (data.error) {
146+
logger.error('Presence subscription stream error', channel, data.error);
147+
}
148+
this._handlePresenceData(data);
149+
}.bind(this));
150+
151+
stream.on('end', function() {
152+
var requests = this.presenceRequests[channel] || {};
153+
for (var id in requests) {
154+
var request = this.presenceRequests[channel][id];
155+
request.seq++;
156+
request.p = null;
157+
this._broadcastPresence(request, function(error) {
158+
if (error) logger.error('Error broadcasting disconnect presence', channel, error);
159+
});
160+
}
161+
delete this.subscribedPresences[channel];
162+
delete this.presenceRequests[channel];
163+
}.bind(this));
164+
};
165+
123166
Agent.prototype._subscribeToQuery = function(emitter, queryId, collection, query) {
124167
var previous = this.subscribedQueries[queryId];
125168
if (previous) previous.destroy();
@@ -310,14 +353,18 @@ Agent.prototype._checkRequest = function(request) {
310353
if (request.a === 'qf' || request.a === 'qs' || request.a === 'qu') {
311354
// Query messages need an ID property.
312355
if (typeof request.id !== 'number') return 'Missing query ID';
313-
} else if (request.a === 'op' || request.a === 'f' || request.a === 's' || request.a === 'u') {
356+
} else if (request.a === 'op' || request.a === 'f' || request.a === 's' || request.a === 'u' || request.a === 'p') {
314357
// Doc-based request.
315358
if (request.c != null && typeof request.c !== 'string') return 'Invalid collection';
316359
if (request.d != null && typeof request.d !== 'string') return 'Invalid id';
317360

318-
if (request.a === 'op') {
361+
if (request.a === 'op' || request.a === 'p') {
319362
if (request.v != null && (typeof request.v !== 'number' || request.v < 0)) return 'Invalid version';
320363
}
364+
365+
if (request.a === 'p') {
366+
if (typeof request.id !== 'string') return 'Missing presence ID';
367+
}
321368
} else if (request.a === 'bf' || request.a === 'bs' || request.a === 'bu') {
322369
// Bulk request
323370
if (request.c != null && typeof request.c !== 'string') return 'Invalid collection';
@@ -359,6 +406,19 @@ Agent.prototype._handleMessage = function(request, callback) {
359406
return this._fetchSnapshot(request.c, request.d, request.v, callback);
360407
case 'nt':
361408
return this._fetchSnapshotByTimestamp(request.c, request.d, request.ts, callback);
409+
case 'p':
410+
var presence = this._createPresence(request);
411+
if (presence.t && !util.supportsPresence(types.map[presence.t])) {
412+
return callback({
413+
code: ERROR_CODE.ERR_TYPE_DOES_NOT_SUPPORT_PRESENCE,
414+
message: 'Type does not support presence: ' + presence.t
415+
});
416+
}
417+
return this._broadcastPresence(presence, callback);
418+
case 'ps':
419+
return this._subscribePresence(request.ch, request.seq, callback);
420+
case 'pu':
421+
return this._unsubscribePresence(request.ch, request.seq, callback);
362422
default:
363423
callback(new ShareDBError(ERROR_CODE.ERR_MESSAGE_BADLY_FORMED, 'Invalid or unknown message'));
364424
}
@@ -645,6 +705,85 @@ Agent.prototype._fetchSnapshotByTimestamp = function(collection, id, timestamp,
645705
this.backend.fetchSnapshotByTimestamp(this, collection, id, timestamp, callback);
646706
};
647707

708+
Agent.prototype._broadcastPresence = function(presence, callback) {
709+
var requests = this.presenceRequests[presence.ch] || (this.presenceRequests[presence.ch] = {});
710+
var previousRequest = requests[presence.id];
711+
if (!previousRequest || previousRequest.seq < presence.seq) {
712+
this.presenceRequests[presence.ch][presence.id] = presence;
713+
}
714+
this.backend.transformPresenceToLatestVersion(this, presence, function(error, presence) {
715+
if (error) return callback(error);
716+
var channel = this._getPresenceChannel(presence.ch);
717+
this.backend.pubsub.publish([channel], presence, function(error) {
718+
if (error) return callback(error);
719+
callback(null, presence);
720+
});
721+
}.bind(this));
722+
};
723+
724+
Agent.prototype._createPresence = function(request) {
725+
return {
726+
a: 'p',
727+
ch: request.ch,
728+
src: this.clientId,
729+
seq: request.seq,
730+
id: request.id,
731+
p: request.p,
732+
c: request.c,
733+
d: request.d,
734+
v: request.v,
735+
t: request.t
736+
};
737+
};
738+
739+
Agent.prototype._subscribePresence = function(channel, seq, callback) {
740+
var presenceChannel = this._getPresenceChannel(channel);
741+
this.backend.pubsub.subscribe(presenceChannel, function(error, stream) {
742+
if (error) return callback(error);
743+
if (seq < this.presenceSubscriptionSeq) return callback(null, {ch: channel, seq: seq});
744+
this.presenceSubscriptionSeq = seq;
745+
this.subscribedPresences[channel] = stream;
746+
this._subscribeToPresenceStream(channel, stream);
747+
this._requestPresence(channel, function(error) {
748+
callback(error, {ch: channel, seq: seq});
749+
});
750+
}.bind(this));
751+
};
752+
753+
Agent.prototype._unsubscribePresence = function(channel, seq, callback) {
754+
if (seq < this.presenceSubscriptionSeq) return;
755+
this.presenceSubscriptionSeq = seq;
756+
var stream = this.subscribedPresences[channel];
757+
if (stream) stream.destroy();
758+
callback(null, {ch: channel, seq: seq});
759+
};
760+
761+
Agent.prototype._getPresenceChannel = function(channel) {
762+
return '$presence.' + channel;
763+
};
764+
765+
Agent.prototype._requestPresence = function(channel, callback) {
766+
var presenceChannel = this._getPresenceChannel(channel);
767+
this.backend.pubsub.publish([presenceChannel], {ch: channel, r: true, src: this.clientId}, callback);
768+
};
769+
770+
Agent.prototype._handlePresenceData = function(presence) {
771+
if (presence.src === this.clientId) return;
772+
773+
if (presence.r) return this.send({a: 'pr', ch: presence.ch});
774+
775+
var backend = this.backend;
776+
var context = {
777+
collection: presence.c,
778+
presence: presence
779+
};
780+
backend.trigger(backend.MIDDLEWARE_ACTIONS.sendPresence, this, context, function(error) {
781+
if (error) {
782+
return this.send({a: 'p', ch: presence.ch, id: presence.id, error: getReplyErrorObject(error)});
783+
}
784+
this.send(presence);
785+
}.bind(this));
786+
};
648787

649788
function createClientOp(request, clientId) {
650789
// src can be provided if it is not the same as the current agent,

lib/backend.js

+18
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,8 @@ Backend.prototype.MIDDLEWARE_ACTIONS = {
6666
// by design, changing existing reply properties can cause weird bugs, since
6767
// the rest of ShareDB would be unaware of those changes.
6868
reply: 'reply',
69+
// About to send presence information to a client
70+
sendPresence: 'sendPresence',
6971
// An operation is about to be submitted to the database
7072
submit: 'submit'
7173
};
@@ -815,6 +817,22 @@ Backend.prototype._buildSnapshotFromOps = function(id, startingSnapshot, ops, ca
815817
callback(error, snapshot);
816818
};
817819

820+
Backend.prototype.transformPresenceToLatestVersion = function(agent, presence, callback) {
821+
if (!presence.c || !presence.d) return callback(null, presence);
822+
this.getOps(agent, presence.c, presence.d, presence.v, null, function(error, ops) {
823+
if (error) return callback(error);
824+
for (var i = 0; i < ops.length; i++) {
825+
var op = ops[i];
826+
var isOwnOp = op.src === presence.src;
827+
var transformError = ot.transformPresence(presence, op, isOwnOp);
828+
if (transformError) {
829+
return callback(transformError);
830+
}
831+
}
832+
callback(null, presence);
833+
});
834+
};
835+
818836
function pluckIds(snapshots) {
819837
var ids = [];
820838
for (var i = 0; i < snapshots.length; i++) {

0 commit comments

Comments
 (0)