forked from octoblu/meshblu
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmqtt-server.js
149 lines (120 loc) · 3.72 KB
/
mqtt-server.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
'use strict';
var mosca = require('mosca');
var config = require('./config');
var updateSocketId = require('./lib/updateSocketId');
var server;
config.mqtt = config.mqtt || {};
var dataStore = {
type: 'mongo',
url: config.mqtt.databaseUrl,
pubsubCollection: 'mqtt',
mongo: {}
};
var dataLogger = {
level: 'debug'
};
var settings = {
port: config.mqtt.port || 1883,
backend: dataStore,
logger: dataLogger
};
function endsWith(str, suffix) {
return str.indexOf(suffix, str.length - suffix.length) !== -1;
}
process.on("uncaughtException", function(error) {
return console.log(error.stack);
});
// Accepts the connection if the username and password are valid
function authenticate(client, username, password, callback) {
console.log('authenticate username:', username.toString(),'password', password.toString(),'client.id:', client.id, client.clientId, client.client_id);
if(username && username.toString() === 'skynet'){
if(password && password.toString() === config.mqtt.skynetPass){
client.skynetDevice = {
uuid: 'skynet',
};
callback(null, true);
}else{
callback('unauthorized');
}
}else{
var data = {
uuid: username.toString(),
token: password.toString(),
socketid: username.toString(),
protocol: 'mqtt',
online: 'true'
};
console.log('attempting authenticate', data);
updateSocketId(data, function(auth){
if (auth.status == 201){
client.skynetDevice = auth.device;
console.log('authenticated: ' + auth.device.uuid);
callback(null, true);
} else {
callback('unauthorized');
}
});
}
}
// In this case the client authorized as alice can publish to /users/alice taking
// the username from the topic and verifing it is the same of the authorized user
function authorizePublish(client, topic, payload, callback) {
function reject(){
callback('unauthorized');
console.log('\nunauthorized Publish', topic, client.id);
}
//TODO refactor this mess
if(client.skynetDevice){
if(client.skynetDevice.uuid === 'skynet'){
callback(null, true);
}else if(topic === 'skynet'){
try{
var payloadObj = JSON.parse(payload.toString());
if(payloadObj.fromUuid === client.skynetDevice.uuid){
callback(null, true);
console.log('\nauthorized Publish', topic, client.id);
}else{
reject();
}
}catch(err){
reject();
}
}else{
reject();
}
}else{
reject();
}
}
// In this case the client authorized as alice can subscribe to /users/alice taking
// the username from the topic and verifing it is the same of the authorized user
function authorizeSubscribe(client, topic, callback) {
if(endsWith(topic, '_bc') ||
(client.skynetDevice &&
((client.skynetDevice.uuid === 'skynet') || (client.skynetDevice.uuid === topic)))){
callback(null, true);
console.log('authorized subscribe', topic, client.skynetDevice);
}else{
callback('unauthorized');
}
}
// fired when the mqtt server is ready
function setup() {
console.log('Skynet MQTT server started on port', config.port);
server.authenticate = authenticate;
server.authorizePublish = authorizePublish;
server.authorizeSubscribe = authorizeSubscribe;
}
// // fired when a message is published
server = new mosca.Server(settings);
server.on('ready', setup);
server.on('published', function(packet, client) {
//console.log('Published', packet, client);
});
// fired when a client connects or disconnects
server.on('clientConnected', function(client) {
console.log('Client Connected:', client.id);
});
server.on('clientDisconnected', function(client) {
console.log('Client Disconnected:', client.id);
});