Skip to content

Commit

Permalink
storage: initial redis support
Browse files Browse the repository at this point in the history
  • Loading branch information
fredriklindberg committed Apr 30, 2021
1 parent 630ed86 commit 56928e7
Show file tree
Hide file tree
Showing 11 changed files with 342 additions and 79 deletions.
6 changes: 6 additions & 0 deletions deps/docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
version: "3.8"
services:
redis:
image: "redis:alpine"
ports:
- "6379:6379"
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
"koa-joi-router": "^7.0.0",
"koa-router": "^9.4.0",
"log4js": "^6.3.0",
"redis": "^3.1.2",
"ws": "^7.3.1",
"yargs": "^15.4.1"
}
Expand Down
10 changes: 4 additions & 6 deletions src/account/account-manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,13 @@ class AccountManager {
async create() {
let accountId;
let maxTries = 100;
let created;
do {
accountId = Account.generateId();
const created = await this._db.set(accountId, {}, {NX: true});
if (!created) {
accountId = undefined;
}
} while (accountId === undefined && maxTries-- > 0);
created = await this._db.set(accountId, {}, {NX: true});
} while (!created && accountId === undefined && maxTries-- > 0);

if (!accountId) {
if (!created) {
return undefined;
}

Expand Down
17 changes: 17 additions & 0 deletions src/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,23 @@ const args = yargs
default: false,
description: 'Allow public account registration - NB: this allows public tunnel creation!'
})
.option('storage', {
type: 'string',
default: 'memory',
choices: ['memory', 'redis'],
})
.option('storage-redis-url', {
type: 'string',
description: 'Redis connection URL',
coerce: (url) => {
try {
return new URL(url);
} catch (err) {
console.log(err.message);
process.exit(-1);
}
},
})
.option('log-level', {
type: 'string',
default: 'info',
Expand Down
2 changes: 1 addition & 1 deletion src/controller/api-controller.js
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ class ApiController {
}

const account = await this.accountManager.create();
if (account === undefined) {
if (!account) {
ctx.status = 503;
return;
}
Expand Down
67 changes: 38 additions & 29 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,27 +5,29 @@ import AdminController from './controller/admin-controller.js';
import Listener from './listener/index.js';
import Ingress from './ingress/index.js';
import Endpoint from './endpoint/index.js';
import Storage from './storage/index.js';

export default () => {
Logger.info("exposr");

// Setup listeners
const listener = new Listener({
http: {
port: Config.get('port')
}
});
let listener;
try {
// Setup listeners
listener = new Listener({
http: {
port: Config.get('port')
}
});

// Setup tunnel connection endpoints (for clients to establish tunnels)
const endpoint = new Endpoint({
ws: {
enabled: true,
baseUrl: Config.get('base-url')
}
});
// Setup tunnel connection endpoints (for clients to establish tunnels)
const endpoint = new Endpoint({
ws: {
enabled: true,
baseUrl: Config.get('base-url')
}
});

// Setup tunnel data ingress (incoming tunnel data)
try {
const ingress = new Ingress({
http: {
enabled: Config.get('ingress').includes('http'),
Expand All @@ -40,23 +42,30 @@ export default () => {
const adminController = Config.get('admin-enable') ? new AdminController(Config.get('admin-port')) : undefined;
const apiController = new ApiController();

const readyCallback = () => {
adminController.setReady();
Logger.info("Service fully ready");
};

listener.listen((err) => {
if (err === undefined) {
if (adminController) {
adminController.setReady();
Logger.info({
message: "Admin interface enabled",
port: Config.get('admin-port')
});
} else {
Logger.info({message: "Admin interface disabled"});
}
Logger.info({
message: "Ready",
base_url: Config.get('base-url'),
port: Config.get('port')
});
if (err) {
Logger.error(err);
process.exit(-1);
}
if (adminController) {
Logger.info({
message: "Admin interface enabled",
port: Config.get('admin-port')
});
new Storage("default", { callback: readyCallback });
} else {
Logger.info({message: "Admin interface disabled"});
}
Logger.info({
message: "API endpoint available",
base_url: Config.get('base-url'),
port: Config.get('port')
});
});

const sigHandler = (signal) => {
Expand Down
70 changes: 69 additions & 1 deletion src/storage/index.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,77 @@
import InMemoryStorage from './inmemory-storage.js';
import RedisStorage from './redis-storage.js';
import Config from '../config.js'
import assert from 'assert/strict';

class Storage {
constructor(namespace, opts = {}) {
return new InMemoryStorage(namespace, opts);
const storageType = Config.get('storage');

const ready = () => {
opts.callback && process.nextTick(opts.callback);
};

if (storageType == 'memory') {
this.storage = new InMemoryStorage(ready);
} else if (storageType == 'redis') {
this.storage = new RedisStorage(ready);
} else {
throw new Error(`Unknown storage ${storageType}`);
}
this.ns = namespace;
this.key = opts.key;
}

_key(key) {
return `${this.ns}:${key}`;
}

// Returns
// Object on success
// undefined on not found
// false on storage error
async get(key = undefined) {
if (!key) {
key = this.key;
}
assert(key !== undefined);
return this.storage.get(this._key(key));
};

// Returns
// Object on success
// false on storage error
async set(arg1, arg2, arg3) {
// set(key, obj, opts)
if (arg1 != undefined && arg2 != undefined && arg3 != undefined) {
return this.storage.set(this._key(arg1), arg2, arg3);
} else if (arg1 != undefined && arg2 != undefined && arg3 == undefined) {
// set(key, obj)
if (typeof arg1 === 'string') {
return this.storage.set(this._key(arg1), arg2, {});
// set(obj, opts)
} else {
return this.storage.set(this._key(this.key), arg1, arg2);
}
// set(obj)
} else if (arg1 != undefined && arg2 == undefined && arg3 == undefined) {
return this.storage.set(this._key(this.key), arg1, {});
} else {
assert.fail("invalid call to set");
}
}

// Returns
// True if deleted
// undefined if not found
// False on storage error
async delete(key = undefined) {
if (!key) {
key = this.key;
}
assert(key !== undefined);
this.storage.delete(this._key(key));
};
}

export default Storage;
69 changes: 28 additions & 41 deletions src/storage/inmemory-storage.js
Original file line number Diff line number Diff line change
@@ -1,64 +1,51 @@
import assert from 'assert/strict';
import { Logger } from '../logger.js';

const _DB = {};

class InMemoryStorage {
constructor(namespace, opts) {
this.logger = Logger("in-memory-storage");
this.logger.addContext("ns", namespace);
this.ns = namespace;
this.key = opts.key;
if (_DB[namespace] === undefined) {
_DB[namespace] = {};
constructor(callback) {
if (InMemoryStorage.instance instanceof InMemoryStorage) {
process.nextTick(callback);
return InMemoryStorage.instance;
}
this.db = _DB[namespace];
InMemoryStorage.instance = this;
this.logger = Logger("in-memory-storage");
this.db = {};
process.nextTick(callback);
}

async get(key = undefined) {
if (!key) {
key = this.key;
}
async get(key) {
assert(key !== undefined);
this.logger.isTraceEnabled() &&
this.logger.trace({
operation: 'get',
key
});
return this.db[key];
};

async set(arg1, arg2, arg3) {
// set(key, obj, opts)
if (arg1 != undefined && arg2 != undefined && arg3 != undefined) {
return this._set(arg1, arg2, arg3);
} else if (arg1 != undefined && arg2 != undefined && arg3 == undefined) {
// set(key, obj)
if (typeof arg1 === 'string') {
return this._set(arg1, arg2, {});
// set(obj, opts)
} else {
return this._set(this.key, arg1, arg2);
}
// set(obj)
} else if (arg1 != undefined && arg2 == undefined && arg3 == undefined) {
return this._set(this.key, arg1, {});
} else {
assert.fail("invalid call to set");
}
}

async _set(key, data, opts = {}) {
async set(key, data, opts = {}) {
assert(key !== undefined);
this.logger.isTraceEnabled() && this.logger.trace(`set=${key}, opts=${JSON.stringify(opts)}, data=${JSON.stringify(data)}`);
this.logger.isTraceEnabled() &&
this.logger.trace({
operation: 'set',
opts,
key,
data: JSON.stringify(data),
});
if (opts.NX === true && this.db[key] !== undefined) {
return false;
}
this.db[key] = data;
return this.db[key];
};

async delete(key = undefined) {
if (!key) {
key = this.key;
}
async delete(key) {
assert(key !== undefined);
this.logger.isTraceEnabled() && this.logger.trace(`delete=${key}`);
this.logger.isTraceEnabled() &&
this.logger.trace({
operation: 'delete',
key
});
delete this.db[key];
};

Expand Down
Loading

0 comments on commit 56928e7

Please sign in to comment.