diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 00000000..9d678dd1 --- /dev/null +++ b/.travis.yml @@ -0,0 +1,38 @@ +sudo: false +language: node_js + +matrix: + include: + - node_js: 4 + env: CXX=g++-4.8 + - node_js: 6 + env: + - SAUCE=true + - CXX=g++-4.8 + - node_js: stable + env: CXX=g++-4.8 + +# Make sure we have new NPM. +before_install: + - npm install -g npm + +script: + - npm run lint + - npm test + - npm run coverage + - make test + +before_script: + - export DISPLAY=:99.0 + - sh -e /etc/init.d/xvfb start + +after_success: + - npm run coverage-publish + +addons: + firefox: 'latest' + apt: + sources: + - ubuntu-toolchain-r-test + packages: + - g++-4.8 diff --git a/README.md b/README.md index 32fc1802..97701b2f 100644 --- a/README.md +++ b/README.md @@ -1,13 +1,53 @@ -# js-libp2p-dht +# js-libp2p-kad-dht -[![](https://img.shields.io/badge/made%20by-Protocol%20Labs-blue.svg?style=flat-square)](http://ipn.io) [![](https://img.shields.io/badge/project-IPFS-blue.svg?style=flat-square)](http://ipfs.io/) [![](https://img.shields.io/badge/freenode-%23ipfs-blue.svg?style=flat-square)](http://webchat.freenode.net/?channels=%23ipfs) [![Build Status](https://travis-ci.org/libp2p/js-libp2p-dht.svg?style=flat-square)](https://travis-ci.org/libp2p/js-libp2p-dht) ![](https://img.shields.io/badge/coverage-%3F-yellow.svg?style=flat-square) [![Dependency Status](https://david-dm.org/libp2p/js-libp2p-dht.svg?style=flat-square)](https://david-dm.org/libp2p/js-libp2p-dht) [![js-standard-style](https://img.shields.io/badge/code%20style-standard-brightgreen.svg?style=flat-square)](https://github.com/feross/standard) +[![](https://img.shields.io/badge/made%20by-Protocol%20Labs-blue.svg?style=flat-square)](http://ipn.io) +[![](https://img.shields.io/badge/project-IPFS-blue.svg?style=flat-square)](http://ipfs.io/) +[![](https://img.shields.io/badge/freenode-%23ipfs-blue.svg?style=flat-square)](http://webchat.freenode.net/?channels=%23ipfs) +[![Build Status](https://travis-ci.org/ipfs/js-libp2p-kad-dht.svg?style=flat-square)](https://travis-ci.org/ipfs/js-libp2p-kad-dht) +[![Coverage Status](https://coveralls.io/repos/github/ipfs/js-libp2p-kad-dht/badge.svg?branch=master)](https://coveralls.io/github/ipfs/js-libp2p-kad-dht?branch=master) +[![Dependency Status](https://david-dm.org/ipfs/js-libp2p-kad-dht.svg?style=flat-square)](https://david-dm.org/ipfs/js-libp2p-kad-dht) +[![js-standard-style](https://img.shields.io/badge/code%20style-standard-brightgreen.svg?style=flat-square)](https://github.com/feross/standard) +[![standard-readme compliant](https://img.shields.io/badge/standard--readme-OK-green.svg?style=flat-square)](https://github.com/RichardLitt/standard-readme) +![](https://img.shields.io/badge/npm-%3E%3D3.0.0-orange.svg?style=flat-square) +![](https://img.shields.io/badge/Node.js-%3E%3D4.0.0-orange.svg?style=flat-square) -> JavaScript implementation of the DHT for libp2p +> JavaScript implementation of the Kademlia DHT for libp2p, based on [go-libp2p-kad-dht](https://github.com/libp2p/go-libp2p-kad-dht). -## Description +## Table of Contents -## Usage +- [Install](#install) + - [npm](#npm) + - [Use in Node.js](#use-in-nodejs) +- [API](#api) +- [Contribute](#contribute) +- [License](#license) + +## Install + +### npm + +```sh +> npm i libp2p-dht +``` + +### Use in Node.js + +```js +const KadDHT = require('libp2p-kad-dht') +``` + +## API + +See https://libp2p.github.io/js-libp2p-kad-dht + +## Contribute + +Feel free to join in. All welcome. Open an [issue](https://github.com/ipfs/js-libp2p-ipfs/issues)! + +This repository falls under the IPFS [Code of Conduct](https://github.com/ipfs/community/blob/master/code-of-conduct.md). + +[![](https://cdn.rawgit.com/jbenet/contribute-ipfs-gif/master/img/contribute.gif)](https://github.com/ipfs/community/blob/master/contributing.md) ## License -MIT +MIT - Protocol Labs 2017 diff --git a/circle.yml b/circle.yml new file mode 100644 index 00000000..5a84b052 --- /dev/null +++ b/circle.yml @@ -0,0 +1,17 @@ +machine: + node: + version: stable + +dependencies: + pre: + - google-chrome --version + - wget -q -O - https://dl-ssl.google.com/linux/linux_signing_key.pub | sudo apt-key add - + - sudo sh -c 'echo "deb [arch=amd64] http://dl.google.com/linux/chrome/deb/ stable main" >> /etc/apt/sources.list.d/google.list' + - sudo apt-get update + - sudo apt-get --only-upgrade install google-chrome-stable + - google-chrome --version + +test: + override: + - npm run test + - npm run test:interop diff --git a/package.json b/package.json new file mode 100644 index 00000000..e57365f6 --- /dev/null +++ b/package.json @@ -0,0 +1,79 @@ +{ + "name": "libp2p-kad-dht", + "version": "0.0.1", + "description": "JavaScript implementation of the Kad-DHT for libp2p", + "main": "src/index.js", + "scripts": { + "lint": "aegir-lint", + "test": "aegir-test --env node", + "test:node": "aegir-test node", + "test:browser": "aegir-test browser", + "build": "aegir-build", + "docs": "aegir-docs", + "release": "aegir-release --docs --env node", + "release-minor": "aegir-release --type minor --docs --env node", + "release-major": "aegir-release --type major --docs --env node", + "coverage": "aegir-coverage", + "coverage-publish": "aegir-coverage publish" + }, + "browser": { + "libp2p-ipfs-nodejs": "libp2p-ipfs-browser" + }, + "pre-commit": [ + "lint", + "test" + ], + "repository": { + "type": "git", + "url": "https://github.com/libp2p/js-libp2p-kad-dht.git" + }, + "keywords": [ + "IPFS" + ], + "author": "Friedel Ziegelmayer ", + "license": "MIT", + "bugs": { + "url": "https://github.com/libp2p/js-libp2p-kad-dht/issues" + }, + "engines": { + "node": ">=4.0.0", + "npm": ">=3.0.0" + }, + "homepage": "https://github.com/libp2p/js-libp2p-kad-dht", + "dependencies": { + "async": "^2.3.0", + "base32.js": "^0.1.0", + "cids": "~0.5.0", + "debug": "^2.6.3", + "hashlru": "^2.1.0", + "heap": "^0.2.6", + "interface-datastore": "^0.2.0", + "k-bucket": "^3.2.1", + "libp2p-crypto": "~0.8.7", + "libp2p-record": "~0.3.1", + "multihashing-async": "~0.4.5", + "peer-id": "~0.8.7", + "peer-info": "~0.9.2", + "priorityqueue": "^0.2.0", + "protocol-buffers": "^3.2.1", + "pull-length-prefixed": "^1.2.0", + "pull-stream": "^3.5.0", + "varint": "^5.0.0", + "xor-distance": "^1.0.0" + }, + "devDependencies": { + "aegir": "^11.0.1", + "chai": "^3.5.0", + "datastore-level": "^0.3.0", + "dirty-chai": "^1.2.2", + "interface-connection": "^0.3.2", + "left-pad": "^1.1.3", + "libp2p-ipfs-browser": "~0.23.0", + "libp2p-ipfs-nodejs": "~0.23.0", + "lodash": "^4.17.4", + "lodash.random": "^3.2.0", + "lodash.range": "^3.2.0", + "peer-book": "~0.4.0", + "pre-commit": "^1.2.2" + } +} diff --git a/src/constants.js b/src/constants.js new file mode 100644 index 00000000..b8d93084 --- /dev/null +++ b/src/constants.js @@ -0,0 +1,34 @@ +'use strict' + +// MaxRecordAge specifies the maximum time that any node will hold onto a record +// from the time its received. This does not apply to any other forms of validity that +// the record may contain. +// For example, a record may contain an ipns entry with an EOL saying its valid +// until the year 2020 (a great time in the future). For that record to stick around +// it must be rebroadcasted more frequently than once every 'MaxRecordAge' + +const second = exports.second = 1000 +const minute = exports.minute = 60 * second +const hour = exports.hour = 60 * minute + +exports.MAX_RECORD_AGE = 36 * hour + +exports.PROTOCOL_DHT = '/ipfs/kad/1.0.0' + +exports.PROVIDERS_KEY_PREFIX = '/providers/' + +exports.PROVIDERS_LRU_CACHE_SIZE = 256 + +exports.PROVIDERS_VALIDITY = 24 * hour + +exports.PROVIDERS_CLEANUP_INTERVAL = hour + +exports.READ_MESSAGE_TIMEOUT = minute + +// K is the maximum number of requests to perform before returning failue +exports.K = 20 + +// Alpha is the concurrency for asynchronous requests +exports.ALPHA = 3 + +exports.maxMessageSize = 2 << 22 // 4MB diff --git a/src/errors.js b/src/errors.js new file mode 100644 index 00000000..dd631f87 --- /dev/null +++ b/src/errors.js @@ -0,0 +1,13 @@ +'use strict' + +class InvalidRecordError extends Error {} + +class NotFoundError extends Error {} + +class LookupFailureError extends Error {} + +module.exports = { + InvalidRecordError, + NotFoundError, + LookupFailureError +} diff --git a/src/index.js b/src/index.js new file mode 100644 index 00000000..80cd0054 --- /dev/null +++ b/src/index.js @@ -0,0 +1,567 @@ +'use strict' + +const libp2pRecord = require('libp2p-record') +const MemoryStore = require('interface-datastore').MemoryDatastore +const waterfall = require('async/waterfall') +const each = require('async/each') +const timeout = require('async/timeout') +const PeerId = require('peer-id') +const PeerInfo = require('peer-info') +const crypto = require('libp2p-crypto') + +const RoutingTable = require('./routing') +const utils = require('./utils') +const c = require('./constants') +const Query = require('./query') +const Network = require('./network') +const errors = require('./errors') +const privateApi = require('./private') +const Providers = require('./providers') +const Message = require('./message') + +/** + * A DHT implementation modeled after Kademlia with Coral and S/Kademlia modifications. + * + * Original implementation in go: https://github.com/libp2p/go-libp2p-kad-dht. + */ +class KadDHT { + /** + * Create a new KadDHT. + * + * @param {Libp2p} libp2p + * @param {number} [kBucketSize=20] + * @param {Datastore} [datastore=MemoryDatastore] + */ + constructor (libp2p, kBucketSize, datastore) { + /** + * Local reference to libp2p. + * + * @type {Libp2p} + */ + this.libp2p = libp2p + + /** + * k-bucket size, defaults to 20. + * + * @type {number} + */ + this.kBucketSize = kBucketSize || 20 + + /** + * Number of closest peers to return on kBucket search + * + * @type {number} + */ + this.ncp = 6 + + /** + * The routing table. + * + * @type {RoutingTable} + */ + this.routingTable = new RoutingTable(this.self.id, this.kBucketSize) + + /** + * Reference to the datastore, uses an in-memory store if none given. + * + * @type {Datastore} + */ + this.datastore = datastore || new MemoryStore() + + /** + * Provider management + * + * @type {Providers} + */ + this.providers = new Providers(this.datastore, this.self.id) + + this.validators = { + pk: libp2pRecord.validator.validators.pk + } + + this.selectors = { + pk: libp2pRecord.selection.selectors.pk + } + + this.network = new Network(this, this.libp2p) + + this._log = utils.logger(this.self.id) + + // Inject private apis so we don't clutter up this file + const pa = privateApi(this) + Object.keys(pa).forEach((name) => { + this[name] = pa[name] + }) + } + + /** + * Start listening to incoming connections. + * + * @param {function(Error)} callback + * @returns {void} + */ + start (callback) { + this._running = true + this.network.start(callback) + } + + /** + * Stop accepting incoming connections and sending outgoing + * messages. + * + * @param {function(Error)} callback + * @returns {void} + */ + stop (callback) { + this._running = false + this.bootstrapStop() + this.network.stop(callback) + } + + /** + * Alias to the peerbook from libp2p + */ + get peerBook () { + return this.libp2p.peerBook + } + + /** + * Is this DHT running. + * + * @type {bool} + */ + get isRunning () { + return this._running + } + + /** + * Local peer (yourself) + * + * @type {PeerInfo} + */ + get self () { + return this.libp2p.peerInfo + } + + /** + * Kademlia 'node lookup' operation. + * + * @param {Buffer} key + * @param {function(Error, Array)} callback + * @returns {void} + */ + getClosestPeers (key, callback) { + this._log('getClosestPeers to %s', key.toString()) + utils.convertBuffer(key, (err, id) => { + if (err) { + return callback(err) + } + + const tablePeers = this.routingTable.closestPeers(id, c.ALPHA) + + const q = new Query(this, key, (peer, callback) => { + waterfall([ + (cb) => this._closerPeersSingle(key, peer, cb), + (closer, cb) => { + cb(null, { + closerPeers: closer + }) + } + ], callback) + }) + + q.run(tablePeers, (err, res) => { + if (err) { + return callback(err) + } + + if (!res || !res.finalSet) { + return callback(null, []) + } + + waterfall([ + (cb) => utils.sortClosestPeers(Array.from(res.finalSet), id, cb), + (sorted, cb) => cb(null, sorted.slice(0, c.K)) + ], callback) + }) + }) + } + + /** + * Store the given key/value pair in the DHT. + * + * @param {Buffer} key + * @param {Buffer} value + * @param {function(Error)} callback + * @returns {void} + */ + put (key, value, callback) { + this._log('PutValue %s', key) + let sign + try { + sign = libp2pRecord.validator.isSigned(this.validators, key) + } catch (err) { + return callback(err) + } + + waterfall([ + (cb) => utils.createPutRecord(key, value, this.self.id, sign, cb), + (rec, cb) => waterfall([ + (cb) => this._putLocal(key, rec, cb), + (cb) => this.getClosestPeers(key, cb), + (peers, cb) => each(peers, (peer, cb) => { + this._putValueToPeer(key, rec, peer, cb) + }, cb) + ], cb) + ], callback) + } + + /** + * Get the value to the given key. + * Times out after 1 minute. + * + * @param {Buffer} key + * @param {number} [maxTimeout=60000] - optional timeout + * @param {function(Error, Buffer)} callback + * @returns {void} + */ + get (key, maxTimeout, callback) { + if (typeof maxTimeout === 'function') { + callback = maxTimeout + maxTimeout = null + } + + if (maxTimeout == null) { + maxTimeout = c.minute + } + + this._get(key, maxTimeout, callback) + } + + /** + * Get the `n` values to the given key without sorting. + * + * @param {Buffer} key + * @param {number} nvals + * @param {number} [maxTimeout=60000] + * @param {function(Error, Array<{from: PeerId, val: Buffer}>)} callback + * @returns {void} + */ + getMany (key, nvals, maxTimeout, callback) { + if (typeof maxTimeout === 'function') { + callback = maxTimeout + maxTimeout = null + } + if (maxTimeout == null) { + maxTimeout = c.minute + } + + this._log('getMany %s (%s)', key, nvals) + const vals = [] + + this._getLocal(key, (err, localRec) => { + if (err && nvals === 0) { + return callback(err) + } + if (err == null) { + vals.push({ + val: localRec.value, + from: this.self.id + }) + } + + if (nvals <= 1) { + return callback(null, vals) + } + + waterfall([ + (cb) => utils.convertBuffer(key, cb), + (id, cb) => { + const rtp = this.routingTable.closestPeers(id, c.ALPHA) + this._log('peers in rt: %d', rtp.length) + if (rtp.length === 0) { + this._log.error('No peers from routing table!') + return cb(new Error('Failed to lookup key')) + } + + // we have peers, lets do the actualy query to them + const query = new Query(this, key, (peer, cb) => { + this._getValueOrPeers(peer, key, (err, rec, peers) => { + if (err) { + // If we have an invalid record we just want to continue and fetch a new one. + if (!(err instanceof errors.InvalidRecordError)) { + return cb(err) + } + } + + const res = { + closerPeers: peers + } + + if ((rec && rec.value) || + err instanceof errors.InvalidRecordError) { + vals.push({ + val: rec && rec.value, + from: peer + }) + } + + // enough is enough + if (vals.length >= nvals) { + res.success = true + } + + cb(null, res) + }) + }) + + // run our query + timeout((cb) => { + query.run(rtp, cb) + }, maxTimeout)(cb) + } + ], (err) => { + if (err && vals.length === 0) { + return callback(err) + } + + callback(null, vals) + }) + }) + } + + /** + * Get the public key for the given peer id. + * + * @param {PeerId} peer + * @param {function(Error, PubKey)} callback + * @returns {void} + */ + getPublicKey (peer, callback) { + this._log('getPublicKey %s', peer.toB58String()) + // local check + let info + if (this.peerBook.has(peer)) { + info = this.libp2p.peerBook.get(peer) + + if (info && info.id.pubKey) { + this._log('getPublicKey: found local copy') + return callback(null, info.id.pubKey) + } + } else { + info = this.peerBook.put(new PeerInfo(peer)) + } + // try the node directly + this._getPublicKeyFromNode(peer, (err, pk) => { + if (!err) { + info.id = new PeerId(peer.id, null, pk) + this.libp2p.peerBook.put(info) + + return callback(null, pk) + } + + // dht directly + const pkKey = utils.keyForPublicKey(peer) + this.get(pkKey, (err, value) => { + if (err) { + return callback(err) + } + + const pk = crypto.unmarshalPublicKey(value) + info.id = new PeerId(peer, null, pk) + this.libp2p.peerBook.put(info) + + callback(null, pk) + }) + }) + } + + /** + * Announce to the network that a node can provide the given key. + * This is what Coral and MainlineDHT do to store large values + * in a DHT. + * + * @param {CID} key + * @param {function(Error)} callback + * @returns {void} + */ + provide (key, callback) { + this._log('provide: %s', key.toBaseEncodedString()) + + waterfall([ + (cb) => this.providers.addProvider(key, this.self.id, cb), + (cb) => this.getClosestPeers(key.buffer, cb), + (peers, cb) => { + const msg = new Message(Message.TYPES.ADD_PROVIDER, key.buffer, 0) + msg.providerPeers = peers.map((p) => new PeerInfo(p)) + + each(peers, (peer, cb) => { + this._log('putProvider %s to %s', key.toBaseEncodedString(), peer.toB58String()) + this.network.sendMessage(peer, msg, cb) + }, cb) + } + ], (err) => callback(err)) + } + + /** + * Search the dht for up to `K` providers of the given CID. + * + * @param {CID} key + * @param {number} timeout - how long the query should maximally run, in milliseconds. + * @param {function(Error, Array)} callback + * @returns {void} + */ + findProviders (key, timeout, callback) { + this._log('findProviders %s', key.toBaseEncodedString()) + this._findNProviders(key, timeout, c.K, callback) + } + + /** + * Search for a peer with the given ID. + * + * @param {PeerId} id + * @param {number} [maxTimeout=60000] + * @param {function(Error, PeerInfo)} callback + * @returns {void} + */ + findPeer (id, maxTimeout, callback) { + if (typeof maxTimeout === 'function') { + callback = maxTimeout + maxTimeout = null + } + + if (maxTimeout == null) { + maxTimeout = c.minute + } + + this._log('findPeer %s', id.toB58String()) + + this.findPeerLocal(id, (err, pi) => { + if (err) { + return callback(err) + } + + // already got it + if (pi != null) { + this._log('found local') + return callback(null, pi) + } + + waterfall([ + (cb) => utils.convertPeerId(id, cb), + (key, cb) => { + const peers = this.routingTable.closestPeers(key, c.ALPHA) + + if (peers.length === 0) { + return cb(new errors.LookupFailureError()) + } + + // sanity check + const match = peers.find((p) => p.isEqual(id)) + if (match && this.peerBook.has(id)) { + this._log('found in peerbook') + return cb(null, this.peerBook.get(id)) + } + + // query the network + const query = new Query(this, id.id, (peer, cb) => { + waterfall([ + (cb) => this._findPeerSingle(peer, id, cb), + (msg, cb) => { + const match = msg.closerPeers.find((p) => p.id.isEqual(id)) + + // found it + if (match) { + return cb(null, { + peer: match, + success: true + }) + } + + cb(null, { + closerPeers: msg.closerPeers + }) + } + ], cb) + }) + + timeout((cb) => { + query.run(peers, cb) + }, maxTimeout)(cb) + }, + (result, cb) => { + this._log('findPeer %s: %s', id.toB58String(), result.success) + if (!result.peer) { + return cb(new errors.NotFoundError()) + } + cb(null, result.peer) + } + ], callback) + }) + } + + /** + * Look if we are connected to a peer with the given id. + * Returns the `PeerInfo` for it, if found, otherwise `undefined`. + * + * @param {PeerId} peer + * @param {function(Error, PeerInfo)} callback + * @returns {void} + */ + findPeerLocal (peer, callback) { + this._log('findPeerLocal %s', peer.toB58String()) + this.routingTable.find(peer, (err, p) => { + if (err) { + return callback(err) + } + if (!p || !this.peerBook.has(p)) { + return callback() + } + callback(null, this.peerBook.get(p)) + }) + } + + /** + * Start the bootstrap process. This means running a number of queries every interval requesting random data. + * This is done to keep the dht healthy over time. + * + * @param {number} [queries=1] - how many queries to run per period + * @param {number} [period=300000] - how often to run the the bootstrap process, in milliseconds (5min) + * @param {number} [maxTimeout=10000] - how long to wait for the the bootstrap query to run, in milliseconds (10s) + * @returns {void} + */ + bootstrapStart (queries, period, maxTimeout) { + if (queries == null) { + queries = 1 + } + if (period == null) { + period = 5 * c.minute + } + if (maxTimeout == null) { + maxTimeout = 10 * c.second + } + + // Don't run twice + if (this._bootstrapRunning) { + return + } + + this._bootstrapRunning = setInterval( + () => this._bootstrap(queries, maxTimeout), + period + ) + } + + /** + * Stop the bootstrap process. + * + * @returns {void} + */ + bootstrapStop () { + if (this._bootstrapRunning) { + clearInterval(this._bootstrapRunning) + } + } +} + +module.exports = KadDHT diff --git a/src/limited-peer-list.js b/src/limited-peer-list.js new file mode 100644 index 00000000..48b536dd --- /dev/null +++ b/src/limited-peer-list.js @@ -0,0 +1,33 @@ +'use strict' + +const PeerList = require('./peer-list') + +/** + * Like PeerList but with a length restriction. + */ +class LimitedPeerList extends PeerList { + /** + * Create a new limited peer list. + * + * @param {number} limit + */ + constructor (limit) { + super() + this.limit = limit + } + + /** + * Add a PeerInfo if it fits in the list + * + * @param {PeerInfo} info + * @returns {bool} + */ + push (info) { + if (this.length < this.limit) { + return super.push(info) + } + return false + } +} + +module.exports = LimitedPeerList diff --git a/src/message/dht.proto.js b/src/message/dht.proto.js new file mode 100644 index 00000000..7f6fb82e --- /dev/null +++ b/src/message/dht.proto.js @@ -0,0 +1,75 @@ +'use strict' + +module.exports = `// can't use, because protocol-buffers doesn't support imports +// so we have to duplicate for now :( +// import "record.proto"; + +message Record { + // adjusted for javascript + optional bytes key = 1; + optional bytes value = 2; + optional bytes author = 3; + optional bytes signature = 4; + optional string timeReceived = 5; +} + +message Message { + enum MessageType { + PUT_VALUE = 0; + GET_VALUE = 1; + ADD_PROVIDER = 2; + GET_PROVIDERS = 3; + FIND_NODE = 4; + PING = 5; + } + + enum ConnectionType { + // sender does not have a connection to peer, and no extra information (default) + NOT_CONNECTED = 0; + + // sender has a live connection to peer + CONNECTED = 1; + + // sender recently connected to peer + CAN_CONNECT = 2; + + // sender recently tried to connect to peer repeatedly but failed to connect + // ("try" here is loose, but this should signal "made strong effort, failed") + CANNOT_CONNECT = 3; + } + + message Peer { + // ID of a given peer. + optional bytes id = 1; + + // multiaddrs for a given peer + repeated bytes addrs = 2; + + // used to signal the sender's connection capabilities to the peer + optional ConnectionType connection = 3; + } + + // defines what type of message it is. + optional MessageType type = 1; + + // defines what coral cluster level this query/response belongs to. + optional int32 clusterLevelRaw = 10; + + // Used to specify the key associated with this message. + // PUT_VALUE, GET_VALUE, ADD_PROVIDER, GET_PROVIDERS + // adjusted for javascript + optional bytes key = 2; + + // Used to return a value + // PUT_VALUE, GET_VALUE + // adjusted Record to bytes for js + optional bytes record = 3; + + // Used to return peers closer to a key in a query + // GET_VALUE, GET_PROVIDERS, FIND_NODE + repeated Peer closerPeers = 8; + + // Used to return Providers + // GET_VALUE, ADD_PROVIDER, GET_PROVIDERS + repeated Peer providerPeers = 9; +}` diff --git a/src/message/index.js b/src/message/index.js new file mode 100644 index 00000000..d5df3bb2 --- /dev/null +++ b/src/message/index.js @@ -0,0 +1,127 @@ +'use strict' + +const assert = require('assert') +const PeerInfo = require('peer-info') +const PeerId = require('peer-id') +const protobuf = require('protocol-buffers') +const Record = require('libp2p-record').Record + +const pbm = protobuf(require('./dht.proto')) + +const MESSAGE_TYPE = pbm.Message.MessageType +const CONNECTION_TYPE = pbm.Message.ConnectionType + +/** + * Represents a single DHT control message. + */ +class Message { + /** + * @param {MessageType} type + * @param {Buffer} key + * @param {number} level + */ + constructor (type, key, level) { + if (key) { + assert(Buffer.isBuffer(key)) + } + + this.type = type + this.key = key + this._clusterLevelRaw = level + this.closerPeers = [] + this.providerPeers = [] + this.record = null + } + + /** + * @type {number} + */ + get clusterLevel () { + const level = this._clusterLevelRaw - 1 + if (level < 0) { + return 0 + } + + return level + } + + set clusterLevel (level) { + this._clusterLevelRaw = level + } + + /** + * Encode into protobuf + * @returns {Buffer} + */ + serialize () { + const obj = { + key: this.key, + type: this.type, + clusterLevelRaw: this._clusterLevelRaw, + closerPeers: this.closerPeers.map(toPbPeer), + providerPeers: this.providerPeers.map(toPbPeer) + } + + if (this.record) { + if (Buffer.isBuffer(this.record)) { + obj.record = this.record + } else { + obj.record = this.record.serialize() + } + } + + return pbm.Message.encode(obj) + } + + /** + * Decode from protobuf + * + * @param {Buffer} raw + * @returns {Message} + */ + static deserialize (raw) { + const dec = pbm.Message.decode(raw) + + const msg = new Message(dec.type, dec.key, dec.clusterLevelRaw) + + msg.closerPeers = dec.closerPeers.map(fromPbPeer) + msg.providerPeers = dec.providerPeers.map(fromPbPeer) + if (dec.record) { + msg.record = Record.deserialize(dec.record) + } + + return msg + } +} + +Message.TYPES = MESSAGE_TYPE +Message.CONNECTION_TYPES = CONNECTION_TYPE + +function toPbPeer (peer) { + const res = { + id: peer.id.id, + addrs: peer.multiaddrs.toArray().map((m) => m.buffer) + } + + if (peer.isConnected()) { + res.connection = CONNECTION_TYPE.CONNECTED + } else { + res.connection = CONNECTION_TYPE.NOT_CONNECTED + } + + return res +} + +function fromPbPeer (peer) { + const info = new PeerInfo(new PeerId(peer.id)) + peer.addrs.forEach((a) => info.multiaddrs.add(a)) + + // TODO: upgrade protobuf to send the address connected on + if (peer.connection === CONNECTION_TYPE.CONNECTED) { + info.connect(peer.addrs[0]) + } + + return info +} + +module.exports = Message diff --git a/src/network.js b/src/network.js new file mode 100644 index 00000000..320bfe2e --- /dev/null +++ b/src/network.js @@ -0,0 +1,241 @@ +'use strict' + +const pull = require('pull-stream') +const timeout = require('async/timeout') +const lp = require('pull-length-prefixed') +const setImmediate = require('async/setImmediate') + +const rpc = require('./rpc') +const c = require('./constants') +const Message = require('./message') +const utils = require('./utils') + +/** + * Handle network operations for the dht + */ +class Network { + /** + * Create a new network. + * + * @param {DHT} dht + * @param {Libp2p} libp2p + */ + constructor (dht, libp2p) { + this.dht = dht + this.libp2p = libp2p + this.readMessageTimeout = c.READ_MESSAGE_TIMEOUT + this._log = utils.logger(this.dht.self.id, 'net') + this._rpc = rpc(this.dht) + this._onPeerConnected = this._onPeerConnected.bind(this) + this._online = false + } + + /** + * Start the network. + * + * @param {function(Error)} callback + * @returns {void} + */ + start (callback) { + const cb = (err) => setImmediate(() => callback(err)) + + if (this.isOnline) { + return cb(new Error('Network is already running')) + } + + if (!this.dht.isRunning || !this.dht.libp2p.isOnline) { + return cb(new Error('Can not start network')) + } + + this._online = true + + // handle incoming connections + this.libp2p.swarm.handle(c.PROTOCOL_DHT, this._rpc) + + // handle new connections + this.libp2p.on('peer:connect', this._onPeerConnected) + + cb() + } + + /** + * Stop all network activity. + * + * @param {function(Error)} callback + * @returns {void} + */ + stop (callback) { + const cb = (err) => setImmediate(() => callback(err)) + + if (!this.isOnline) { + return cb(new Error('Network is already stopped')) + } + this._online = false + this.libp2p.removeListener('peer:connect', this._onPeerConnected) + + this.libp2p.swarm.unhandle(c.PROTOCOL_DHT) + cb() + } + + /** + * Is the network online? + * + * @type {bool} + */ + get isOnline () { + return this._online + } + + /** + * Are all network components there? + * + * @type {bool} + */ + get isConnected () { + return this.dht.libp2p.isOnline && this.dht.isRunning && this.isOnline + } + + /** + * Handle new connections in the swarm. + * + * @param {PeerInfo} peer + * @returns {void} + * @private + */ + _onPeerConnected (peer) { + if (!this.isConnected) { + return this._log.error('Network is offline') + } + + this.libp2p.dial(peer, c.PROTOCOL_DHT, (err, conn) => { + if (err) { + return this._log('%s does not support protocol: %s', peer.id.toB58String(), c.PROTOCOL_DHT) + } + + // TODO: conn.close() + pull(pull.empty(), conn) + + this.dht._add(peer, (err) => { + if (err) { + return this._log.error('Failed to add to the routing table', err) + } + + this._log('added to the routing table: %s', peer.id.toB58String()) + }) + }) + } + + /** + * Send a request and record RTT for latency measurements. + * + * @param {PeerId} to - The peer that should receive a message + * @param {Message} msg - The message to send. + * @param {function(Error, Message)} callback + * @returns {void} + */ + sendRequest (to, msg, callback) { + // TODO: record latency + if (!this.isConnected) { + return callback(new Error('Network is offline')) + } + + this._log('sending to: %s', to.toB58String()) + this.dht.libp2p.dial(to, c.PROTOCOL_DHT, (err, conn) => { + if (err) { + return callback(err) + } + + this._writeReadMessage(conn, msg.serialize(), callback) + }) + } + + /** + * Sends a message without expecting an answer. + * + * @param {PeerId} to + * @param {Message} msg + * @param {function(Error)} callback + * @returns {void} + */ + sendMessage (to, msg, callback) { + if (!this.isConnected) { + return callback(new Error('Network is offline')) + } + + this._log('sending to: %s', to.toB58String()) + + this.dht.libp2p.dial(to, c.PROTOCOL_DHT, (err, conn) => { + if (err) { + return callback(err) + } + + this._writeMessage(conn, msg.serialize(), callback) + }) + } + + /** + * Write a message and read its response. + * If no response is received after the specified timeout + * this will error out. + * + * @param {Connection} conn - the connection to use + * @param {Buffer} msg - the message to send + * @param {function(Error, Message)} callback + * @returns {void} + * @private + */ + _writeReadMessage (conn, msg, callback) { + timeout( + writeReadMessage, + this.readMessageTimeout + )(conn, msg, callback) + } + + /** + * Write a message to the given connection. + * + * @param {Connection} conn - the connection to use + * @param {Buffer} msg - the message to send + * @param {function(Error)} callback + * @returns {void} + * @private + */ + _writeMessage (conn, msg, callback) { + pull( + pull.values([msg]), + lp.encode(), + conn, + lp.decode(), + pull.collect((err) => callback(err)) + ) + } +} + +function writeReadMessage (conn, msg, callback) { + pull( + pull.values([msg]), + lp.encode(), + conn, + pull.filter((msg) => msg.length < c.maxMessageSize), + lp.decode(), + pull.collect((err, res) => { + if (err) { + return callback(err) + } + if (res.length === 0) { + return callback(new Error('No message received')) + } + + let response + try { + response = Message.deserialize(res[0]) + } catch (err) { + return callback(new Error('failed to deserialize response: ' + err.message)) + } + + callback(null, response) + }) + ) +} + +module.exports = Network diff --git a/src/peer-list.js b/src/peer-list.js new file mode 100644 index 00000000..b846efd8 --- /dev/null +++ b/src/peer-list.js @@ -0,0 +1,64 @@ +'use strict' + +/** + * A list of unique peer infos. + */ +class PeerList { + constructor () { + this.list = [] + } + + /** + * Add a new info. Returns `true` if it was a new one + * + * @param {PeerInfo} info + * @returns {bool} + */ + push (info) { + if (!this.has(info)) { + this.list.push(info) + return true + } + return false + } + + /** + * Check if this PeerInfo is already in here. + * + * @param {PeerInfo} info + * @returns {bool} + */ + has (info) { + const match = this.list.find((i) => i.id.isEqual(info.id)) + return Boolean(match) + } + + /** + * Get the list as an array. + * + * @returns {Array} + */ + toArray () { + return this.list.slice() + } + + /** + * Remove the last element + * + * @returns {PeerInfo} + */ + pop () { + return this.list.pop() + } + + /** + * The length of the list + * + * @type {number} + */ + get length () { + return this.list.length + } +} + +module.exports = PeerList diff --git a/src/peer-queue.js b/src/peer-queue.js new file mode 100644 index 00000000..f1c16fe8 --- /dev/null +++ b/src/peer-queue.js @@ -0,0 +1,101 @@ +'use strict' + +const Heap = require('heap') +const distance = require('xor-distance') +const debug = require('debug') + +const utils = require('./utils') + +const log = debug('libp2p:dht:peer-queue') + +/** + * PeerQueue is a heap that sorts its entries (PeerIds) by their + * xor distance to the inital provided key. + */ +class PeerQueue { + /** + * Create from a given peer id. + * + * @param {PeerId} id + * @param {function(Error, PeerQueue)} callback + * @returns {void} + */ + static fromPeerId (id, callback) { + utils.convertPeerId(id, (err, key) => { + if (err) { + return callback(err) + } + + callback(null, new PeerQueue(key)) + }) + } + + /** + * Create from a given buffer. + * + * @param {Buffer} key + * @param {function(Error, PeerQueue)} callback + * @returns {void} + */ + static fromKey (key, callback) { + utils.convertBuffer(key, (err, key) => { + if (err) { + return callback(err) + } + + callback(null, new PeerQueue(key)) + }) + } + + /** + * Create a new PeerQueue. + * + * @param {Buffer} from - The sha2-256 encoded peer id + */ + constructor (from) { + log('create: %s', from.toString('hex')) + this.from = from + this.heap = new Heap(utils.xorCompare) + } + + /** + * Add a new PeerId to the queue. + * + * @param {PeerId} id + * @param {function(Error)} callback + * @returns {void} + */ + enqueue (id, callback) { + log('enqueue %s', id.id.toString('hex')) + utils.convertPeerId(id, (err, key) => { + if (err) { + return callback(err) + } + + const el = { + id: id, + distance: distance(this.from, key) + } + + this.heap.push(el) + callback() + }) + } + + /** + * Returns the closest peer to the `from` peer. + * + * @returns {PeerId} + */ + dequeue () { + const el = this.heap.pop() + log('dequeue %s', el.id.toB58String()) + return el.id + } + + get length () { + return this.heap.size() + } +} + +module.exports = PeerQueue diff --git a/src/private.js b/src/private.js new file mode 100644 index 00000000..768fb7f1 --- /dev/null +++ b/src/private.js @@ -0,0 +1,638 @@ +'use strict' + +const PeerId = require('peer-id') +const libp2pRecord = require('libp2p-record') +const waterfall = require('async/waterfall') +const series = require('async/series') +const each = require('async/each') +const timeout = require('async/timeout') +const times = require('async/times') +const crypto = require('libp2p-crypto') +const PeerInfo = require('peer-info') +const multihashing = require('multihashing-async') + +const utils = require('./utils') +const errors = require('./errors') +const Message = require('./message') +const c = require('./constants') +const Query = require('./query') +const LimitedPeerList = require('./limited-peer-list') + +const Record = libp2pRecord.Record + +module.exports = (dht) => ({ + /** + * Returns the routing tables closest peers, for the key of + * the message. + * + * @param {Message} msg + * @param {function(Error, Array)} callback + * @returns {undefined} + * @private + */ + _nearestPeersToQuery (msg, callback) { + utils.convertBuffer(msg.key, (err, key) => { + if (err) { + return callback(err) + } + let ids + try { + ids = dht.routingTable.closestPeers(key, dht.ncp) + } catch (err) { + return callback(err) + } + + callback(null, ids.map((p) => { + if (dht.peerBook.has(p)) { + return dht.peerBook.get(p) + } else { + return dht.peerBook.put(new PeerInfo(p)) + } + })) + }) + }, + /** + * Get the nearest peers to the given query, but iff closer + * than self. + * + * @param {Message} msg + * @param {PeerInfo} peer + * @param {function(Error, Array)} callback + * @returns {undefined} + * @private + */ + _betterPeersToQuery (msg, peer, callback) { + dht._log('betterPeersToQuery') + dht._nearestPeersToQuery(msg, (err, closer) => { + if (err) { + return callback(err) + } + + const filtered = closer.filter((closer) => { + if (dht._isSelf(closer.id)) { + // Should bail, not sure + dht._log.error('trying to return self as closer') + return false + } + + return !closer.id.isEqual(peer.id) + }) + + callback(null, filtered) + }) + }, + /** + * Try to fetch a given record by from the local datastore. + * Returns the record iff it is still valid, meaning + * - it was either authored by this node, or + * - it was receceived less than `MAX_RECORD_AGE` ago. + * + * @param {Buffer} key + * @param {function(Error, Record)} callback + * @returns {undefined} + * + *@private + */ + _checkLocalDatastore (key, callback) { + dht._log('checkLocalDatastore: %s', key) + const dsKey = utils.bufferToKey(key) + + // 2. fetch value from ds + dht.datastore.has(dsKey, (err, exists) => { + if (err) { + return callback(err) + } + if (!exists) { + return callback() + } + + dht.datastore.get(dsKey, (err, res) => { + if (err) { + return callback(err) + } + + const rawRecord = res + + // 4. create record from the returned bytes + let record + try { + record = Record.deserialize(rawRecord) + } catch (err) { + return callback(err) + } + + if (!record) { + return callback(new Error('Invalid record')) + } + + // 5. check validity + + // 5. if: we are the author, all good + if (record.author.isEqual(dht.self.id)) { + return callback(null, record) + } + + // else: compare recvtime with maxrecordage + if (record.timeReceived == null || + utils.now() - record.timeReceived > c.MAX_RECORD_AGE) { + // 6. if: record is bad delete it and return + return dht.datastore.delete(key, callback) + } + + // else: return good record + callback(null, record) + }) + }) + }, + /** + * Add the peer to the routing table and update it in the peerbook. + * + * @param {PeerInfo} peer + * @param {function(Error)} callback + * @returns {undefined} + * + * @private + */ + _add (peer, callback) { + peer = dht.peerBook.put(peer) + dht.routingTable.add(peer.id, callback) + }, + /** + * Verify a record without searching the DHT. + * + * @param {Record} record + * @param {function(Error)} callback + * @returns {undefined} + * + * @private + */ + _verifyRecordLocally (record, callback) { + dht._log('verifyRecordLocally') + series([ + (cb) => { + if (record.signature) { + const peer = record.author + let info + if (dht.peerBook.has(peer)) { + info = dht.peerBook.get(peer) + } + + if (!info || !info.id.pubKey) { + return callback(new Error('Missing public key for: ' + peer.toB58String())) + } + + record.verifySignature(info.id.pubKey, cb) + } else { + cb() + } + }, + (cb) => libp2pRecord.validator.verifyRecord( + dht.validators, + record, + cb + ) + ], callback) + }, + /** + * Find close peers for a given peer + * + * @param {Buffer} key + * @param {PeerId} peer + * @param {function(Error)} callback + * @returns {void} + * + * @private + */ + _closerPeersSingle (key, peer, callback) { + dht._log('_closerPeersSingle %s from %s', key, peer.toB58String()) + dht._findPeerSingle(peer, new PeerId(key), (err, msg) => { + if (err) { + return callback(err) + } + + const out = msg.closerPeers + .filter((pInfo) => !dht._isSelf(pInfo.id)) + .map((pInfo) => dht.peerBook.put(pInfo)) + + callback(null, out) + }) + }, + /** + * Is the given peer id the peer id? + * + * @param {PeerId} other + * @returns {bool} + * + * @private + */ + _isSelf (other) { + return other && dht.self.id.id.equals(other.id) + }, + /** + * Ask peer `peer` if they know where the peer with id `target` is. + * + * @param {PeerId} peer + * @param {PeerId} target + * @param {function(Error)} callback + * @returns {void} + * + * @private + */ + _findPeerSingle (peer, target, callback) { + dht._log('_findPeerSingle %s', peer.toB58String()) + const msg = new Message(Message.TYPES.FIND_NODE, target.id, 0) + dht.network.sendRequest(peer, msg, callback) + }, + /** + * Store the given key/value pair at the peer `target`. + * + * @param {Buffer} key + * @param {Buffer} rec - encoded record + * @param {PeerId} target + * @param {function(Error)} callback + * @returns {void} + * + * @private + */ + _putValueToPeer (key, rec, target, callback) { + const msg = new Message(Message.TYPES.PUT_VALUE, key, 0) + msg.record = rec + + dht.network.sendRequest(target, msg, (err, resp) => { + if (err) { + return callback(err) + } + + if (!resp.record.value.equals(Record.deserialize(rec).value)) { + return callback(new Error('value not put correctly')) + } + + callback() + }) + }, + /** + * Store the given key/value pair locally, in the datastore. + * @param {Buffer} key + * @param {Buffer} rec - encoded record + * @param {function(Error)} callback + * @returns {void} + * + * @private + */ + _putLocal (key, rec, callback) { + dht.datastore.put(utils.bufferToKey(key), rec, callback) + }, + /** + * Get the value to the given key. + * + * @param {Buffer} key + * @param {number} maxTimeout + * @param {function(Error, Buffer)} callback + * @returns {void} + * + * @private + */ + _get (key, maxTimeout, callback) { + dht._log('_get %s', key.toString()) + waterfall([ + (cb) => dht.getMany(key, 16, maxTimeout, cb), + (vals, cb) => { + const recs = vals.map((v) => v.val) + const i = libp2pRecord.selection.bestRecord(dht.selectors, key, recs) + const best = recs[i] + dht._log('GetValue %s %s', key.toString(), best) + + if (!best) { + return cb(new errors.NotFoundError()) + } + + // Send out correction record + waterfall([ + (cb) => utils.createPutRecord(key, best, dht.self.id, true, cb), + (fixupRec, cb) => each(vals, (v, cb) => { + // no need to do anything + if (v.val.equals(best)) { + return cb() + } + + // correct ourself + if (dht._isSelf(v.from)) { + return dht._putLocal(key, fixupRec, (err) => { + if (err) { + dht._log.error('Failed error correcting self', err) + } + cb() + }) + } + + // send correction + dht._putValueToPeer(v.from, key, fixupRec, (err) => { + if (err) { + dht._log.error('Failed error correcting entry', err) + } + cb() + }) + }, cb) + ], (err) => cb(err, err ? null : best)) + } + ], callback) + }, + /** + * Attempt to retrieve the value for the given key from + * the local datastore. + * + * @param {Buffer} key + * @param {function(Error, Record)} callback + * @returns {void} + * + * @private + */ + _getLocal (key, callback) { + dht._log('getLocal %s', key) + + waterfall([ + (cb) => dht.datastore.get(utils.bufferToKey(key), cb), + (raw, cb) => { + dht._log('found %s in local datastore', key) + let rec + try { + rec = Record.deserialize(raw) + } catch (err) { + return cb(err) + } + + dht._verifyRecordLocally(rec, (err) => { + if (err) { + return cb(err) + } + + cb(null, rec) + }) + } + ], callback) + }, + /** + * Query a particular peer for the value for the given key. + * It will either return the value or a list of closer peers. + * + * Note: The peerbook is updated with new addresses found for the given peer. + * + * @param {PeerId} peer + * @param {Buffer} key + * @param {function(Error, Redcord, Array)} callback + * @returns {void} + * + * @private + */ + _getValueOrPeers (peer, key, callback) { + waterfall([ + (cb) => dht._getValueSingle(peer, key, cb), + (msg, cb) => { + const peers = msg.closerPeers + const record = msg.record + + if (record) { + // We have a record + return dht._verifyRecordOnline(record, (err) => { + if (err) { + dht._log('invalid record received, discarded') + return cb(new errors.InvalidRecordError()) + } + + return cb(null, record, peers) + }) + } + + if (peers.length > 0) { + return cb(null, null, peers) + } + + cb(new errors.NotFoundError('Not found')) + } + ], callback) + }, + /** + * Get a value via rpc call for the given parameters. + * + * @param {PeerId} peer + * @param {Buffer} key + * @param {function(Error, Message)} callback + * @returns {void} + * + * @private + */ + _getValueSingle (peer, key, callback) { + const msg = new Message(Message.TYPES.GET_VALUE, key, 0) + dht.network.sendRequest(peer, msg, callback) + }, + /** + * Verify a record, fetching missing public keys from the network. + * Calls back with an error if the record is invalid. + * + * @param {Record} record + * @param {function(Error)} callback + * @returns {void} + * + * @private + */ + _verifyRecordOnline (record, callback) { + series([ + (cb) => { + if (record.signature) { + // fetch the public key + waterfall([ + (cb) => dht.getPublicKey(record.author, cb), + (pk, cb) => record.verifySignature(pk, cb) + ], cb) + } else { + cb() + } + }, + (cb) => { + libp2pRecord.validator.verifyRecord(dht.validators, record, cb) + } + ], callback) + }, + /** + * Get the public key directly from a node. + * + * @param {PeerId} peer + * @param {function(Error, PublicKey)} callback + * @returns {void} + * + * @private + */ + _getPublicKeyFromNode (peer, callback) { + const pkKey = utils.keyForPublicKey(peer) + waterfall([ + (cb) => dht._getValueSingle(peer, pkKey, cb), + (msg, cb) => { + if (!msg.record || !msg.record.value) { + return cb(new Error('Node not responding with its public key: ' + peer.toB58String())) + } + + PeerId.createFromPubKey(msg.record.value, cb) + }, + (recPeer, cb) => { + // compare hashes of the pub key + if (!recPeer.isEqual(peer)) { + return cb(new Error('public key does not match id')) + } + + cb(null, recPeer.pubKey) + } + ], callback) + }, + /** + * Search the dht for up to `n` providers of the given CID. + * + * @param {CID} key + * @param {number} maxTimeout - How long the query should maximally run in milliseconds. + * @param {number} n + * @param {function(Error, Array)} callback + * @returns {void} + * + * @private + */ + _findNProviders (key, maxTimeout, n, callback) { + let out = new LimitedPeerList(n) + + dht.providers.getProviders(key, (err, provs) => { + if (err) { + return callback(err) + } + + provs.forEach((id) => { + let info + if (dht.peerBook.has(id)) { + info = dht.peerBook.get(id) + } else { + info = dht.peerBook.put(new PeerInfo(id)) + } + out.push(info) + }) + + // All done + if (out.length >= n) { + return callback(null, out.toArray()) + } + + // need more, query the network + const query = new Query(dht, key.buffer, (peer, cb) => { + waterfall([ + (cb) => dht._findProvidersSingle(peer, key, cb), + (msg, cb) => { + const provs = msg.providerPeers + dht._log('(%s) found %s provider entries', dht.self.id.toB58String(), provs.length) + + provs.forEach((prov) => { + out.push(dht.peerBook.put(prov)) + }) + + // hooray we have all that we want + if (out.length >= n) { + return cb(null, {success: true}) + } + + // it looks like we want some more + cb(null, { + closerPeers: msg.closerPeers + }) + } + ], cb) + }) + + const peers = dht.routingTable.closestPeers(key.buffer, c.ALPHA) + + timeout((cb) => query.run(peers, cb), maxTimeout)((err) => { + if (err) { + if (err.code === 'ETIMEDOUT' && out.length > 0) { + return callback(null, out.toArray()) + } + return callback(err) + } + + callback(null, out.toArray()) + }) + }) + }, + /** + * Check for providers from a single node. + * + * @param {PeerId} peer + * @param {CID} key + * @param {function(Error, Message)} callback + * @returns {void} + * + * @private + */ + _findProvidersSingle (peer, key, callback) { + const msg = new Message(Message.TYPES.GET_PROVIDERS, key.buffer, 0) + dht.network.sendRequest(peer, msg, callback) + }, + /** + * Do the bootstrap work. + * + * @param {number} queries + * @param {number} maxTimeout + * @returns {void} + * + * @private + */ + _bootstrap (queries, maxTimeout) { + dht._log('bootstrap:start') + times(queries, (i, cb) => { + waterfall([ + (cb) => this._generateBootstrapId(cb), + (id, cb) => timeout((cb) => { + this._bootstrapQuery(id, cb) + }, maxTimeout)(cb) + ], (err) => { + if (err) { + dht._log.error('bootstrap:error', err) + } + dht._log('bootstrap:done') + }) + }) + }, + /** + * The query run during a bootstrap request. + * + * @param {PeerId} id + * @param {function(Error)} callback + * @returns {void} + * + * @private + */ + _bootstrapQuery (id, callback) { + dht._log('bootstrap:query:%s', id.toB58String()) + this.findPeer(id, (err, peer) => { + if (err instanceof errors.NotFoundError) { + // expected case, we asked for random stuff after all + return callback() + } + if (err) { + return callback(err) + } + dht._log('bootstrap:query:found', err, peer) + // wait what, there was something found? + callback(new Error(`Bootstrap peer: ACTUALLY FOUND PEER: ${peer}, ${id.toB58String()}`)) + }) + }, + /** + * Generate a random peer id for bootstrapping purposes. + * + * @param {function(Error, PeerId)} callback + * @returns {void} + * + * @private + */ + _generateBootstrapId (callback) { + multihashing(crypto.randomBytes(16), 'sha2-256', (err, digest) => { + if (err) { + return callback(err) + } + callback(null, new PeerId(digest)) + }) + } +}) diff --git a/src/providers.js b/src/providers.js new file mode 100644 index 00000000..25eca02f --- /dev/null +++ b/src/providers.js @@ -0,0 +1,328 @@ +'use strict' + +const cache = require('hashlru') +const varint = require('varint') +const each = require('async/each') +const pull = require('pull-stream') +const CID = require('cids') +const PeerId = require('peer-id') +const Key = require('interface-datastore').Key + +const c = require('./constants') +const utils = require('./utils') + +/** + * This class manages known providers. + * A provider is a peer that we know to have the content for a given CID. + * + * Every `cleanupInterval` providers are checked if they + * are still valid, i.e. younger than the `provideValidity`. + * If they are not, they are deleted. + * + * To ensure the list survives restarts of the daemon, + * providers are stored in the datastore, but to ensure + * access is fast there is an LRU cache in front of that. + */ +class Providers { + /** + * @param {Object} datastore + * @param {PeerId} [self] + * @param {number} [cacheSize=256] + */ + constructor (datastore, self, cacheSize) { + this.datastore = datastore + + this._log = utils.logger(self, 'providers') + + /** + * How often invalid records are cleaned. (in seconds) + * + * @type {number} + */ + this.cleanupInterval = c.PROVIDERS_CLEANUP_INTERVAL + + /** + * How long is a provider valid for. (in seconds) + * + * @type {number} + */ + this.provideValidity = c.PROVIDERS_VALIDITY + + /** + * LRU cache size + * + * @type {number} + */ + this.lruCacheSize = cacheSize || c.PROVIDERS_LRU_CACHE_SIZE + + this.providers = cache(this.lruCacheSize) + } + + /** + * Check all providers if they are still valid, and if not + * delete them. + * + * @returns {undefined} + * + * @private + */ + _cleanup () { + this._getProviderCids((err, cids) => { + if (err) { + return this._log.error('Failed to get cids', err) + } + + each(cids, (cid, cb) => { + this._getProvidersMap(cid, (err, provs) => { + if (err) { + return cb(err) + } + + provs.forEach((time, provider) => { + this._log('comparing: %s - %s > %s', Date.now(), time, this.provideValidity) + if (Date.now() - time > this.provideValidity) { + provs.delete(provider) + } + }) + + if (provs.size === 0) { + return this._deleteProvidersMap(cid, cb) + } + + cb() + }) + }, (err) => { + if (err) { + return this._log.error('Failed to cleanup', err) + } + + this._log('Cleanup successfull') + }) + }) + } + + /** + * Get a list of all cids that providers are known for. + * + * @param {function(Error, Array)} callback + * @returns {undefined} + * + * @private + */ + _getProviderCids (callback) { + pull( + this.datastore.query({prefix: c.PROVIDERS_KEY_PREFIX}), + pull.map((entry) => { + const parts = entry.key.toString().split('/') + if (parts.length !== 4) { + this._log.error('incorrectly formatted provider entry in datastore: %s', entry.key) + return + } + + let decoded + try { + decoded = utils.decodeBase32(parts[2]) + } catch (err) { + this._log.error('error decoding base32 provider key: %s', parts[2]) + return + } + + let cid + try { + cid = new CID(decoded) + } catch (err) { + this._log.error('error converting key to cid from datastore: %s', err.message) + } + + return cid + }), + pull.filter(Boolean), + pull.collect(callback) + ) + } + + /** + * Get the currently known provider maps for a given CID. + * + * @param {CID} cid + * @param {function(Error, Map)} callback + * @returns {undefined} + * + * @private + */ + _getProvidersMap (cid, callback) { + const provs = this.providers.get(makeProviderKey(cid)) + + if (!provs) { + return loadProviders(this.datastore, cid, callback) + } + + callback(null, provs) + } + + /** + * Completely remove a providers map entry for a given CID. + * + * @param {CID} cid + * @param {function(Error)} callback + * @returns {undefined} + * + * @private + */ + _deleteProvidersMap (cid, callback) { + const dsKey = makeProviderKey(cid) + this.providers.set(dsKey, null) + const batch = this.datastore.batch() + + pull( + this.datastore.query({ + keysOnly: true, + prefix: dsKey + }), + pull.through((entry) => batch.delete(entry.key)), + pull.onEnd((err) => { + if (err) { + return callback(err) + } + batch.commit(callback) + }) + ) + } + + get cleanupInterval () { + return this._cleanupInterval + } + + set cleanupInterval (val) { + this._cleanupInterval = val + + if (this._cleaner) { + clearInterval(this._cleaner) + } + + this._cleaner = setInterval( + () => this._cleanup(), + this.cleanupInterval + ) + } + + /** + * Add a new provider. + * + * @param {CID} cid + * @param {PeerId} provider + * @param {function(Error)} callback + * @returns {undefined} + */ + addProvider (cid, provider, callback) { + this._log('addProvider %s', cid.toBaseEncodedString()) + const dsKey = makeProviderKey(cid) + const provs = this.providers.get(dsKey) + + const next = (err, provs) => { + if (err) { + return callback(err) + } + + this._log('loaded %s provs', provs.size) + const now = Date.now() + provs.set(provider, now) + + this.providers.set(dsKey, provs) + writeProviderEntry(this.datastore, cid, provider, now, callback) + } + + if (!provs) { + loadProviders(this.datastore, cid, next) + } else { + next(null, provs) + } + } + + /** + * Get a list of providers for the given CID. + * + * @param {CID} cid + * @param {function(Error, Array)} callback + * @returns {undefined} + */ + getProviders (cid, callback) { + this._log('getProviders %s', cid.toBaseEncodedString()) + this._getProvidersMap(cid, (err, provs) => { + if (err) { + return callback(err) + } + + callback(null, Array.from(provs.keys())) + }) + } +} + +/** + * Encode the given key its matching datastore key. + * + * @param {CID} cid + * @returns {string} + * + * @private + */ +function makeProviderKey (cid) { + return c.PROVIDERS_KEY_PREFIX + utils.encodeBase32(cid.buffer) +} + +/** + * Write a provider into the given store. + * + * @param {Datastore} store + * @param {CID} cid + * @param {PeerId} peer + * @param {number} time + * @param {function(Error)} callback + * @returns {undefined} + * + * @private + */ +function writeProviderEntry (store, cid, peer, time, callback) { + const dsKey = [ + makeProviderKey(cid), + '/', + utils.encodeBase32(peer.id) + ].join('') + + store.put(new Key(dsKey), new Buffer(varint.encode(time)), callback) +} + +/** + * Load providers from the store. + * + * @param {Datastore} store + * @param {CID} cid + * @param {function(Error, Map)} callback + * @returns {undefined} + * + * @private + */ +function loadProviders (store, cid, callback) { + pull( + store.query({prefix: makeProviderKey(cid)}), + pull.map((entry) => { + const parts = entry.key.toString().split('/') + const lastPart = parts[parts.length - 1] + const rawPeerId = utils.decodeBase32(lastPart) + return [new PeerId(rawPeerId), readTime(entry.value)] + }), + pull.collect((err, res) => { + if (err) { + return callback(err) + } + + return callback(null, new Map(res)) + }) + ) +} + +function readTime (buf) { + return varint.decode(buf) +} + +module.exports = Providers diff --git a/src/query.js b/src/query.js new file mode 100644 index 00000000..0c6003b8 --- /dev/null +++ b/src/query.js @@ -0,0 +1,193 @@ +'use strict' + +const waterfall = require('async/waterfall') +const each = require('async/each') +const queue = require('async/queue') + +const c = require('./constants') +const PeerQueue = require('./peer-queue') +const utils = require('./utils') + +/** + * Query peers from closest to farthest away. + */ +class Query { + /** + * Create a new query. + * + * @param {DHT} dht - DHT instance + * @param {Buffer} key + * @param {function(PeerId, function(Error, Object))} query - The query function to exectue + * + */ + constructor (dht, key, query) { + this.dht = dht + this.key = key + this.query = query + this.concurrency = c.ALPHA + this._log = utils.logger(this.dht.self.id, 'query:' + key.toString()) + } + + /** + * Run this query, start with the given list of peers first. + * + * @param {Array} peers + * @param {function(Error, Object)} callback + * @returns {void} + */ + run (peers, callback) { + const run = { + peersSeen: new Set(), + errors: [], + peersToQuery: null + } + + if (peers.length === 0) { + this._log.error('Running query with no peers') + return callback() + } + + waterfall([ + (cb) => PeerQueue.fromKey(this.key, cb), + (q, cb) => { + run.peersToQuery = q + each(peers, (p, cb) => addPeerToQuery(p, this.dht, run, cb), cb) + }, + (cb) => workerQueue(this, run, cb) + ], (err) => { + this._log('query:done') + if (err) { + return callback(err) + } + + if (run.errors.length === run.peersSeen.size) { + return callback(run.errors[0]) + } + if (run.res && run.res.success) { + run.res.finalSet = run.peersSeen + return callback(null, run.res) + } + + callback(null, { + finalSet: run.peersSeen + }) + }) + } +} + +/** + * Use the queue from async to keep `concurrency` amount items running. + * + * @param {Query} query + * @param {Object} run + * @param {function(Error)} callback + * @returns {void} + */ +function workerQueue (query, run, callback) { + let killed = false + const q = queue((next, cb) => { + query._log('queue:work') + execQuery(next, query, run, (err, done) => { + // Ignore after kill + if (killed) { + return cb() + } + query._log('queue:work:done', err, done) + if (err) { + return cb(err) + } + if (done) { + q.kill() + killed = true + return callback() + } + cb() + }) + }, query.concurrency) + + const fill = () => { + query._log('queue:fill') + while (q.length() < query.concurrency && + run.peersToQuery.length > 0) { + q.push(run.peersToQuery.dequeue()) + } + } + + fill() + + // callback handling + q.error = (err) => { + query._log.error('queue', err) + callback(err) + } + + q.drain = () => { + query._log('queue:drain') + callback() + } + + q.unsaturated = () => { + query._log('queue:unsatured') + fill() + } + + q.buffer = 0 +} + +/** + * Execute a query on the `next` peer. + * + * @param {PeerId} next + * @param {Query} query + * @param {Object} run + * @param {function(Error)} callback + * @returns {void} + * @private + */ +function execQuery (next, query, run, callback) { + query.query(next, (err, res) => { + if (err) { + run.errors.push(err) + callback() + } else if (res.success) { + run.res = res + callback(null, true) + } else if (res.closerPeers && res.closerPeers.length > 0) { + each(res.closerPeers, (closer, cb) => { + // don't add ourselves + if (query.dht._isSelf(closer.id)) { + return cb() + } + closer = query.dht.peerBook.put(closer) + addPeerToQuery(closer.id, query.dht, run, cb) + }, callback) + } else { + callback() + } + }) +} + +/** + * Add a peer to the peers to be queried. + * + * @param {PeerId} next + * @param {DHT} dht + * @param {Object} run + * @param {function(Error)} callback + * @returns {void} + * @private + */ +function addPeerToQuery (next, dht, run, callback) { + if (dht._isSelf(next)) { + return callback() + } + + if (run.peersSeen.has(next)) { + return callback() + } + + run.peersSeen.add(next) + run.peersToQuery.enqueue(next, callback) +} + +module.exports = Query diff --git a/src/routing.js b/src/routing.js new file mode 100644 index 00000000..38e656e2 --- /dev/null +++ b/src/routing.js @@ -0,0 +1,153 @@ +'use strict' + +const KBucket = require('k-bucket') + +const utils = require('./utils') + +/** + * A wrapper around `k-bucket`, to provide easy store and + * retrival for peers. + */ +class RoutingTable { + /** + * @param {PeerId} self + * @param {number} kBucketSize + */ + constructor (self, kBucketSize) { + this.self = self + this._onPing = this._onPing.bind(this) + + utils.convertPeerId(self, (err, selfKey) => { + if (err) { + throw err + } + + this.kb = new KBucket({ + localNodeId: selfKey, + numberOfNodesPerKBucket: kBucketSize, + numberOfNodesToPing: 1 + }) + + this.kb.on('ping', this._onPing) + }) + } + + // -- Private Methods + + /** + * Called on the `ping` event from `k-bucket`. + * Currently this just removes the oldest contact from + * the list, without acutally pinging the individual peers. + * This is the same as go does, but should probably + * be upgraded to actually ping the individual peers. + * + * @param {Array} oldContacts + * @param {Object} newContact + * @returns {undefined} + * @private + */ + _onPing (oldContacts, newContact) { + // just use the first one (k-bucket sorts from oldest to newest) + const oldest = oldContacts[0] + + // remove the oldest one + this.kb.remove(oldest.id) + + // add the new one + this.kb.add(newContact) + } + + // -- Public Interface + + /** + * Amount of currently stored peers. + * + * @type {number} + */ + get size () { + return this.kb.count() + } + + /** + * Find a specific peer by id. + * + * @param {PeerId} peer + * @param {function(Error, PeerId)} callback + * @returns {void} + */ + find (peer, callback) { + utils.convertPeerId(peer, (err, key) => { + if (err) { + return callback(err) + } + const closest = this.closestPeer(key) + + if (closest && closest.isEqual(peer)) { + return callback(null, closest) + } + + callback() + }) + } + + /** + * Retrieve the closest peers to the given key. + * + * @param {Buffer} key + * @param {number} count + * @returns {PeerId|undefined} + */ + closestPeer (key, count) { + const res = this.closestPeers(key, 1) + if (res.length > 0) { + return res[0] + } + } + + /** + * Retrieve the `count`-closest peers to the given key. + * + * @param {Buffer} key + * @param {number} count + * @returns {Array} + */ + closestPeers (key, count) { + return this.kb.closest(key, count).map((p) => p.peer) + } + + /** + * Add or update the routing table with the given peer. + * + * @param {PeerId} peer + * @param {function(Error)} callback + * @returns {undefined} + */ + add (peer, callback) { + utils.convertPeerId(peer, (err, id) => { + if (err) { + return callback(err) + } + this.kb.add({ id: id, peer: peer }) + callback() + }) + } + + /** + * Remove a given peer from the table. + * + * @param {PeerId} peer + * @param {function(Error)} callback + * @returns {undefined} + */ + remove (peer, callback) { + utils.convertPeerId(peer, (err, id) => { + if (err) { + return callback(err) + } + this.kb.remove(id) + callback() + }) + } +} + +module.exports = RoutingTable diff --git a/src/rpc/handlers/add-provider.js b/src/rpc/handlers/add-provider.js new file mode 100644 index 00000000..b14bdd90 --- /dev/null +++ b/src/rpc/handlers/add-provider.js @@ -0,0 +1,52 @@ +'use strict' + +const CID = require('cids') + +const utils = require('../../utils') + +module.exports = (dht) => { + const log = utils.logger(dht.self.id, 'rpc:add-provider') + /** + * Process `AddProvider` DHT messages. + * + * @param {PeerInfo} peer + * @param {Message} msg + * @param {function(Error, Message)} callback + * @returns {undefined} + */ + return function addProvider (peer, msg, callback) { + log('start') + + if (!msg.key || msg.key.length === 0) { + return callback(new Error('Missing key')) + } + + let cid + try { + cid = new CID(msg.key) + } catch (err) { + return callback(new Error('Invalid CID: ' + err.message)) + } + + msg.providerPeers.forEach((pi) => { + // Ignore providers not from the originator + if (!pi.id.isEqual(peer.id)) { + log('invalid provider peer %s from %s', pi.id.toB58String(), peer.id.toB58String()) + return + } + + if (pi.multiaddrs.size < 1) { + log('no valid addresses for provider %s. Ignore', peer.id.toB58String()) + return + } + + log('received provider %s for %s (addrs %s)', peer.id.toB58String(), cid.toBaseEncodedString(), pi.multiaddrs.toArray().map((m) => m.toString())) + + if (!dht._isSelf(pi.id)) { + dht.peerBook.put(pi) + } + }) + + dht.providers.addProvider(cid, peer.id, callback) + } +} diff --git a/src/rpc/handlers/find-node.js b/src/rpc/handlers/find-node.js new file mode 100644 index 00000000..db460962 --- /dev/null +++ b/src/rpc/handlers/find-node.js @@ -0,0 +1,43 @@ +'use strict' + +const waterfall = require('async/waterfall') + +const Message = require('../../message') +const utils = require('../../utils') + +module.exports = (dht) => { + const log = utils.logger(dht.self.id, 'rpc:find-node') + + /** + * Process `FindNode` DHT messages. + * + * @param {PeerInfo} peer + * @param {Message} msg + * @param {function(Error, Message)} callback + * @returns {undefined} + */ + return function findNode (peer, msg, callback) { + log('start') + + waterfall([ + (cb) => { + if (msg.key.equals(dht.self.id.id)) { + return cb(null, [dht.self]) + } + + dht._betterPeersToQuery(msg, peer, cb) + }, + (closer, cb) => { + const response = new Message(msg.type, new Buffer(0), msg.clusterLevel) + + if (closer.length > 0) { + response.closerPeers = closer + } else { + log('handle FindNode %s: could not find anything', peer.id.toB58String()) + } + + cb(null, response) + } + ], callback) + } +} diff --git a/src/rpc/handlers/get-providers.js b/src/rpc/handlers/get-providers.js new file mode 100644 index 00000000..15419890 --- /dev/null +++ b/src/rpc/handlers/get-providers.js @@ -0,0 +1,77 @@ +'use strict' + +const CID = require('cids') +const parallel = require('async/parallel') +const PeerInfo = require('peer-info') + +const Message = require('../../message') +const utils = require('../../utils') + +module.exports = (dht) => { + const log = utils.logger(dht.self.id, 'rpc:get-providers') + + /** + * Process `GetProviders` DHT messages. + * + * @param {PeerInfo} peer + * @param {Message} msg + * @param {function(Error, Message)} callback + * @returns {undefined} + */ + return function getProviders (peer, msg, callback) { + let cid + try { + cid = new CID(msg.key) + } catch (err) { + return callback(new Error('Invalid CID: ' + err.message)) + } + + log('%s', cid.toBaseEncodedString()) + + const dsKey = utils.bufferToKey(cid.buffer) + + parallel([ + (cb) => dht.datastore.has(dsKey, (err, exists) => { + if (err) { + log.error('Failed to check datastore existence', err) + return cb(null, false) + } + + cb(null, exists) + }), + (cb) => dht.providers.getProviders(cid, cb), + (cb) => dht._betterPeersToQuery(msg, peer, cb) + ], (err, res) => { + if (err) { + return callback(err) + } + const has = res[0] + const closer = res[2] + const providers = res[1].map((p) => { + if (dht.peerBook.has(p)) { + return dht.peerBook.get(p) + } + + return dht.peerBook.put(new PeerInfo(p)) + }) + + if (has) { + providers.push(dht.self) + } + + const response = new Message(msg.type, msg.key, msg.clusterLevel) + + if (providers.length > 0) { + response.providerPeers = providers + } + + if (closer.length > 0) { + response.closerPeers = closer + } + + log('got %s providers %s closerPeers', providers.length, closer.length) + + callback(null, response) + }) + } +} diff --git a/src/rpc/handlers/get-value.js b/src/rpc/handlers/get-value.js new file mode 100644 index 00000000..ce9f7127 --- /dev/null +++ b/src/rpc/handlers/get-value.js @@ -0,0 +1,73 @@ +'use strict' + +const parallel = require('async/parallel') +const Record = require('libp2p-record').Record + +const Message = require('../../message') +const utils = require('../../utils') + +module.exports = (dht) => { + const log = utils.logger(dht.self.id, 'rpc:get-value') + + /** + * Process `GetValue` DHT messages. + * + * @param {PeerInfo} peer + * @param {Message} msg + * @param {function(Error, Message)} callback + * @returns {undefined} + */ + return function getValue (peer, msg, callback) { + const key = msg.key + + log('key: %s', key) + + if (!key || key.length === 0) { + return callback(new Error('Invalid key')) + } + + const response = new Message(Message.TYPES.GET_VALUE, key, msg.clusterLevel) + + if (utils.isPublicKeyKey(key)) { + log('is public key') + const id = utils.fromPublicKeyKey(key) + let info + + if (dht._isSelf(id)) { + info = dht.self + } else if (dht.peerBook.has(id)) { + info = dht.peerBook.get(id) + } + + if (info && info.id.pubKey) { + log('returning found public key') + response.record = new Record(key, info.id.pubKey.bytes, dht.self.id) + return callback(null, response) + } + } + + parallel([ + (cb) => dht._checkLocalDatastore(key, cb), + (cb) => dht._betterPeersToQuery(msg, peer, cb) + ], (err, res) => { + if (err) { + return callback(err) + } + + const record = res[0] + const closer = res[1] + + if (record) { + log('got record') + response.record = record + } + + if (closer.length > 0) { + log('got closer %s', closer.length) + response.closerPeers = closer + } + + callback(null, response) + }) + } +} diff --git a/src/rpc/handlers/index.js b/src/rpc/handlers/index.js new file mode 100644 index 00000000..5c0d909c --- /dev/null +++ b/src/rpc/handlers/index.js @@ -0,0 +1,27 @@ +'use strict' + +const T = require('../../message').TYPES + +module.exports = (dht) => { + const handlers = { + [T.GET_VALUE]: require('./get-value')(dht), + [T.PUT_VALUE]: require('./put-value')(dht), + [T.FIND_NODE]: require('./find-node')(dht), + [T.ADD_PROVIDER]: require('./add-provider')(dht), + [T.GET_PROVIDERS]: require('./get-providers')(dht), + [T.PING]: require('./ping')(dht) + } + + /** + * Get the message handler matching the passed in type. + * + * @param {number} type + * + * @returns {function(PeerInfo, Message, function(Error, Message))} + * + * @private + */ + return function getMessageHandler (type) { + return handlers[type] + } +} diff --git a/src/rpc/handlers/ping.js b/src/rpc/handlers/ping.js new file mode 100644 index 00000000..831b26f2 --- /dev/null +++ b/src/rpc/handlers/ping.js @@ -0,0 +1,20 @@ +'use strict' + +const utils = require('../../utils') + +module.exports = (dht) => { + const log = utils.logger(dht.self.id, 'rpc:ping') + + /** + * Process `Ping` DHT messages. + * + * @param {PeerInfo} peer + * @param {Message} msg + * @param {function(Error, Message)} callback + * @returns {undefined} + */ + return function ping (peer, msg, callback) { + log('from %s', peer.id.toB58String()) + callback(null, msg) + } +} diff --git a/src/rpc/handlers/put-value.js b/src/rpc/handlers/put-value.js new file mode 100644 index 00000000..559cc397 --- /dev/null +++ b/src/rpc/handlers/put-value.js @@ -0,0 +1,46 @@ +'use strict' + +const utils = require('../../utils') + +module.exports = (dht) => { + const log = utils.logger(dht.self.id, 'rpc:put-value') + + /** + * Process `PutValue` DHT messages. + * + * @param {PeerInfo} peer + * @param {Message} msg + * @param {function(Error, Message)} callback + * @returns {undefined} + */ + return function putValue (peer, msg, callback) { + const key = msg.key + log('key: %s', key) + + const record = msg.record + + if (!record) { + log.error('Got empty record from: %s', peer.id.toB58String()) + return callback(new Error('Empty record')) + } + + dht._verifyRecordLocally(record, (err) => { + if (err) { + log.error(err.message) + return callback(err) + } + + record.timeReceived = new Date() + + const key = utils.bufferToKey(record.key) + + dht.datastore.put(key, record.serialize(), (err) => { + if (err) { + return callback(err) + } + + callback(null, msg) + }) + }) + } +} diff --git a/src/rpc/index.js b/src/rpc/index.js new file mode 100644 index 00000000..3b845457 --- /dev/null +++ b/src/rpc/index.js @@ -0,0 +1,97 @@ +'use strict' + +const pull = require('pull-stream') +const lp = require('pull-length-prefixed') + +const Message = require('../message') +const handlers = require('./handlers') +const utils = require('../utils') +const c = require('../constants') + +module.exports = (dht) => { + const log = utils.logger(dht.self.id, 'rpc') + + const getMessageHandler = handlers(dht) + /** + * Process incoming DHT messages. + * + * @param {PeerInfo} peer + * @param {Message} msg + * @param {function(Error, Message)} callback + * @returns {void} + * + * @private + */ + function handleMessage (peer, msg, callback) { + // update the peer + dht._add(peer, (err) => { + if (err) { + log.error('Failed to update the kbucket store') + log.error(err) + } + + // get handler & exectue it + const handler = getMessageHandler(msg.type) + + if (!handler) { + log.error(`no handler found for message type: ${msg.type}`) + return callback() + } + + handler(peer, msg, callback) + }) + } + + /** + * Handle incoming streams from the swarm, on the dht protocol. + * + * @param {string} protocol + * @param {Connection} conn + * @returns {undefined} + */ + return function protocolHandler (protocol, conn) { + conn.getPeerInfo((err, peer) => { + if (err) { + log.error('Failed to get peer info') + log.error(err) + return + } + + log('from: %s', peer.id.toB58String()) + + pull( + conn, + lp.decode(), + pull.filter((msg) => msg.length < c.maxMessageSize), + pull.map((rawMsg) => { + let msg + try { + msg = Message.deserialize(rawMsg) + } catch (err) { + log.error('failed to read incoming message', err) + return + } + + return msg + }), + pull.filter(Boolean), + pull.asyncMap((msg, cb) => handleMessage(peer, msg, cb)), + // Not all handlers will return a response + pull.filter(Boolean), + pull.map((response) => { + let msg + try { + msg = response.serialize() + } catch (err) { + log.error('failed to send message', err) + return + } + return msg + }), + pull.filter(Boolean), + lp.encode(), + conn + ) + }) + } +} diff --git a/src/utils.js b/src/utils.js new file mode 100644 index 00000000..ab092ee5 --- /dev/null +++ b/src/utils.js @@ -0,0 +1,178 @@ +'use strict' + +const debug = require('debug') +const multihashing = require('multihashing-async') +const Key = require('interface-datastore').Key +const base32 = require('base32.js') +const distance = require('xor-distance') +const map = require('async/map') +const Record = require('libp2p-record').Record +const setImmediate = require('async/setImmediate') +const PeerId = require('peer-id') + +/** + * Creates a DHT ID by hashing a given buffer. + * + * @param {Buffer} buf + * @param {function(Error, Buffer)} callback + * @returns {void} + */ +exports.convertBuffer = (buf, callback) => { + multihashing.digest(buf, 'sha2-256', callback) +} + +/** + * Creates a DHT ID by hashing a Peer ID + * + * @param {PeerId} peer + * @param {function(Error, Buffer)} callback + * @returns {void} + */ +exports.convertPeerId = (peer, callback) => { + multihashing.digest(peer.id, 'sha2-256', callback) +} + +/** + * Convert a buffer to their SHA2-256 hash. + * + * @param {Buffer} buf + * @returns {Key} + */ +exports.bufferToKey = (buf) => { + return new Key('/' + exports.encodeBase32(buf), false) +} + +/** + * Generate the key for a public key. + * + * @param {PeerId} peer + * @returns {Buffer} + */ +exports.keyForPublicKey = (peer) => { + return Buffer.concat([ + new Buffer('/pk/'), + peer.id + ]) +} + +exports.isPublicKeyKey = (key) => { + return key.slice(0, 4).toString() === '/pk/' +} + +exports.fromPublicKeyKey = (key) => { + return new PeerId(key.slice(4)) +} + +/** + * Get the current time as timestamp. + * + * @returns {number} + */ +exports.now = () => { + return Date.now() +} + +/** + * Encode a given buffer into a base32 string. + * @param {Buffer} buf + * @returns {string} + */ +exports.encodeBase32 = (buf) => { + const enc = new base32.Encoder() + return enc.write(buf).finalize() +} + +/** + * Decode a given base32 string into a buffer. + * @param {string} raw + * @returns {Buffer} + */ +exports.decodeBase32 = (raw) => { + const dec = new base32.Decoder() + return new Buffer(dec.write(raw).finalize()) +} + +/** + * Sort peers by distance to the given `id`. + * + * @param {Array} peers + * @param {Buffer} target + * @param {function(Error, )} callback + * @returns {void} + */ +exports.sortClosestPeers = (peers, target, callback) => { + map(peers, (peer, cb) => { + exports.convertPeerId(peer, (err, id) => { + if (err) { + return cb(err) + } + + cb(null, { + peer: peer, + distance: distance(id, target) + }) + }) + }, (err, distances) => { + if (err) { + return callback(err) + } + + callback(null, distances.sort(exports.xorCompare).map((d) => d.peer)) + }) +} + +/** + * Compare function to sort an array of elements which have a distance property which is the xor distance to a given element. + * + * @param {Object} a + * @param {Object} b + * @returns {number} + */ +exports.xorCompare = (a, b) => { + return distance.compare(a.distance, b.distance) +} + +/** + * Create a new put record, encodes and signs it if enabled. + * + * @param {Buffer} key + * @param {Buffer} value + * @param {PeerId} peer + * @param {bool} sign - Should the record be signed + * @param {function(Error, Buffer)} callback + * @returns {void} + */ +exports.createPutRecord = (key, value, peer, sign, callback) => { + const rec = new Record(key, value, peer) + + if (sign) { + return rec.serializeSigned(peer.privKey, callback) + } + + setImmediate(() => { + callback(null, rec.serialize()) + }) +} + +/** + * Creates a logger for the given subsystem + * + * @param {PeerId} [id] + * @param {string} [subsystem] + * @returns {debug} + * + * @private + */ +exports.logger = (id, subsystem) => { + const name = ['libp2p', 'dht'] + if (subsystem) { + name.push(subsystem) + } + if (id) { + name.push(`${id.toB58String().slice(0, 8)}`) + } + const logger = debug(name.join(':')) + logger.error = debug(name.concat(['error']).join(':')) + + return logger +} diff --git a/test/browser.js b/test/browser.js new file mode 100644 index 00000000..ccacec30 --- /dev/null +++ b/test/browser.js @@ -0,0 +1 @@ +'use strict' diff --git a/test/fixtures/msg-1 b/test/fixtures/msg-1 new file mode 100755 index 00000000..85aadeee Binary files /dev/null and b/test/fixtures/msg-1 differ diff --git a/test/fixtures/msg-2 b/test/fixtures/msg-2 new file mode 100755 index 00000000..3c75f5ed Binary files /dev/null and b/test/fixtures/msg-2 differ diff --git a/test/fixtures/msg-3 b/test/fixtures/msg-3 new file mode 100755 index 00000000..0821b201 Binary files /dev/null and b/test/fixtures/msg-3 differ diff --git a/test/fixtures/msg-4 b/test/fixtures/msg-4 new file mode 100755 index 00000000..de90e344 Binary files /dev/null and b/test/fixtures/msg-4 differ diff --git a/test/fixtures/msg-5 b/test/fixtures/msg-5 new file mode 100755 index 00000000..de90e344 Binary files /dev/null and b/test/fixtures/msg-5 differ diff --git a/test/fixtures/msg-6 b/test/fixtures/msg-6 new file mode 100755 index 00000000..de90e344 Binary files /dev/null and b/test/fixtures/msg-6 differ diff --git a/test/fixtures/msg-7 b/test/fixtures/msg-7 new file mode 100755 index 00000000..4044711d Binary files /dev/null and b/test/fixtures/msg-7 differ diff --git a/test/fixtures/msg-8 b/test/fixtures/msg-8 new file mode 100755 index 00000000..4044711d Binary files /dev/null and b/test/fixtures/msg-8 differ diff --git a/test/index.spec.js b/test/index.spec.js new file mode 100644 index 00000000..3bd338ff --- /dev/null +++ b/test/index.spec.js @@ -0,0 +1,479 @@ +/* eslint-env mocha */ +'use strict' + +const chai = require('chai') +chai.use(require('dirty-chai')) +const expect = chai.expect +const series = require('async/series') +const times = require('async/times') +const parallel = require('async/parallel') +const timeout = require('async/timeout') +const retry = require('async/retry') +const each = require('async/each') +const waterfall = require('async/waterfall') +const Record = require('libp2p-record').Record +const Libp2p = require('libp2p-ipfs-nodejs') +const random = require('lodash.random') +const _ = require('lodash') + +const KadDHT = require('../src') +const utils = require('../src/utils') +const c = require('../src/constants') + +const util = require('./util') +const makePeers = util.makePeers +const setupDHT = util.setupDHT +const makeValues = util.makeValues + +describe('DHT', () => { + let infos + let values + + before((done) => { + parallel([ + (cb) => makePeers(3, cb), + (cb) => makeValues(20, cb) + ], (err, res) => { + expect(err).to.not.exist() + infos = res[0] + values = res[1] + done() + }) + }) + + afterEach((done) => { + // Give the nodes some time to finish request + setTimeout(() => util.teardown(done), 100) + }) + + it('create', () => { + const libp2p = new Libp2p(infos[0]) + const dht = new KadDHT(libp2p, 5) + + expect(dht).to.have.property('self').eql(infos[0]) + expect(dht).to.have.property('libp2p').eql(libp2p) + expect(dht).to.have.property('kBucketSize', 5) + expect(dht).to.have.property('routingTable') + }) + + it('put - get', (done) => { + times(2, (i, cb) => setupDHT(cb), (err, dhts) => { + expect(err).to.not.exist() + const dhtA = dhts[0] + const dhtB = dhts[1] + + waterfall([ + (cb) => connect(dhtA, dhtB, cb), + (cb) => { + dhtA.put(new Buffer('/v/hello'), new Buffer('world'), cb) + }, + (cb) => { + dhtB.get(new Buffer('/v/hello'), 1000, cb) + }, + (res, cb) => { + expect(res).to.be.eql(new Buffer('world')) + cb() + } + ], done) + }) + }) + + it('provides', (done) => { + setupDHTs(4, (err, dhts, addrs, ids) => { + expect(err).to.not.exist() + waterfall([ + (cb) => connect(dhts[0], dhts[1], cb), + (cb) => connect(dhts[1], dhts[2], cb), + (cb) => connect(dhts[2], dhts[3], cb), + (cb) => each(values, (v, cb) => { + dhts[3].provide(v.cid, cb) + }, cb), + (cb) => { + let n = 0 + each(values, (v, cb) => { + n = (n + 1) % 3 + dhts[n].findProviders(v.cid, 5000, (err, provs) => { + expect(err).to.not.exist() + expect(provs).to.have.length(1) + expect(provs[0].id.id).to.be.eql(ids[3].id) + expect( + provs[0].multiaddrs.toArray()[0].toString() + ).to.be.eql( + addrs[3].toString() + ) + cb() + }) + }, cb) + } + ], done) + }) + }) + + it('bootstrap', (done) => { + const nDHTs = 20 + + setupDHTs(nDHTs, (err, dhts) => { + expect(err).to.not.exist() + + waterfall([ + // ring connect + (cb) => times(nDHTs, (i, cb) => { + connect(dhts[i], dhts[(i + 1) % nDHTs], cb) + }, (err) => cb(err)), + (cb) => { + bootstrap(dhts) + waitForWellFormedTables(dhts, 7, 0, 20 * 1000, cb) + } + ], done) + }) + }) + + it('layered get', (done) => { + setupDHTs(4, (err, dhts) => { + expect(err).to.not.exist() + + waterfall([ + (cb) => connect(dhts[0], dhts[1], cb), + (cb) => connect(dhts[1], dhts[2], cb), + (cb) => connect(dhts[2], dhts[3], cb), + (cb) => dhts[3].put( + new Buffer('/v/hello'), + new Buffer('world'), + cb + ), + (cb) => dhts[0].get(new Buffer('/v/hello'), 1000, cb), + (res, cb) => { + expect(res).to.be.eql(new Buffer('world')) + cb() + } + ], done) + }) + }) + + it('findPeer', (done) => { + setupDHTs(4, (err, dhts, addrs, ids) => { + expect(err).to.not.exist() + + waterfall([ + (cb) => connect(dhts[0], dhts[1], cb), + (cb) => connect(dhts[1], dhts[2], cb), + (cb) => connect(dhts[2], dhts[3], cb), + (cb) => dhts[0].findPeer(ids[3], 1000, cb), + (res, cb) => { + expect(res.id.isEqual(ids[3])).to.eql(true) + cb() + } + ], done) + }) + }) + + it('connect by id to with address in the peerbook ', (done) => { + parallel([ + (cb) => setupDHT(cb), + (cb) => setupDHT(cb) + ], (err, dhts) => { + expect(err).to.not.exist() + const dhtA = dhts[0] + const dhtB = dhts[1] + + const peerA = dhtA.self + const peerB = dhtB.self + dhtA.peerBook.put(peerB) + dhtB.peerBook.put(peerA) + + parallel([ + (cb) => dhtA.libp2p.dial(peerB.id, cb), + (cb) => dhtB.libp2p.dial(peerA.id, cb) + ], done) + }) + }) + + // Might need to disable on ci + it('find peer query', (done) => { + setupDHTs(101, (err, dhts, addrs, ids) => { + expect(err).to.not.exist() + + const guy = dhts[0] + const others = dhts.slice(1) + const val = new Buffer('foobar') + + series([ + (cb) => times(20, (i, cb) => { + times(16, (j, cb) => { + const t = 20 + random(79) + connect(others[i], others[t], cb) + }, cb) + }, cb), + (cb) => times(20, (i, cb) => { + connect(guy, others[i], cb) + }, cb), + (cb) => utils.convertBuffer(val, (err, rtval) => { + expect(err).to.not.exist() + const rtablePeers = guy.routingTable.closestPeers(rtval, c.ALPHA) + expect(rtablePeers).to.have.length(3) + + const netPeers = guy.peerBook.getAllArray().filter((p) => p.isConnected()) + expect(netPeers).to.have.length(20) + + const rtableSet = {} + rtablePeers.forEach((p) => { + rtableSet[p.toB58String()] = true + }) + + series([ + (cb) => guy.getClosestPeers(val, cb), + (cb) => utils.sortClosestPeers(ids.slice(1), rtval, cb) + ], (err, res) => { + expect(err).to.not.exist() + const out = res[0] + const actualClosest = res[1] + + expect( + out.filter((p) => !rtableSet[p.toB58String()]) + ).to.not.be.empty() + + expect(out).to.have.length(20) + const exp = actualClosest.slice(0, 20) + + utils.sortClosestPeers(out, rtval, (err, got) => { + expect(err).to.not.exist() + expect(countDiffPeers(exp, got)).to.eql(0) + + cb() + }) + }) + }) + ], done) + }) + }) + + it('getClosestPeers', (done) => { + setupDHTs(30, (err, dhts) => { + expect(err).to.not.exist() + + // ring connect + series([ + (cb) => times(dhts.length, (i, cb) => { + connect(dhts[i], dhts[(i + 1) % dhts.length], cb) + }, cb), + (cb) => dhts[1].getClosestPeers(new Buffer('foo'), cb) + ], (err, res) => { + expect(err).to.not.exist() + expect(res[1]).to.have.length(c.K) + done() + }) + }) + }) + + describe('getPublicKey', () => { + it('already known', (done) => { + setupDHTs(2, (err, dhts, addrs, ids) => { + expect(err).to.not.exist() + dhts[0].peerBook.put(dhts[1].self) + dhts[0].getPublicKey(ids[1], (err, key) => { + expect(err).to.not.exist() + expect(key).to.be.eql(dhts[1].self.id.pubKey) + done() + }) + }) + }) + + it('connected node', (done) => { + setupDHTs(2, (err, dhts, addrs, ids) => { + expect(err).to.not.exist() + + waterfall([ + (cb) => connect(dhts[0], dhts[1], cb), + (cb) => { + // remove the pub key to be sure it is fetched + const p = dhts[0].peerBook.get(ids[1]) + p.id._pubKey = null + dhts[0].peerBook.put(p, true) + dhts[0].getPublicKey(ids[1], cb) + }, + (key, cb) => { + expect( + key.equals(dhts[1].self.id.pubKey) + ).to.eql( + true + ) + cb() + } + ], done) + }) + }) + }) + + it('_nearestPeersToQuery', (done) => { + const libp2p = new Libp2p(infos[0]) + const dht = new KadDHT(libp2p) + + dht.peerBook.put(infos[1]) + series([ + (cb) => dht._add(infos[1], cb), + (cb) => dht._nearestPeersToQuery({key: 'hello'}, cb) + ], (err, res) => { + expect(err).to.not.exist() + expect(res[1]).to.be.eql([infos[1]]) + done() + }) + }) + + it('_betterPeersToQuery', (done) => { + const libp2p = new Libp2p(infos[0]) + const dht = new KadDHT(libp2p) + + dht.peerBook.put(infos[1]) + dht.peerBook.put(infos[2]) + + series([ + (cb) => dht._add(infos[1], cb), + (cb) => dht._add(infos[2], cb), + (cb) => dht._betterPeersToQuery({key: 'hello'}, infos[1], cb) + ], (err, res) => { + expect(err).to.not.exist() + expect(res[2]).to.be.eql([infos[2]]) + done() + }) + }) + + describe('_verifyRecordLocally', () => { + it('invalid record (missing public key)', (done) => { + const libp2p = new Libp2p(infos[0]) + const dht = new KadDHT(libp2p) + + // Not putting the peer info into the peerbook + // dht.peerBook.put(infos[1]) + + const record = new Record(new Buffer('hello'), new Buffer('world'), infos[1].id) + + waterfall([ + (cb) => record.serializeSigned(infos[1].id.privKey, cb), + (enc, cb) => dht._verifyRecordLocally(Record.deserialize(enc), (err) => { + expect(err).to.match(/Missing public key/) + cb() + }) + ], done) + }) + + it('valid record - signed', (done) => { + const libp2p = new Libp2p(infos[0]) + const dht = new KadDHT(libp2p) + + dht.peerBook.put(infos[1]) + + const record = new Record(new Buffer('hello'), new Buffer('world'), infos[1].id) + + waterfall([ + (cb) => record.serializeSigned(infos[1].id.privKey, cb), + (enc, cb) => dht._verifyRecordLocally(Record.deserialize(enc), cb) + ], done) + }) + + it('valid record - not signed', (done) => { + const libp2p = new Libp2p(infos[0]) + const dht = new KadDHT(libp2p) + + dht.peerBook.put(infos[1]) + + const record = new Record(new Buffer('hello'), new Buffer('world'), infos[1].id) + + waterfall([ + (cb) => cb(null, record.serialize()), + (enc, cb) => dht._verifyRecordLocally(Record.deserialize(enc), cb) + ], done) + }) + }) +}) + +function setupDHTs (n, callback) { + times(n, (i, cb) => setupDHT(cb), (err, dhts) => { + if (err) { + return callback(err) + } + callback(null, dhts, dhts.map((d) => d.self.multiaddrs.toArray()[0]), dhts.map((d) => d.self.id)) + }) +} + +// connect two dhts +function connectNoSync (a, b, callback) { + const target = _.cloneDeep(b.self) + target.id._pubKey = target.id.pubKey + target.id._privKey = null + a.libp2p.dial(target, callback) +} + +function find (a, b, cb) { + retry({ times: 50, interval: 100 }, (cb) => { + a.routingTable.find(b.self.id, (err, match) => { + if (err) { + return cb(err) + } + if (!match) { + return cb(new Error('not found')) + } + + try { + expect( + a.peerBook.get(b.self).multiaddrs.toArray()[0].toString() + ).to.eql( + b.self.multiaddrs.toArray()[0].toString() + ) + } catch (err) { + return cb(err) + } + + cb() + }) + }, cb) +} + +// connect two dhts and wait for them to have each other +// in their routing table +function connect (a, b, callback) { + series([ + (cb) => connectNoSync(a, b, cb), + (cb) => find(a, b, cb), + (cb) => find(b, a, cb) + ], (err) => callback(err)) +} + +function bootstrap (dhts) { + dhts.forEach((dht) => { + dht._bootstrap(3, 10000) + }) +} + +function waitForWellFormedTables (dhts, minPeers, avgPeers, maxTimeout, callback) { + timeout((cb) => { + retry({ + times: 50, + interval: 200 + }, (cb) => { + let totalPeers = 0 + + const ready = dhts.map((dht) => { + const rtlen = dht.routingTable.size + totalPeers += rtlen + if (minPeers > 0 && rtlen < minPeers) { + return false + } + const actualAvgPeers = totalPeers / dhts.length + if (avgPeers > 0 && actualAvgPeers < avgPeers) { + return false + } + return true + }) + + const done = ready.every(Boolean) + cb(done ? null : new Error('not done yet')) + }, cb) + }, maxTimeout)(callback) +} + +function countDiffPeers (a, b) { + const s = new Set() + a.forEach((p) => s.add(p.toB58String())) + + return b.filter((p) => !s.has(p.toB58String())).length +} diff --git a/test/limited-peer-list.spec.js b/test/limited-peer-list.spec.js new file mode 100644 index 00000000..2f642f19 --- /dev/null +++ b/test/limited-peer-list.spec.js @@ -0,0 +1,41 @@ +/* eslint-env mocha */ +'use strict' + +const chai = require('chai') +chai.use(require('dirty-chai')) +const expect = chai.expect + +const LimitedPeerList = require('../src/limited-peer-list') + +const makePeers = require('./util').makePeers + +describe('LimitedPeerList', () => { + let peers + + before((done) => { + makePeers(5, (err, p) => { + if (err) { + return done(err) + } + peers = p + done() + }) + }) + + it('basics', () => { + const l = new LimitedPeerList(4) + + expect(l.push(peers[0])).to.eql(true) + expect(l.push(peers[0])).to.eql(false) + expect(l.push(peers[1])).to.eql(true) + expect(l.push(peers[2])).to.eql(true) + expect(l.push(peers[3])).to.eql(true) + expect(l.push(peers[4])).to.eql(false) + + expect(l).to.have.length(4) + expect(l.pop()).to.eql(peers[3]) + expect(l).to.have.length(3) + expect(l.push(peers[4])).to.eql(true) + expect(l.toArray()).to.eql([peers[0], peers[1], peers[2], peers[4]]) + }) +}) diff --git a/test/message.spec.js b/test/message.spec.js new file mode 100644 index 00000000..aa67c03d --- /dev/null +++ b/test/message.spec.js @@ -0,0 +1,130 @@ +/* eslint-env mocha */ +'use strict' + +const chai = require('chai') +chai.use(require('dirty-chai')) +const expect = chai.expect +const PeerInfo = require('peer-info') +const PeerId = require('peer-id') +const map = require('async/map') +const range = require('lodash.range') +const random = require('lodash.random') +const Record = require('libp2p-record').Record +const fs = require('fs') +const path = require('path') + +const Message = require('../src/message') + +describe('Message', () => { + it('create', () => { + const k = new Buffer('hello') + const msg = new Message(Message.TYPES.PING, k, 5) + + expect(msg).to.have.property('type', 5) + expect(msg).to.have.property('key').eql(new Buffer('hello')) + // TODO: confirm this works as expected + expect(msg).to.have.property('_clusterLevelRaw', 5) + expect(msg).to.have.property('clusterLevel', 4) + }) + + it('serialize & deserialize', (done) => { + map(range(5), (n, cb) => PeerId.create({bits: 1024}, cb), (err, peers) => { + expect(err).to.not.exist() + + const closer = peers.slice(0, 5).map((p) => { + const info = new PeerInfo(p) + const addr = `/ip4/198.176.1.${random(198)}/tcp/1234` + info.multiaddrs.add(addr) + info.multiaddrs.add(`/ip4/100.176.1.${random(198)}`) + info.connect(addr) + + return info + }) + + const provider = peers.slice(0, 5).map((p) => { + const info = new PeerInfo(p) + info.multiaddrs.add(`/ip4/98.176.1.${random(198)}/tcp/1234`) + info.multiaddrs.add(`/ip4/10.176.1.${random(198)}`) + + return info + }) + + const msg = new Message(Message.TYPES.GET_VALUE, new Buffer('hello'), 5) + const record = new Record(new Buffer('hello'), new Buffer('world'), peers[0]) + + msg.closerPeers = closer + msg.providerPeers = provider + msg.record = record + + const enc = msg.serialize() + const dec = Message.deserialize(enc) + + expect(dec.type).to.be.eql(msg.type) + expect(dec.key).to.be.eql(msg.key) + expect(dec.clusterLevel).to.be.eql(msg.clusterLevel) + expect(dec.record.serialize()).to.be.eql(record.serialize()) + expect(dec.record.key).to.be.eql(new Buffer('hello')) + + expect(dec.closerPeers).to.have.length(5) + dec.closerPeers.forEach((peer, i) => { + expect( + peer.id.isEqual(msg.closerPeers[i].id) + ).to.be.eql(true) + expect( + peer.multiaddrs.toArray() + ).to.be.eql( + msg.closerPeers[i].multiaddrs.toArray() + ) + + expect( + peer.isConnected() + ).to.be.eql( + peer.multiaddrs.toArray()[0] + ) + }) + + expect(dec.providerPeers).to.have.length(5) + dec.providerPeers.forEach((peer, i) => { + expect( + peer.id.isEqual(msg.providerPeers[i].id) + ).to.be.eql(true) + expect( + peer.multiaddrs.toArray() + ).to.be.eql( + msg.providerPeers[i].multiaddrs.toArray() + ) + }) + + done() + }) + }) + + it('clusterlevel', () => { + const msg = new Message(Message.TYPES.PING, new Buffer('hello'), 0) + + msg.clusterLevel = 10 + expect(msg.clusterLevel).to.eql(9) + }) + + it('go-interop', () => { + range(1, 9).forEach((i) => { + const raw = fs.readFileSync( + path.join(__dirname, 'fixtures', `msg-${i}`) + ) + + const msg = Message.deserialize(raw) + + expect(msg.clusterLevel).to.gte(0) + if (msg.record) { + expect(Buffer.isBuffer(msg.record.key)).to.eql(true) + expect(PeerId.isPeerId(msg.record.author)).to.eql(true) + } + + if (msg.providerPeers.length > 0) { + msg.providerPeers.forEach((p) => { + expect(PeerInfo.isPeerInfo(p)).to.eql(true) + }) + } + }) + }) +}) diff --git a/test/network.spec.js b/test/network.spec.js new file mode 100644 index 00000000..36be1268 --- /dev/null +++ b/test/network.spec.js @@ -0,0 +1,127 @@ +/* eslint-env mocha */ +'use strict' + +const chai = require('chai') +chai.use(require('dirty-chai')) +const expect = chai.expect +const Libp2p = require('libp2p-ipfs-nodejs') +const Connection = require('interface-connection').Connection +const pull = require('pull-stream') +const lp = require('pull-length-prefixed') +const series = require('async/series') + +const DHT = require('../src') +const Message = require('../src/message') + +const makePeers = require('./util').makePeers + +describe('Network', () => { + let libp2p + let network + let dht + let infos + + before((done) => { + makePeers(3, (err, peers) => { + if (err) { + return done(err) + } + + infos = peers + libp2p = new Libp2p(infos[0]) + dht = new DHT(libp2p) + network = dht.network + series([ + (cb) => libp2p.start(cb), + (cb) => dht.start(cb) + ], done) + }) + }) + + after((done) => series([ + (cb) => dht.stop(cb), + (cb) => libp2p.stop(cb) + ], done)) + + describe('sendRequest', () => { + it('send and response', (done) => { + let i = 0 + const finish = () => { + if (i++ === 1) { + done() + } + } + + const msg = new Message(Message.TYPES.PING, new Buffer('hello'), 0) + + // mock it + libp2p.dial = (peer, protocol, callback) => { + expect(protocol).to.eql('/ipfs/kad/1.0.0') + const msg = new Message(Message.TYPES.FIND_NODE, new Buffer('world'), 0) + + const rawConn = { + source: pull( + pull.values([msg.serialize()]), + lp.encode() + ), + sink: pull( + lp.decode(), + pull.collect((err, res) => { + expect(err).to.not.exist() + expect(Message.deserialize(res[0]).type).to.eql(Message.TYPES.PING) + finish() + }) + ) + } + const conn = new Connection(rawConn) + callback(null, conn) + } + + network.sendRequest(infos[0].id, msg, (err, response) => { + expect(err).to.not.exist() + expect(response.type).to.eql(Message.TYPES.FIND_NODE) + + finish() + }) + }) + + it('timeout on no message', (done) => { + let i = 0 + const finish = () => { + if (i++ === 1) { + done() + } + } + + const msg = new Message(Message.TYPES.PING, new Buffer('hello'), 0) + + // mock it + libp2p.dial = (peer, protocol, callback) => { + expect(protocol).to.eql('/ipfs/kad/1.0.0') + const rawConn = { + // hanging + source: (end, cb) => {}, + sink: pull( + lp.decode(), + pull.collect((err, res) => { + expect(err).to.not.exist() + expect(Message.deserialize(res[0]).type).to.eql(Message.TYPES.PING) + finish() + }) + ) + } + const conn = new Connection(rawConn) + callback(null, conn) + } + + network.readMessageTimeout = 100 + + network.sendRequest(infos[0].id, msg, (err, response) => { + expect(err).to.exist() + expect(err.message).to.match(/timed out/) + + finish() + }) + }) + }) +}) diff --git a/test/peer-list.spec.js b/test/peer-list.spec.js new file mode 100644 index 00000000..ddd4cad2 --- /dev/null +++ b/test/peer-list.spec.js @@ -0,0 +1,36 @@ +/* eslint-env mocha */ +'use strict' + +const chai = require('chai') +chai.use(require('dirty-chai')) +const expect = chai.expect + +const PeerList = require('../src/peer-list') + +const makePeers = require('./util').makePeers + +describe('PeerList', () => { + let peers + + before((done) => { + makePeers(3, (err, p) => { + if (err) { + return done(err) + } + peers = p + done() + }) + }) + + it('basics', () => { + const l = new PeerList() + + expect(l.push(peers[0])).to.eql(true) + expect(l.push(peers[0])).to.eql(false) + expect(l).to.have.length(1) + expect(l.push(peers[1])).to.eql(true) + expect(l.pop()).to.eql(peers[1]) + expect(l).to.have.length(1) + expect(l.toArray()).to.eql([peers[0]]) + }) +}) diff --git a/test/peer-queue.spec.js b/test/peer-queue.spec.js new file mode 100644 index 00000000..3f844377 --- /dev/null +++ b/test/peer-queue.spec.js @@ -0,0 +1,51 @@ +/* eslint-env mocha */ +'use strict' + +const chai = require('chai') +chai.use(require('dirty-chai')) +const expect = chai.expect +const PeerId = require('peer-id') +const series = require('async/series') + +const PeerQueue = require('../src/peer-queue') + +describe('PeerQueue', () => { + it('basics', (done) => { + const p1 = new PeerId(new Buffer('11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a31')) + const p2 = new PeerId(new Buffer('11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a32')) + const p3 = new PeerId(new Buffer('11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a33')) + const p4 = new PeerId(new Buffer('11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a34')) + const p5 = new PeerId(new Buffer('11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a31')) + + const peer = new PeerId(new Buffer('11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a31')) + + PeerQueue.fromPeerId(peer, (err, pq) => { + expect(err).to.not.exist() + + series([ + (cb) => pq.enqueue(p3, cb), + (cb) => pq.enqueue(p1, cb), + (cb) => pq.enqueue(p2, cb), + (cb) => pq.enqueue(p4, cb), + (cb) => pq.enqueue(p5, cb), + (cb) => pq.enqueue(p1, cb) + ], (err) => { + expect(err).to.not.exist() + + expect([ + pq.dequeue(), + pq.dequeue(), + pq.dequeue(), + pq.dequeue(), + pq.dequeue(), + pq.dequeue() + ].map((m) => m.toB58String())).to.be.eql([ + p1, p1, p1, p4, p3, p2 + ].map((m) => m.toB58String())) + + expect(pq.length).to.be.eql(0) + done() + }) + }) + }) +}) diff --git a/test/providers.spec.js b/test/providers.spec.js new file mode 100644 index 00000000..bd8e37af --- /dev/null +++ b/test/providers.spec.js @@ -0,0 +1,164 @@ +/* eslint-env mocha */ +'use strict' + +const chai = require('chai') +chai.use(require('dirty-chai')) +const expect = chai.expect +const Store = require('interface-datastore').MemoryDatastore +const parallel = require('async/parallel') +const waterfall = require('async/waterfall') +const CID = require('cids') +const multihashing = require('multihashing-async') +const map = require('async/map') +const timesSeries = require('async/timesSeries') +const each = require('async/each') +const eachSeries = require('async/eachSeries') +const range = require('lodash.range') +const LevelStore = require('datastore-level') +const path = require('path') +const os = require('os') + +const Providers = require('../src/providers') + +const util = require('./util') + +describe('Providers', () => { + let infos + + before((done) => { + util.makePeers(3, (err, peers) => { + if (err) { + return done(err) + } + + infos = peers + done() + }) + }) + + it('simple add and get of providers', (done) => { + const providers = new Providers(new Store(), infos[2].id) + + const cid = new CID('QmdfTbBqBPQ7VNxZEYEj14VmRuZBkqFbiwReogJgS1zR1n') + + parallel([ + (cb) => providers.addProvider(cid, infos[0].id, cb), + (cb) => providers.addProvider(cid, infos[1].id, cb) + ], (err) => { + expect(err).to.not.exist() + providers.getProviders(cid, (err, provs) => { + expect(err).to.not.exist() + expect(provs).to.be.eql([infos[0].id, infos[1].id]) + + done() + }) + }) + }) + + it('more providers than space in the lru cache', (done) => { + const providers = new Providers(new Store(), infos[2].id, 10) + + waterfall([ + (cb) => map( + range(100), + (i, cb) => { + multihashing(new Buffer(`hello ${i}`), 'sha2-256', cb) + }, + cb + ), + (hashes, cb) => { + const cids = hashes.map((h) => new CID(h)) + + map(cids, (cid, cb) => { + providers.addProvider(cid, infos[0].id, cb) + }, (err) => cb(err, cids)) + }, + (cids, cb) => { + map(cids, (cid, cb) => { + providers.getProviders(cid, cb) + }, (err, provs) => { + expect(err).to.not.exist() + expect(provs).to.have.length(100) + provs.forEach((p) => { + expect(p[0].id).to.be.eql(infos[0].id.id) + }) + cb() + }) + } + ], done) + }) + + it('expires', (done) => { + const providers = new Providers(new Store(), infos[2].id) + providers.cleanupInterval = 100 + providers.provideValidity = 200 + + const cid = new CID('QmdfTbBqBPQ7VNxZEYEj14VmRuZBkqFbiwReogJgS1zR1n') + parallel([ + (cb) => providers.addProvider(cid, infos[0].id, cb), + (cb) => providers.addProvider(cid, infos[1].id, cb) + ], (err) => { + expect(err).to.not.exist() + + providers.getProviders(cid, (err, provs) => { + expect(err).to.not.exist() + expect(provs).to.have.length(2) + expect(provs[0].id).to.be.eql(infos[0].id.id) + expect(provs[1].id).to.be.eql(infos[1].id.id) + }) + + setTimeout(() => { + providers.getProviders(cid, (err, provs) => { + expect(err).to.not.exist() + expect(provs).to.have.length(0) + done() + }) + }, 300) + }) + }) + + // slooow so only run when you need to + it.skip('many', (done) => { + const p = path.join( + os.tmpdir(), (Math.random() * 100).toString() + ) + const store = new LevelStore(p) + const providers = new Providers(store, infos[2].id, 10) + + console.log('starting') + waterfall([ + (cb) => parallel([ + (cb) => util.makeValues(100, cb), + (cb) => util.makePeers(600, cb) + ], cb), + (res, cb) => { + console.log('got values and peers') + const values = res[0] + const peers = res[1] + let total = Date.now() + eachSeries(values, (v, cb) => { + eachSeries(peers, (p, cb) => { + providers.addProvider(v.cid, p.id, cb) + }, cb) + }, (err) => { + console.log('addProvider %s peers %s cids in %sms', peers.length, values.length, Date.now() - total) + expect(err).to.not.exist() + console.log('starting profile with %s peers and %s cids', peers.length, values.length) + timesSeries(3, (i, cb) => { + const start = Date.now() + each(values, (v, cb) => { + providers.getProviders(v.cid, cb) + }, (err) => { + expect(err).to.not.exist() + console.log('query %sms', (Date.now() - start)) + cb() + }) + }, cb) + }) + } + ], (err) => { + expect(err).to.not.exist() + store.close(done) + }) + }) +}) diff --git a/test/query.spec.js b/test/query.spec.js new file mode 100644 index 00000000..5d394309 --- /dev/null +++ b/test/query.spec.js @@ -0,0 +1,108 @@ +/* eslint-env mocha */ +'use strict' + +const chai = require('chai') +chai.use(require('dirty-chai')) +const expect = chai.expect +const Libp2p = require('libp2p-ipfs-nodejs') + +const DHT = require('../src') +const Query = require('../src/query') + +const makePeers = require('./util').makePeers + +describe('Query', () => { + let infos + let libp2p + let dht + + before((done) => { + makePeers(3, (err, peers) => { + if (err) { + return done(err) + } + + infos = peers + libp2p = new Libp2p(infos[0]) + dht = new DHT(libp2p) + + done() + }) + }) + + it('simple run', (done) => { + const peer = infos[0] + + // mock this so we can dial non existing peers + libp2p.dial = (peer, callback) => { + callback() + } + + let i = 0 + const query = (p, cb) => { + if (i++ === 1) { + expect(p.id).to.eql(infos[2].id.id) + + return cb(null, { + value: new Buffer('cool'), + success: true + }) + } + expect(p.id).to.eql(infos[1].id.id) + cb(null, { + closerPeers: [infos[2]] + }) + } + + const q = new Query(dht, peer.id.id, query) + q.run([infos[1].id], (err, res) => { + expect(err).to.not.exist() + expect(res.value).to.eql(new Buffer('cool')) + expect(res.success).to.eql(true) + expect(res.finalSet.size).to.eql(2) + done() + }) + }) + + it('returns an error if all queries error', (done) => { + const peer = infos[0] + + // mock this so we can dial non existing peers + libp2p.dial = (peer, callback) => { + callback() + } + + const query = (p, cb) => { + cb(new Error('fail')) + } + + const q = new Query(dht, peer.id.id, query) + q.run([infos[1].id], (err, res) => { + expect(err).to.exist() + expect(err.message).to.eql('fail') + done() + }) + }) + + it('only closerPeers', (done) => { + const peer = infos[0] + + // mock this so we can dial non existing peers + libp2p.dial = (peer, callback) => { + callback() + } + + const query = (p, cb) => { + cb(null, { + closerPeers: [infos[2]] + }) + } + + const q = new Query(dht, peer.id.id, query) + q.run([infos[1].id], (err, res) => { + expect(err).to.not.exist() + expect(res.finalSet.size).to.eql(2) + done() + }) + }) +}) diff --git a/test/routing.spec.js b/test/routing.spec.js new file mode 100644 index 00000000..b1a35622 --- /dev/null +++ b/test/routing.spec.js @@ -0,0 +1,130 @@ +/* eslint-env mocha */ +'use strict' + +const chai = require('chai') +chai.use(require('dirty-chai')) +const expect = chai.expect +const PeerId = require('peer-id') +const map = require('async/map') +const each = require('async/each') +const waterfall = require('async/waterfall') +const range = require('lodash.range') +const random = require('lodash.random') + +const RoutingTable = require('../src/routing') +const utils = require('../src/utils') + +describe('RoutingTable', () => { + let table + + beforeEach((done) => { + PeerId.create((err, id) => { + if (err) { + done(err) + } + + table = new RoutingTable(id, 20) + + done() + }) + }) + + it('add', (done) => { + createPeers(20, (err, peers) => { + expect(err).to.not.exist() + waterfall([ + (cb) => each(range(1000), (n, cb) => { + table.add(peers[random(peers.length - 1)], cb) + }, cb), + (cb) => each(range(20), (n, cb) => { + const id = peers[random(peers.length - 1)] + utils.convertPeerId(id, (err, key) => { + expect(err).to.not.exist() + expect( + table.closestPeers(key, 5).length + ).to.be.above(0) + cb() + }) + }, cb) + ], done) + }) + }) + + it('remove', (done) => { + createPeers(10, (err, peers) => { + let k + expect(err).to.not.exist() + waterfall([ + (cb) => each(peers, (peer, cb) => { + table.add(peer, cb) + }, cb), + (cb) => { + const id = peers[2] + utils.convertPeerId(id, (err, key) => { + expect(err).to.not.exist() + k = key + expect( + table.closestPeers(key, 10) + ).to.have.length(10) + cb() + }) + }, + (cb) => table.remove(peers[5], cb), + (cb) => { + expect( + table.closestPeers(k, 10) + ).to.have.length(9) + + expect(table.size).to.be.eql(9) + cb() + } + ], done) + }) + }) + + it('closestPeer', (done) => { + createPeers(10, (err, peers) => { + expect(err).to.not.exist() + waterfall([ + (cb) => each(peers, (peer, cb) => { + table.add(peer, cb) + }, cb), + (cb) => { + const id = peers[2] + utils.convertPeerId(id, (err, key) => { + expect(err).to.not.exist() + expect( + table.closestPeer(key) + ).to.be.eql(id) + cb() + }) + } + ], done) + }) + }) + + it('closestPeers', (done) => { + createPeers(18, (err, peers) => { + expect(err).to.not.exist() + waterfall([ + (cb) => each(peers, (peer, cb) => { + table.add(peer, cb) + }, cb), + (cb) => { + const id = peers[2] + utils.convertPeerId(id, (err, key) => { + expect(err).to.not.exist() + expect( + table.closestPeers(key, 15) + ).to.have.length(15) + cb() + }) + } + ], done) + }) + }) +}) + +function createPeers (n, callback) { + map(range(n), (i, cb) => PeerId.create({bits: 1024}, cb), callback) +} diff --git a/test/rpc/handlers/add-provider.spec.js b/test/rpc/handlers/add-provider.spec.js new file mode 100644 index 00000000..082623e8 --- /dev/null +++ b/test/rpc/handlers/add-provider.spec.js @@ -0,0 +1,118 @@ +/* eslint-env mocha */ +/* eslint max-nested-callbacks: ["error", 8] */ +'use strict' + +const chai = require('chai') +chai.use(require('dirty-chai')) +const expect = chai.expect +const parallel = require('async/parallel') +const waterfall = require('async/waterfall') +const _ = require('lodash') + +const Message = require('../../../src/message') + +const handler = require('../../../src/rpc/handlers/add-provider') + +const util = require('../../util') + +describe('rpc - handlers - AddProvider', () => { + let peers + let values + let dht + + before((done) => { + parallel([ + (cb) => util.makePeers(3, cb), + (cb) => util.makeValues(2, cb) + ], (err, res) => { + if (err) { + return done(err) + } + peers = res[0] + values = res[1] + done() + }) + }) + + afterEach((done) => util.teardown(done)) + + beforeEach((done) => { + util.setupDHT((err, res) => { + expect(err).to.not.exist() + dht = res + done() + }) + }) + + describe('invalid messages', () => { + const tests = [{ + message: new Message(Message.TYPES.ADD_PROVIDER, new Buffer(0), 0), + error: /Missing key/ + }, { + message: new Message(Message.TYPES.ADD_PROVIDER, new Buffer(0), 0), + error: /Missing key/ + }, { + message: new Message(Message.TYPES.ADD_PROVIDER, new Buffer('hello world'), 0), + error: /Invalid CID/ + }] + + tests.forEach((t) => it(t.error.toString(), (done) => { + handler(dht)(peers[0], t.message, (err, res) => { + expect(err).to.exist() + expect(err.message).to.match(t.error) + done() + }) + })) + }) + + it('ignore providers not from the originator', (done) => { + const cid = values[0].cid + + const msg = new Message(Message.TYPES.ADD_PROVIDER, cid.buffer, 0) + const sender = peers[0] + sender.multiaddrs.add('/ip4/127.0.0.1/tcp/1234') + const other = peers[1] + other.multiaddrs.add('/ip4/127.0.0.1/tcp/2345') + msg.providerPeers = [ + sender, + other + ] + + waterfall([ + (cb) => handler(dht)(sender, msg, cb), + (cb) => dht.providers.getProviders(cid, cb), + (provs, cb) => { + expect(provs).to.have.length(1) + expect(provs[0].id).to.eql(sender.id.id) + const bookEntry = dht.peerBook.get(sender.id) + expect( + bookEntry.multiaddrs.toArray() + ).to.eql( + sender.multiaddrs.toArray() + ) + cb() + } + ], done) + }) + + it('ignore providers with no multiaddrs', (done) => { + const cid = values[0].cid + const msg = new Message(Message.TYPES.ADD_PROVIDER, cid.buffer, 0) + const sender = _.cloneDeep(peers[0]) + sender.multiaddrs.clear() + msg.providerPeers = [sender] + + waterfall([ + (cb) => handler(dht)(sender, msg, cb), + (cb) => dht.providers.getProviders(cid, cb), + (provs, cb) => { + expect(provs).to.have.length(1) + expect(provs[0].id).to.eql(sender.id.id) + expect( + dht.peerBook.has(sender.id) + ).to.be.eql(false) + cb() + } + ], done) + }) +}) diff --git a/test/rpc/handlers/find-node.spec.js b/test/rpc/handlers/find-node.spec.js new file mode 100644 index 00000000..aecaa395 --- /dev/null +++ b/test/rpc/handlers/find-node.spec.js @@ -0,0 +1,82 @@ +/* eslint-env mocha */ +'use strict' + +const chai = require('chai') +chai.use(require('dirty-chai')) +const expect = chai.expect +const waterfall = require('async/waterfall') + +const Message = require('../../../src/message') +const handler = require('../../../src/rpc/handlers/find-node') +const util = require('../../util') + +const T = Message.TYPES.FIND_NODE + +describe('rpc - handlers - FindNode', () => { + let peers + let dht + + before((done) => { + util.makePeers(3, (err, res) => { + expect(err).to.not.exist() + peers = res + done() + }) + }) + + afterEach((done) => util.teardown(done)) + + beforeEach((done) => { + util.setupDHT((err, res) => { + expect(err).to.not.exist() + dht = res + done() + }) + }) + + it('returns self, if asked for self', (done) => { + const msg = new Message(T, dht.self.id.id, 0) + + handler(dht)(peers[1], msg, (err, response) => { + expect(err).to.not.exist() + expect(response.closerPeers).to.have.length(1) + const peer = response.closerPeers[0] + + expect(peer.id.id).to.be.eql(dht.self.id.id) + done() + }) + }) + + it('returns closer peers', (done) => { + const msg = new Message(T, new Buffer('hello'), 0) + const other = peers[1] + + waterfall([ + (cb) => dht._add(other, cb), + (cb) => handler(dht)(peers[2], msg, cb) + ], (err, response) => { + expect(err).to.not.exist() + expect(response.closerPeers).to.have.length(1) + const peer = response.closerPeers[0] + + expect(peer.id.id).to.be.eql(peers[1].id.id) + expect( + peer.multiaddrs.toArray() + ).to.be.eql( + peers[1].multiaddrs.toArray() + ) + + done() + }) + }) + + it('handles no peers found', (done) => { + const msg = new Message(T, new Buffer('hello'), 0) + + handler(dht)(peers[2], msg, (err, response) => { + expect(err).to.not.exist() + expect(response.closerPeers).to.have.length(0) + done() + }) + }) +}) diff --git a/test/rpc/handlers/get-providers.spec.js b/test/rpc/handlers/get-providers.spec.js new file mode 100644 index 00000000..768e9ef6 --- /dev/null +++ b/test/rpc/handlers/get-providers.spec.js @@ -0,0 +1,103 @@ +/* eslint-env mocha */ +'use strict' + +const chai = require('chai') +chai.use(require('dirty-chai')) +const expect = chai.expect +const parallel = require('async/parallel') +const waterfall = require('async/waterfall') + +const Message = require('../../../src/message') +const utils = require('../../../src/utils') +const handler = require('../../../src/rpc/handlers/get-providers') +const util = require('../../util') + +const T = Message.TYPES.GET_PROVIDERS + +describe('rpc - handlers - GetProviders', () => { + let peers + let values + let dht + + before((done) => { + parallel([ + (cb) => util.makePeers(3, cb), + (cb) => util.makeValues(2, cb) + ], (err, res) => { + expect(err).to.not.exist() + peers = res[0] + values = res[1] + done() + }) + }) + + afterEach((done) => util.teardown(done)) + + beforeEach((done) => { + util.setupDHT((err, res) => { + expect(err).to.not.exist() + dht = res + done() + }) + }) + + it('errors with an invalid key ', (done) => { + const msg = new Message(T, new Buffer('hello'), 0) + + handler(dht)(peers[0], msg, (err, response) => { + expect(err).to.match(/Invalid CID/) + expect(response).to.not.exist() + done() + }) + }) + + it('responds with self if the value is in the datastore', (done) => { + const v = values[0] + + const msg = new Message(T, v.cid.buffer, 0) + const dsKey = utils.bufferToKey(v.cid.buffer) + + waterfall([ + (cb) => dht.datastore.put(dsKey, v.value, cb), + (cb) => handler(dht)(peers[0], msg, cb) + ], (err, response) => { + expect(err).to.not.exist() + + expect(response.key).to.be.eql(v.cid.buffer) + expect(response.providerPeers).to.have.length(1) + expect( + response.providerPeers[0].id.toB58String() + ).to.be.eql(dht.self.id.toB58String()) + + done() + }) + }) + + it('responds with listed providers and closer peers', (done) => { + const v = values[0] + + const msg = new Message(T, v.cid.buffer, 0) + const prov = peers[1].id + const closer = peers[2] + + waterfall([ + (cb) => dht._add(closer, cb), + (cb) => dht.providers.addProvider(v.cid, prov, cb), + (cb) => handler(dht)(peers[0], msg, cb) + ], (err, response) => { + expect(err).to.not.exist() + + expect(response.key).to.be.eql(v.cid.buffer) + expect(response.providerPeers).to.have.length(1) + expect( + response.providerPeers[0].id.toB58String() + ).to.be.eql(prov.toB58String()) + + expect(response.closerPeers).to.have.length(1) + expect( + response.closerPeers[0].id.toB58String() + ).to.be.eql(closer.id.toB58String()) + done() + }) + }) +}) diff --git a/test/rpc/handlers/get-value.spec.js b/test/rpc/handlers/get-value.spec.js new file mode 100644 index 00000000..49718e4a --- /dev/null +++ b/test/rpc/handlers/get-value.spec.js @@ -0,0 +1,136 @@ +/* eslint-env mocha */ +'use strict' + +const chai = require('chai') +chai.use(require('dirty-chai')) +const expect = chai.expect +const waterfall = require('async/waterfall') + +const Message = require('../../../src/message') +const handler = require('../../../src/rpc/handlers/get-value') +const utils = require('../../../src/utils') +const util = require('../../util') + +const T = Message.TYPES.GET_VALUE + +describe('rpc - handlers - GetValue', () => { + let peers + let dht + + before((done) => { + util.makePeers(2, (err, res) => { + expect(err).to.not.exist() + peers = res + done() + }) + }) + + afterEach((done) => util.teardown(done)) + + beforeEach((done) => { + util.setupDHT((err, res) => { + expect(err).to.not.exist() + dht = res + done() + }) + }) + + it('errors when missing key', (done) => { + const msg = new Message(T, new Buffer(0), 0) + + handler(dht)(peers[0], msg, (err, response) => { + expect(err).to.match(/Invalid key/) + expect(response).to.not.exist() + done() + }) + }) + + it('responds with a local value', (done) => { + const key = new Buffer('hello') + const value = new Buffer('world') + const msg = new Message(T, key, 0) + + waterfall([ + (cb) => dht.put(key, value, cb), + (cb) => handler(dht)(peers[0], msg, cb) + ], (err, response) => { + expect(err).to.not.exist() + expect(response.record).to.exist() + expect(response.record.key).to.eql(key) + expect(response.record.value).to.eql(value) + done() + }) + }) + + it('responds with closerPeers returned from the dht', (done) => { + const key = new Buffer('hello') + const msg = new Message(T, key, 0) + const other = peers[1] + + waterfall([ + (cb) => dht._add(other, cb), + (cb) => handler(dht)(peers[0], msg, cb) + ], (err, response) => { + expect(err).to.not.exist() + expect(response.closerPeers).to.have.length(1) + expect( + response.closerPeers[0].id.toB58String() + ).to.be.eql(other.id.toB58String()) + done() + }) + }) + + describe('public key', () => { + it('self', (done) => { + const key = utils.keyForPublicKey(dht.self.id) + + const msg = new Message(T, key, 0) + + waterfall([ + (cb) => handler(dht)(peers[0], msg, cb) + ], (err, response) => { + expect(err).to.not.exist() + expect(response.record).to.exist() + expect(response.record.value).to.eql( + dht.self.id.pubKey.bytes + ) + done() + }) + }) + + it('other in peerstore', (done) => { + const other = peers[1] + const key = utils.keyForPublicKey(other.id) + + const msg = new Message(T, key, 0) + + waterfall([ + (cb) => dht._add(other, cb), + (cb) => handler(dht)(peers[0], msg, cb) + ], (err, response) => { + expect(err).to.not.exist() + expect(response.record).to.exist() + expect(response.record.value).to.eql( + other.id.pubKey.bytes + ) + done() + }) + }) + + it('other unkown', (done) => { + const other = peers[1] + const key = utils.keyForPublicKey(other.id) + + const msg = new Message(T, key, 0) + + waterfall([ + (cb) => handler(dht)(peers[0], msg, cb) + ], (err, response) => { + expect(err).to.not.exist() + expect(response.record).to.not.exist() + + done() + }) + }) + }) +}) diff --git a/test/rpc/handlers/ping.spec.js b/test/rpc/handlers/ping.spec.js new file mode 100644 index 00000000..bf49f311 --- /dev/null +++ b/test/rpc/handlers/ping.spec.js @@ -0,0 +1,46 @@ +/* eslint-env mocha */ +'use strict' + +const chai = require('chai') +chai.use(require('dirty-chai')) +const expect = chai.expect + +const Message = require('../../../src/message') +const handler = require('../../../src/rpc/handlers/ping') + +const util = require('../../util') + +const T = Message.TYPES.PING + +describe('rpc - handlers - Ping', () => { + let peers + let dht + + before((done) => { + util.makePeers(2, (err, res) => { + expect(err).to.not.exist() + peers = res + done() + }) + }) + + afterEach((done) => util.teardown(done)) + + beforeEach((done) => { + util.setupDHT((err, res) => { + expect(err).to.not.exist() + dht = res + done() + }) + }) + + it('replies with the same message', (done) => { + const msg = new Message(T, new Buffer('hello'), 5) + + handler(dht)(peers[0], msg, (err, response) => { + expect(err).to.not.exist() + expect(response).to.be.eql(msg) + done() + }) + }) +}) diff --git a/test/rpc/handlers/put-value.spec.js b/test/rpc/handlers/put-value.spec.js new file mode 100644 index 00000000..13856e7e --- /dev/null +++ b/test/rpc/handlers/put-value.spec.js @@ -0,0 +1,72 @@ +/* eslint-env mocha */ +/* eslint max-nested-callbacks: ["error", 8] */ +'use strict' + +const chai = require('chai') +chai.use(require('dirty-chai')) +const expect = chai.expect +const Record = require('libp2p-record').Record + +const Message = require('../../../src/message') +const handler = require('../../../src/rpc/handlers/put-value') +const utils = require('../../../src/utils') + +const util = require('../../util') + +const T = Message.TYPES.PUT_VALUE + +describe('rpc - handlers - PutValue', () => { + let peers + let dht + + before((done) => { + util.makePeers(2, (err, res) => { + expect(err).to.not.exist() + peers = res + done() + }) + }) + + afterEach((done) => util.teardown(done)) + + beforeEach((done) => { + util.setupDHT((err, res) => { + expect(err).to.not.exist() + dht = res + done() + }) + }) + + it('errors on missing record', (done) => { + const msg = new Message(T, new Buffer('hello'), 5) + handler(dht)(peers[0], msg, (err, response) => { + expect(err).to.match(/Empty record/) + done() + }) + }) + + it('stores the record in the datastore', (done) => { + const msg = new Message(T, new Buffer('hello'), 5) + const record = new Record(new Buffer('hello'), new Buffer('world'), peers[0].id) + msg.record = record + + handler(dht)(peers[1], msg, (err, response) => { + expect(err).to.not.exist() + expect(response).to.be.eql(msg) + + const key = utils.bufferToKey(new Buffer('hello')) + dht.datastore.get(key, (err, res) => { + expect(err).to.not.exist() + const rec = Record.deserialize(res) + + expect(rec).to.have.property('key').eql(new Buffer('hello')) + + // make sure some time has passed + setTimeout(() => { + expect(rec.timeReceived < new Date()).to.be.eql(true) + done() + }, 10) + }) + }) + }) +}) diff --git a/test/rpc/index.spec.js b/test/rpc/index.spec.js new file mode 100644 index 00000000..0661c3b9 --- /dev/null +++ b/test/rpc/index.spec.js @@ -0,0 +1,69 @@ +/* eslint-env mocha */ +'use strict' + +const chai = require('chai') +chai.use(require('dirty-chai')) +const expect = chai.expect +const pull = require('pull-stream') +const lp = require('pull-length-prefixed') +const Connection = require('interface-connection').Connection +const Libp2p = require('libp2p-ipfs-nodejs') + +const Message = require('../../src/message') +const Dht = require('../../src') +const rpc = require('../../src/rpc') + +const makePeers = require('../util').makePeers + +describe('rpc', () => { + let infos + + before((done) => { + makePeers(2, (err, peers) => { + if (err) { + return done(err) + } + + infos = peers + done() + }) + }) + + describe('protocolHandler', () => { + it('calls back with the response', (done) => { + const libp2p = new Libp2p(infos[0]) + const dht = new Dht(libp2p) + dht.peerBook.put(infos[1]) + + const msg = new Message(Message.TYPES.GET_VALUE, new Buffer('hello'), 5) + + const conn = makeConnection(msg, infos[1], (err, res) => { + expect(err).to.not.exist() + expect(res).to.have.length(1) + const msg = Message.deserialize(res[0]) + expect(msg).to.have.property('key').eql(new Buffer('hello')) + expect(msg).to.have.property('closerPeers').eql([]) + + done() + }) + + rpc(dht)('protocol', conn) + }) + }) +}) + +function makeConnection (msg, info, callback) { + const rawConn = { + source: pull( + pull.values([msg.serialize()]), + lp.encode() + ), + sink: pull( + lp.decode(), + pull.collect(callback) + ) + } + const conn = new Connection(rawConn) + conn.setPeerInfo(info) + return conn +} diff --git a/test/util.js b/test/util.js new file mode 100644 index 00000000..bf55af52 --- /dev/null +++ b/test/util.js @@ -0,0 +1,90 @@ +'use strict' + +const times = require('async/times') +const each = require('async/each') +const series = require('async/series') +const PeerId = require('peer-id') +const PeerInfo = require('peer-info') +const leftPad = require('left-pad') +const setImmediate = require('async/setImmediate') +const MemoryDatastore = require('interface-datastore').MemoryDatastore +const Libp2p = require('libp2p-ipfs-nodejs') +const multihashing = require('multihashing-async') +const crypto = require('libp2p-crypto') +const CID = require('cids') +const waterfall = require('async/waterfall') + +const KadDHT = require('../src') + +exports.makePeers = (n, callback) => { + times(n, (i, cb) => { + PeerId.create({bits: 1024}, cb) + }, (err, ids) => { + if (err) { + return callback(err) + } + callback(null, ids.map((i) => new PeerInfo(i))) + }) +} + +let nodes = [] +let i = 0 + +exports.setupDHT = (callback) => { + exports.makePeers(1, (err, peers) => { + if (err) { + return callback(err) + } + + const p = peers[0] + p.multiaddrs.add(`ip4/127.0.0.1/tcp/9${leftPad(i++, 3, 0)}`) + + const libp2p = new Libp2p(p, undefined, { mdns: false }) + const dht = new KadDHT(libp2p, 20, new MemoryDatastore()) + + dht.validators.v = { + func (key, publicKey, callback) { + setImmediate(callback) + }, + sign: false + } + + dht.selectors.v = (k, records) => 0 + + series([ + (cb) => libp2p.start(cb), + (cb) => dht.start(cb) + ], (err) => { + if (err) { + return callback(err) + } + nodes.push(dht) + callback(null, dht) + }) + }) +} + +exports.teardown = (callback) => { + each(nodes, (n, cb) => { + series([ + (cb) => n.stop(cb), + (cb) => n.libp2p.stop(cb) + ], cb) + }, (err) => { + // ignoring error, just shut it down + nodes = [] + i = 0 + callback(err) + }) +} + +exports.makeValues = (n, callback) => { + times(n, (i, cb) => { + const bytes = crypto.randomBytes(32) + + waterfall([ + (cb) => multihashing(bytes, 'sha2-256', cb), + (h, cb) => cb(null, {cid: new CID(h), value: bytes}) + ], cb) + }, callback) +} diff --git a/test/utils.spec.js b/test/utils.spec.js new file mode 100644 index 00000000..272a3081 --- /dev/null +++ b/test/utils.spec.js @@ -0,0 +1,138 @@ +/* eslint-env mocha */ +'use strict' + +const chai = require('chai') +chai.use(require('dirty-chai')) +const expect = chai.expect +const base32 = require('base32.js') +const PeerId = require('peer-id') +const distance = require('xor-distance') +const waterfall = require('async/waterfall') + +const utils = require('../src/utils') +const makePeers = require('./util').makePeers + +describe('utils', () => { + describe('bufferToKey', () => { + it('returns the base32 encoded key of the buffer', () => { + const buf = new Buffer('hello world') + + const key = utils.bufferToKey(buf) + + const enc = new base32.Encoder() + expect( + key.toString() + ).to.be.eql( + '/' + enc.write(buf).finalize() + ) + }) + }) + + describe('convertBuffer', () => { + it('returns the sha2-256 hash of the buffer', (done) => { + const buf = new Buffer('hello world') + + utils.convertBuffer(buf, (err, digest) => { + expect(err).to.not.exist() + + expect( + digest + ).to.be.eql( + new Buffer('b94d27b9934d3e08a52e52d7da7dabfac484efe37a5380ee9088f7ace2efcde9', 'hex') + ) + done() + }) + }) + }) + + describe('sortClosestPeers', () => { + it('sorts a list of PeerInfos', (done) => { + const rawIds = [ + '11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a31', + '11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a32', + '11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a33', + '11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a34' + ] + + const ids = rawIds.map((raw) => { + return new PeerId(new Buffer(raw)) + }) + + const input = [ + ids[2], + ids[1], + ids[3], + ids[0] + ] + + waterfall([ + (cb) => utils.convertPeerId(ids[0], cb), + (id, cb) => utils.sortClosestPeers(input, id, cb), + (out, cb) => { + expect( + out.map((m) => m.toB58String()) + ).to.be.eql([ + ids[0], + ids[3], + ids[2], + ids[1] + ].map((m) => m.toB58String())) + done() + } + ], done) + }) + }) + + describe('xorCompare', () => { + it('sorts two distances', () => { + const target = new Buffer('11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a90') + const a = { + distance: distance(new Buffer('11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a95'), target) + } + const b = { + distance: distance(new Buffer('11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a96'), target) + } + + expect(utils.xorCompare(a, b)).to.eql(-1) + expect(utils.xorCompare(b, a)).to.eql(1) + expect(utils.xorCompare(a, a)).to.eql(0) + }) + }) + + describe('keyForPublicKey', () => { + it('works', (done) => { + makePeers(1, (err, peers) => { + expect(err).to.not.exist() + + expect( + utils.keyForPublicKey(peers[0].id) + ).to.be.eql(Buffer.concat([ + new Buffer('/pk/'), + peers[0].id.id + ])) + done() + }) + }) + }) + + describe('fromPublicKeyKey', () => { + it('round trips', (done) => { + makePeers(50, (err, peers) => { + expect(err).to.not.exist() + + peers.forEach((p, i) => { + const id = p.id + expect( + utils.isPublicKeyKey(utils.keyForPublicKey(id)) + ).to.eql(true) + expect( + utils.fromPublicKeyKey(utils.keyForPublicKey(id)).id + ).to.eql( + id.id + ) + }) + done() + }) + }) + }) +})