Skip to content

WIP Presence Continuation #2

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 57 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
fe1b7c0
Merge branch 'master' into fix-whenNothingPending
ericyhwang Apr 10, 2019
588167c
Merge pull request #222 from Teamwork/fix-whenNothingPending
ericyhwang Apr 10, 2019
70ee4ea
Merge pull request #283 from curran/patch-12
ericyhwang Apr 10, 2019
abcea41
1.0.0-beta.22
ericyhwang Apr 12, 2019
33450ae
Resolve merge conflicts
curran Apr 17, 2019
f43b752
Restore tests to working order
curran Apr 17, 2019
9409429
Remove extraneous .editorconfig
curran Apr 17, 2019
c4cf1b8
Revert extraneous changes in .travis.yml and package.json
curran Apr 17, 2019
237d2ad
Use lolex to make 'expires cached ops' test more stable.
curran Apr 17, 2019
c8d35c5
Move doc.presence to doc.presence.current
curran Apr 17, 2019
3efb82c
Move doc.receivedPresence to doc.presence.received
curran Apr 17, 2019
f0451e3
Move doc.requestReplyPresence to doc.presence.requestReply
curran Apr 17, 2019
5217635
Move doc.cachedOps to doc.presence.cachedOps
curran Apr 17, 2019
ac26dae
Move doc.inflightPresenceSeq to doc.presence.inflightSeq
curran Apr 17, 2019
48acccc
Move doc.inflightPresence to doc.presence.inflight
curran Apr 17, 2019
cab69fb
Move doc.pendingPresence to doc.presence.pending
curran Apr 17, 2019
6a0ecc4
Refactor presence fields into object declaration.
curran Apr 17, 2019
d41c961
Simplify object creation; 'change Object.create(null)' to '{}'.
curran Apr 17, 2019
fc351fa
Introduce enablePresence option. Closes #128
curran Apr 17, 2019
6cd16f3
Misc cleanup, finishing touches.
curran Apr 17, 2019
ad6a528
Split out presence methods into separate module
curran Apr 17, 2019
eaafc98
Move more presence-related logic into presence methods module.
curran Apr 17, 2019
7259f7e
Move presence methods such that they are passed into Backend
curran Apr 18, 2019
09f6415
Migrate hardRollbackPresence to presence instance
curran Apr 18, 2019
23a06c3
Migrate _initializePresence
curran Apr 18, 2019
824346f
Migrate _handlePresence
curran Apr 18, 2019
d6e3e3d
Migrate _processReceivedPresence
curran Apr 18, 2019
0382c03
Migrate processAllReceivedPresence
curran Apr 18, 2019
46d8a1b
Migrate _transformPresence
curran Apr 18, 2019
fc16be7
Migrate pausePresence
curran Apr 18, 2019
9cb5564
Migrate cacheOp
curran Apr 18, 2019
6461a79
Migrate flushPresence
curran Apr 18, 2019
2358022
Migrate transformAllPresence
curran Apr 18, 2019
41a4743
Migrate emitPresence
curran Apr 18, 2019
ba7d880
Migrate submitPresence
curran Apr 18, 2019
3ffd1ab
Migrate _setPresence
curran Apr 18, 2019
6114bad
Clean up intermediate migration steps
curran Apr 18, 2019
19446cd
Convert StatelessPresence to a class
curran Apr 18, 2019
0089f80
Convert StatelessPresence into idiomatic JS class.
curran Apr 18, 2019
2054057
Add test case that doc invokes presence.destroy inside doc.destroy
curran Apr 18, 2019
1a64a06
Introduce DummyPresence, use it by default
curran Apr 18, 2019
47193da
Remove if(this.presence) guards.
curran Apr 18, 2019
b23661c
Optimize cacheOp
curran Apr 18, 2019
521f77b
Split out getPendingPresence logic from hardRollbackPresence.
curran Apr 18, 2019
bdb6424
Clean up DummyPresence
curran Apr 18, 2019
0f3084a
Introduce Presence base class inherited by DummyPresence and Stateles…
curran Apr 18, 2019
41cd2ea
Add Presence base class module
curran Apr 19, 2019
986a695
Start disentangling presence logic from Agent
curran Apr 19, 2019
ac55884
Migrate Agent._createPresence
curran Apr 19, 2019
9187b34
Migrate subscribeToStream
curran Apr 19, 2019
7507731
Migrate _subscribeToQuery
curran Apr 19, 2019
1a1f52a
Migrate handlePresenceMessage
curran Apr 19, 2019
49ff5c2
Use only flushPresence(), not flush(), _handleSubscribe
curran Apr 19, 2019
87aa90b
Disentangle doc internals from flushPresence
curran Apr 19, 2019
2198e8e
Begin disentangling presence logic from connection.js
curran Apr 19, 2019
cf0168d
Decouple sendPresence
curran Apr 19, 2019
4c46a05
Move Presence class to presence.DocPresence
curran Apr 19, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,5 @@ coverage
# Dependency directories
node_modules
package-lock.json
yarn.lock
jspm_packages
23 changes: 23 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ tracker](https://github.com/share/sharedb/issues).

- Realtime synchronization of any JSON document
- Concurrent multi-user collaboration
- Realtime synchronization of any ephemeral "presence" data
- Synchronous editing API with asynchronous eventual consistency
- Realtime query subscriptions
- Simple integration with any database - [MongoDB](https://github.com/share/sharedb-mongo), [PostgresQL](https://github.com/share/sharedb-postgres) (experimental)
Expand Down Expand Up @@ -73,6 +74,12 @@ initial data. Then you can submit editing operations on the document (using
OT). Finally you can delete the document with a delete operation. By
default, ShareDB stores all operations forever - nothing is truly deleted.

## User presence synchronization

ShareDB supports synchronization of user presence data. This feature is opt-in, not enabled by default. To enable this feature, pass the `enablePresence: true` option to the ShareDB constructor (e.g. `var share = new ShareDB({ enablePresence: true })`).

Presence data represents a user and is automatically synchronized between all clients subscribed to the same document. Its format is defined by the document's [OT Type](https://github.com/ottypes/docs), for example it may contain a user ID and a cursor position in a text document. All clients can modify their own presence data and receive a read-only version of other client's data. Presence data is automatically cleared when a client unsubscribes from the document or disconnects. It is also automatically transformed against applied operations, so that it still makes sense in the context of a modified document, for example a cursor position may be automatically advanced when a user types at the beginning of a text document.

## Server API

### Initialization
Expand All @@ -91,6 +98,8 @@ __Options__
* `options.pubsub` _(instance of `ShareDB.PubSub`)_
Notify other ShareDB processes when data changes
through this pub/sub adapter. Defaults to `ShareDB.MemoryPubSub()`.
* `options.enablePresence` _(optional boolean)_
Enable user presence synchronization.

#### Database Adapters
* `ShareDB.MemoryDB`, backed by a non-persistent database with no queries
Expand Down Expand Up @@ -308,6 +317,9 @@ Unique document ID
`doc.data` _(Object)_
Document contents. Available after document is fetched or subscribed to.

`doc.presence.current` _(Object)_
Each property under `doc.presence.current` contains presence data shared by a client subscribed to this document. The property name is an empty string for this client's data and connection IDs for other clients' data.

`doc.fetch(function(err) {...})`
Populate the fields on `doc` with a snapshot of the document from the server.

Expand Down Expand Up @@ -337,6 +349,9 @@ An operation was applied to the data. `source` will be `false` for ops received
`doc.on('del', function(data, source) {...})`
The document was deleted. Document contents before deletion are passed in as an argument. `source` will be `false` for ops received from the server and defaults to `true` for ops generated locally.

`doc.on('presence', function(srcList, submitted) {...})`
Presence data has changed. `srcList` is an Array of `doc.presence` property names for which values have changed. `submitted` is `true`, if the event is the result of new presence data being submitted by the local or remote user, otherwise it is `false` - eg if the presence data was transformed against an operation or was cleared on unsubscribe, disconnect or roll-back.

`doc.on('error', function(err) {...})`
There was an error fetching the document or applying an operation.

Expand Down Expand Up @@ -370,6 +385,11 @@ Invokes the given callback function after

Note that `whenNothingPending` does NOT wait for pending `model.query()` calls.

`doc.submitPresence(presenceData[, function(err) {...}])`
Set local presence data and publish it for other clients.
`presenceData` structure depends on the document type.
Presence is synchronized only when subscribed to the document.

### Class: `ShareDB.Query`

`query.ready` _(Boolean)_
Expand Down Expand Up @@ -467,6 +487,9 @@ Additional fields may be added to the error object for debugging context dependi
* 4022 - Database adapter does not support queries
* 4023 - Cannot project snapshots of this type
* 4024 - Invalid version
* 4025 - Not subscribed to document
* 4026 - Presence data superseded
* 4027 - OT Type does not support presence

### 5000 - Internal error

Expand Down
74 changes: 74 additions & 0 deletions lib/agent.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ var hat = require('hat');
var util = require('./util');
var types = require('./types');
var logger = require('./logger');
var ShareDBError = require('./error');

/**
* Agent deserializes the wire protocol messages received from the stream and
Expand All @@ -26,6 +27,69 @@ function Agent(backend, stream) {
// Map from queryId -> emitter
this.subscribedQueries = {};

this.presence = {
agent: this,
isPresenceData: function (data) {
return data.a === 'p';
},
processPresenceData: function (data) {
if (data.a === 'p') {
// Send other clients' presence data
if (data.src !== this.agent.clientId) this.agent.send(data);
return true;
}
},
// The max presence sequence number received from the client.
maxPresenceSeq: 0,
createPresence: function(collection, id, data, version, requestReply, seq) {
return {
a: 'p',
src: this.agent.clientId,
seq: seq != null ? seq : this.maxPresenceSeq,
c: collection,
d: id,
p: data,
v: version,
r: requestReply
};
},
subscribeToStream: function (collection, id, stream) {
var agent = this.agent;
stream.on('end', function() {
agent.backend.sendPresence(agent.presence.createPresence(collection, id));
});
},
checkRequest: function (request) {
if (request.a === 'p') {
if (typeof request.c !== 'string') return 'Invalid collection';
if (typeof request.d !== 'string') return 'Invalid id';
if (typeof request.v !== 'number' || request.v < 0) return 'Invalid version';
if (typeof request.seq !== 'number' || request.seq <= 0) return 'Invalid seq';
if (typeof request.r !== 'undefined' && typeof request.r !== 'boolean') {
return 'Invalid "request reply" value';
}
}
},
handlePresenceMessage: function(request, callback) {
var presence = this.createPresence(request.c, request.d, request.p, request.v, request.r, request.seq);
if (presence.seq <= this.maxPresenceSeq) {
return process.nextTick(function() {
callback(new ShareDBError(4026, 'Presence data superseded'));
});
}
this.maxPresenceSeq = presence.seq;
if (!this.agent.subscribedDocs[presence.c] || !this.agent.subscribedDocs[presence.c][presence.d]) {
return process.nextTick(function() {
callback(new ShareDBError(4025, 'Cannot send presence. Not subscribed to document: ' + presence.c + ' ' + presence.d));
});
}
this.agent.backend.sendPresence(presence, function(err) {
if (err) return callback(err);
callback(null, { seq: presence.seq });
});
}
};

// We need to track this manually to make sure we don't reply to messages
// after the stream was closed.
this.closed = false;
Expand Down Expand Up @@ -106,6 +170,10 @@ Agent.prototype._subscribeToStream = function(collection, id, stream) {
logger.error('Doc subscription stream error', collection, id, data.error);
return;
}
if (agent.presence.isPresenceData(data)) {
agent.presence.processPresenceData(data);
return;
}
if (agent._isOwnOp(collection, data)) return;
agent._sendOp(collection, id, data);
});
Expand All @@ -117,6 +185,7 @@ Agent.prototype._subscribeToStream = function(collection, id, stream) {
if (util.hasKeys(streams)) return;
delete agent.subscribedDocs[collection];
});
this.presence.subscribeToStream(collection, id, stream);
};

Agent.prototype._subscribeToQuery = function(emitter, queryId, collection, query) {
Expand Down Expand Up @@ -288,6 +357,8 @@ Agent.prototype._checkRequest = function(request) {
// Bulk request
if (request.c != null && typeof request.c !== 'string') return 'Invalid collection';
if (typeof request.b !== 'object') return 'Invalid bulk subscribe data';
} else {
return this.presence.checkRequest(request);
}
};

Expand Down Expand Up @@ -325,6 +396,9 @@ Agent.prototype._handleMessage = function(request, callback) {
case 'nt':
return this._fetchSnapshotByTimestamp(request.c, request.d, request.ts, callback);
default:
if (this.presence.isPresenceData(request)) {
return this.presence.handlePresenceMessage(request, callback);
}
callback({code: 4000, message: 'Invalid or unknown message'});
}
} catch (err) {
Expand Down
11 changes: 11 additions & 0 deletions lib/backend.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ var Snapshot = require('./snapshot');
var StreamSocket = require('./stream-socket');
var SubmitRequest = require('./submit-request');
var types = require('./types');
var dummyPresence = require('./presence/dummy');

var warnDeprecatedDoc = true;
var warnDeprecatedAfterSubmit = true;
Expand Down Expand Up @@ -48,6 +49,8 @@ function Backend(options) {
if (!options.disableSpaceDelimitedActions) {
this._shimAfterSubmit();
}

this.presence = options.presence || dummyPresence;
}
module.exports = Backend;
emitter.mixin(Backend);
Expand Down Expand Up @@ -155,6 +158,9 @@ Backend.prototype.connect = function(connection, req) {
// not used internal to ShareDB, but it is handy for server-side only user
// code that may cache state on the agent and read it in middleware
connection.agent = agent;

connection.DocPresence = this.presence.DocPresence;

return connection;
};

Expand Down Expand Up @@ -720,6 +726,11 @@ Backend.prototype._buildSnapshotFromOps = function (id, startingSnapshot, ops, c
callback(error, snapshot);
};

Backend.prototype.sendPresence = function(presence, callback) {
var channels = [ this.getDocChannel(presence.c, presence.d) ];
this.pubsub.publish(channels, presence, callback);
};

function pluckIds(snapshots) {
var ids = [];
for (var i = 0; i < snapshots.length; i++) {
Expand Down
38 changes: 38 additions & 0 deletions lib/client/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,34 @@ function Connection(socket) {
this.state = connectionState(socket);

this.bindToSocket(socket);

this.presence = {
connection: this,
// TODO unify with code in agent.js
isPresenceMessage: function (message) {
return message.a === 'p';
},
handlePresenceMessage: function (err, message) {
var doc = this.connection.getExisting(message.c, message.d);
if (doc) doc._handlePresence(err, message);
},
sendPresence: function(doc, data, requestReply) {
// Ensure the doc is registered so that it receives the reply message
this.connection._addDoc(doc);
var message = {
a: 'p',
c: doc.collection,
d: doc.id,
p: data,
v: doc.version || 0,
seq: this.connection.seq++
};
if (requestReply) {
message.r = true;
}
this.connection.send(message);
}
};
}
emitter.mixin(Connection);

Expand Down Expand Up @@ -255,6 +283,9 @@ Connection.prototype.handleMessage = function(message) {
return;

default:
if (this.presence.isPresenceMessage(message)) {
return this.presence.handlePresenceMessage(err, message);
}
logger.warn('Ignoring unrecognized message', message);
}
};
Expand Down Expand Up @@ -424,6 +455,13 @@ Connection.prototype.sendOp = function(doc, op) {
this.send(message);
};

/**
* Sends presence data down the socket
*/
Connection.prototype.sendPresence = function(doc, data, requestReply) {
this.presence.sendPresence(doc, data, requestReply);
};


/**
* Sends a message down the socket
Expand Down
Loading