-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathlib.js
213 lines (200 loc) · 7.39 KB
/
lib.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
function debugLog(objToLog) {
if(process.env.DEBUG_LOGS === "true") {
console.log((new Date().getTime() | 0));
console.log(objToLog);
}
}
/**
* channels: {
* 'channel1': {
* lastValue: "last_published_value_here",
* clients: {
* / * Against each client we are saving a list of sockets, because the client maybe connected from multiple browser-tabs/devices * /
* "client1": [ client1ResObject, client1ResObject2 ],
* "client2": [ client2ResObject, client2ResObject2 ],
* "client3": [ client3ResObject ]
* }
* },
* 'channel2': {
* ...
* }
*
* }
*/
let channels = {};
/**
* Publishes the data to a single client.
* @param {string} channelName name of the channel
* @param {object} res express res object for the client
* @param {string} eventType name of the event
* @param {string} payload string to publish to the client.
*/
function publishDataToSingleSocket(channelName, res, eventType, payload) {
const dataToWrite = JSON.stringify({ channelName, type: eventType, payload: payload+"" });
res.write(`event: ${eventType}\n`)
res.write(`data: ${dataToWrite}\n\n`);
}
// This object manages online presence.
let connectedClients = {
_clients: {},
_broadcastChange: function() {
for(const [clientId, sockets] of Object.entries(this._clients)) {
for(res of sockets) {
publishDataToSingleSocket(undefined, res, "online-presence", JSON.stringify(Object.keys(this._clients)));
}
}
},
addClientAndNotifyOthers: function(clientId, res) {
if(!this._clients[clientId]) {
this._clients[clientId] = [res];
} else {
this._clients[clientId].push(res);
}
this._broadcastChange();
},
removeClientAndNotifyOthers: function(clientId, res) {
// If there's no such clientId, or if there are no sockets against
// the id simply return.
if(!this._clients[clientId] || !this._clients[clientId].length) return;
// Remove the res object from the clients list.
this._clients[clientId] = this._clients[clientId].filter(v => v !== res);
// Check now if the list is of 0 length, then delete the whole
// clientId and broadcast the change.
if(!this._clients[clientId].length) {
delete this._clients[clientId];
this._broadcastChange();
}
}
};
/**
* Initializes a channel in the 'channels' object.
* @param {*} channelName
* @param {*} initialValue
* @param {*} clients
*/
function createChannel(channelName, initialValue, clients={}) {
channels[channelName] = {
lastEvent: initialValue,
clients
}
}
/**
* Saves and Subscribes client to a channel, also adding an on disconnect hook
* that removes the client from the channel.
* @param {*} channelName Channel identifier
* @param {*} clientId Client identifier
* @param {*} res Client's express res object.
*/
function saveClientToChannel(channelName, clientId, req, res) {
if(channels[channelName]) {
// 1a.1) If the client list is already created, add this client
// to the client list. Else, create the
// client list add, this client as the first client.
const clientsForThisChannel = channels[channelName].clients;
if(clientsForThisChannel) {
// save the res socket for the clientId to the
// client list.
if(clientsForThisChannel[clientId]){
clientsForThisChannel[clientId].push(res);
} else {
clientsForThisChannel[clientId] = [res];
}
} else {
// No clients. Add this client as the
// first client.
channels[channelName].clients = {
[clientId]: [res]
};
}
} else {
// 1b) If the channel does not exist, creat the channel, set "{}" as the lastValue,
// add the client in the client list.
createChannel(channelName, "{}", {
[clientId]: [res]
});
}
// 2) Remove the client from the clients list if
// the client disconnects.
req.on("close", function () {
const clients = channels[channelName].clients;
// remove the associated 'res' from the clients list against the clientId.
clients[clientId] = clients[clientId].filter(v => v !== res);
// check if there are no more sockets left for this clientId then delete it
// from the channel.
if(!clients[clientId].length) {
delete clients[clientId];
debugLog(`Client ${clientId} removed `);
debugLog(channels);
}
// Also remove the client from the connectedClients singleton.
connectedClients.removeClientAndNotifyOthers(clientId, res);
});
debugLog(`Client ${clientId} added `);
debugLog(channels);
}
function doInitialSSESetup(req, res) {
// Initial setup for SSE.
// Setting timeout to 0 disables idle timeout,
// so the socket will never close because of inactivity.
req.socket.setTimeout(0);
// Disables the Nagle algorithm, data will not be buffered
// and will be sent each time socket.write() is called.
req.socket.setNoDelay(true);
// Enable TCP keep-alive probes.
req.socket.setKeepAlive(true);
res.writeHead(200, {
'Access-Control-Allow-Origin': "*",
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'X-Accel-Buffering': 'no'
});
res.write('\n');
}
/**
* Registers a client for the first time:
* i) Saves it to the channel
* ii) send initial messages.
* @param {[string]} channelNames
* @param {*} clientId
* @param {*} res
*/
module.exports.registerClient = function (channelNames, clientId, req, res) {
// Set timeout and headers.
doInitialSSESetup(req, res);
// Send the client id to the client for debugging purposes.
// TODO: Need to add logging for client trace.
publishDataToSingleSocket(undefined, res, "registered", JSON.stringify({ clientId }));
// Add the client to all the requested channels
for(const channelName of channelNames) {
saveClientToChannel(channelName, clientId, req, res);
const lastChannelEvent = channels[channelName].lastEvent;
publishDataToSingleSocket(channelName, res, "lastEvent", lastChannelEvent);
}
// Finally, add the client to the connectedClients singleton and notify all
// connected users.
connectedClients.addClientAndNotifyOthers(clientId, res);
}
module.exports.getLastEvent = function (channelName) {
const lastChannelEvent = channels[channelName] && channels[channelName].lastEvent;
return lastChannelEvent;
}
module.exports.reservedEvents = ["registered", "lastEvent", "online-presence"];
module.exports.publishDataToChannel = function (channelName, eventType, payload) {
if(module.exports.reservedEvents.includes(eventType)) {
throw Error(`Can't fire event "${eventType}", it's reserved.`);
}
const channel = channels[channelName];
if(!channel) {
createChannel(channelName, payload);
return;
} else {
channel.lastEvent = JSON.stringify({ type: eventType, payload });
const clients = channel.clients;
for (clientId in clients) {
for(res of clients[clientId]) {
publishDataToSingleSocket(channelName, res, eventType, payload);
}
};
}
}