From 8cc03cadd54dac7c10170fe06cd4daf81f21befb Mon Sep 17 00:00:00 2001 From: Vasco Santos Date: Fri, 4 Dec 2020 14:53:05 +0100 Subject: [PATCH 1/3] chore: update pubsub example by disabled emit self (#823) --- examples/pubsub/1.js | 1 + examples/pubsub/README.md | 19 ++++++++++++++----- examples/pubsub/message-filtering/1.js | 1 + examples/pubsub/message-filtering/README.md | 3 --- 4 files changed, 16 insertions(+), 8 deletions(-) diff --git a/examples/pubsub/1.js b/examples/pubsub/1.js index 8c6fdfdb91..419838960b 100644 --- a/examples/pubsub/1.js +++ b/examples/pubsub/1.js @@ -43,6 +43,7 @@ const createNode = async () => { }) await node1.pubsub.subscribe(topic) + // Will not receive own published messages by default node2.pubsub.on(topic, (msg) => { console.log(`node2 received: ${uint8ArrayToString(msg.data)}`) }) diff --git a/examples/pubsub/README.md b/examples/pubsub/README.md index 17a896f3a6..2016b2c813 100644 --- a/examples/pubsub/README.md +++ b/examples/pubsub/README.md @@ -44,7 +44,6 @@ const node2 = nodes[1] // Add node's 2 data to the PeerStore node1.peerStore.addressBook.set(node2.peerId, node2.multiaddrs) - await node1.dial(node2.peerId) node1.pubsub.on(topic, (msg) => { @@ -52,6 +51,7 @@ node1.pubsub.on(topic, (msg) => { }) await node1.pubsub.subscribe(topic) +// Will not receive own published messages by default node2.pubsub.on(topic, (msg) => { console.log(`node2 received: ${uint8ArrayToString(msg.data)}`) }) @@ -68,25 +68,34 @@ The output of the program should look like: ``` > node 1.js connected to QmWpvkKm6qHLhoxpWrTswY6UMNWDyn8hN265Qp9ZYvgS82 -node2 received: Bird bird bird, bird is the word! node1 received: Bird bird bird, bird is the word! -node2 received: Bird bird bird, bird is the word! node1 received: Bird bird bird, bird is the word! ``` -You can change the pubsub `emitSelf` option if you don't want the publishing node to receive its own messages. +You can change the pubsub `emitSelf` option if you want the publishing node to receive its own messages. ```JavaScript const defaults = { config: { pubsub: { enabled: true, - emitSelf: false + emitSelf: true } } } ``` +The output of the program should look like: + +``` +> node 1.js +connected to QmWpvkKm6qHLhoxpWrTswY6UMNWDyn8hN265Qp9ZYvgS82 +node1 received: Bird bird bird, bird is the word! +node2 received: Bird bird bird, bird is the word! +node1 received: Bird bird bird, bird is the word! +node2 received: Bird bird bird, bird is the word! +``` + ## 2. Future work libp2p/IPFS PubSub is enabling a whole set of Distributed Real Time applications using CRDT (Conflict-Free Replicated Data Types). It is still going through heavy research (and hacking) and we invite you to join the conversation at [research-CRDT](https://github.com/ipfs/research-CRDT). Here is a list of some of the exciting examples: diff --git a/examples/pubsub/message-filtering/1.js b/examples/pubsub/message-filtering/1.js index 85d7bcf8c4..4d8a2c1803 100644 --- a/examples/pubsub/message-filtering/1.js +++ b/examples/pubsub/message-filtering/1.js @@ -44,6 +44,7 @@ const createNode = async () => { //subscribe node1.pubsub.on(topic, (msg) => { + // Will not receive own published messages by default console.log(`node1 received: ${uint8ArrayToString(msg.data)}`) }) await node1.pubsub.subscribe(topic) diff --git a/examples/pubsub/message-filtering/README.md b/examples/pubsub/message-filtering/README.md index a9c0dad26d..df99043051 100644 --- a/examples/pubsub/message-filtering/README.md +++ b/examples/pubsub/message-filtering/README.md @@ -97,15 +97,12 @@ Result ``` > node 1.js ############## fruit banana ############## -node1 received: banana node2 received: banana node3 received: banana ############## fruit apple ############## -node1 received: apple node2 received: apple node3 received: apple ############## fruit car ############## -node1 received: car ############## fruit orange ############## node1 received: orange node2 received: orange From a044b5d0806a0064945bfafe059146b323722abc Mon Sep 17 00:00:00 2001 From: Vasco Santos Date: Wed, 9 Dec 2020 16:27:42 +0100 Subject: [PATCH 2/3] chore: auto relay configuration example with noise (#828) --- doc/CONFIGURATION.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/doc/CONFIGURATION.md b/doc/CONFIGURATION.md index e31eb870af..ff143ce74c 100644 --- a/doc/CONFIGURATION.md +++ b/doc/CONFIGURATION.md @@ -445,13 +445,13 @@ const node = await Libp2p.create({ const Libp2p = require('libp2p') const TCP = require('libp2p-tcp') const MPLEX = require('libp2p-mplex') -const SECIO = require('libp2p-secio') +const { NOISE } = require('libp2p-noise') const node = await Libp2p.create({ modules: { transport: [TCP], streamMuxer: [MPLEX], - connEncryption: [SECIO] + connEncryption: [NOISE] }, config: { relay: { // Circuit Relay options (this config is part of libp2p core configurations) From a279926a831fa6d37b883bd6ef028f913fceaf1f Mon Sep 17 00:00:00 2001 From: Vasco Santos Date: Thu, 10 Dec 2020 14:48:14 +0100 Subject: [PATCH 3/3] chore: add typedefs (#802) --- .github/workflows/main.yml | 67 ++++++ .travis.yml | 58 ----- package.json | 15 +- src/address-manager/index.js | 28 ++- src/circuit/auto-relay.js | 29 ++- src/circuit/circuit/hop.js | 55 ++++- src/circuit/circuit/stop.js | 28 ++- src/circuit/circuit/stream-handler.js | 25 +- src/circuit/circuit/utils.js | 7 +- src/circuit/index.js | 22 +- src/circuit/listener.js | 52 ++-- src/circuit/protocol/index.js | 2 + src/circuit/transport.js | 62 +++-- src/circuit/utils.js | 2 + src/connection-manager/index.js | 81 ++++--- src/connection-manager/latency-monitor.js | 19 +- .../visibility-change-emitter.js | 12 +- src/content-routing.js | 223 ++++++++++-------- src/dialer/dial-request.js | 39 +-- src/dialer/index.js | 79 ++++--- src/get-peer.js | 6 +- src/identify/consts.js | 1 + src/identify/index.js | 101 ++++---- src/index.js | 147 ++++++++---- src/insecure/plaintext.js | 18 +- src/keychain/cms.js | 7 +- src/keychain/index.js | 4 +- src/keychain/util.js | 1 + src/metrics/index.js | 37 ++- src/metrics/old-peers.js | 3 +- src/metrics/stats.js | 25 +- src/peer-routing.js | 12 +- src/peer-store/address-book.js | 118 +++++---- src/peer-store/book.js | 22 +- src/peer-store/index.js | 21 +- src/peer-store/key-book.js | 20 +- src/peer-store/metadata-book.js | 24 +- src/peer-store/persistent/index.js | 23 +- src/peer-store/proto-book.js | 24 +- src/ping/index.js | 21 +- src/pnet/crypto.js | 9 +- src/pnet/index.js | 22 +- src/pnet/key-generator.js | 2 + src/pubsub-adapter.js | 78 +++--- src/record/envelope/envelope.proto.js | 7 +- src/record/envelope/index.js | 24 +- src/record/peer-record/index.js | 32 ++- src/record/peer-record/peer-record.proto.js | 7 +- src/record/utils.js | 6 +- src/registrar.js | 22 +- src/transport-manager.js | 36 ++- src/types.ts | 84 +++++++ src/upgrader.js | 85 +++---- test/dialing/dial-request.spec.js | 2 +- test/identify/index.spec.js | 3 +- test/record/envelope.spec.js | 6 +- test/upgrading/upgrader.spec.js | 22 -- tsconfig.json | 9 + 58 files changed, 1238 insertions(+), 758 deletions(-) create mode 100644 .github/workflows/main.yml delete mode 100644 .travis.yml create mode 100644 src/types.ts create mode 100644 tsconfig.json diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml new file mode 100644 index 0000000000..ade1dcbcc0 --- /dev/null +++ b/.github/workflows/main.yml @@ -0,0 +1,67 @@ +name: ci +on: + push: + branches: + - master + pull_request: + branches: + - '**' + +jobs: + check: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + - run: yarn + - run: yarn lint + - uses: gozala/typescript-error-reporter-action@v1.0.8 + - run: yarn build + - run: yarn aegir dep-check + - uses: ipfs/aegir/actions/bundle-size@master + name: size + with: + github_token: ${{ secrets.GITHUB_TOKEN }} + test-node: + needs: check + runs-on: ${{ matrix.os }} + strategy: + matrix: + os: [windows-latest, ubuntu-latest, macos-latest] + node: [12, 14] + fail-fast: true + steps: + - uses: actions/checkout@v2 + - uses: actions/setup-node@v1 + with: + node-version: ${{ matrix.node }} + - run: yarn + - run: npx nyc --reporter=lcov aegir test -t node -- --bail + - uses: codecov/codecov-action@v1 + test-chrome: + needs: check + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + - run: yarn + - run: npx aegir test -t browser -t webworker --bail + test-firefox: + needs: check + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + - run: yarn + - run: npx aegir test -t browser -t webworker --bail -- --browsers FirefoxHeadless + test-interop: + needs: check + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + - run: yarn + - run: cd node_modules/interop-libp2p && yarn && LIBP2P_JS=${GITHUB_WORKSPACE}/src/index.js npx aegir test -t node --bail + test-auto-relay-example: + needs: check + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + - run: yarn + - run: cd examples && yarn && npm run test -- auto-relay diff --git a/.travis.yml b/.travis.yml deleted file mode 100644 index 47f1fcb07a..0000000000 --- a/.travis.yml +++ /dev/null @@ -1,58 +0,0 @@ -language: node_js -cache: npm -stages: - - check - - test - - cov - -node_js: - - 'lts/*' - - '14' - -os: - - linux - - osx - -script: npx nyc -s npm run test:node -- --bail -after_success: npx nyc report --reporter=text-lcov > coverage.lcov && npx codecov - -jobs: - include: - - stage: check - script: - - npx aegir build --bundlesize - # Remove pull libs once ping is async - - npx aegir dep-check -- -i pull-handshake -i pull-stream - - npm run lint - - - stage: test - name: chrome - addons: - chrome: stable - script: - - npx aegir test -t browser -t webworker - - - stage: test - name: firefox - addons: - firefox: latest - script: - - npx aegir test -t browser -t webworker -- --browsers FirefoxHeadless - - - stage: test - name: interop - script: - - cd node_modules/interop-libp2p - - npm install - - LIBP2P_JS=${TRAVIS_BUILD_DIR}/src/index.js npx aegir test -t node --bail - - - stage: test - if: type = pull_request - name: example - auto-relay - script: - - cd examples - - npm install - - npm run test -- auto-relay - -notifications: - email: false \ No newline at end of file diff --git a/package.json b/package.json index 34dcc4d260..2516b12ff6 100644 --- a/package.json +++ b/package.json @@ -4,6 +4,15 @@ "description": "JavaScript implementation of libp2p, a modular peer to peer network stack", "leadMaintainer": "Jacob Heun ", "main": "src/index.js", + "types": "dist/src/index.d.ts", + "typesVersions": { + "*": { + "src/*": [ + "dist/src/*", + "dist/src/*/index" + ] + } + }, "files": [ "dist", "src" @@ -53,7 +62,7 @@ "events": "^3.1.0", "hashlru": "^2.3.0", "interface-datastore": "^2.0.0", - "ipfs-utils": "^2.2.0", + "ipfs-utils": "^5.0.1", "it-all": "^1.0.1", "it-buffer": "^0.1.2", "it-handshake": "^1.0.1", @@ -61,7 +70,7 @@ "it-pipe": "^1.1.0", "it-protocol-buffers": "^0.2.0", "libp2p-crypto": "^0.18.0", - "libp2p-interfaces": "^0.7.2", + "libp2p-interfaces": "^0.8.0", "libp2p-utils": "^0.2.2", "mafmt": "^8.0.0", "merge-options": "^2.0.0", @@ -88,7 +97,7 @@ "devDependencies": { "@nodeutils/defaults-deep": "^1.1.0", "abortable-iterator": "^3.0.0", - "aegir": "^27.0.0", + "aegir": "^29.2.0", "chai-bytes": "^0.1.2", "chai-string": "^1.5.0", "delay": "^4.3.0", diff --git a/src/address-manager/index.js b/src/address-manager/index.js index 314f0a1ae6..5c9874af33 100644 --- a/src/address-manager/index.js +++ b/src/address-manager/index.js @@ -1,23 +1,25 @@ 'use strict' -const debug = require('debug') -const log = debug('libp2p:addresses') -log.error = debug('libp2p:addresses:error') - const multiaddr = require('multiaddr') /** - * Responsible for managing the peer addresses. - * Peers can specify their listen and announce addresses. - * The listen addresses will be used by the libp2p transports to listen for new connections, - * while the announce addresses will be used for the peer addresses' to other peers in the network. + * @typedef {import('multiaddr')} Multiaddr + */ + +/** + * @typedef {Object} AddressManagerOptions + * @property {string[]} [listen = []] - list of multiaddrs string representation to listen. + * @property {string[]} [announce = []] - list of multiaddrs string representation to announce. */ class AddressManager { /** + * Responsible for managing the peer addresses. + * Peers can specify their listen and announce addresses. + * The listen addresses will be used by the libp2p transports to listen for new connections, + * while the announce addresses will be used for the peer addresses' to other peers in the network. + * * @class - * @param {object} [options] - * @param {Array} [options.listen = []] - list of multiaddrs string representation to listen. - * @param {Array} [options.announce = []] - list of multiaddrs string representation to announce. + * @param {AddressManagerOptions} [options] */ constructor ({ listen = [], announce = [] } = {}) { this.listen = new Set(listen) @@ -27,7 +29,7 @@ class AddressManager { /** * Get peer listen multiaddrs. * - * @returns {Array} + * @returns {Multiaddr[]} */ getListenAddrs () { return Array.from(this.listen).map((a) => multiaddr(a)) @@ -36,7 +38,7 @@ class AddressManager { /** * Get peer announcing multiaddrs. * - * @returns {Array} + * @returns {Multiaddr[]} */ getAnnounceAddrs () { return Array.from(this.announce).map((a) => multiaddr(a)) diff --git a/src/circuit/auto-relay.js b/src/circuit/auto-relay.js index 6b9a40392d..122ac979fb 100644 --- a/src/circuit/auto-relay.js +++ b/src/circuit/auto-relay.js @@ -1,8 +1,9 @@ 'use strict' const debug = require('debug') -const log = debug('libp2p:auto-relay') -log.error = debug('libp2p:auto-relay:error') +const log = Object.assign(debug('libp2p:auto-relay'), { + error: debug('libp2p:auto-relay:err') +}) const uint8ArrayFromString = require('uint8arrays/from-string') const uint8ArrayToString = require('uint8arrays/to-string') @@ -19,14 +20,25 @@ const { RELAY_RENDEZVOUS_NS } = require('./constants') +/** + * @typedef {import('libp2p-interfaces/src/connection').Connection} Connection + * @typedef {import('../peer-store/address-book').Address} Address + */ + +/** + * @typedef {Object} AutoRelayProperties + * @property {import('../')} libp2p + * + * @typedef {Object} AutoRelayOptions + * @property {number} [maxListeners = 1] - maximum number of relays to listen. + */ + class AutoRelay { /** * Creates an instance of AutoRelay. * * @class - * @param {object} props - * @param {Libp2p} props.libp2p - * @param {number} [props.maxListeners = 1] - maximum number of relays to listen. + * @param {AutoRelayProperties & AutoRelayOptions} props */ constructor ({ libp2p, maxListeners = 1 }) { this._libp2p = libp2p @@ -58,7 +70,7 @@ class AutoRelay { * * @param {Object} props * @param {PeerId} props.peerId - * @param {Array} props.protocols + * @param {string[]} props.protocols * @returns {Promise} */ async _onProtocolChange ({ peerId, protocols }) { @@ -78,6 +90,9 @@ class AutoRelay { // If protocol, check if can hop, store info in the metadataBook and listen on it try { const connection = this._connectionManager.get(peerId) + if (!connection) { + return + } // Do not hop on a relayed connection if (connection.remoteAddr.protoCodes().includes(CIRCUIT_PROTO_CODE)) { @@ -171,7 +186,7 @@ class AutoRelay { * 2. Dial and try to listen on the peers we know that support hop but are not connected. * 3. Search the network. * - * @param {Array} [peersToIgnore] + * @param {string[]} [peersToIgnore] * @returns {Promise} */ async _listenOnAvailableHopRelays (peersToIgnore = []) { diff --git a/src/circuit/circuit/hop.js b/src/circuit/circuit/hop.js index c653a7c9ae..8e23e6a894 100644 --- a/src/circuit/circuit/hop.js +++ b/src/circuit/circuit/hop.js @@ -1,22 +1,42 @@ 'use strict' const debug = require('debug') -const log = debug('libp2p:circuit:hop') -log.error = debug('libp2p:circuit:hop:error') +const log = Object.assign(debug('libp2p:circuit:hop'), { + error: debug('libp2p:circuit:hop:err') +}) +const errCode = require('err-code') const PeerId = require('peer-id') const { validateAddrs } = require('./utils') const StreamHandler = require('./stream-handler') const { CircuitRelay: CircuitPB } = require('../protocol') -const pipe = require('it-pipe') -const errCode = require('err-code') +const { pipe } = require('it-pipe') const { codes: Errors } = require('../../errors') const { stop } = require('./stop') const multicodec = require('./../multicodec') -module.exports.handleHop = async function handleHop ({ +/** + * @typedef {import('../../types').CircuitRequest} CircuitRequest + * @typedef {import('libp2p-interfaces/src/connection').Connection} Connection + * @typedef {import('./stream-handler')} StreamHandlerT + * @typedef {import('../transport')} Transport + */ + +/** + * @typedef {Object} HopRequest + * @property {Connection} connection + * @property {CircuitRequest} request + * @property {StreamHandlerT} streamHandler + * @property {Transport} circuit + */ + +/** + * @param {HopRequest} options + * @returns {Promise} + */ +async function handleHop ({ connection, request, streamHandler, @@ -51,6 +71,9 @@ module.exports.handleHop = async function handleHop ({ } // TODO: Handle being an active relay + if (!destinationConnection) { + return + } // Handle the incoming HOP request by performing a STOP request const stopRequest = { @@ -63,8 +86,7 @@ module.exports.handleHop = async function handleHop ({ try { destinationStream = await stop({ connection: destinationConnection, - request: stopRequest, - circuit + request: stopRequest }) } catch (err) { return log.error(err) @@ -91,10 +113,10 @@ module.exports.handleHop = async function handleHop ({ * * @param {object} options * @param {Connection} options.connection - Connection to the relay - * @param {*} options.request + * @param {CircuitRequest} options.request * @returns {Promise} */ -module.exports.hop = async function hop ({ +async function hop ({ connection, request }) { @@ -123,7 +145,7 @@ module.exports.hop = async function hop ({ * @param {Connection} options.connection - Connection to the relay * @returns {Promise} */ -module.exports.canHop = async function canHop ({ +async function canHop ({ connection }) { // Create a new stream to the relay @@ -149,11 +171,11 @@ module.exports.canHop = async function canHop ({ * * @param {Object} options * @param {Connection} options.connection - * @param {StreamHandler} options.streamHandler - * @param {Circuit} options.circuit + * @param {StreamHandlerT} options.streamHandler + * @param {Transport} options.circuit * @private */ -module.exports.handleCanHop = function handleCanHop ({ +function handleCanHop ({ connection, streamHandler, circuit @@ -165,3 +187,10 @@ module.exports.handleCanHop = function handleCanHop ({ code: canHop ? CircuitPB.Status.SUCCESS : CircuitPB.Status.HOP_CANT_SPEAK_RELAY }) } + +module.exports = { + handleHop, + hop, + canHop, + handleCanHop +} diff --git a/src/circuit/circuit/stop.js b/src/circuit/circuit/stop.js index 77eaa1fcc2..111b811dc2 100644 --- a/src/circuit/circuit/stop.js +++ b/src/circuit/circuit/stop.js @@ -1,23 +1,31 @@ 'use strict' +const debug = require('debug') +const log = Object.assign(debug('libp2p:circuit:stop'), { + error: debug('libp2p:circuit:stop:err') +}) + const { CircuitRelay: CircuitPB } = require('../protocol') const multicodec = require('../multicodec') const StreamHandler = require('./stream-handler') const { validateAddrs } = require('./utils') -const debug = require('debug') -const log = debug('libp2p:circuit:stop') -log.error = debug('libp2p:circuit:stop:error') +/** + * @typedef {import('libp2p-interfaces/src/connection').Connection} Connection + * @typedef {import('libp2p-interfaces/src/stream-muxer/types').MuxedStream} MuxedStream + * @typedef {import('../../types').CircuitRequest} CircuitRequest + * @typedef {import('./stream-handler')} StreamHandlerT + */ /** * Handles incoming STOP requests * * @private - * @param {*} options + * @param {Object} options * @param {Connection} options.connection - * @param {*} options.request - The CircuitRelay protobuf request (unencoded) - * @param {StreamHandler} options.streamHandler - * @returns {Promise<*>} Resolves a duplex iterable + * @param {CircuitRequest} options.request - The CircuitRelay protobuf request (unencoded) + * @param {StreamHandlerT} options.streamHandler + * @returns {Promise|void} Resolves a duplex iterable */ module.exports.handleStop = function handleStop ({ connection, @@ -44,10 +52,10 @@ module.exports.handleStop = function handleStop ({ * Creates a STOP request * * @private - * @param {*} options + * @param {Object} options * @param {Connection} options.connection - * @param {*} options.request - The CircuitRelay protobuf request (unencoded) - * @returns {Promise<*>} Resolves a duplex iterable + * @param {CircuitRequest} options.request - The CircuitRelay protobuf request (unencoded) + * @returns {Promise} Resolves a duplex iterable */ module.exports.stop = async function stop ({ connection, diff --git a/src/circuit/circuit/stream-handler.js b/src/circuit/circuit/stream-handler.js index 8b8ecf89bc..5be2c6edf5 100644 --- a/src/circuit/circuit/stream-handler.js +++ b/src/circuit/circuit/stream-handler.js @@ -1,20 +1,29 @@ 'use strict' +const debug = require('debug') +const log = Object.assign(debug('libp2p:circuit:stream-handler'), { + error: debug('libp2p:circuit:stream-handler:err') +}) + const lp = require('it-length-prefixed') const handshake = require('it-handshake') const { CircuitRelay: CircuitPB } = require('../protocol') -const debug = require('debug') -const log = debug('libp2p:circuit:stream-handler') -log.error = debug('libp2p:circuit:stream-handler:error') +/** + * @typedef {import('libp2p-interfaces/src/stream-muxer/types').MuxedStream} MuxedStream + */ +/** + * @template T + */ class StreamHandler { /** * Create a stream handler for connection * + * @class * @param {object} options - * @param {*} options.stream - A duplex iterable - * @param {number} options.maxLength - max bytes length of message + * @param {MuxedStream} options.stream - A duplex iterable + * @param {number} [options.maxLength = 4096] - max bytes length of message */ constructor ({ stream, maxLength = 4096 }) { this.stream = stream @@ -27,7 +36,7 @@ class StreamHandler { * Read and decode message * * @async - * @returns {void} + * @returns {Promise} */ async read () { const msg = await this.decoder.next() @@ -45,10 +54,12 @@ class StreamHandler { /** * Encode and write array of buffers * - * @param {*} msg - An unencoded CircuitRelay protobuf message + * @param {CircuitPB} msg - An unencoded CircuitRelay protobuf message + * @returns {void} */ write (msg) { log('write message type %s', msg.type) + // @ts-ignore lp.encode expects type type 'Buffer | BufferList', not 'Uint8Array' this.shake.write(lp.encode.single(CircuitPB.encode(msg))) } diff --git a/src/circuit/circuit/utils.js b/src/circuit/circuit/utils.js index be7ab35a73..65c5afe47d 100644 --- a/src/circuit/circuit/utils.js +++ b/src/circuit/circuit/utils.js @@ -3,11 +3,16 @@ const multiaddr = require('multiaddr') const { CircuitRelay } = require('../protocol') +/** + * @typedef {import('./stream-handler')} StreamHandler + * @typedef {import('../../types').CircuitStatus} CircuitStatus + */ + /** * Write a response * * @param {StreamHandler} streamHandler - * @param {CircuitRelay.Status} status + * @param {CircuitStatus} status */ function writeResponse (streamHandler, status) { streamHandler.write({ diff --git a/src/circuit/index.js b/src/circuit/index.js index 9e556f7f18..447d829ac5 100644 --- a/src/circuit/index.js +++ b/src/circuit/index.js @@ -1,8 +1,9 @@ 'use strict' const debug = require('debug') -const log = debug('libp2p:relay') -log.error = debug('libp2p:relay:error') +const log = Object.assign(debug('libp2p:relay'), { + error: debug('libp2p:relay:err') +}) const { setDelayedInterval, @@ -17,6 +18,23 @@ const { RELAY_RENDEZVOUS_NS } = require('./constants') +/** + * @typedef {import('../')} Libp2p + * + * @typedef {Object} RelayAdvertiseOptions + * @property {number} [bootDelay = ADVERTISE_BOOT_DELAY] + * @property {boolean} [enabled = true] + * @property {number} [ttl = ADVERTISE_TTL] + * + * @typedef {Object} HopOptions + * @property {boolean} [enabled = false] + * @property {boolean} [active = false] + * + * @typedef {Object} AutoRelayOptions + * @property {number} [maxListeners = 2] - maximum number of relays to listen. + * @property {boolean} [enabled = false] + */ + class Relay { /** * Creates an instance of Relay. diff --git a/src/circuit/listener.js b/src/circuit/listener.js index 02e371fb8b..d19cca5e46 100644 --- a/src/circuit/listener.js +++ b/src/circuit/listener.js @@ -1,37 +1,27 @@ 'use strict' -const EventEmitter = require('events') +const { EventEmitter } = require('events') const multiaddr = require('multiaddr') -const debug = require('debug') -const log = debug('libp2p:circuit:listener') -log.err = debug('libp2p:circuit:error:listener') +/** + * @typedef {import('multiaddr')} Multiaddr + * @typedef {import('libp2p-interfaces/src/transport/types').Listener} Listener + */ /** - * @param {Libp2p} libp2p + * @param {import('../')} libp2p * @returns {Listener} a transport listener */ module.exports = (libp2p) => { - const listener = new EventEmitter() const listeningAddrs = new Map() - // Remove listeningAddrs when a peer disconnects - libp2p.connectionManager.on('peer:disconnect', (connection) => { - const deleted = listeningAddrs.delete(connection.remotePeer.toB58String()) - - if (deleted) { - // Announce listen addresses change - listener.emit('close') - } - }) - /** * Add swarm handler and listen for incoming connections * * @param {Multiaddr} addr - * @returns {void} + * @returns {Promise} */ - listener.listen = async (addr) => { + async function listen (addr) { const addrString = String(addr).split('/p2p-circuit').find(a => a !== '') const relayConn = await libp2p.dial(multiaddr(addrString)) @@ -41,13 +31,6 @@ module.exports = (libp2p) => { listener.emit('listening') } - /** - * TODO: Remove the peers from our topology - * - * @returns {void} - */ - listener.close = () => {} - /** * Get fixed up multiaddrs * @@ -64,7 +47,7 @@ module.exports = (libp2p) => { * * @returns {Multiaddr[]} */ - listener.getAddrs = () => { + function getAddrs () { const addrs = [] for (const addr of listeningAddrs.values()) { addrs.push(addr) @@ -72,5 +55,22 @@ module.exports = (libp2p) => { return addrs } + /** @type Listener */ + const listener = Object.assign(new EventEmitter(), { + close: () => Promise.resolve(), + listen, + getAddrs + }) + + // Remove listeningAddrs when a peer disconnects + libp2p.connectionManager.on('peer:disconnect', (connection) => { + const deleted = listeningAddrs.delete(connection.remotePeer.toB58String()) + + if (deleted) { + // Announce listen addresses change + listener.emit('close') + } + }) + return listener } diff --git a/src/circuit/protocol/index.js b/src/circuit/protocol/index.js index f217cb4262..a9d3e31a6f 100644 --- a/src/circuit/protocol/index.js +++ b/src/circuit/protocol/index.js @@ -1,5 +1,7 @@ 'use strict' const protobuf = require('protons') + +/** @type {{CircuitRelay: import('../../types').CircuitMessageProto}} */ module.exports = protobuf(` message CircuitRelay { diff --git a/src/circuit/transport.js b/src/circuit/transport.js index cc79870564..fc2ddad4f0 100644 --- a/src/circuit/transport.js +++ b/src/circuit/transport.js @@ -1,13 +1,13 @@ 'use strict' const debug = require('debug') -const log = debug('libp2p:circuit') -log.error = debug('libp2p:circuit:error') +const log = Object.assign(debug('libp2p:circuit'), { + error: debug('libp2p:circuit:err') +}) const mafmt = require('mafmt') const multiaddr = require('multiaddr') const PeerId = require('peer-id') -const withIs = require('class-is') const { CircuitRelay: CircuitPB } = require('./protocol') const toConnection = require('libp2p-utils/src/stream-to-ma-conn') @@ -18,14 +18,23 @@ const { handleCanHop, handleHop, hop } = require('./circuit/hop') const { handleStop } = require('./circuit/stop') const StreamHandler = require('./circuit/stream-handler') +const transportSymbol = Symbol.for('@libp2p/js-libp2p-circuit/circuit') + +/** + * @typedef {import('multiaddr')} Multiaddr + * @typedef {import('libp2p-interfaces/src/connection').Connection} Connection + * @typedef {import('libp2p-interfaces/src/stream-muxer/types').MuxedStream} MuxedStream + * @typedef {import('../types').CircuitRequest} CircuitRequest + */ + class Circuit { /** * Creates an instance of the Circuit Transport. * * @class * @param {object} options - * @param {Libp2p} options.libp2p - * @param {Upgrader} options.upgrader + * @param {import('../')} options.libp2p + * @param {import('../upgrader')} options.upgrader */ constructor ({ libp2p, upgrader }) { this._dialer = libp2p.dialer @@ -39,7 +48,13 @@ class Circuit { this._registrar.handle(multicodec, this._onProtocol.bind(this)) } + /** + * @param {Object} props + * @param {Connection} props.connection + * @param {MuxedStream} props.stream + */ async _onProtocol ({ connection, stream }) { + /** @type {import('./circuit/stream-handler')} */ const streamHandler = new StreamHandler({ stream }) const request = await streamHandler.read() @@ -71,8 +86,7 @@ class Circuit { virtualConnection = await handleStop({ connection, request, - streamHandler, - circuit + streamHandler }) break } @@ -89,7 +103,7 @@ class Circuit { remoteAddr, localAddr }) - const type = CircuitPB.Type === CircuitPB.Type.HOP ? 'relay' : 'inbound' + const type = request.type === CircuitPB.Type.HOP ? 'relay' : 'inbound' log('new %s connection %s', type, maConn.remoteAddr) const conn = await this._upgrader.upgradeInbound(maConn) @@ -101,10 +115,10 @@ class Circuit { /** * Dial a peer over a relay * - * @param {multiaddr} ma - the multiaddr of the peer to dial + * @param {Multiaddr} ma - the multiaddr of the peer to dial * @param {Object} options - dial options * @param {AbortSignal} [options.signal] - An optional abort signal - * @returns {Connection} - the connection + * @returns {Promise} - the connection */ async dial (ma, options) { // Check the multiaddr to see if it contains a relay and a destination peer @@ -124,7 +138,6 @@ class Circuit { try { const virtualConnection = await hop({ connection: relayConnection, - circuit: this, request: { type: CircuitPB.Type.HOP, srcPeer: { @@ -159,7 +172,7 @@ class Circuit { * * @param {any} options * @param {Function} handler - * @returns {listener} + * @returns {import('libp2p-interfaces/src/transport/types').Listener} */ createListener (options, handler) { if (typeof options === 'function') { @@ -170,14 +183,14 @@ class Circuit { // Called on successful HOP and STOP requests this.handler = handler - return createListener(this._libp2p, options) + return createListener(this._libp2p) } /** * Filter check for all Multiaddrs that this transport can dial on * - * @param {Array} multiaddrs - * @returns {Array} + * @param {Multiaddr[]} multiaddrs + * @returns {Multiaddr[]} */ filter (multiaddrs) { multiaddrs = Array.isArray(multiaddrs) ? multiaddrs : [multiaddrs] @@ -186,9 +199,20 @@ class Circuit { return mafmt.Circuit.matches(ma) }) } + + get [Symbol.toStringTag] () { + return 'Circuit' + } + + /** + * Checks if the given value is a Transport instance. + * + * @param {any} other + * @returns {other is Transport} + */ + static isTransport (other) { + return Boolean(other && other[transportSymbol]) + } } -/** - * @type {Circuit} - */ -module.exports = withIs(Circuit, { className: 'Circuit', symbolName: '@libp2p/js-libp2p-circuit/circuit' }) +module.exports = Circuit diff --git a/src/circuit/utils.js b/src/circuit/utils.js index 18b61eafbb..f75e13386a 100644 --- a/src/circuit/utils.js +++ b/src/circuit/utils.js @@ -3,6 +3,8 @@ const CID = require('cids') const multihashing = require('multihashing-async') +const TextEncoder = require('ipfs-utils/src/text-encoder') + /** * Convert a namespace string into a cid. * diff --git a/src/connection-manager/index.js b/src/connection-manager/index.js index 1b1d807cb5..4add4840e1 100644 --- a/src/connection-manager/index.js +++ b/src/connection-manager/index.js @@ -1,8 +1,9 @@ 'use strict' const debug = require('debug') -const log = debug('libp2p:connection-manager') -log.error = debug('libp2p:connection-manager:error') +const log = Object.assign(debug('libp2p:connection-manager'), { + error: debug('libp2p:connection-manager:err') +}) const errcode = require('err-code') const mergeOptions = require('merge-options') @@ -14,7 +15,7 @@ const { EventEmitter } = require('events') const PeerId = require('peer-id') const { - ERR_INVALID_PARAMETERS + codes: { ERR_INVALID_PARAMETERS } } = require('../errors') const defaultOptions = { @@ -31,29 +32,39 @@ const defaultOptions = { } /** - * Responsible for managing known connections. + * @typedef {import('../')} Libp2p + * @typedef {import('libp2p-interfaces/src/connection').Connection} Connection + */ + +/** + * @typedef {Object} ConnectionManagerOptions + * @property {number} [maxConnections = Infinity] - The maximum number of connections allowed. + * @property {number} [minConnections = 0] - The minimum number of connections to avoid pruning. + * @property {number} [maxData = Infinity] - The max data (in and out), per average interval to allow. + * @property {number} [maxSentData = Infinity] - The max outgoing data, per average interval to allow. + * @property {number} [maxReceivedData = Infinity] - The max incoming data, per average interval to allow. + * @property {number} [maxEventLoopDelay = Infinity] - The upper limit the event loop can take to run. + * @property {number} [pollInterval = 2000] - How often, in milliseconds, metrics and latency should be checked. + * @property {number} [movingAverageInterval = 60000] - How often, in milliseconds, to compute averages. + * @property {number} [defaultPeerValue = 1] - The value of the peer. + * @property {boolean} [autoDial = true] - Should preemptively guarantee connections are above the low watermark. + * @property {number} [autoDialInterval = 10000] - How often, in milliseconds, it should preemptively guarantee connections are above the low watermark. + */ + +/** * * @fires ConnectionManager#peer:connect Emitted when a new peer is connected. * @fires ConnectionManager#peer:disconnect Emitted when a peer is disconnected. */ class ConnectionManager extends EventEmitter { /** + * Responsible for managing known connections. + * * @class * @param {Libp2p} libp2p - * @param {object} options - * @param {number} options.maxConnections - The maximum number of connections allowed. Default=Infinity - * @param {number} options.minConnections - The minimum number of connections to avoid pruning. Default=0 - * @param {number} options.maxData - The max data (in and out), per average interval to allow. Default=Infinity - * @param {number} options.maxSentData - The max outgoing data, per average interval to allow. Default=Infinity - * @param {number} options.maxReceivedData - The max incoming data, per average interval to allow.. Default=Infinity - * @param {number} options.maxEventLoopDelay - The upper limit the event loop can take to run. Default=Infinity - * @param {number} options.pollInterval - How often, in milliseconds, metrics and latency should be checked. Default=2000 - * @param {number} options.movingAverageInterval - How often, in milliseconds, to compute averages. Default=60000 - * @param {number} options.defaultPeerValue - The value of the peer. Default=1 - * @param {boolean} options.autoDial - Should preemptively guarantee connections are above the low watermark. Default=true - * @param {number} options.autoDialInterval - How often, in milliseconds, it should preemptively guarantee connections are above the low watermark. Default=10000 + * @param {ConnectionManagerOptions} options */ - constructor (libp2p, options) { + constructor (libp2p, options = {}) { super() this._libp2p = libp2p @@ -66,8 +77,6 @@ class ConnectionManager extends EventEmitter { log('options: %j', this._options) - this._libp2p = libp2p - /** * Map of peer identifiers to their peer value for pruning connections. * @@ -78,7 +87,7 @@ class ConnectionManager extends EventEmitter { /** * Map of connections per peer * - * @type {Map>} + * @type {Map} */ this.connections = new Map() @@ -159,15 +168,13 @@ class ConnectionManager extends EventEmitter { * * @param {PeerId} peerId * @param {number} value - A number between 0 and 1 + * @returns {void} */ setPeerValue (peerId, value) { if (value < 0 || value > 1) { throw new Error('value should be a number between 0 and 1') } - if (peerId.toB58String) { - peerId = peerId.toB58String() - } - this._peerValues.set(peerId, value) + this._peerValues.set(peerId.toB58String(), value) } /** @@ -177,21 +184,24 @@ class ConnectionManager extends EventEmitter { * @private */ _checkMetrics () { - const movingAverages = this._libp2p.metrics.global.movingAverages - const received = movingAverages.dataReceived[this._options.movingAverageInterval].movingAverage() - this._checkMaxLimit('maxReceivedData', received) - const sent = movingAverages.dataSent[this._options.movingAverageInterval].movingAverage() - this._checkMaxLimit('maxSentData', sent) - const total = received + sent - this._checkMaxLimit('maxData', total) - log('metrics update', total) - this._timer = retimer(this._checkMetrics, this._options.pollInterval) + if (this._libp2p.metrics) { + const movingAverages = this._libp2p.metrics.global.movingAverages + const received = movingAverages.dataReceived[this._options.movingAverageInterval].movingAverage() + this._checkMaxLimit('maxReceivedData', received) + const sent = movingAverages.dataSent[this._options.movingAverageInterval].movingAverage() + this._checkMaxLimit('maxSentData', sent) + const total = received + sent + this._checkMaxLimit('maxData', total) + log('metrics update', total) + this._timer = retimer(this._checkMetrics, this._options.pollInterval) + } } /** * Tracks the incoming connection and check the connection limit * * @param {Connection} connection + * @returns {void} */ onConnect (connection) { const peerId = connection.remotePeer @@ -218,6 +228,7 @@ class ConnectionManager extends EventEmitter { * Removes the connection from tracking * * @param {Connection} connection + * @returns {void} */ onDisconnect (connection) { const peerId = connection.remotePeer.toB58String() @@ -237,7 +248,7 @@ class ConnectionManager extends EventEmitter { * Get a connection with a peer. * * @param {PeerId} peerId - * @returns {Connection} + * @returns {Connection|null} */ get (peerId) { const connections = this.getAll(peerId) @@ -251,7 +262,7 @@ class ConnectionManager extends EventEmitter { * Get all open connections with a peer. * * @param {PeerId} peerId - * @returns {Array} + * @returns {Connection[]} */ getAll (peerId) { if (!PeerId.isPeerId(peerId)) { diff --git a/src/connection-manager/latency-monitor.js b/src/connection-manager/latency-monitor.js index db299890b4..c9301ee142 100644 --- a/src/connection-manager/latency-monitor.js +++ b/src/connection-manager/latency-monitor.js @@ -1,3 +1,4 @@ +// @ts-nocheck 'use strict' /** @@ -6,7 +7,7 @@ /* global window */ const globalThis = require('ipfs-utils/src/globalthis') -const EventEmitter = require('events') +const { EventEmitter } = require('events') const VisibilityChangeEmitter = require('./visibility-change-emitter') const debug = require('debug')('latency-monitor:LatencyMonitor') @@ -17,6 +18,12 @@ const debug = require('debug')('latency-monitor:LatencyMonitor') * @property {number} maxMS What was the max time for a cb to be called * @property {number} avgMs What was the average time for a cb to be called * @property {number} lengthMs How long this interval was in ms + * + * @typedef {Object} LatencyMonitorOptions + * @property {number} [latencyCheckIntervalMs=500] - How often to add a latency check event (ms) + * @property {number} [dataEmitIntervalMs=5000] - How often to summarize latency check events. null or 0 disables event firing + * @property {Function} [asyncTestFn] - What cb-style async function to use + * @property {number} [latencyRandomPercentage=5] - What percent (+/-) of latencyCheckIntervalMs should we randomly use? This helps avoid alignment to other events. */ /** @@ -24,6 +31,8 @@ const debug = require('debug')('latency-monitor:LatencyMonitor') * the asyncTestFn and timing how long it takes the callback to be called. It can also periodically emit stats about this. * This can be disabled and stats can be pulled via setting dataEmitIntervalMs = 0. * + * @extends {EventEmitter} + * * The default implementation is an event loop latency monitor. This works by firing periodic events into the event loop * and timing how long it takes to get back. * @@ -37,11 +46,8 @@ const debug = require('debug')('latency-monitor:LatencyMonitor') */ class LatencyMonitor extends EventEmitter { /** - * @param {object} [options] - * @param {number} [options.latencyCheckIntervalMs=500] - How often to add a latency check event (ms) - * @param {number} [options.dataEmitIntervalMs=5000] - How often to summarize latency check events. null or 0 disables event firing - * @param {Function} [options.asyncTestFn] - What cb-style async function to use - * @param {number} [options.latencyRandomPercentage=5] - What percent (+/-) of latencyCheckIntervalMs should we randomly use? This helps avoid alignment to other events. + * @class + * @param {LatencyMonitorOptions} [options] */ constructor ({ latencyCheckIntervalMs, dataEmitIntervalMs, asyncTestFn, latencyRandomPercentage } = {}) { super() @@ -91,6 +97,7 @@ class LatencyMonitor extends EventEmitter { // See: http://stackoverflow.com/questions/6032429/chrome-timeouts-interval-suspended-in-background-tabs if (isBrowser()) { that._visibilityChangeEmitter = new VisibilityChangeEmitter() + that._visibilityChangeEmitter.on('visibilityChange', (pageInFocus) => { if (pageInFocus) { that._startTimers() diff --git a/src/connection-manager/visibility-change-emitter.js b/src/connection-manager/visibility-change-emitter.js index baece0ec17..ebe5e7d076 100644 --- a/src/connection-manager/visibility-change-emitter.js +++ b/src/connection-manager/visibility-change-emitter.js @@ -1,10 +1,12 @@ +// @ts-nocheck /* global document */ /** * This code is based on `latency-monitor` (https://github.com/mlucool/latency-monitor) by `mlucool` (https://github.com/mlucool), available under Apache License 2.0 (https://github.com/mlucool/latency-monitor/blob/master/LICENSE) */ 'use strict' -const EventEmitter = require('events') + +const { EventEmitter } = require('events') const debug = require('debug')('latency-monitor:VisibilityChangeEmitter') @@ -29,12 +31,12 @@ const debug = require('debug')('latency-monitor:VisibilityChangeEmitter') * }); * // To access the visibility state directly, call: * console.log('Am I focused now? ' + myVisibilityEmitter.isVisible()); - * - * @class VisibilityChangeEmitter */ -module.exports = class VisibilityChangeEmitter extends EventEmitter { +class VisibilityChangeEmitter extends EventEmitter { /** * Creates a VisibilityChangeEmitter + * + * @class */ constructor () { super() @@ -119,3 +121,5 @@ module.exports = class VisibilityChangeEmitter extends EventEmitter { this.emit('visibilityChange', visible) } } + +module.exports = VisibilityChangeEmitter diff --git a/src/content-routing.js b/src/content-routing.js index d7e160e79e..cba4a38bce 100644 --- a/src/content-routing.js +++ b/src/content-routing.js @@ -6,111 +6,130 @@ const { messages, codes } = require('./errors') const all = require('it-all') const pAny = require('p-any') -module.exports = (node) => { - const routers = node._modules.contentRouting || [] - const dht = node._dht +/** + * @typedef {import('peer-id')} PeerId + * @typedef {import('multiaddr')} Multiaddr + * @typedef {import('cids')} CID + */ + +/** + * @typedef {Object} GetData + * @property {PeerId} from + * @property {Uint8Array} val + */ + +class ContentRouting { + /** + * @class + * @param {import('./')} libp2p + */ + constructor (libp2p) { + this.libp2p = libp2p + this.routers = libp2p._modules.contentRouting || [] + this.dht = libp2p._dht + + // If we have the dht, make it first + if (this.dht) { + this.routers.unshift(this.dht) + } + } + + /** + * Iterates over all content routers in series to find providers of the given key. + * Once a content router succeeds, iteration will stop. + * + * @param {CID} key - The CID key of the content to find + * @param {object} [options] + * @param {number} [options.timeout] - How long the query should run + * @param {number} [options.maxNumProviders] - maximum number of providers to find + * @returns {AsyncIterable<{ id: PeerId, multiaddrs: Multiaddr[] }>} + */ + async * findProviders (key, options) { + if (!this.routers.length) { + throw errCode(new Error('No content this.routers available'), 'NO_ROUTERS_AVAILABLE') + } + + const result = await pAny( + this.routers.map(async (router) => { + const provs = await all(router.findProviders(key, options)) + + if (!provs || !provs.length) { + throw errCode(new Error('not found'), 'NOT_FOUND') + } + return provs + }) + ) - // If we have the dht, make it first - if (dht) { - routers.unshift(dht) + for (const peer of result) { + yield peer + } } - return { - /** - * Iterates over all content routers in series to find providers of the given key. - * Once a content router succeeds, iteration will stop. - * - * @param {CID} key - The CID key of the content to find - * @param {object} [options] - * @param {number} [options.timeout] - How long the query should run - * @param {number} [options.maxNumProviders] - maximum number of providers to find - * @returns {AsyncIterable<{ id: PeerId, multiaddrs: Multiaddr[] }>} - */ - async * findProviders (key, options) { - if (!routers.length) { - throw errCode(new Error('No content routers available'), 'NO_ROUTERS_AVAILABLE') - } - - const result = await pAny( - routers.map(async (router) => { - const provs = await all(router.findProviders(key, options)) - - if (!provs || !provs.length) { - throw errCode(new Error('not found'), 'NOT_FOUND') - } - return provs - }) - ) - - for (const peer of result) { - yield peer - } - }, - - /** - * Iterates over all content routers in parallel to notify it is - * a provider of the given key. - * - * @param {CID} key - The CID key of the content to find - * @returns {Promise} - */ - async provide (key) { // eslint-disable-line require-await - if (!routers.length) { - throw errCode(new Error('No content routers available'), 'NO_ROUTERS_AVAILABLE') - } - - return Promise.all(routers.map((router) => router.provide(key))) - }, - - /** - * Store the given key/value pair in the DHT. - * - * @param {Uint8Array} key - * @param {Uint8Array} value - * @param {Object} [options] - put options - * @param {number} [options.minPeers] - minimum number of peers required to successfully put - * @returns {Promise} - */ - async put (key, value, options) { // eslint-disable-line require-await - if (!node.isStarted() || !dht.isStarted) { - throw errCode(new Error(messages.NOT_STARTED_YET), codes.DHT_NOT_STARTED) - } - - return dht.put(key, value, options) - }, - - /** - * Get the value to the given key. - * Times out after 1 minute by default. - * - * @param {Uint8Array} key - * @param {Object} [options] - get options - * @param {number} [options.timeout] - optional timeout (default: 60000) - * @returns {Promise<{from: PeerId, val: Uint8Array}>} - */ - async get (key, options) { // eslint-disable-line require-await - if (!node.isStarted() || !dht.isStarted) { - throw errCode(new Error(messages.NOT_STARTED_YET), codes.DHT_NOT_STARTED) - } - - return dht.get(key, options) - }, - - /** - * Get the `n` values to the given key without sorting. - * - * @param {Uint8Array} key - * @param {number} nVals - * @param {Object} [options] - get options - * @param {number} [options.timeout] - optional timeout (default: 60000) - * @returns {Promise>} - */ - async getMany (key, nVals, options) { // eslint-disable-line require-await - if (!node.isStarted() || !dht.isStarted) { - throw errCode(new Error(messages.NOT_STARTED_YET), codes.DHT_NOT_STARTED) - } - - return dht.getMany(key, nVals, options) + /** + * Iterates over all content routers in parallel to notify it is + * a provider of the given key. + * + * @param {CID} key - The CID key of the content to find + * @returns {Promise} + */ + async provide (key) { + if (!this.routers.length) { + throw errCode(new Error('No content routers available'), 'NO_ROUTERS_AVAILABLE') } + + await Promise.all(this.routers.map((router) => router.provide(key))) + } + + /** + * Store the given key/value pair in the DHT. + * + * @param {Uint8Array} key + * @param {Uint8Array} value + * @param {Object} [options] - put options + * @param {number} [options.minPeers] - minimum number of peers required to successfully put + * @returns {Promise} + */ + put (key, value, options) { + if (!this.libp2p.isStarted() || !this.dht.isStarted) { + throw errCode(new Error(messages.NOT_STARTED_YET), codes.DHT_NOT_STARTED) + } + + return this.dht.put(key, value, options) + } + + /** + * Get the value to the given key. + * Times out after 1 minute by default. + * + * @param {Uint8Array} key + * @param {Object} [options] - get options + * @param {number} [options.timeout] - optional timeout (default: 60000) + * @returns {Promise} + */ + get (key, options) { + if (!this.libp2p.isStarted() || !this.dht.isStarted) { + throw errCode(new Error(messages.NOT_STARTED_YET), codes.DHT_NOT_STARTED) + } + + return this.dht.get(key, options) + } + + /** + * Get the `n` values to the given key without sorting. + * + * @param {Uint8Array} key + * @param {number} nVals + * @param {Object} [options] - get options + * @param {number} [options.timeout] - optional timeout (default: 60000) + * @returns {Promise} + */ + async getMany (key, nVals, options) { // eslint-disable-line require-await + if (!this.libp2p.isStarted() || !this.dht.isStarted) { + throw errCode(new Error(messages.NOT_STARTED_YET), codes.DHT_NOT_STARTED) + } + + return this.dht.getMany(key, nVals, options) } } + +module.exports = ContentRouting diff --git a/src/dialer/dial-request.js b/src/dialer/dial-request.js index dc427e1bbf..62bab31be6 100644 --- a/src/dialer/dial-request.js +++ b/src/dialer/dial-request.js @@ -1,14 +1,27 @@ 'use strict' -const AbortController = require('abort-controller') -const anySignal = require('any-signal') -const debug = require('debug') const errCode = require('err-code') -const log = debug('libp2p:dialer:request') -log.error = debug('libp2p:dialer:request:error') +const AbortController = require('abort-controller').default +const anySignal = require('any-signal') const FIFO = require('p-fifo') const pAny = require('p-any') +/** + * @typedef {import('libp2p-interfaces/src/connection').Connection} Connection + * @typedef {import('./')} Dialer + * @typedef {import('multiaddr')} Multiaddr + */ + +/** + * @typedef {Object} DialOptions + * @property {AbortSignal} signal + * + * @typedef {Object} DialRequestOptions + * @property {Multiaddr[]} addrs + * @property {(m: Multiaddr, options: DialOptions) => Promise} dialAction + * @property {Dialer} dialer + */ + class DialRequest { /** * Manages running the `dialAction` on multiple provided `addrs` in parallel @@ -17,10 +30,8 @@ class DialRequest { * started using `DialRequest.run(options)`. Once a single dial has succeeded, * all other dials in the request will be cancelled. * - * @param {object} options - * @param {Multiaddr[]} options.addrs - * @param {function(Multiaddr):Promise} options.dialAction - * @param {Dialer} options.dialer + * @class + * @param {DialRequestOptions} options */ constructor ({ addrs, @@ -34,11 +45,11 @@ class DialRequest { /** * @async - * @param {object} options - * @param {AbortSignal} options.signal - An AbortController signal - * @returns {Connection} + * @param {object} [options] + * @param {AbortSignal} [options.signal] - An AbortController signal + * @returns {Promise} */ - async run (options) { + async run (options = {}) { const tokens = this.dialer.getTokens(this.addrs.length) // If no tokens are available, throw if (tokens.length < 1) { @@ -78,4 +89,4 @@ class DialRequest { } } -module.exports.DialRequest = DialRequest +module.exports = DialRequest diff --git a/src/dialer/index.js b/src/dialer/index.js index 3ee3ada6c8..09ae2627ad 100644 --- a/src/dialer/index.js +++ b/src/dialer/index.js @@ -1,14 +1,15 @@ 'use strict' -const multiaddr = require('multiaddr') +const debug = require('debug') +const log = Object.assign(debug('libp2p:dialer'), { + error: debug('libp2p:dialer:err') +}) const errCode = require('err-code') +const multiaddr = require('multiaddr') const TimeoutController = require('timeout-abort-controller') const anySignal = require('any-signal') -const debug = require('debug') -const log = debug('libp2p:dialer') -log.error = debug('libp2p:dialer:error') -const { DialRequest } = require('./dial-request') +const DialRequest = require('./dial-request') const { publicAddressesFirst } = require('libp2p-utils/src/address-sort') const getPeer = require('../get-peer') @@ -19,17 +20,44 @@ const { MAX_PER_PEER_DIALS } = require('../constants') +/** + * @typedef {import('libp2p-interfaces/src/connection').Connection} Connection + * @typedef {import('multiaddr')} Multiaddr + * @typedef {import('peer-id')} PeerId + * @typedef {import('../peer-store')} PeerStore + * @typedef {import('../peer-store/address-book').Address} Address + * @typedef {import('../transport-manager')} TransportManager + */ + +/** + * @typedef {Object} DialerProperties + * @property {PeerStore} peerStore + * @property {TransportManager} transportManager + * + * @typedef {(addr:Multiaddr) => Promise} Resolver + * + * @typedef {Object} DialerOptions + * @property {(addresses: Address[]) => Address[]} [options.addressSorter = publicAddressesFirst] - Sort the known addresses of a peer before trying to dial. + * @property {number} [concurrency = MAX_PARALLEL_DIALS] - Number of max concurrent dials. + * @property {number} [perPeerLimit = MAX_PER_PEER_DIALS] - Number of max concurrent dials per peer. + * @property {number} [timeout = DIAL_TIMEOUT] - How long a dial attempt is allowed to take. + * @property {Record} [resolvers = {}] - multiaddr resolvers to use when dialing + * + * @typedef DialTarget + * @property {string} id + * @property {Multiaddr[]} addrs + * + * @typedef PendingDial + * @property {DialRequest} dialRequest + * @property {TimeoutController} controller + * @property {Promise} promise + * @property {function():void} destroy + */ + class Dialer { /** * @class - * @param {object} options - * @param {TransportManager} options.transportManager - * @param {Peerstore} options.peerStore - * @param {(addresses: Array Array
} [options.addressSorter = publicAddressesFirst] - Sort the known addresses of a peer before trying to dial. - * @param {number} [options.concurrency = MAX_PARALLEL_DIALS] - Number of max concurrent dials. - * @param {number} [options.perPeerLimit = MAX_PER_PEER_DIALS] - Number of max concurrent dials per peer. - * @param {number} [options.timeout = DIAL_TIMEOUT] - How long a dial attempt is allowed to take. - * @param {object} [options.resolvers = {}] - multiaddr resolvers to use when dialing + * @param {DialerProperties & DialerOptions} options */ constructor ({ transportManager, @@ -102,12 +130,6 @@ class Dialer { } } - /** - * @typedef DialTarget - * @property {string} id - * @property {Multiaddr[]} addrs - */ - /** * Creates a DialTarget. The DialTarget is used to create and track * the DialRequest to a given peer. @@ -145,14 +167,6 @@ class Dialer { } } - /** - * @typedef PendingDial - * @property {DialRequest} dialRequest - * @property {TimeoutController} controller - * @property {Promise} promise - * @property {function():void} destroy - */ - /** * Creates a PendingDial that wraps the underlying DialRequest * @@ -162,7 +176,7 @@ class Dialer { * @param {AbortSignal} [options.signal] - An AbortController signal * @returns {PendingDial} */ - _createPendingDial (dialTarget, options) { + _createPendingDial (dialTarget, options = {}) { const dialAction = (addr, options) => { if (options.signal.aborted) throw errCode(new Error('already aborted'), codes.ERR_ALREADY_ABORTED) return this.transportManager.dial(addr, options) @@ -211,7 +225,7 @@ class Dialer { * Resolve multiaddr recursively. * * @param {Multiaddr} ma - * @returns {Promise>} + * @returns {Promise} */ async _resolve (ma) { // TODO: recursive logic should live in multiaddr once dns4/dns6 support is in place @@ -228,19 +242,20 @@ class Dialer { return this._resolve(nm) })) - return recursiveMultiaddrs.flat().reduce((array, newM) => { + const addrs = recursiveMultiaddrs.flat() + return addrs.reduce((array, newM) => { if (!array.find(m => m.equals(newM))) { array.push(newM) } return array - }, []) // Unique addresses + }, /** @type {Multiaddr[]} */([])) } /** * Resolve a given multiaddr. If this fails, an empty array will be returned * * @param {Multiaddr} ma - * @returns {Promise>} + * @returns {Promise} */ async _resolveRecord (ma) { try { diff --git a/src/get-peer.js b/src/get-peer.js index bc36b04cc9..807c333384 100644 --- a/src/get-peer.js +++ b/src/get-peer.js @@ -6,12 +6,16 @@ const errCode = require('err-code') const { codes } = require('./errors') +/** + * @typedef {import('multiaddr')} Multiaddr + */ + /** * Converts the given `peer` to a `Peer` object. * If a multiaddr is received, the addressBook is updated. * * @param {PeerId|Multiaddr|string} peer - * @returns {{ id: PeerId, multiaddrs: Array }} + * @returns {{ id: PeerId, multiaddrs: Multiaddr[]|undefined }} */ function getPeer (peer) { if (typeof peer === 'string') { diff --git a/src/identify/consts.js b/src/identify/consts.js index 58ec077faa..1f697e5dc5 100644 --- a/src/identify/consts.js +++ b/src/identify/consts.js @@ -1,5 +1,6 @@ 'use strict' +// @ts-ignore file not listed within the file list of projects const libp2pVersion = require('../../package.json').version module.exports.PROTOCOL_VERSION = 'ipfs/0.1.0' diff --git a/src/identify/index.js b/src/identify/index.js index 351bfbe4c7..6a21342993 100644 --- a/src/identify/index.js +++ b/src/identify/index.js @@ -1,13 +1,13 @@ 'use strict' const debug = require('debug') -const log = debug('libp2p:identify') -log.error = debug('libp2p:identify:error') - +const log = Object.assign(debug('libp2p:identify'), { + error: debug('libp2p:identify:err') +}) const errCode = require('err-code') const pb = require('it-protocol-buffers') const lp = require('it-length-prefixed') -const pipe = require('it-pipe') +const { pipe } = require('it-pipe') const { collect, take, consume } = require('streaming-iterables') const uint8ArrayFromString = require('uint8arrays/from-string') @@ -29,57 +29,28 @@ const { const { codes } = require('../errors') -class IdentifyService { - /** - * Takes the `addr` and converts it to a Multiaddr if possible - * - * @param {Uint8Array | string} addr - * @returns {Multiaddr|null} - */ - static getCleanMultiaddr (addr) { - if (addr && addr.length > 0) { - try { - return multiaddr(addr) - } catch (_) { - return null - } - } - return null - } +/** + * @typedef {import('libp2p-interfaces/src/connection').Connection} Connection + * @typedef {import('libp2p-interfaces/src/stream-muxer/types').MuxedStream} MuxedStream + */ +class IdentifyService { /** * @class - * @param {object} options - * @param {Libp2p} options.libp2p + * @param {Object} options + * @param {import('../')} options.libp2p */ constructor ({ libp2p }) { - /** - * @property {PeerStore} - */ + this._libp2p = libp2p this.peerStore = libp2p.peerStore - - /** - * @property {ConnectionManager} - */ this.connectionManager = libp2p.connectionManager - - /** - * @property {PeerId} - */ this.peerId = libp2p.peerId - /** - * @property {AddressManager} - */ - this._libp2p = libp2p - this.handleMessage = this.handleMessage.bind(this) // When a new connection happens, trigger identify this.connectionManager.on('peer:connect', (connection) => { - const peerId = connection.remotePeer - - this.identify(connection, peerId).catch(log.error) + this.identify(connection).catch(log.error) }) // When self multiaddrs change, trigger identify-push @@ -100,8 +71,8 @@ class IdentifyService { /** * Send an Identify Push update to the list of connections * - * @param {Array} connections - * @returns {Promise} + * @param {Connection[]} connections + * @returns {Promise} */ async push (connections) { const signedPeerRecord = await this.peerStore.addressBook.getRawEnvelope(this.peerId) @@ -228,11 +199,11 @@ class IdentifyService { /** * A handler to register with Libp2p to process identify messages. * - * @param {object} options - * @param {string} options.protocol - * @param {*} options.stream + * @param {Object} options * @param {Connection} options.connection - * @returns {Promise} + * @param {MuxedStream} options.stream + * @param {string} options.protocol + * @returns {Promise|undefined} */ handleMessage ({ connection, stream, protocol }) { switch (protocol) { @@ -250,9 +221,10 @@ class IdentifyService { * to the requesting peer over the given `connection` * * @private - * @param {object} options - * @param {*} options.stream + * @param {Object} options + * @param {MuxedStream} options.stream * @param {Connection} options.connection + * @returns {Promise} */ async _handleIdentify ({ connection, stream }) { let publicKey = new Uint8Array(0) @@ -290,8 +262,9 @@ class IdentifyService { * * @private * @param {object} options - * @param {*} options.stream + * @param {MuxedStream} options.stream * @param {Connection} options.connection + * @returns {Promise} */ async _handlePush ({ connection, stream }) { let message @@ -331,16 +304,36 @@ class IdentifyService { // Update the protocols this.peerStore.protoBook.set(id, message.protocols) } + + /** + * Takes the `addr` and converts it to a Multiaddr if possible + * + * @param {Uint8Array | string} addr + * @returns {multiaddr|null} + */ + static getCleanMultiaddr (addr) { + if (addr && addr.length > 0) { + try { + return multiaddr(addr) + } catch (_) { + return null + } + } + return null + } } -module.exports.IdentifyService = IdentifyService /** * The protocols the IdentifyService supports * * @property multicodecs */ -module.exports.multicodecs = { +const multicodecs = { IDENTIFY: MULTICODEC_IDENTIFY, IDENTIFY_PUSH: MULTICODEC_IDENTIFY_PUSH } -module.exports.Message = Message + +IdentifyService.multicodecs = multicodecs +IdentifyService.Messsage = Message + +module.exports = IdentifyService diff --git a/src/index.js b/src/index.js index 1c25ef8600..1bb3b87f89 100644 --- a/src/index.js +++ b/src/index.js @@ -1,16 +1,17 @@ 'use strict' -const { EventEmitter } = require('events') const debug = require('debug') +const log = Object.assign(debug('libp2p'), { + error: debug('libp2p:err') +}) +const { EventEmitter } = require('events') const globalThis = require('ipfs-utils/src/globalthis') -const log = debug('libp2p') -log.error = debug('libp2p:error') const errCode = require('err-code') const PeerId = require('peer-id') const PeerRouting = require('./peer-routing') -const contentRouting = require('./content-routing') +const ContentRouting = require('./content-routing') const getPeer = require('./get-peer') const { validate: validateConfig } = require('./config') const { codes, messages } = require('./errors') @@ -29,22 +30,95 @@ const PubsubAdapter = require('./pubsub-adapter') const PersistentPeerStore = require('./peer-store/persistent') const Registrar = require('./registrar') const ping = require('./ping') -const { - IdentifyService, - multicodecs: IDENTIFY_PROTOCOLS -} = require('./identify') +const IdentifyService = require('./identify') +const IDENTIFY_PROTOCOLS = IdentifyService.multicodecs + +/** + * @typedef {import('multiaddr')} Multiaddr + * @typedef {import('libp2p-interfaces/src/connection').Connection} Connection + * @typedef {import('libp2p-interfaces/src/stream-muxer/types').MuxedStream} MuxedStream + * @typedef {import('libp2p-interfaces/src/transport/types').TransportFactory} TransportFactory + * @typedef {import('libp2p-interfaces/src/stream-muxer/types').MuxerFactory} MuxerFactory + * @typedef {import('libp2p-interfaces/src/crypto/types').Crypto} Crypto + * @typedef {import('libp2p-interfaces/src/pubsub')} Pubsub + */ /** + * @typedef {Object} PeerStoreOptions + * @property {boolean} persistence + * + * @typedef {Object} PeerDiscoveryOptions + * @property {boolean} autoDial + * + * @typedef {Object} RelayOptions + * @property {boolean} enabled + * @property {import('./circuit').RelayAdvertiseOptions} advertise + * @property {import('./circuit').HopOptions} hop + * @property {import('./circuit').AutoRelayOptions} autoRelay + * + * @typedef {Object} Libp2pConfig + * @property {Object} [dht] dht module options + * @property {PeerDiscoveryOptions} [peerDiscovery] + * @property {Pubsub} [pubsub] pubsub module options + * @property {RelayOptions} [relay] + * @property {Record} [transport] transport options indexed by transport key + * + * @typedef {Object} Libp2pModules + * @property {TransportFactory[]} transport + * @property {MuxerFactory[]} streamMuxer + * @property {Crypto[]} connEncryption + * + * @typedef {Object} Libp2pOptions + * @property {Libp2pModules} modules libp2p modules to use + * @property {import('./address-manager').AddressManagerOptions} [addresses] + * @property {import('./connection-manager').ConnectionManagerOptions} [connectionManager] + * @property {import('./dialer').DialerOptions} [dialer] + * @property {import('./metrics').MetricsOptions} [metrics] + * @property {Object} [keychain] + * @property {import('./transport-manager').TransportManagerOptions} [transportManager] + * @property {PeerStoreOptions & import('./peer-store/persistent').PersistentPeerStoreOptions} [peerStore] + * @property {Libp2pConfig} [config] + * @property {PeerId} peerId + * + * @typedef {Object} CreateOptions + * @property {PeerId} peerId + * + * @extends {EventEmitter} * @fires Libp2p#error Emitted when an error occurs * @fires Libp2p#peer:discovery Emitted when a peer is discovered */ class Libp2p extends EventEmitter { + /** + * Like `new Libp2p(options)` except it will create a `PeerId` + * instance if one is not provided in options. + * + * @param {Libp2pOptions & CreateOptions} options - Libp2p configuration options + * @returns {Promise} + */ + static async create (options) { + if (options.peerId) { + return new Libp2p(options) + } + + const peerId = await PeerId.create() + + options.peerId = peerId + return new Libp2p(options) + } + + /** + * Libp2p node. + * + * @class + * @param {Libp2pOptions} _options + */ constructor (_options) { super() // validateConfig will ensure the config is correct, // and add default values where appropriate this._options = validateConfig(_options) + /** @type {PeerId} */ this.peerId = this._options.peerId this.datastore = this._options.datastore @@ -147,6 +221,7 @@ class Libp2p extends EventEmitter { }) if (this._config.relay.enabled) { + // @ts-ignore Circuit prototype this.transportManager.add(Circuit.prototype[Symbol.toStringTag], Circuit) this.relay = new Relay(this) } @@ -188,13 +263,14 @@ class Libp2p extends EventEmitter { if (this._modules.pubsub) { const Pubsub = this._modules.pubsub // using pubsub adapter with *DEPRECATED* handlers functionality + /** @type {Pubsub} */ this.pubsub = PubsubAdapter(Pubsub, this, this._config.pubsub) } // Attach remaining APIs // peer and content routing will automatically get modules from _modules and _dht this.peerRouting = new PeerRouting(this) - this.contentRouting = contentRouting(this) + this.contentRouting = new ContentRouting(this) // Mount default protocols ping.mount(this) @@ -208,13 +284,16 @@ class Libp2p extends EventEmitter { * * @param {string} eventName * @param {...any} args - * @returns {void} + * @returns {boolean} */ emit (eventName, ...args) { + // TODO: do we still need this? + // @ts-ignore _events does not exist in libp2p if (eventName === 'error' && !this._events.error) { - log.error(...args) + log.error(args) + return false } else { - super.emit(eventName, ...args) + return super.emit(eventName, ...args) } } @@ -242,7 +321,7 @@ class Libp2p extends EventEmitter { * Stop the libp2p node by closing its listeners and open connections * * @async - * @returns {void} + * @returns {Promise} */ async stop () { log('libp2p is stopping') @@ -288,9 +367,13 @@ class Libp2p extends EventEmitter { * Imports the private key as 'self', if needed. * * @async - * @returns {void} + * @returns {Promise} */ async loadKeychain () { + if (!this.keychain) { + return + } + try { await this.keychain.findKeyByName('self') } catch (err) { @@ -317,12 +400,12 @@ class Libp2p extends EventEmitter { * peer will be added to the nodes `peerStore` * * @param {PeerId|Multiaddr|string} peer - The peer to dial - * @param {object} options + * @param {object} [options] * @param {AbortSignal} [options.signal] * @returns {Promise} */ dial (peer, options) { - return this.dialProtocol(peer, null, options) + return this.dialProtocol(peer, [], options) } /** @@ -333,7 +416,7 @@ class Libp2p extends EventEmitter { * @async * @param {PeerId|Multiaddr|string} peer - The peer to dial * @param {string[]|string} protocols - * @param {object} options + * @param {object} [options] * @param {AbortSignal} [options.signal] * @returns {Promise} */ @@ -348,7 +431,7 @@ class Libp2p extends EventEmitter { } // If a protocol was provided, create a new stream - if (protocols) { + if (protocols && protocols.length) { return connection.newStream(protocols) } @@ -360,7 +443,7 @@ class Libp2p extends EventEmitter { * by transports to listen with the announce addresses. * Duplicated addresses and noAnnounce addresses are filtered out. * - * @returns {Array} + * @returns {Multiaddr[]} */ get multiaddrs () { const announceAddrs = this.addressManager.getAnnounceAddrs() @@ -377,7 +460,7 @@ class Libp2p extends EventEmitter { /** * Disconnects all connections to the given `peer` * - * @param {PeerId|multiaddr|string} peer - the peer to close connections to + * @param {PeerId|Multiaddr|string} peer - the peer to close connections to * @returns {Promise} */ async hangUp (peer) { @@ -417,7 +500,7 @@ class Libp2p extends EventEmitter { * Registers the `handler` for each protocol * * @param {string[]|string} protocols - * @param {function({ connection:*, stream:*, protocol:string })} handler + * @param {({ connection: Connection, stream: MuxedStream, protocol: string }) => void} handler */ handle (protocols, handler) { protocols = Array.isArray(protocols) ? protocols : [protocols] @@ -505,7 +588,7 @@ class Libp2p extends EventEmitter { * Known peers may be emitted. * * @private - * @param {{ id: PeerId, multiaddrs: Array, protocols: Array }} peer + * @param {{ id: PeerId, multiaddrs: Multiaddr[], protocols: string[] }} peer */ _onDiscoveryPeer (peer) { if (peer.id.toB58String() === this.peerId.toB58String()) { @@ -583,7 +666,9 @@ class Libp2p extends EventEmitter { // Transport modules with discovery for (const Transport of this.transportManager.getTransports()) { + // @ts-ignore Transport interface does not include discovery if (Transport.discovery) { + // @ts-ignore Transport interface does not include discovery setupService(Transport.discovery) } } @@ -592,22 +677,4 @@ class Libp2p extends EventEmitter { } } -/** - * Like `new Libp2p(options)` except it will create a `PeerId` - * instance if one is not provided in options. - * - * @param {object} options - Libp2p configuration options - * @returns {Libp2p} - */ -Libp2p.create = async function create (options = {}) { - if (options.peerId) { - return new Libp2p(options) - } - - const peerId = await PeerId.create() - - options.peerId = peerId - return new Libp2p(options) -} - module.exports = Libp2p diff --git a/src/insecure/plaintext.js b/src/insecure/plaintext.js index 83e1ba463b..07efe9f758 100644 --- a/src/insecure/plaintext.js +++ b/src/insecure/plaintext.js @@ -1,21 +1,33 @@ 'use strict' +const debug = require('debug') +const log = Object.assign(debug('libp2p:plaintext'), { + error: debug('libp2p:plaintext:err') +}) const handshake = require('it-handshake') const lp = require('it-length-prefixed') const PeerId = require('peer-id') -const debug = require('debug') -const log = debug('libp2p:plaintext') -log.error = debug('libp2p:plaintext:error') const { UnexpectedPeerError, InvalidCryptoExchangeError } = require('libp2p-interfaces/src/crypto/errors') const { Exchange, KeyType } = require('./proto') const protocol = '/plaintext/2.0.0' +/** + * @typedef {import('libp2p-interfaces/src/connection').Connection} Connection + */ + function lpEncodeExchange (exchange) { const pb = Exchange.encode(exchange) return lp.encode.single(pb) } +/** + * Encrypt connection. + * + * @param {PeerId} localId + * @param {Connection} conn + * @param {PeerId} [remoteId] + */ async function encrypt (localId, conn, remoteId) { const shake = handshake(conn) diff --git a/src/keychain/cms.js b/src/keychain/cms.js index 60bfd323f7..3ba99cfb5b 100644 --- a/src/keychain/cms.js +++ b/src/keychain/cms.js @@ -1,3 +1,4 @@ +// @ts-nocheck 'use strict' require('node-forge/lib/pkcs7') @@ -21,7 +22,7 @@ class CMS { /** * Creates a new instance with a keychain * - * @param {Keychain} keychain - the available keys + * @param {import('./index')} keychain - the available keys */ constructor (keychain) { if (!keychain) { @@ -38,7 +39,7 @@ class CMS { * * @param {string} name - The local key name. * @param {Uint8Array} plain - The data to encrypt. - * @returns {undefined} + * @returns {Promise} */ async encrypt (name, plain) { if (!(plain instanceof Uint8Array)) { @@ -68,7 +69,7 @@ class CMS { * exists, an Error is returned with the property 'missingKeys'. It is array of key ids. * * @param {Uint8Array} cmsData - The CMS encrypted data to decrypt. - * @returns {undefined} + * @returns {Promise} */ async decrypt (cmsData) { if (!(cmsData instanceof Uint8Array)) { diff --git a/src/keychain/index.js b/src/keychain/index.js index c823eb3e46..440a2913a5 100644 --- a/src/keychain/index.js +++ b/src/keychain/index.js @@ -1,3 +1,4 @@ +// @ts-nocheck /* eslint max-nested-callbacks: ["error", 5] */ 'use strict' @@ -101,7 +102,8 @@ class Keychain { * Creates a new instance of a key chain. * * @param {DS} store - where the key are. - * @param {object} options - ??? + * @param {object} options + * @class */ constructor (store, options) { if (!store) { diff --git a/src/keychain/util.js b/src/keychain/util.js index 56386fe488..6a332c9ceb 100644 --- a/src/keychain/util.js +++ b/src/keychain/util.js @@ -1,3 +1,4 @@ +// @ts-nocheck 'use strict' require('node-forge/lib/x509') diff --git a/src/metrics/index.js b/src/metrics/index.js index 9d0f436041..8d94861d81 100644 --- a/src/metrics/index.js +++ b/src/metrics/index.js @@ -1,7 +1,8 @@ +// @ts-nocheck 'use strict' const mergeOptions = require('merge-options') -const pipe = require('it-pipe') +const { pipe } = require('it-pipe') const { tap } = require('streaming-iterables') const oldPeerLRU = require('./old-peers') const { METRICS: defaultOptions } = require('../constants') @@ -17,15 +18,26 @@ const directionToEvent = { out: 'dataSent' } +/** + * @typedef {import('peer-id')} PeerId + * @typedef {import('libp2p-interfaces/src/transport/types').MultiaddrConnection} MultiaddrConnection + */ + +/** + * @typedef MetricsProperties + * @property {import('../connection-manager')} connectionManager + * + * @typedef MetricsOptions + * @property {number} [computeThrottleMaxQueueSize = defaultOptions.computeThrottleMaxQueueSize] + * @property {number} [computeThrottleTimeout = defaultOptions.computeThrottleTimeout] + * @property {number[]} [movingAverageIntervals = defaultOptions.movingAverageIntervals] + * @property {number} [maxOldPeersRetention = defaultOptions.maxOldPeersRetention] + */ + class Metrics { /** - * - * @param {object} options - * @param {ConnectionManager} options.connectionManager - * @param {number} options.computeThrottleMaxQueueSize - * @param {number} options.computeThrottleTimeout - * @param {Array} options.movingAverageIntervals - * @param {number} options.maxOldPeersRetention + * @class + * @param {MetricsProperties & MetricsOptions} options */ constructor (options) { this._options = mergeOptions(defaultOptions, options) @@ -76,7 +88,7 @@ class Metrics { /** * Returns a list of `PeerId` strings currently being tracked * - * @returns {Array} + * @returns {string[]} */ get peers () { return Array.from(this._peerStats.keys()) @@ -97,7 +109,7 @@ class Metrics { /** * Returns a list of all protocol strings currently being tracked. * - * @returns {Array} + * @returns {string[]} */ get protocols () { return Array.from(this._protocolStats.keys()) @@ -176,6 +188,7 @@ class Metrics { * * @param {PeerId} placeholder - A peerId string * @param {PeerId} peerId + * @returns {void} */ updatePlaceholder (placeholder, peerId) { if (!this._running) return @@ -205,10 +218,10 @@ class Metrics { * with the placeholder string returned from here, and the known `PeerId`. * * @param {Object} options - * @param {{ sink: function(*), source: function() }} options.stream - A duplex iterable stream + * @param {MultiaddrConnection} options.stream - A duplex iterable stream * @param {PeerId} [options.remotePeer] - The id of the remote peer that's connected * @param {string} [options.protocol] - The protocol the stream is running - * @returns {string} The peerId string or placeholder string + * @returns {MultiaddrConnection} The peerId string or placeholder string */ trackStream ({ stream, remotePeer, protocol }) { const metrics = this diff --git a/src/metrics/old-peers.js b/src/metrics/old-peers.js index 08d317dc09..753bdf5fa1 100644 --- a/src/metrics/old-peers.js +++ b/src/metrics/old-peers.js @@ -6,9 +6,10 @@ const LRU = require('hashlru') * Creates and returns a Least Recently Used Cache * * @param {number} maxSize - * @returns {LRUCache} + * @returns {any} */ module.exports = (maxSize) => { + // @ts-ignore LRU expression is not callable const patched = LRU(maxSize) patched.delete = patched.remove return patched diff --git a/src/metrics/stats.js b/src/metrics/stats.js index 3517766309..e35ab311fc 100644 --- a/src/metrics/stats.js +++ b/src/metrics/stats.js @@ -1,17 +1,19 @@ +// @ts-nocheck 'use strict' -const EventEmitter = require('events') +const { EventEmitter } = require('events') const Big = require('bignumber.js') const MovingAverage = require('moving-average') const retimer = require('retimer') -/** - * A queue based manager for stat processing - * - * @param {Array} initialCounters - * @param {any} options - */ class Stats extends EventEmitter { + /** + * A queue based manager for stat processing + * + * @class + * @param {string[]} initialCounters + * @param {any} options + */ constructor (initialCounters, options) { super() @@ -21,6 +23,7 @@ class Stats extends EventEmitter { this._frequencyLastTime = Date.now() this._frequencyAccumulators = {} + this._movingAverages = {} this._update = this._update.bind(this) @@ -68,7 +71,7 @@ class Stats extends EventEmitter { /** * Returns a clone of the current stats. * - * @returns {Map} + * @returns {Object} */ get snapshot () { return Object.assign({}, this._stats) @@ -77,7 +80,7 @@ class Stats extends EventEmitter { /** * Returns a clone of the internal movingAverages * - * @returns {Array} + * @returns {MovingAverage} */ get movingAverages () { return Object.assign({}, this._movingAverages) @@ -229,7 +232,7 @@ class Stats extends EventEmitter { * will be updated or initialized if they don't already exist. * * @private - * @param {Array} op + * @param {{string, number}[]} op * @throws {InvalidNumber} * @returns {void} */ @@ -238,7 +241,7 @@ class Stats extends EventEmitter { const inc = op[1] if (typeof inc !== 'number') { - throw new Error('invalid increment number:', inc) + throw new Error(`invalid increment number: ${inc}`) } let n diff --git a/src/peer-routing.js b/src/peer-routing.js index e783c82f8b..26fe9625b4 100644 --- a/src/peer-routing.js +++ b/src/peer-routing.js @@ -1,9 +1,10 @@ 'use strict' -const errCode = require('err-code') const debug = require('debug') -const log = debug('libp2p:peer-routing') -log.error = debug('libp2p:peer-routing:error') +const log = Object.assign(debug('libp2p:peer-routing'), { + error: debug('libp2p:peer-routing:err') +}) +const errCode = require('err-code') const all = require('it-all') const pAny = require('p-any') @@ -13,12 +14,13 @@ const { } = require('set-delayed-interval') /** - * Responsible for managing the usage of the available Peer Routing modules. + * @typedef {import('peer-id')} PeerId + * @typedef {import('multiaddr')} Multiaddr */ class PeerRouting { /** * @class - * @param {Libp2p} libp2p + * @param {import('./')} libp2p */ constructor (libp2p) { this._peerId = libp2p.peerId diff --git a/src/peer-store/address-book.js b/src/peer-store/address-book.js index 07d8af59b2..74b6049a5c 100644 --- a/src/peer-store/address-book.js +++ b/src/peer-store/address-book.js @@ -1,9 +1,10 @@ 'use strict' -const errcode = require('err-code') const debug = require('debug') -const log = debug('libp2p:peer-store:address-book') -log.error = debug('libp2p:peer-store:address-book:error') +const log = Object.assign(debug('libp2p:peer-store:address-book'), { + error: debug('libp2p:peer-store:address-book:err') +}) +const errcode = require('err-code') const multiaddr = require('multiaddr') const PeerId = require('peer-id') @@ -17,35 +18,31 @@ const { const Envelope = require('../record/envelope') /** - * The AddressBook is responsible for keeping the known multiaddrs - * of a peer. + * @typedef {import('multiaddr')} Multiaddr + * @typedef {import('./')} PeerStore */ -class AddressBook extends Book { - /** - * Address object - * - * @typedef {Object} Address - * @property {Multiaddr} multiaddr peer multiaddr. - * @property {boolean} isCertified obtained from a signed peer record. - */ - /** - * CertifiedRecord object - * - * @typedef {Object} CertifiedRecord - * @property {Uint8Array} raw raw envelope. - * @property {number} seqNumber seq counter. - */ +/** + * @typedef {Object} Address + * @property {Multiaddr} multiaddr peer multiaddr. + * @property {boolean} isCertified obtained from a signed peer record. + * + * @typedef {Object} CertifiedRecord + * @property {Uint8Array} raw raw envelope. + * @property {number} seqNumber seq counter. + * + * @typedef {Object} Entry + * @property {Address[]} addresses peer Addresses. + * @property {CertifiedRecord} record certified peer record. + */ +/** + * @extends {Book} + */ +class AddressBook extends Book { /** - * Entry object for the addressBook + * The AddressBook is responsible for keeping the known multiaddrs of a peer. * - * @typedef {Object} Entry - * @property {Array
} addresses peer Addresses. - * @property {CertifiedRecord} record certified peer record. - */ - - /** * @class * @param {PeerStore} peerStore */ @@ -70,7 +67,7 @@ class AddressBook extends Book { /** * Map known peers to their known Address Entries. * - * @type {Map>} + * @type {Map} */ this.data = new Map() } @@ -105,7 +102,7 @@ class AddressBook extends Book { const peerId = peerRecord.peerId const id = peerId.toB58String() - const entry = this.data.get(id) || {} + const entry = this.data.get(id) || { record: undefined } const storedRecord = entry.record // ensure seq is greater than, or equal to, the last received @@ -151,7 +148,7 @@ class AddressBook extends Book { * Returns undefined if no record exists. * * @param {PeerId} peerId - * @returns {Promise} + * @returns {Promise|undefined} */ getPeerRecord (peerId) { const raw = this.getRawEnvelope(peerId) @@ -171,7 +168,7 @@ class AddressBook extends Book { * * @override * @param {PeerId} peerId - * @param {Array} multiaddrs + * @param {Multiaddr[]} multiaddrs * @returns {AddressBook} */ set (peerId, multiaddrs) { @@ -181,22 +178,22 @@ class AddressBook extends Book { } const addresses = this._toAddresses(multiaddrs) - const id = peerId.toB58String() - const entry = this.data.get(id) || {} - const rec = entry.addresses // Not replace multiaddrs if (!addresses.length) { return this } + const id = peerId.toB58String() + const entry = this.data.get(id) + // Already knows the peer - if (rec && rec.length === addresses.length) { - const intersection = rec.filter((addr) => addresses.some((newAddr) => addr.multiaddr.equals(newAddr.multiaddr))) + if (entry && entry.addresses && entry.addresses.length === addresses.length) { + const intersection = entry.addresses.filter((addr) => addresses.some((newAddr) => addr.multiaddr.equals(newAddr.multiaddr))) // Are new addresses equal to the old ones? // If yes, no changes needed! - if (intersection.length === rec.length) { + if (intersection.length === entry.addresses.length) { log(`the addresses provided to store are equal to the already stored for ${id}`) return this } @@ -204,12 +201,12 @@ class AddressBook extends Book { this._setData(peerId, { addresses, - record: entry.record + record: entry && entry.record }) log(`stored provided multiaddrs for ${id}`) // Notify the existance of a new peer - if (!rec) { + if (!entry) { this._ps.emit('peer', peerId) } @@ -221,7 +218,7 @@ class AddressBook extends Book { * If the peer is not known, it is set with the given addresses. * * @param {PeerId} peerId - * @param {Array} multiaddrs + * @param {Multiaddr[]} multiaddrs * @returns {AddressBook} */ add (peerId, multiaddrs) { @@ -233,32 +230,33 @@ class AddressBook extends Book { const addresses = this._toAddresses(multiaddrs) const id = peerId.toB58String() - const entry = this.data.get(id) || {} - const rec = entry.addresses || [] + const entry = this.data.get(id) - // Add recorded uniquely to the new array (Union) - rec.forEach((addr) => { - if (!addresses.find(r => r.multiaddr.equals(addr.multiaddr))) { - addresses.push(addr) - } - }) + if (entry && entry.addresses) { + // Add recorded uniquely to the new array (Union) + entry.addresses.forEach((addr) => { + if (!addresses.find(r => r.multiaddr.equals(addr.multiaddr))) { + addresses.push(addr) + } + }) - // If the recorded length is equal to the new after the unique union - // The content is the same, no need to update. - if (rec && rec.length === addresses.length) { - log(`the addresses provided to store are already stored for ${id}`) - return this + // If the recorded length is equal to the new after the unique union + // The content is the same, no need to update. + if (entry.addresses.length === addresses.length) { + log(`the addresses provided to store are already stored for ${id}`) + return this + } } this._setData(peerId, { addresses, - record: entry.record + record: entry && entry.record }) log(`added provided multiaddrs for ${id}`) // Notify the existance of a new peer - if (!entry.addresses) { + if (!(entry && entry.addresses)) { this._ps.emit('peer', peerId) } @@ -270,7 +268,7 @@ class AddressBook extends Book { * * @override * @param {PeerId} peerId - * @returns {Array
|undefined} + * @returns {Address[]|undefined} */ get (peerId) { if (!PeerId.isPeerId(peerId)) { @@ -286,9 +284,9 @@ class AddressBook extends Book { * Transforms received multiaddrs into Address. * * @private - * @param {Array} multiaddrs + * @param {Multiaddr[]} multiaddrs * @param {boolean} [isCertified] - * @returns {Array
} + * @returns {Address[]} */ _toAddresses (multiaddrs, isCertified = false) { if (!multiaddrs) { @@ -319,8 +317,8 @@ class AddressBook extends Book { * Returns `undefined` if there are no known multiaddrs for the given peer. * * @param {PeerId} peerId - * @param {(addresses: Array Array
} [addressSorter] - * @returns {Array|undefined} + * @param {(addresses: Address[]) => Address[]} [addressSorter] + * @returns {Multiaddr[]|undefined} */ getMultiaddrsForPeer (peerId, addressSorter = (ms) => ms) { if (!PeerId.isPeerId(peerId)) { diff --git a/src/peer-store/book.js b/src/peer-store/book.js index f0a830f97a..48855c157b 100644 --- a/src/peer-store/book.js +++ b/src/peer-store/book.js @@ -10,16 +10,19 @@ const { const passthrough = data => data /** - * The Book is the skeleton for the PeerStore books. + * @typedef {import('./')} PeerStore */ + class Book { /** + * The Book is the skeleton for the PeerStore books. + * * @class * @param {Object} properties * @param {PeerStore} properties.peerStore - PeerStore instance. * @param {string} properties.eventName - Name of the event to emit by the PeerStore. * @param {string} properties.eventProperty - Name of the property to emit by the PeerStore. - * @param {Function} [properties.eventTransformer] - Transformer function of the provided data for being emitted. + * @param {(data: any) => any[]} [properties.eventTransformer] - Transformer function of the provided data for being emitted. */ constructor ({ peerStore, eventName, eventProperty, eventTransformer = passthrough }) { this._ps = peerStore @@ -30,7 +33,7 @@ class Book { /** * Map known peers to their data. * - * @type {Map} + * @type {Map} */ this.data = new Map() } @@ -39,7 +42,7 @@ class Book { * Set known data of a provided peer. * * @param {PeerId} peerId - * @param {Array|Data} data + * @param {any[]|any} data */ set (peerId, data) { throw errcode(new Error('set must be implemented by the subclass'), 'ERR_NOT_IMPLEMENTED') @@ -48,9 +51,9 @@ class Book { /** * Set data into the datastructure, persistence and emit it using the provided transformers. * - * @private + * @protected * @param {PeerId} peerId - peerId of the data to store - * @param {*} data - data to store. + * @param {any} data - data to store. * @param {Object} [options] - storing options. * @param {boolean} [options.emit = true] - emit the provided data. * @returns {void} @@ -68,9 +71,9 @@ class Book { /** * Emit data. * - * @private + * @protected * @param {PeerId} peerId - * @param {*} data + * @param {any} [data] */ _emit (peerId, data) { this._ps.emit(this.eventName, { @@ -84,7 +87,7 @@ class Book { * Returns `undefined` if there is no available data for the given peer. * * @param {PeerId} peerId - * @returns {Array|undefined} + * @returns {any[]|any|undefined} */ get (peerId) { if (!PeerId.isPeerId(peerId)) { @@ -93,6 +96,7 @@ class Book { const rec = this.data.get(peerId.toB58String()) + // @ts-ignore return rec ? [...rec] : undefined } diff --git a/src/peer-store/index.js b/src/peer-store/index.js index 69a1f15a57..b3df1bbb94 100644 --- a/src/peer-store/index.js +++ b/src/peer-store/index.js @@ -1,9 +1,6 @@ 'use strict' const errcode = require('err-code') -const debug = require('debug') -const log = debug('libp2p:peer-store') -log.error = debug('libp2p:peer-store:error') const { EventEmitter } = require('events') const PeerId = require('peer-id') @@ -14,11 +11,15 @@ const MetadataBook = require('./metadata-book') const ProtoBook = require('./proto-book') const { - ERR_INVALID_PARAMETERS + codes: { ERR_INVALID_PARAMETERS } } = require('../errors') /** - * Responsible for managing known peers, as well as their addresses, protocols and metadata. + * @typedef {import('./address-book').Address} Address + */ + +/** + * @extends {EventEmitter} * * @fires PeerStore#peer Emitted when a new peer is added. * @fires PeerStore#change:protocols Emitted when a known peer supports a different set of protocols. @@ -32,12 +33,14 @@ class PeerStore extends EventEmitter { * * @typedef {Object} Peer * @property {PeerId} id peer's peer-id instance. - * @property {Array
} addresses peer's addresses containing its multiaddrs and metadata. - * @property {Array} protocols peer's supported protocols. - * @property {Map} metadata peer's metadata map. + * @property {Address[]} addresses peer's addresses containing its multiaddrs and metadata. + * @property {string[]} protocols peer's supported protocols. + * @property {Map|undefined} metadata peer's metadata map. */ /** + * Responsible for managing known peers, as well as their addresses, protocols and metadata. + * * @param {object} options * @param {PeerId} options.peerId * @class @@ -121,7 +124,7 @@ class PeerStore extends EventEmitter { * Get the stored information of a given peer. * * @param {PeerId} peerId - * @returns {Peer} + * @returns {Peer|undefined} */ get (peerId) { if (!PeerId.isPeerId(peerId)) { diff --git a/src/peer-store/key-book.js b/src/peer-store/key-book.js index 607d12795f..356c81866e 100644 --- a/src/peer-store/key-book.js +++ b/src/peer-store/key-book.js @@ -1,9 +1,10 @@ 'use strict' -const errcode = require('err-code') const debug = require('debug') -const log = debug('libp2p:peer-store:key-book') -log.error = debug('libp2p:peer-store:key-book:error') +const log = Object.assign(debug('libp2p:peer-store:key-book'), { + error: debug('libp2p:peer-store:key-book:err') +}) +const errcode = require('err-code') const PeerId = require('peer-id') @@ -14,10 +15,17 @@ const { } = require('../errors') /** - * The KeyBook is responsible for keeping the known public keys of a peer. + * @typedef {import('./')} PeerStore + * @typedef {import('libp2p-crypto').PublicKey} PublicKey + */ + +/** + * @extends {Book} */ class KeyBook extends Book { /** + * The KeyBook is responsible for keeping the known public keys of a peer. + * * @class * @param {PeerStore} peerStore */ @@ -42,7 +50,7 @@ class KeyBook extends Book { * * @override * @param {PeerId} peerId - * @param {RsaPublicKey|Ed25519PublicKey|Secp256k1PublicKey} publicKey + * @param {PublicKey} publicKey * @returns {KeyBook} */ set (peerId, publicKey) { @@ -72,7 +80,7 @@ class KeyBook extends Book { * * @override * @param {PeerId} peerId - * @returns {RsaPublicKey|Ed25519PublicKey|Secp256k1PublicKey} + * @returns {PublicKey | undefined} */ get (peerId) { if (!PeerId.isPeerId(peerId)) { diff --git a/src/peer-store/metadata-book.js b/src/peer-store/metadata-book.js index 490ef02b09..d497bb2f04 100644 --- a/src/peer-store/metadata-book.js +++ b/src/peer-store/metadata-book.js @@ -1,9 +1,10 @@ 'use strict' -const errcode = require('err-code') const debug = require('debug') -const log = debug('libp2p:peer-store:proto-book') -log.error = debug('libp2p:peer-store:proto-book:error') +const log = Object.assign(debug('libp2p:peer-store:proto-book'), { + error: debug('libp2p:peer-store:proto-book:err') +}) +const errcode = require('err-code') const uint8ArrayEquals = require('uint8arrays/equals') const PeerId = require('peer-id') @@ -15,13 +16,19 @@ const { } = require('../errors') /** - * The MetadataBook is responsible for keeping the known supported - * protocols of a peer. + * @typedef {import('./')} PeerStore + */ + +/** + * @extends {Book} * * @fires MetadataBook#change:metadata */ class MetadataBook extends Book { /** + * The MetadataBook is responsible for keeping the known supported + * protocols of a peer. + * * @class * @param {PeerStore} peerStore */ @@ -51,8 +58,9 @@ class MetadataBook extends Book { * @param {PeerId} peerId * @param {string} key - metadata key * @param {Uint8Array} value - metadata value - * @returns {ProtoBook} + * @returns {MetadataBook} */ + // @ts-ignore override with more then the parameters expected in Book set (peerId, key, value) { if (!PeerId.isPeerId(peerId)) { log.error('peerId must be an instance of peer-id to store data') @@ -95,7 +103,7 @@ class MetadataBook extends Book { * Get the known data of a provided peer. * * @param {PeerId} peerId - * @returns {Map} + * @returns {Map|undefined} */ get (peerId) { if (!PeerId.isPeerId(peerId)) { @@ -110,7 +118,7 @@ class MetadataBook extends Book { * * @param {PeerId} peerId * @param {string} key - * @returns {Uint8Array} + * @returns {Uint8Array | undefined} */ getValue (peerId, key) { if (!PeerId.isPeerId(peerId)) { diff --git a/src/peer-store/persistent/index.js b/src/peer-store/persistent/index.js index 70a417f4f7..bbab49657e 100644 --- a/src/peer-store/persistent/index.js +++ b/src/peer-store/persistent/index.js @@ -1,9 +1,9 @@ 'use strict' const debug = require('debug') -const log = debug('libp2p:persistent-peer-store') -log.error = debug('libp2p:persistent-peer-store:error') - +const log = Object.assign(debug('libp2p:persistent-peer-store'), { + error: debug('libp2p:persistent-peer-store:err') +}) const { Key } = require('interface-datastore') const multiaddr = require('multiaddr') const PeerId = require('peer-id') @@ -21,16 +21,22 @@ const { const Addresses = require('./pb/address-book.proto') const Protocols = require('./pb/proto-book.proto') +/** + * @typedef {Object} PersistentPeerStoreProperties + * @property {PeerId} peerId + * @property {any} datastore + * + * @typedef {Object} PersistentPeerStoreOptions + * @property {number} [threshold = 5] - Number of dirty peers allowed before commit data. + */ + /** * Responsible for managing the persistence of data in the PeerStore. */ class PersistentPeerStore extends PeerStore { /** * @class - * @param {Object} properties - * @param {PeerId} properties.peerId - * @param {Datastore} properties.datastore - Datastore to persist data. - * @param {number} [properties.threshold = 5] - Number of dirty peers allowed before commit data. + * @param {PersistentPeerStoreProperties & PersistentPeerStoreOptions} properties */ constructor ({ peerId, datastore, threshold = 5 }) { super({ peerId }) @@ -340,6 +346,7 @@ class PersistentPeerStore extends PeerStore { case 'addrs': decoded = Addresses.decode(value) + // @ts-ignore protected function this.addressBook._setData( peerId, { @@ -357,6 +364,7 @@ class PersistentPeerStore extends PeerStore { case 'keys': decoded = await PeerId.createFromPubKey(value) + // @ts-ignore protected function this.keyBook._setData( decoded, decoded, @@ -372,6 +380,7 @@ class PersistentPeerStore extends PeerStore { case 'protos': decoded = Protocols.decode(value) + // @ts-ignore protected function this.protoBook._setData( peerId, new Set(decoded.protocols), diff --git a/src/peer-store/proto-book.js b/src/peer-store/proto-book.js index a08f5a284d..5c17b1371a 100644 --- a/src/peer-store/proto-book.js +++ b/src/peer-store/proto-book.js @@ -1,10 +1,10 @@ 'use strict' -const errcode = require('err-code') const debug = require('debug') -const log = debug('libp2p:peer-store:proto-book') -log.error = debug('libp2p:peer-store:proto-book:error') - +const log = Object.assign(debug('libp2p:peer-store:proto-book'), { + error: debug('libp2p:peer-store:proto-book:err') +}) +const errcode = require('err-code') const PeerId = require('peer-id') const Book = require('./book') @@ -14,13 +14,19 @@ const { } = require('../errors') /** - * The ProtoBook is responsible for keeping the known supported - * protocols of a peer. + * @typedef {import('./')} PeerStore + */ + +/** + * @extends {Book} * * @fires ProtoBook#change:protocols */ class ProtoBook extends Book { /** + * The ProtoBook is responsible for keeping the known supported + * protocols of a peer. + * * @class * @param {PeerStore} peerStore */ @@ -50,7 +56,7 @@ class ProtoBook extends Book { * * @override * @param {PeerId} peerId - * @param {Array} protocols + * @param {string[]} protocols * @returns {ProtoBook} */ set (peerId, protocols) { @@ -88,7 +94,7 @@ class ProtoBook extends Book { * If the peer was not known before, it will be added. * * @param {PeerId} peerId - * @param {Array} protocols + * @param {string[]} protocols * @returns {ProtoBook} */ add (peerId, protocols) { @@ -123,7 +129,7 @@ class ProtoBook extends Book { * If the protocols did not exist before, nothing will be done. * * @param {PeerId} peerId - * @param {Array} protocols + * @param {string[]} protocols * @returns {ProtoBook} */ remove (peerId, protocols) { diff --git a/src/ping/index.js b/src/ping/index.js index e882f81f4c..eb8d7b96e9 100644 --- a/src/ping/index.js +++ b/src/ping/index.js @@ -1,30 +1,39 @@ 'use strict' const debug = require('debug') -const log = debug('libp2p-ping') -log.error = debug('libp2p-ping:error') +const log = Object.assign(debug('libp2p:ping'), { + error: debug('libp2p:ping:err') +}) const errCode = require('err-code') const crypto = require('libp2p-crypto') -const pipe = require('it-pipe') +const { pipe } = require('it-pipe') const { toBuffer } = require('it-buffer') const { collect, take } = require('streaming-iterables') +const equals = require('uint8arrays/equals') const { PROTOCOL, PING_LENGTH } = require('./constants') +/** + * @typedef {import('../')} Libp2p + * @typedef {import('multiaddr')} Multiaddr + * @typedef {import('peer-id')} PeerId + */ + /** * Ping a given peer and wait for its response, getting the operation latency. * * @param {Libp2p} node - * @param {PeerId|multiaddr} peer + * @param {PeerId|Multiaddr} peer * @returns {Promise} */ async function ping (node, peer) { + // @ts-ignore multiaddr might not have toB58String log('dialing %s to %s', PROTOCOL, peer.toB58String ? peer.toB58String() : peer) const { stream } = await node.dialProtocol(peer, PROTOCOL) - const start = new Date() + const start = Date.now() const data = crypto.randomBytes(PING_LENGTH) const [result] = await pipe( @@ -36,7 +45,7 @@ async function ping (node, peer) { ) const end = Date.now() - if (!data.equals(result)) { + if (!equals(data, result)) { throw errCode(new Error('Received wrong ping ack'), 'ERR_WRONG_PING_ACK') } diff --git a/src/pnet/crypto.js b/src/pnet/crypto.js index ae824bfcfb..9cfcbc8e6f 100644 --- a/src/pnet/crypto.js +++ b/src/pnet/crypto.js @@ -1,16 +1,17 @@ 'use strict' const debug = require('debug') +const log = Object.assign(debug('libp2p:pnet'), { + trace: debug('libp2p:pnet:trace'), + error: debug('libp2p:pnet:err') +}) + const Errors = require('./errors') const xsalsa20 = require('xsalsa20') const KEY_LENGTH = require('./key-generator').KEY_LENGTH const uint8ArrayFromString = require('uint8arrays/from-string') const uint8ArrayToString = require('uint8arrays/to-string') -const log = debug('libp2p:pnet') -log.trace = debug('libp2p:pnet:trace') -log.error = debug('libp2p:pnet:err') - /** * Creates a stream iterable to encrypt messages in a private network * diff --git a/src/pnet/index.js b/src/pnet/index.js index e5f82331a2..194a5005ec 100644 --- a/src/pnet/index.js +++ b/src/pnet/index.js @@ -1,12 +1,16 @@ 'use strict' -const pipe = require('it-pipe') +const debug = require('debug') +const log = Object.assign(debug('libp2p:pnet'), { + error: debug('libp2p:pnet:err') +}) +const { pipe } = require('it-pipe') const errcode = require('err-code') const duplexPair = require('it-pair/duplex') const crypto = require('libp2p-crypto') const Errors = require('./errors') const { - ERR_INVALID_PARAMETERS + codes: { ERR_INVALID_PARAMETERS } } = require('../errors') const { createBoxStream, @@ -15,16 +19,16 @@ const { } = require('./crypto') const handshake = require('it-handshake') const { NONCE_LENGTH } = require('./key-generator') -const debug = require('debug') -const log = debug('libp2p:pnet') -log.error = debug('libp2p:pnet:err') /** - * Takes a Private Shared Key (psk) and provides a `protect` method - * for wrapping existing connections in a private encryption stream + * @typedef {import('libp2p-interfaces/src/transport/types').MultiaddrConnection} MultiaddrConnection */ + class Protector { /** + * Takes a Private Shared Key (psk) and provides a `protect` method + * for wrapping existing connections in a private encryption stream. + * * @param {Uint8Array} keyBuffer - The private shared key buffer * @class */ @@ -39,8 +43,8 @@ class Protector { * between its two peers from the PSK the Protector instance was * created with. * - * @param {Connection} connection - The connection to protect - * @returns {*} A protected duplex iterable + * @param {MultiaddrConnection} connection - The connection to protect + * @returns {Promise} A protected duplex iterable */ async protect (connection) { if (!connection) { diff --git a/src/pnet/key-generator.js b/src/pnet/key-generator.js index b3676dcccf..8a7a1ef5a2 100644 --- a/src/pnet/key-generator.js +++ b/src/pnet/key-generator.js @@ -22,6 +22,8 @@ module.exports = generate module.exports.NONCE_LENGTH = 24 module.exports.KEY_LENGTH = KEY_LENGTH +// @ts-ignore This condition will always return 'false' since the types 'Module | undefined' if (require.main === module) { + // @ts-ignore generate(process.stdout) } diff --git a/src/pubsub-adapter.js b/src/pubsub-adapter.js index 1f42cc7d15..7d7af8df2f 100644 --- a/src/pubsub-adapter.js +++ b/src/pubsub-adapter.js @@ -1,42 +1,54 @@ 'use strict' +/** + * @typedef {import('libp2p-interfaces/src/pubsub').InMessage} InMessage + * @typedef {import('libp2p-interfaces/src/pubsub')} PubsubRouter + */ + // Pubsub adapter to keep API with handlers while not removed. -module.exports = (PubsubRouter, libp2p, options) => { - class Pubsub extends PubsubRouter { - /** - * Subscribes to a given topic. - * - * @override - * @param {string} topic - * @param {function(msg: InMessage)} [handler] - * @returns {void} - */ - subscribe (topic, handler) { - // Bind provided handler - handler && this.on(topic, handler) - super.subscribe(topic) +function pubsubAdapter (PubsubRouter, libp2p, options) { + const pubsub = new PubsubRouter(libp2p, options) + pubsub._subscribeAdapter = pubsub.subscribe + pubsub._unsubscribeAdapter = pubsub.unsubscribe + + /** + * Subscribes to a given topic. + * + * @override + * @param {string} topic + * @param {(msg: InMessage) => void} [handler] + * @returns {void} + */ + function subscribe (topic, handler) { + // Bind provided handler + handler && pubsub.on(topic, handler) + pubsub._subscribeAdapter(topic) + } + + /** + * Unsubscribe from the given topic. + * + * @override + * @param {string} topic + * @param {(msg: InMessage) => void} [handler] + * @returns {void} + */ + function unsubscribe (topic, handler) { + if (!handler) { + pubsub.removeAllListeners(topic) + } else { + pubsub.removeListener(topic, handler) } - /** - * Unsubscribe from the given topic. - * - * @override - * @param {string} topic - * @param {function(msg: InMessage)} [handler] - * @returns {void} - */ - unsubscribe (topic, handler) { - if (!handler) { - this.removeAllListeners(topic) - } else { - this.removeListener(topic, handler) - } - - if (this.listenerCount(topic) === 0) { - super.unsubscribe(topic) - } + if (pubsub.listenerCount(topic) === 0) { + pubsub._unsubscribeAdapter(topic) } } - return new Pubsub(libp2p, options) + pubsub.subscribe = subscribe + pubsub.unsubscribe = unsubscribe + + return pubsub } + +module.exports = pubsubAdapter diff --git a/src/record/envelope/envelope.proto.js b/src/record/envelope/envelope.proto.js index ca0074961a..c8907debda 100644 --- a/src/record/envelope/envelope.proto.js +++ b/src/record/envelope/envelope.proto.js @@ -2,7 +2,8 @@ const protons = require('protons') -const message = ` +/** @type {{Envelope: import('../../types').MessageProto}} */ +module.exports = protons(` message Envelope { // public_key is the public key of the keypair the enclosed payload was // signed with. @@ -20,6 +21,4 @@ message Envelope { // additional security. bytes signature = 5; } -` - -module.exports = protons(message).Envelope +`) diff --git a/src/record/envelope/index.js b/src/record/envelope/index.js index 6a73914f2a..46f9c3ccf6 100644 --- a/src/record/envelope/index.js +++ b/src/record/envelope/index.js @@ -1,8 +1,5 @@ 'use strict' -const debug = require('debug') -const log = debug('libp2p:envelope') -log.error = debug('libp2p:envelope:error') const errCode = require('err-code') const uint8arraysConcat = require('uint8arrays/concat') const uint8arraysFromString = require('uint8arrays/from-string') @@ -15,11 +12,14 @@ const { codes } = require('../../errors') const Protobuf = require('./envelope.proto') /** - * The Envelope is responsible for keeping an arbitrary signed record - * by a libp2p peer. + * @typedef {import('libp2p-interfaces/src/record/types').Record} Record */ + class Envelope { /** + * The Envelope is responsible for keeping an arbitrary signed record + * by a libp2p peer. + * * @class * @param {object} params * @param {PeerId} params.peerId @@ -49,7 +49,7 @@ class Envelope { const publicKey = cryptoKeys.marshalPublicKey(this.peerId.pubKey) - this._marshal = Protobuf.encode({ + this._marshal = Protobuf.Envelope.encode({ public_key: publicKey, payload_type: this.payloadType, payload: this.payload, @@ -102,14 +102,14 @@ const formatSignaturePayload = (domain, payloadType, payload) => { // - The length of the payload field in bytes // - The value of the payload field - domain = uint8arraysFromString(domain) - const domainLength = varint.encode(domain.byteLength) + const domainUint8Array = uint8arraysFromString(domain) + const domainLength = varint.encode(domainUint8Array.byteLength) const payloadTypeLength = varint.encode(payloadType.length) const payloadLength = varint.encode(payload.length) return uint8arraysConcat([ new Uint8Array(domainLength), - domain, + domainUint8Array, new Uint8Array(payloadTypeLength), payloadType, new Uint8Array(payloadLength), @@ -124,7 +124,7 @@ const formatSignaturePayload = (domain, payloadType, payload) => { * @returns {Promise} */ Envelope.createFromProtobuf = async (data) => { - const envelopeData = Protobuf.decode(data) + const envelopeData = Protobuf.Envelope.decode(data) const peerId = await PeerId.createFromPubKey(envelopeData.public_key) return new Envelope({ @@ -142,7 +142,7 @@ Envelope.createFromProtobuf = async (data) => { * @async * @param {Record} record * @param {PeerId} peerId - * @returns {Envelope} + * @returns {Promise} */ Envelope.seal = async (record, peerId) => { const domain = record.domain @@ -166,7 +166,7 @@ Envelope.seal = async (record, peerId) => { * * @param {Uint8Array} data * @param {string} domain - * @returns {Envelope} + * @returns {Promise} */ Envelope.openAndCertify = async (data, domain) => { const envelope = await Envelope.createFromProtobuf(data) diff --git a/src/record/peer-record/index.js b/src/record/peer-record/index.js index 51c43a7970..32d018abc5 100644 --- a/src/record/peer-record/index.js +++ b/src/record/peer-record/index.js @@ -2,7 +2,6 @@ const multiaddr = require('multiaddr') const PeerId = require('peer-id') -const Record = require('libp2p-interfaces/src/record') const arrayEquals = require('libp2p-utils/src/array-equals') const Protobuf = require('./peer-record.proto') @@ -12,19 +11,28 @@ const { } = require('./consts') /** - * The PeerRecord is used for distributing peer routing records across the network. - * It contains the peer's reachable listen addresses. + * @typedef {import('peer-id')} PeerId + * @typedef {import('multiaddr')} Multiaddr + * @typedef {import('libp2p-interfaces/src/record/types').Record} Record */ -class PeerRecord extends Record { + +/** + * @implements {Record} + */ +class PeerRecord { /** + * The PeerRecord is used for distributing peer routing records across the network. + * It contains the peer's reachable listen addresses. + * * @class - * @param {object} params + * @param {Object} params * @param {PeerId} params.peerId - * @param {Array} params.multiaddrs - addresses of the associated peer. + * @param {Multiaddr[]} params.multiaddrs - addresses of the associated peer. * @param {number} [params.seqNumber] - monotonically-increasing sequence counter that's used to order PeerRecords in time. */ constructor ({ peerId, multiaddrs = [], seqNumber = Date.now() }) { - super(ENVELOPE_DOMAIN_PEER_RECORD, ENVELOPE_PAYLOAD_TYPE_PEER_RECORD) + this.domain = ENVELOPE_DOMAIN_PEER_RECORD + this.codec = ENVELOPE_PAYLOAD_TYPE_PEER_RECORD this.peerId = peerId this.multiaddrs = multiaddrs @@ -44,7 +52,7 @@ class PeerRecord extends Record { return this._marshal } - this._marshal = Protobuf.encode({ + this._marshal = Protobuf.PeerRecord.encode({ peer_id: this.peerId.toBytes(), seq: this.seqNumber, addresses: this.multiaddrs.map((m) => ({ @@ -58,10 +66,14 @@ class PeerRecord extends Record { /** * Returns true if `this` record equals the `other`. * - * @param {Record} other + * @param {unknown} other * @returns {boolean} */ equals (other) { + if (!(other instanceof PeerRecord)) { + return false + } + // Validate PeerId if (!this.peerId.equals(other.peerId)) { return false @@ -89,7 +101,7 @@ class PeerRecord extends Record { */ PeerRecord.createFromProtobuf = (buf) => { // Decode - const peerRecord = Protobuf.decode(buf) + const peerRecord = Protobuf.PeerRecord.decode(buf) const peerId = PeerId.createFromBytes(peerRecord.peer_id) const multiaddrs = (peerRecord.addresses || []).map((a) => multiaddr(a.multiaddr)) diff --git a/src/record/peer-record/peer-record.proto.js b/src/record/peer-record/peer-record.proto.js index 9da916ca87..0ebb3b90d0 100644 --- a/src/record/peer-record/peer-record.proto.js +++ b/src/record/peer-record/peer-record.proto.js @@ -7,7 +7,8 @@ const protons = require('protons') // is expected to expand to include other information in the future. // PeerRecords are designed to be serialized to bytes and placed inside of // SignedEnvelopes before sharing with other peers. -const message = ` +/** @type {{PeerRecord: import('../../types').MessageProto}} */ +module.exports = protons(` message PeerRecord { // AddressInfo is a wrapper around a binary multiaddr. It is defined as a // separate message to allow us to add per-address metadata in the future. @@ -24,6 +25,4 @@ message PeerRecord { // addresses is a list of public listen addresses for the peer. repeated AddressInfo addresses = 3; } -` - -module.exports = protons(message).PeerRecord +`) diff --git a/src/record/utils.js b/src/record/utils.js index 65696156b8..0a92ade177 100644 --- a/src/record/utils.js +++ b/src/record/utils.js @@ -3,10 +3,14 @@ const Envelope = require('./envelope') const PeerRecord = require('./peer-record') +/** + * @typedef {import('../')} Libp2p + */ + /** * Create (or update if existing) self peer record and store it in the AddressBook. * - * @param {libp2p} libp2p + * @param {Libp2p} libp2p * @returns {Promise} */ async function updateSelfPeerRecord (libp2p) { diff --git a/src/registrar.js b/src/registrar.js index 5130a02fcb..367f110c80 100644 --- a/src/registrar.js +++ b/src/registrar.js @@ -1,15 +1,24 @@ 'use strict' const debug = require('debug') +const log = Object.assign(debug('libp2p:peer-store'), { + error: debug('libp2p:peer-store:err') +}) const errcode = require('err-code') -const log = debug('libp2p:peer-store') -log.error = debug('libp2p:peer-store:error') const { - ERR_INVALID_PARAMETERS + codes: { ERR_INVALID_PARAMETERS } } = require('./errors') const Topology = require('libp2p-interfaces/src/topology') +/** + * @typedef {import('peer-id')} PeerId + * @typedef {import('./peer-store')} PeerStore + * @typedef {import('./connection-manager')} ConnectionManager + * @typedef {import('libp2p-interfaces/src/connection').Connection} Connection + * @typedef {import('libp2p-interfaces/src/topology')} Topology + */ + /** * Responsible for notifying registered protocols of events in the network. */ @@ -17,7 +26,7 @@ class Registrar { /** * @param {Object} props * @param {PeerStore} props.peerStore - * @param {connectionManager} props.connectionManager + * @param {ConnectionManager} props.connectionManager * @class */ constructor ({ peerStore, connectionManager }) { @@ -51,7 +60,7 @@ class Registrar { * Get a connection with a peer. * * @param {PeerId} peerId - * @returns {Connection} + * @returns {Connection | null} */ getConnection (peerId) { return this.connectionManager.get(peerId) @@ -65,11 +74,12 @@ class Registrar { */ register (topology) { if (!Topology.isTopology(topology)) { + log.error('topology must be an instance of interfaces/topology') throw errcode(new Error('topology must be an instance of interfaces/topology'), ERR_INVALID_PARAMETERS) } // Create topology - const id = (parseInt(Math.random() * 1e9)).toString(36) + Date.now() + const id = (Math.random() * 1e9).toString(36) + Date.now() this.topologies.set(id, topology) diff --git a/src/transport-manager.js b/src/transport-manager.js index 7a47a9e90a..6d1326833f 100644 --- a/src/transport-manager.js +++ b/src/transport-manager.js @@ -1,25 +1,39 @@ 'use strict' +const debug = require('debug') +const log = Object.assign(debug('libp2p:transports'), { + error: debug('libp2p:transports:err') +}) + const pSettle = require('p-settle') const { codes } = require('./errors') const errCode = require('err-code') -const debug = require('debug') -const log = debug('libp2p:transports') -log.error = debug('libp2p:transports:error') const { updateSelfPeerRecord } = require('./record/utils') +/** + * @typedef {import('multiaddr')} Multiaddr + * @typedef {import('libp2p-interfaces/src/connection').Connection} Connection + * @typedef {import('libp2p-interfaces/src/transport/types').TransportFactory} TransportFactory + * @typedef {import('libp2p-interfaces/src/transport/types').Transport} Transport + * + * @typedef {Object} TransportManagerProperties + * @property {import('./')} libp2p + * @property {import('./upgrader')} upgrader + * + * @typedef {Object} TransportManagerOptions + * @property {number} [faultTolerance = FAULT_TOLERANCE.FATAL_ALL] - Address listen error tolerance. + */ + class TransportManager { /** * @class - * @param {object} options - * @param {Libp2p} options.libp2p - The Libp2p instance. It will be passed to the transports. - * @param {Upgrader} options.upgrader - The upgrader to provide to the transports - * @param {boolean} [options.faultTolerance = FAULT_TOLERANCE.FATAL_ALL] - Address listen error tolerance. + * @param {TransportManagerProperties & TransportManagerOptions} options */ constructor ({ libp2p, upgrader, faultTolerance = FAULT_TOLERANCE.FATAL_ALL }) { this.libp2p = libp2p this.upgrader = upgrader + /** @type {Map} */ this._transports = new Map() this._listeners = new Map() this.faultTolerance = faultTolerance @@ -29,7 +43,7 @@ class TransportManager { * Adds a `Transport` to the manager * * @param {string} key - * @param {Transport} Transport + * @param {TransportFactory} Transport * @param {*} transportOptions - Additional options to pass to the transport * @returns {void} */ @@ -117,7 +131,7 @@ class TransportManager { /** * Returns all the transports instances. * - * @returns {Iterator} + * @returns {IterableIterator} */ getTransports () { return this._transports.values() @@ -141,7 +155,7 @@ class TransportManager { * Starts listeners for each listen Multiaddr. * * @async - * @param {Array} addrs - addresses to attempt to listen on + * @param {Multiaddr[]} addrs - addresses to attempt to listen on */ async listen (addrs) { if (!addrs || addrs.length === 0) { @@ -157,7 +171,7 @@ class TransportManager { // For each supported multiaddr, create a listener for (const addr of supportedAddrs) { log('creating listener for %s on %s', key, addr) - const listener = transport.createListener({}, this.onConnection) + const listener = transport.createListener({}) this._listeners.get(key).push(listener) // Track listen/close events diff --git a/src/types.ts b/src/types.ts new file mode 100644 index 0000000000..3e87d7c803 --- /dev/null +++ b/src/types.ts @@ -0,0 +1,84 @@ + +// Insecure Message types +export enum KeyType { + RSA = 0, + Ed25519 = 1, + Secp256k1 = 2, + ECDSA = 3 +} + +// Protobufs +export type MessageProto = { + encode(value: any): Uint8Array + decode(bytes: Uint8Array): any +} + +export type SUCCESS = 100; +export type HOP_SRC_ADDR_TOO_LONG = 220; +export type HOP_DST_ADDR_TOO_LONG = 221; +export type HOP_SRC_MULTIADDR_INVALID = 250; +export type HOP_DST_MULTIADDR_INVALID = 251; +export type HOP_NO_CONN_TO_DST = 260; +export type HOP_CANT_DIAL_DST = 261; +export type HOP_CANT_OPEN_DST_STREAM = 262; +export type HOP_CANT_SPEAK_RELAY = 270; +export type HOP_CANT_RELAY_TO_SELF = 280; +export type STOP_SRC_ADDR_TOO_LONG = 320; +export type STOP_DST_ADDR_TOO_LONG = 321; +export type STOP_SRC_MULTIADDR_INVALID = 350; +export type STOP_DST_MULTIADDR_INVALID = 351; +export type STOP_RELAY_REFUSED = 390; +export type MALFORMED_MESSAGE = 400; + +export type CircuitStatus = SUCCESS | HOP_SRC_ADDR_TOO_LONG | HOP_DST_ADDR_TOO_LONG + | HOP_SRC_MULTIADDR_INVALID | HOP_DST_MULTIADDR_INVALID | HOP_NO_CONN_TO_DST + | HOP_CANT_DIAL_DST | HOP_CANT_OPEN_DST_STREAM | HOP_CANT_SPEAK_RELAY | HOP_CANT_RELAY_TO_SELF + | STOP_SRC_ADDR_TOO_LONG | STOP_DST_ADDR_TOO_LONG | STOP_SRC_MULTIADDR_INVALID + | STOP_DST_MULTIADDR_INVALID | STOP_RELAY_REFUSED | MALFORMED_MESSAGE + +export type HOP = 1; +export type STOP = 2; +export type STATUS = 3; +export type CAN_HOP = 4; + +export type CircuitType = HOP | STOP | STATUS | CAN_HOP + +export type CircuitPeer = { + id: Uint8Array + addrs: Uint8Array[] +} + +export type CircuitRequest = { + type: CircuitType + dstPeer: CircuitPeer + srcPeer: CircuitPeer +} + +export type CircuitMessageProto = { + encode(value: any): Uint8Array + decode(bytes: Uint8Array): any + Status: { + SUCCESS: SUCCESS, + HOP_SRC_ADDR_TOO_LONG: HOP_SRC_ADDR_TOO_LONG, + HOP_DST_ADDR_TOO_LONG: HOP_DST_ADDR_TOO_LONG, + HOP_SRC_MULTIADDR_INVALID: HOP_SRC_MULTIADDR_INVALID, + HOP_DST_MULTIADDR_INVALID: HOP_DST_MULTIADDR_INVALID, + HOP_NO_CONN_TO_DST: HOP_NO_CONN_TO_DST, + HOP_CANT_DIAL_DST: HOP_CANT_DIAL_DST, + HOP_CANT_OPEN_DST_STREAM: HOP_CANT_OPEN_DST_STREAM, + HOP_CANT_SPEAK_RELAY: HOP_CANT_SPEAK_RELAY, + HOP_CANT_RELAY_TO_SELF: HOP_CANT_RELAY_TO_SELF, + STOP_SRC_ADDR_TOO_LONG: STOP_SRC_ADDR_TOO_LONG, + STOP_DST_ADDR_TOO_LONG: STOP_DST_ADDR_TOO_LONG, + STOP_SRC_MULTIADDR_INVALID: STOP_SRC_MULTIADDR_INVALID, + STOP_DST_MULTIADDR_INVALID: STOP_DST_MULTIADDR_INVALID, + STOP_RELAY_REFUSED: STOP_RELAY_REFUSED, + MALFORMED_MESSAGE: MALFORMED_MESSAGE + }, + Type: { + HOP: HOP, + STOP: STOP, + STATUS: STATUS, + CAN_HOP: CAN_HOP + } +} diff --git a/src/upgrader.js b/src/upgrader.js index 92997eb15f..14d0a4e82d 100644 --- a/src/upgrader.js +++ b/src/upgrader.js @@ -1,29 +1,30 @@ 'use strict' const debug = require('debug') -const log = debug('libp2p:upgrader') -log.error = debug('libp2p:upgrader:error') +const log = Object.assign(debug('libp2p:upgrader'), { + error: debug('libp2p:upgrader:err') +}) +const errCode = require('err-code') const Multistream = require('multistream-select') const { Connection } = require('libp2p-interfaces/src/connection') -const ConnectionStatus = require('libp2p-interfaces/src/connection/status') const PeerId = require('peer-id') -const pipe = require('it-pipe') -const errCode = require('err-code') +const { pipe } = require('it-pipe') const mutableProxy = require('mutable-proxy') const { codes } = require('./errors') /** - * @typedef MultiaddrConnection - * @property {Function} sink - * @property {AsyncIterator} source - * @property {*} conn - * @property {Multiaddr} remoteAddr + * @typedef {import('libp2p-interfaces/src/transport/types').MultiaddrConnection} MultiaddrConnection + * @typedef {import('libp2p-interfaces/src/stream-muxer/types').MuxerFactory} MuxerFactory + * @typedef {import('libp2p-interfaces/src/stream-muxer/types').Muxer} Muxer + * @typedef {import('libp2p-interfaces/src/stream-muxer/types').MuxedStream} MuxedStream + * @typedef {import('libp2p-interfaces/src/crypto/types').Crypto} Crypto + * @typedef {import('multiaddr')} Multiaddr */ /** * @typedef CryptoResult - * @property {*} conn A duplex iterable + * @property {MultiaddrConnection} conn A duplex iterable * @property {PeerId} remotePeer * @property {string} protocol */ @@ -32,24 +33,24 @@ class Upgrader { /** * @param {object} options * @param {PeerId} options.localPeer - * @param {Metrics} options.metrics - * @param {Map} options.cryptos - * @param {Map} options.muxers - * @param {function(Connection)} options.onConnection - Called when a connection is upgraded - * @param {function(Connection)} options.onConnectionEnd + * @param {import('./metrics')} [options.metrics] + * @param {Map} [options.cryptos] + * @param {Map} [options.muxers] + * @param {(Connection) => void} options.onConnection - Called when a connection is upgraded + * @param {(Connection) => void} options.onConnectionEnd */ constructor ({ localPeer, metrics, - cryptos, - muxers, + cryptos = new Map(), + muxers = new Map(), onConnectionEnd = () => {}, onConnection = () => {} }) { this.localPeer = localPeer this.metrics = metrics - this.cryptos = cryptos || new Map() - this.muxers = muxers || new Map() + this.cryptos = cryptos + this.muxers = muxers this.protector = null this.protocols = new Map() this.onConnection = onConnection @@ -74,7 +75,7 @@ class Upgrader { if (this.metrics) { ({ setTarget: setPeer, proxy: proxyPeer } = mutableProxy()) - const idString = (parseInt(Math.random() * 1e9)).toString(36) + Date.now() + const idString = (Math.random() * 1e9).toString(36) + Date.now() setPeer({ toB58String: () => idString }) maConn = this.metrics.trackStream({ stream: maConn, remotePeer: proxyPeer }) } @@ -132,12 +133,7 @@ class Upgrader { * @returns {Promise} */ async upgradeOutbound (maConn) { - let remotePeerId - try { - remotePeerId = PeerId.createFromB58String(maConn.remoteAddr.getPeerId()) - } catch (err) { - log.error('multiaddr did not contain a valid peer id', err) - } + const remotePeerId = PeerId.createFromB58String(maConn.remoteAddr.getPeerId()) let encryptedConn let remotePeer @@ -149,7 +145,7 @@ class Upgrader { if (this.metrics) { ({ setTarget: setPeer, proxy: proxyPeer } = mutableProxy()) - const idString = (parseInt(Math.random() * 1e9)).toString(36) + Date.now() + const idString = (Math.random() * 1e9).toString(36) + Date.now() setPeer({ toB58String: () => idString }) maConn = this.metrics.trackStream({ stream: maConn, remotePeer: proxyPeer }) } @@ -207,8 +203,8 @@ class Upgrader { * @param {string} options.cryptoProtocol - The crypto protocol that was negotiated * @param {string} options.direction - One of ['inbound', 'outbound'] * @param {MultiaddrConnection} options.maConn - The transport layer connection - * @param {*} options.upgradedConn - A duplex connection returned from multiplexer and/or crypto selection - * @param {Muxer} options.Muxer - The muxer to be used for muxing + * @param {MuxedStream | MultiaddrConnection} options.upgradedConn - A duplex connection returned from multiplexer and/or crypto selection + * @param {MuxerFactory} [options.Muxer] - The muxer to be used for muxing * @param {PeerId} options.remotePeer - The peer the connection is with * @returns {Connection} */ @@ -272,7 +268,7 @@ class Upgrader { // Wait for close to finish before notifying of the closure (async () => { try { - if (connection.stat.status === ConnectionStatus.OPEN) { + if (connection.stat.status === 'open') { await connection.close() } } catch (err) { @@ -300,6 +296,7 @@ class Upgrader { remotePeer: remotePeer, stat: { direction, + // @ts-ignore timeline: maConn.timeline, multiplexer: Muxer && Muxer.multicodec, encryption: cryptoProtocol @@ -326,7 +323,7 @@ class Upgrader { * @private * @param {object} options * @param {Connection} options.connection - The connection the stream belongs to - * @param {Stream} options.stream + * @param {MuxedStream} options.stream * @param {string} options.protocol */ _onStream ({ connection, stream, protocol }) { @@ -342,7 +339,7 @@ class Upgrader { * @param {PeerId} localPeer - The initiators PeerId * @param {*} connection * @param {Map} cryptos - * @returns {CryptoResult} An encrypted connection, remote peer `PeerId` and the protocol of the `Crypto` used + * @returns {Promise} An encrypted connection, remote peer `PeerId` and the protocol of the `Crypto` used */ async _encryptInbound (localPeer, connection, cryptos) { const mss = new Multistream.Listener(connection) @@ -354,6 +351,10 @@ class Upgrader { const crypto = cryptos.get(protocol) log('encrypting inbound connection...') + if (!crypto) { + throw new Error(`no crypto module found for ${protocol}`) + } + return { ...await crypto.secureInbound(localPeer, stream), protocol @@ -373,7 +374,7 @@ class Upgrader { * @param {*} connection * @param {PeerId} remotePeerId * @param {Map} cryptos - * @returns {CryptoResult} An encrypted connection, remote peer `PeerId` and the protocol of the `Crypto` used + * @returns {Promise} An encrypted connection, remote peer `PeerId` and the protocol of the `Crypto` used */ async _encryptOutbound (localPeer, connection, remotePeerId, cryptos) { const mss = new Multistream.Dialer(connection) @@ -385,6 +386,10 @@ class Upgrader { const crypto = cryptos.get(protocol) log('encrypting outbound connection to %j', remotePeerId) + if (!crypto) { + throw new Error(`no crypto module found for ${protocol}`) + } + return { ...await crypto.secureOutbound(localPeer, stream, remotePeerId), protocol @@ -400,9 +405,9 @@ class Upgrader { * * @private * @async - * @param {*} connection - A basic duplex connection to multiplex - * @param {Map} muxers - The muxers to attempt multiplexing with - * @returns {*} A muxed connection + * @param {MultiaddrConnection} connection - A basic duplex connection to multiplex + * @param {Map} muxers - The muxers to attempt multiplexing with + * @returns {Promise<{ stream: MuxedStream, Muxer?: MuxerFactory}>} A muxed connection */ async _multiplexOutbound (connection, muxers) { const dialer = new Multistream.Dialer(connection) @@ -424,9 +429,9 @@ class Upgrader { * * @private * @async - * @param {*} connection - A basic duplex connection to multiplex - * @param {Map} muxers - The muxers to attempt multiplexing with - * @returns {*} A muxed connection + * @param {MultiaddrConnection} connection - A basic duplex connection to multiplex + * @param {Map} muxers - The muxers to attempt multiplexing with + * @returns {Promise<{ stream: MuxedStream, Muxer?: MuxerFactory}>} A muxed connection */ async _multiplexInbound (connection, muxers) { const listener = new Multistream.Listener(connection) diff --git a/test/dialing/dial-request.spec.js b/test/dialing/dial-request.spec.js index 4ca25e9d28..fd56620a00 100644 --- a/test/dialing/dial-request.spec.js +++ b/test/dialing/dial-request.spec.js @@ -10,7 +10,7 @@ const AggregateError = require('aggregate-error') const pDefer = require('p-defer') const delay = require('delay') -const { DialRequest } = require('../../src/dialer/dial-request') +const DialRequest = require('../../src/dialer/dial-request') const createMockConnection = require('../utils/mockConnection') const error = new Error('dial failes') diff --git a/test/identify/index.spec.js b/test/identify/index.spec.js index 749f491484..cccd071871 100644 --- a/test/identify/index.spec.js +++ b/test/identify/index.spec.js @@ -12,7 +12,8 @@ const pWaitFor = require('p-wait-for') const unit8ArrayToString = require('uint8arrays/to-string') const { codes: Errors } = require('../../src/errors') -const { IdentifyService, multicodecs } = require('../../src/identify') +const IdentifyService = require('../../src/identify') +const multicodecs = IdentifyService.multicodecs const Peers = require('../fixtures/peers') const Libp2p = require('../../src') const Envelope = require('../../src/record/envelope') diff --git a/test/record/envelope.spec.js b/test/record/envelope.spec.js index ca7408f26f..d95a925ecb 100644 --- a/test/record/envelope.spec.js +++ b/test/record/envelope.spec.js @@ -6,7 +6,6 @@ chai.use(require('chai-bytes')) const uint8arrayFromString = require('uint8arrays/from-string') const uint8arrayEquals = require('uint8arrays/equals') const Envelope = require('../../src/record/envelope') -const Record = require('libp2p-interfaces/src/record') const { codes: ErrorCodes } = require('../../src/errors') const peerUtils = require('../utils/creators/peer') @@ -14,9 +13,10 @@ const peerUtils = require('../utils/creators/peer') const domain = 'libp2p-testing' const codec = uint8arrayFromString('/libp2p/testdata') -class TestRecord extends Record { +class TestRecord { constructor (data) { - super(domain, codec) + this.domain = domain + this.codec = codec this.data = data } diff --git a/test/upgrading/upgrader.spec.js b/test/upgrading/upgrader.spec.js index 7282dcd1c2..96df354952 100644 --- a/test/upgrading/upgrader.spec.js +++ b/test/upgrading/upgrader.spec.js @@ -56,28 +56,6 @@ describe('Upgrader', () => { sinon.restore() }) - it('should ignore a missing remote peer id', async () => { - const { inbound, outbound } = mockMultiaddrConnPair({ addrs, remotePeer }) - - const muxers = new Map([[Muxer.multicodec, Muxer]]) - sinon.stub(localUpgrader, 'muxers').value(muxers) - sinon.stub(remoteUpgrader, 'muxers').value(muxers) - - const cryptos = new Map([[Crypto.protocol, Crypto]]) - sinon.stub(localUpgrader, 'cryptos').value(cryptos) - sinon.stub(remoteUpgrader, 'cryptos').value(cryptos) - - // Remove the peer id from the remote address - outbound.remoteAddr = outbound.remoteAddr.decapsulateCode(421) - - const connections = await Promise.all([ - localUpgrader.upgradeOutbound(outbound), - remoteUpgrader.upgradeInbound(inbound) - ]) - - expect(connections).to.have.length(2) - }) - it('should upgrade with valid muxers and crypto', async () => { const { inbound, outbound } = mockMultiaddrConnPair({ addrs, remotePeer }) diff --git a/tsconfig.json b/tsconfig.json new file mode 100644 index 0000000000..5b9a618c43 --- /dev/null +++ b/tsconfig.json @@ -0,0 +1,9 @@ +{ + "extends": "./node_modules/aegir/src/config/tsconfig.aegir.json", + "compilerOptions": { + "outDir": "dist" + }, + "include": [ + "src" + ] +} \ No newline at end of file