Skip to content

Commit 8ad250a

Browse files
committed
redirector: handle closed connections, refactor into RpcConnSet util
1 parent 40d3435 commit 8ad250a

File tree

2 files changed

+78
-61
lines changed

2 files changed

+78
-61
lines changed

src/rpc/rpc_conn_set.js

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/* Copyright (C) 2016 NooBaa */
2+
'use strict';
3+
4+
const dbg = require('../util/debug_module')(__filename);
5+
6+
const CLOSE_LISTENER_SYMBOL = Symbol('CLOSE_LISTENER_SYMBOL');
7+
8+
class RpcConnSet {
9+
10+
constructor(name) {
11+
this.name = name;
12+
this.set = new Set();
13+
}
14+
15+
add(conn) {
16+
if (conn.is_closed()) {
17+
dbg.warn(this.name, 'ignore closed connection', conn.url.href);
18+
return;
19+
}
20+
if (this.set.has(conn)) {
21+
dbg.log0(this.name, 'already registered', conn.url.href);
22+
return;
23+
}
24+
dbg.log0(this.name, 'adding connection', conn.url.href);
25+
this.set.add(conn);
26+
const close_listener = () => this.remove(conn);
27+
conn[CLOSE_LISTENER_SYMBOL] = close_listener;
28+
conn.once('close', close_listener);
29+
}
30+
31+
remove(conn) {
32+
dbg.warn(this.name, 'removing connection', conn.url.href);
33+
const close_listener = conn[CLOSE_LISTENER_SYMBOL];
34+
delete conn[CLOSE_LISTENER_SYMBOL];
35+
conn.removeListener('close', close_listener);
36+
this.set.delete(conn);
37+
}
38+
39+
cleanup() {
40+
for (const conn of this.set) {
41+
if (conn.is_closed()) this.remove(conn);
42+
}
43+
}
44+
45+
list() {
46+
this.cleanup();
47+
return Array.from(this.set);
48+
}
49+
50+
}
51+
52+
module.exports = RpcConnSet;

src/server/system_services/redirector.js

Lines changed: 26 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -1,87 +1,52 @@
11
/* Copyright (C) 2016 NooBaa */
2-
/**
3-
*
4-
* REDIRECTOR
5-
*
6-
*/
72
'use strict';
83

9-
const _ = require('lodash');
104
const P = require('../../util/promise');
115
const dbg = require('../../util/debug_module')(__filename);
126
const server_rpc = require('../server_rpc');
7+
const RpcConnSet = require('../../rpc/rpc_conn_set');
138

14-
// dbg.set_level(5);
15-
16-
const cluster_connections = new Set();
17-
const alerts_connections = new Set();
9+
const cluster_conn_set = new RpcConnSet('redirector cluster_conn_set');
10+
const alerts_conn_set = new RpcConnSet('redirector alerts_conn_set');
1811

1912
function register_to_cluster(req) {
20-
var conn = req.connection;
21-
if (!cluster_connections.has(conn)) {
22-
dbg.log0('register_to_cluster', conn.url.href);
23-
cluster_connections.add(conn);
24-
conn.on('close', function() {
25-
cluster_connections.delete(conn);
26-
});
27-
}
13+
cluster_conn_set.add(req.connection);
2814
}
2915

3016
function publish_to_cluster(req) {
31-
var api_name = req.rpc_params.method_api.slice(0, -4); // remove _api suffix
32-
var method = req.rpc_params.method_name;
33-
var addresses = ['fcall://fcall']; // also call on myself
34-
cluster_connections.forEach(function(conn) {
35-
addresses.push(conn.url.href);
36-
});
37-
addresses = _.uniq(addresses);
38-
dbg.log0('publish_to_cluster:', addresses);
39-
return P.map(addresses, function(address) {
40-
return server_rpc.client[api_name][method](req.rpc_params.request_params, {
41-
address: address,
17+
const api_name = req.rpc_params.method_api.slice(0, -4); // remove _api suffix
18+
const method = req.rpc_params.method_name;
19+
const connections = cluster_conn_set.list();
20+
dbg.log0('publish_to_cluster:', connections.length);
21+
return P.map(connections,
22+
conn => server_rpc.client[api_name][method](req.rpc_params.request_params, {
23+
connection: conn,
4224
auth_token: req.auth_token,
43-
});
44-
})
45-
.then(function(res) {
46-
return {
47-
redirect_reply: {
48-
aggregated: res,
49-
}
50-
};
51-
});
25+
})
26+
)
27+
.then(res => ({
28+
redirect_reply: {
29+
aggregated: res,
30+
}
31+
}));
5232
}
5333

5434
function register_for_alerts(req) {
55-
var conn = req.connection;
56-
if (!alerts_connections.has(conn)) {
57-
dbg.log0('register_for_alerts', conn.url.href);
58-
alerts_connections.add(conn);
59-
conn.on('close', function() {
60-
alerts_connections.delete(conn);
61-
});
62-
}
35+
alerts_conn_set.add(req.connection);
6336
}
6437

6538
function unregister_from_alerts(req) {
66-
var conn = req.connection;
67-
if (!alerts_connections.has(conn)) {
68-
return;
69-
}
70-
alerts_connections.delete(conn);
39+
alerts_conn_set.remove(req.connection);
7140
}
7241

7342
function publish_alerts(req) {
74-
var connections = [];
75-
alerts_connections.forEach(function(conn) {
76-
connections.push(conn);
77-
});
78-
connections = _.uniq(connections);
79-
dbg.log3('publish_alerts:', req.rpc_params.request_params, connections);
80-
return P.map(connections, function(conn) {
81-
return server_rpc.client.frontend_notifications.alert(req.rpc_params.request_params, {
43+
const connections = alerts_conn_set.list();
44+
dbg.log3('publish_alerts:', req.rpc_params.request_params, connections.length);
45+
return P.map(connections, conn =>
46+
server_rpc.client.frontend_notifications.alert(req.rpc_params.request_params, {
8247
connection: conn,
83-
});
84-
})
48+
})
49+
)
8550
.then(() => {
8651
dbg.log3('published');
8752
});

0 commit comments

Comments
 (0)