diff --git a/codex/codex.nim b/codex/codex.nim index 8135746410..62fd6cc9ba 100644 --- a/codex/codex.nim +++ b/codex/codex.nim @@ -40,11 +40,10 @@ import ./contracts import ./systemclock import ./contracts/clock import ./contracts/deployment -import ./utils/addrutils import ./namespaces import ./codextypes import ./logutils -import ./nat +import ./nat/reachabilitymanager logScope: topics = "codex node" @@ -57,6 +56,7 @@ type repoStore: RepoStore maintenance: BlockMaintainer taskpool: Taskpool + reachabilityManager: ReachabilityManager CodexPrivateKey* = libp2p.PrivateKey # alias EthWallet = ethers.Wallet @@ -166,12 +166,18 @@ proc start*(s: CodexServer) {.async.} = await s.codexNode.switch.start() - let (announceAddrs, discoveryAddrs) = nattedAddress( - s.config.nat, s.codexNode.switch.peerInfo.addrs, s.config.discoveryPort - ) + s.reachabilityManager.getAnnounceRecords = some proc(): ?seq[MultiAddress] = + s.codexNode.switch.peerInfo.addrs.some + s.reachabilityManager.getDiscoveryRecords = some proc(): ?seq[MultiAddress] = + if dhtRecord =? s.codexNode.discovery.dhtRecord: + return dhtRecord.data.addresses.mapIt(it.address).some + + s.reachabilityManager.updateAnnounceRecords = some proc(records: seq[MultiAddress]) = + s.codexNode.discovery.updateAnnounceRecord(records) + s.reachabilityManager.updateDiscoveryRecords = some proc(records: seq[MultiAddress]) = + s.codexNode.discovery.updateDhtRecord(records) - s.codexNode.discovery.updateAnnounceRecord(announceAddrs) - s.codexNode.discovery.updateDhtRecord(discoveryAddrs) + await s.reachabilityManager.start(s.codexNode.switch, s.config.bootstrapNodes) await s.bootstrapInteractions() await s.codexNode.start() @@ -183,6 +189,7 @@ proc stop*(s: CodexServer) {.async.} = let res = await noCancel allFinishedFailed[void]( @[ s.restServer.stop(), + s.reachabilityManager.stop(), s.codexNode.switch.stop(), s.codexNode.stop(), s.repoStore.stop(), @@ -201,6 +208,9 @@ proc new*( T: type CodexServer, config: CodexConf, privateKey: CodexPrivateKey ): CodexServer = ## create CodexServer including setting up datastore, repostore, etc + + let reachabilityManager = ReachabilityManager.new(config.forcePortMapping) + let switch = SwitchBuilder .new() .withPrivateKey(privateKey) @@ -212,6 +222,11 @@ proc new*( .withAgentVersion(config.agentString) .withSignedPeerRecord(true) .withTcpTransport({ServerFlags.ReuseAddr}) + # Adds AutoNAT server support - ability to respond to other peers ask about their reachability status + .withAutonat() + + # Adds AutoNAT client support - to discover the node's rechability + .withServices(@[reachabilityManager.getAutonatService()]) .build() var @@ -338,4 +353,5 @@ proc new*( repoStore: repoStore, maintenance: maintenance, taskpool: taskpool, + reachabilityManager: reachabilityManager, ) diff --git a/codex/conf.nim b/codex/conf.nim index 77ef96caac..882c37d281 100644 --- a/codex/conf.nim +++ b/codex/conf.nim @@ -41,14 +41,13 @@ import ./logutils import ./stores import ./units import ./utils -import ./nat -import ./utils/natutils +import ./nat/port_mapping from ./contracts/config import DefaultRequestCacheSize, DefaultMaxPriorityFeePerGas from ./validationconfig import MaxSlots, ValidationGroups -export units, net, codextypes, logutils, completeCmdArg, parseCmdArg, NatConfig -export ValidationGroups, MaxSlots +export units, net, codextypes, logutils, completeCmdArg, parseCmdArg +export ValidationGroups, MaxSlots, PortMappingStrategy export DefaultQuotaBytes, DefaultBlockTtl, DefaultBlockInterval, DefaultNumBlocksPerInterval, @@ -151,14 +150,14 @@ type name: "listen-addrs" .}: seq[MultiAddress] - nat* {. + forcePortMapping* {. desc: - "Specify method to use for determining public address. " & - "Must be one of: any, none, upnp, pmp, extip:", - defaultValue: defaultNatConfig(), + "Overide automatic detection to specific upnp mode. " & + "Must be one of: any, none, upnp, pmp", + defaultValue: PortMappingStrategy.Any, defaultValueDesc: "any", - name: "nat" - .}: NatConfig + name: "force-port-mapping" + .}: PortMappingStrategy discoveryPort* {. desc: "Discovery (UDP) port", @@ -464,9 +463,6 @@ logutils.formatIt(LogFormat.json, EthAddress): func defaultAddress*(conf: CodexConf): IpAddress = result = static parseIpAddress("127.0.0.1") -func defaultNatConfig*(): NatConfig = - result = NatConfig(hasExtIp: false, nat: NatStrategy.NatAny) - func persistence*(self: CodexConf): bool = self.cmd == StartUpCmd.persistence @@ -538,29 +534,20 @@ proc parseCmdArg*(T: type SignedPeerRecord, uri: string): T = quit QuitFailure res -func parseCmdArg*(T: type NatConfig, p: string): T {.raises: [ValueError].} = +func parseCmdArg*(T: type PortMappingStrategy, p: string): T {.raises: [ValueError].} = case p.toLowerAscii of "any": - NatConfig(hasExtIp: false, nat: NatStrategy.NatAny) + PortMappingStrategy.Any of "none": - NatConfig(hasExtIp: false, nat: NatStrategy.NatNone) + PortMappingStrategy.None of "upnp": - NatConfig(hasExtIp: false, nat: NatStrategy.NatUpnp) + PortMappingStrategy.Upnp of "pmp": - NatConfig(hasExtIp: false, nat: NatStrategy.NatPmp) + PortMappingStrategy.Pmp else: - if p.startsWith("extip:"): - try: - let ip = parseIpAddress(p[6 ..^ 1]) - NatConfig(hasExtIp: true, extIp: ip) - except ValueError: - let error = "Not a valid IP address: " & p[6 ..^ 1] - raise newException(ValueError, error) - else: - let error = "Not a valid NAT option: " & p - raise newException(ValueError, error) + raise newException(ValueError, "Not a valid NAT option: " & p) -proc completeCmdArg*(T: type NatConfig, val: string): seq[string] = +proc completeCmdArg*(T: type PortMappingStrategy, val: string): seq[string] = return @[] proc parseCmdArg*(T: type EthAddress, address: string): T = @@ -642,11 +629,11 @@ proc readValue*( val = dur proc readValue*( - r: var TomlReader, val: var NatConfig + r: var TomlReader, val: var PortMappingStrategy ) {.raises: [SerializationError].} = val = try: - parseCmdArg(NatConfig, r.readValue(string)) + parseCmdArg(PortMappingStrategy, r.readValue(string)) except CatchableError as err: raise newException(SerializationError, err.msg) diff --git a/codex/nat.nim b/codex/nat.nim deleted file mode 100644 index d022dad6cb..0000000000 --- a/codex/nat.nim +++ /dev/null @@ -1,432 +0,0 @@ -# Copyright (c) 2019-2023 Status Research & Development GmbH -# Licensed under either of -# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) -# * MIT license ([LICENSE-MIT](LICENSE-MIT)) -# at your option. -# This file may not be copied, modified, or distributed except according to -# those terms. - -{.push raises: [].} - -import - std/[options, os, strutils, times, net, atomics], - stew/shims/net as stewNet, - stew/[objects, results], - nat_traversal/[miniupnpc, natpmp], - json_serialization/std/net - -import pkg/chronos -import pkg/chronicles -import pkg/libp2p - -import ./utils -import ./utils/natutils -import ./utils/addrutils - -const - UPNP_TIMEOUT = 200 # ms - PORT_MAPPING_INTERVAL = 20 * 60 # seconds - NATPMP_LIFETIME = 60 * 60 # in seconds, must be longer than PORT_MAPPING_INTERVAL - -type PortMappings* = object - internalTcpPort: Port - externalTcpPort: Port - internalUdpPort: Port - externalUdpPort: Port - description: string - -type PortMappingArgs = - tuple[strategy: NatStrategy, tcpPort, udpPort: Port, description: string] - -type NatConfig* = object - case hasExtIp*: bool - of true: extIp*: IpAddress - of false: nat*: NatStrategy - -var - upnp {.threadvar.}: Miniupnp - npmp {.threadvar.}: NatPmp - strategy = NatStrategy.NatNone - natClosed: Atomic[bool] - extIp: Option[IpAddress] - activeMappings: seq[PortMappings] - natThreads: seq[Thread[PortMappingArgs]] = @[] - -logScope: - topics = "nat" - -type PrefSrcStatus = enum - NoRoutingInfo - PrefSrcIsPublic - PrefSrcIsPrivate - BindAddressIsPublic - BindAddressIsPrivate - -## Also does threadvar initialisation. -## Must be called before redirectPorts() in each thread. -proc getExternalIP*(natStrategy: NatStrategy, quiet = false): Option[IpAddress] = - var externalIP: IpAddress - - if natStrategy == NatStrategy.NatAny or natStrategy == NatStrategy.NatUpnp: - if upnp == nil: - upnp = newMiniupnp() - - upnp.discoverDelay = UPNP_TIMEOUT - let dres = upnp.discover() - if dres.isErr: - debug "UPnP", msg = dres.error - else: - var - msg: cstring - canContinue = true - case upnp.selectIGD() - of IGDNotFound: - msg = "Internet Gateway Device not found. Giving up." - canContinue = false - of IGDFound: - msg = "Internet Gateway Device found." - of IGDNotConnected: - msg = "Internet Gateway Device found but it's not connected. Trying anyway." - of NotAnIGD: - msg = - "Some device found, but it's not recognised as an Internet Gateway Device. Trying anyway." - of IGDIpNotRoutable: - msg = - "Internet Gateway Device found and is connected, but with a reserved or non-routable IP. Trying anyway." - if not quiet: - debug "UPnP", msg - if canContinue: - let ires = upnp.externalIPAddress() - if ires.isErr: - debug "UPnP", msg = ires.error - else: - # if we got this far, UPnP is working and we don't need to try NAT-PMP - try: - externalIP = parseIpAddress(ires.value) - strategy = NatStrategy.NatUpnp - return some(externalIP) - except ValueError as e: - error "parseIpAddress() exception", err = e.msg - return - - if natStrategy == NatStrategy.NatAny or natStrategy == NatStrategy.NatPmp: - if npmp == nil: - npmp = newNatPmp() - let nres = npmp.init() - if nres.isErr: - debug "NAT-PMP", msg = nres.error - else: - let nires = npmp.externalIPAddress() - if nires.isErr: - debug "NAT-PMP", msg = nires.error - else: - try: - externalIP = parseIpAddress($(nires.value)) - strategy = NatStrategy.NatPmp - return some(externalIP) - except ValueError as e: - error "parseIpAddress() exception", err = e.msg - return - -# This queries the routing table to get the "preferred source" attribute and -# checks if it's a public IP. If so, then it's our public IP. -# -# Further more, we check if the bind address (user provided, or a "0.0.0.0" -# default) is a public IP. That's a long shot, because code paths involving a -# user-provided bind address are not supposed to get here. -proc getRoutePrefSrc(bindIp: IpAddress): (Option[IpAddress], PrefSrcStatus) = - let bindAddress = initTAddress(bindIp, Port(0)) - - if bindAddress.isAnyLocal(): - let ip = getRouteIpv4() - if ip.isErr(): - # No route was found, log error and continue without IP. - error "No routable IP address found, check your network connection", - error = ip.error - return (none(IpAddress), NoRoutingInfo) - elif ip.get().isGlobalUnicast(): - return (some(ip.get()), PrefSrcIsPublic) - else: - return (none(IpAddress), PrefSrcIsPrivate) - elif bindAddress.isGlobalUnicast(): - return (some(bindIp), BindAddressIsPublic) - else: - return (none(IpAddress), BindAddressIsPrivate) - -# Try to detect a public IP assigned to this host, before trying NAT traversal. -proc getPublicRoutePrefSrcOrExternalIP*( - natStrategy: NatStrategy, bindIp: IpAddress, quiet = true -): Option[IpAddress] = - let (prefSrcIp, prefSrcStatus) = getRoutePrefSrc(bindIp) - - case prefSrcStatus - of NoRoutingInfo, PrefSrcIsPublic, BindAddressIsPublic: - return prefSrcIp - of PrefSrcIsPrivate, BindAddressIsPrivate: - let extIp = getExternalIP(natStrategy, quiet) - if extIp.isSome: - return some(extIp.get) - -proc doPortMapping( - strategy: NatStrategy, tcpPort, udpPort: Port, description: string -): Option[(Port, Port)] {.gcsafe.} = - var - extTcpPort: Port - extUdpPort: Port - - if strategy == NatStrategy.NatUpnp: - for t in [(tcpPort, UPNPProtocol.TCP), (udpPort, UPNPProtocol.UDP)]: - let - (port, protocol) = t - pmres = upnp.addPortMapping( - externalPort = $port, - protocol = protocol, - internalHost = upnp.lanAddr, - internalPort = $port, - desc = description, - leaseDuration = 0, - ) - if pmres.isErr: - error "UPnP port mapping", msg = pmres.error, port - return - else: - # let's check it - let cres = - upnp.getSpecificPortMapping(externalPort = $port, protocol = protocol) - if cres.isErr: - warn "UPnP port mapping check failed. Assuming the check itself is broken and the port mapping was done.", - msg = cres.error - - info "UPnP: added port mapping", - externalPort = port, internalPort = port, protocol = protocol - case protocol - of UPNPProtocol.TCP: - extTcpPort = port - of UPNPProtocol.UDP: - extUdpPort = port - elif strategy == NatStrategy.NatPmp: - for t in [(tcpPort, NatPmpProtocol.TCP), (udpPort, NatPmpProtocol.UDP)]: - let - (port, protocol) = t - pmres = npmp.addPortMapping( - eport = port.cushort, - iport = port.cushort, - protocol = protocol, - lifetime = NATPMP_LIFETIME, - ) - if pmres.isErr: - error "NAT-PMP port mapping", msg = pmres.error, port - return - else: - let extPort = Port(pmres.value) - info "NAT-PMP: added port mapping", - externalPort = extPort, internalPort = port, protocol = protocol - case protocol - of NatPmpProtocol.TCP: - extTcpPort = extPort - of NatPmpProtocol.UDP: - extUdpPort = extPort - return some((extTcpPort, extUdpPort)) - -proc repeatPortMapping(args: PortMappingArgs) {.thread, raises: [ValueError].} = - ignoreSignalsInThread() - let - (strategy, tcpPort, udpPort, description) = args - interval = initDuration(seconds = PORT_MAPPING_INTERVAL) - sleepDuration = 1_000 # in ms, also the maximum delay after pressing Ctrl-C - - var lastUpdate = now() - - # We can't use copies of Miniupnp and NatPmp objects in this thread, because they share - # C pointers with other instances that have already been garbage collected, so - # we use threadvars instead and initialise them again with getExternalIP(), - # even though we don't need the external IP's value. - let ipres = getExternalIP(strategy, quiet = true) - if ipres.isSome: - while natClosed.load() == false: - let - # we're being silly here with this channel polling because we can't - # select on Nim channels like on Go ones - currTime = now() - if currTime >= (lastUpdate + interval): - discard doPortMapping(strategy, tcpPort, udpPort, description) - lastUpdate = currTime - - sleep(sleepDuration) - -proc stopNatThreads() {.noconv.} = - # stop the thread - debug "Stopping NAT port mapping renewal threads" - try: - natClosed.store(true) - joinThreads(natThreads) - except Exception as exc: - warn "Failed to stop NAT port mapping renewal thread", exc = exc.msg - - # delete our port mappings - - # FIXME: if the initial port mapping failed because it already existed for the - # required external port, we should not delete it. It might have been set up - # by another program. - - # In Windows, a new thread is created for the signal handler, so we need to - # initialise our threadvars again. - - let ipres = getExternalIP(strategy, quiet = true) - if ipres.isSome: - if strategy == NatStrategy.NatUpnp: - for entry in activeMappings: - for t in [ - (entry.externalTcpPort, entry.internalTcpPort, UPNPProtocol.TCP), - (entry.externalUdpPort, entry.internalUdpPort, UPNPProtocol.UDP), - ]: - let - (eport, iport, protocol) = t - pmres = upnp.deletePortMapping(externalPort = $eport, protocol = protocol) - if pmres.isErr: - error "UPnP port mapping deletion", msg = pmres.error - else: - debug "UPnP: deleted port mapping", - externalPort = eport, internalPort = iport, protocol = protocol - elif strategy == NatStrategy.NatPmp: - for entry in activeMappings: - for t in [ - (entry.externalTcpPort, entry.internalTcpPort, NatPmpProtocol.TCP), - (entry.externalUdpPort, entry.internalUdpPort, NatPmpProtocol.UDP), - ]: - let - (eport, iport, protocol) = t - pmres = npmp.deletePortMapping( - eport = eport.cushort, iport = iport.cushort, protocol = protocol - ) - if pmres.isErr: - error "NAT-PMP port mapping deletion", msg = pmres.error - else: - debug "NAT-PMP: deleted port mapping", - externalPort = eport, internalPort = iport, protocol = protocol - -proc redirectPorts*( - strategy: NatStrategy, tcpPort, udpPort: Port, description: string -): Option[(Port, Port)] = - result = doPortMapping(strategy, tcpPort, udpPort, description) - if result.isSome: - let (externalTcpPort, externalUdpPort) = result.get() - # needed by NAT-PMP on port mapping deletion - # Port mapping works. Let's launch a thread that repeats it, in case the - # NAT-PMP lease expires or the router is rebooted and forgets all about - # these mappings. - activeMappings.add( - PortMappings( - internalTcpPort: tcpPort, - externalTcpPort: externalTcpPort, - internalUdpPort: udpPort, - externalUdpPort: externalUdpPort, - description: description, - ) - ) - try: - natThreads.add(Thread[PortMappingArgs]()) - natThreads[^1].createThread( - repeatPortMapping, (strategy, externalTcpPort, externalUdpPort, description) - ) - # atexit() in disguise - if natThreads.len == 1: - # we should register the thread termination function only once - addQuitProc(stopNatThreads) - except Exception as exc: - warn "Failed to create NAT port mapping renewal thread", exc = exc.msg - -proc setupNat*( - natStrategy: NatStrategy, tcpPort, udpPort: Port, clientId: string -): tuple[ip: Option[IpAddress], tcpPort, udpPort: Option[Port]] = - ## Setup NAT port mapping and get external IP address. - ## If any of this fails, we don't return any IP address but do return the - ## original ports as best effort. - ## TODO: Allow for tcp or udp port mapping to be optional. - if extIp.isNone: - extIp = getExternalIP(natStrategy) - if extIp.isSome: - let ip = extIp.get - let extPorts = ( - {.gcsafe.}: - redirectPorts( - strategy, tcpPort = tcpPort, udpPort = udpPort, description = clientId - ) - ) - if extPorts.isSome: - let (extTcpPort, extUdpPort) = extPorts.get() - (ip: some(ip), tcpPort: some(extTcpPort), udpPort: some(extUdpPort)) - else: - warn "UPnP/NAT-PMP available but port forwarding failed" - (ip: none(IpAddress), tcpPort: some(tcpPort), udpPort: some(udpPort)) - else: - warn "UPnP/NAT-PMP not available" - (ip: none(IpAddress), tcpPort: some(tcpPort), udpPort: some(udpPort)) - -proc setupAddress*( - natConfig: NatConfig, bindIp: IpAddress, tcpPort, udpPort: Port, clientId: string -): tuple[ip: Option[IpAddress], tcpPort, udpPort: Option[Port]] {.gcsafe.} = - ## Set-up of the external address via any of the ways as configured in - ## `NatConfig`. In case all fails an error is logged and the bind ports are - ## selected also as external ports, as best effort and in hope that the - ## external IP can be figured out by other means at a later stage. - ## TODO: Allow for tcp or udp bind ports to be optional. - - if natConfig.hasExtIp: - # any required port redirection must be done by hand - return (some(natConfig.extIp), some(tcpPort), some(udpPort)) - - case natConfig.nat - of NatStrategy.NatAny: - let (prefSrcIp, prefSrcStatus) = getRoutePrefSrc(bindIp) - - case prefSrcStatus - of NoRoutingInfo, PrefSrcIsPublic, BindAddressIsPublic: - return (prefSrcIp, some(tcpPort), some(udpPort)) - of PrefSrcIsPrivate, BindAddressIsPrivate: - return setupNat(natConfig.nat, tcpPort, udpPort, clientId) - of NatStrategy.NatNone: - let (prefSrcIp, prefSrcStatus) = getRoutePrefSrc(bindIp) - - case prefSrcStatus - of NoRoutingInfo, PrefSrcIsPublic, BindAddressIsPublic: - return (prefSrcIp, some(tcpPort), some(udpPort)) - of PrefSrcIsPrivate: - error "No public IP address found. Should not use --nat:none option" - return (none(IpAddress), some(tcpPort), some(udpPort)) - of BindAddressIsPrivate: - error "Bind IP is not a public IP address. Should not use --nat:none option" - return (none(IpAddress), some(tcpPort), some(udpPort)) - of NatStrategy.NatUpnp, NatStrategy.NatPmp: - return setupNat(natConfig.nat, tcpPort, udpPort, clientId) - -proc nattedAddress*( - natConfig: NatConfig, addrs: seq[MultiAddress], udpPort: Port -): tuple[libp2p, discovery: seq[MultiAddress]] = - ## Takes a NAT configuration, sequence of multiaddresses and UDP port and returns: - ## - Modified multiaddresses with NAT-mapped addresses for libp2p - ## - Discovery addresses with NAT-mapped UDP ports - - var discoveryAddrs = newSeq[MultiAddress](0) - let newAddrs = addrs.mapIt: - block: - # Extract IP address and port from the multiaddress - let (ipPart, port) = getAddressAndPort(it) - if ipPart.isSome and port.isSome: - # Try to setup NAT mapping for the address - let (newIP, tcp, udp) = - setupAddress(natConfig, ipPart.get, port.get, udpPort, "codex") - if newIP.isSome: - # NAT mapping successful - add discovery address with mapped UDP port - discoveryAddrs.add(getMultiAddrWithIPAndUDPPort(newIP.get, udp.get)) - # Remap original address with NAT IP and TCP port - it.remapAddr(ip = newIP, port = tcp) - else: - # NAT mapping failed - use original address - echo "Failed to get external IP, using original address", it - discoveryAddrs.add(getMultiAddrWithIPAndUDPPort(ipPart.get, udpPort)) - it - else: - # Invalid multiaddress format - return as is - it - (newAddrs, discoveryAddrs) diff --git a/codex/nat/port_mapping.nim b/codex/nat/port_mapping.nim new file mode 100644 index 0000000000..57944d08f7 --- /dev/null +++ b/codex/nat/port_mapping.nim @@ -0,0 +1,344 @@ +# Copyright (c) 2019-2025 Status Research & Development GmbH +# Licensed under either of +# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +# * MIT license ([LICENSE-MIT](LICENSE-MIT)) +# at your option. +# This file may not be copied, modified, or distributed except according to +# those terms. + +{.push raises: [].} + +import std/[options, strutils, net] + +import pkg/nat_traversal/[miniupnpc, natpmp] +import pkg/json_serialization/std/net +import pkg/results +import pkg/questionable +import pkg/questionable/results +import pkg/chronos +import pkg/chronicles +import pkg/libp2p + +import ../utils + +logScope: + topics = "codex nat port-mapping" + +const + UPNP_TIMEOUT = 200 # ms + RENEWAL_SLEEP = (20 * 60).seconds + Pmp_LIFETIME = 60 * 60 # in seconds, must be longer than RENEWAL_INTERVAL + MAPPING_DESCRIPTION = "codex" + +type PortMappingStrategy* = enum + Any + Upnp + Pmp + None + +type MappingPort* = ref object of RootObj + value*: Port + +proc `$`*(p: MappingPort): string = + $(p.value) + +type TcpPort* = ref object of MappingPort +type UdpPort* = ref object of MappingPort + +proc newTcpMappingPort*(value: Port): TcpPort = + TcpPort(value: value) + +proc newUdpMappingPort*(value: Port): UdpPort = + UdpPort(value: value) + +type PortMappingEntry* = + tuple[internalPort: MappingPort, externalPort: Option[MappingPort]] + +type PortMapping* = ref object of RootObj + upnp: Miniupnp + npmp: NatPmp + mappings: seq[PortMappingEntry] + renewalLoop: Future[void] + +proc initUpnp(self: PortMapping) = + logScope: + protocol = "upnp" + + if not self.upnp.isNil: + warn "UPnP already initialized!" + + self.upnp = newMiniupnp() + self.upnp.discoverDelay = UPNP_TIMEOUT + + if err =? self.upnp.discover().errorOption: + warn "UPnP error discoverning Internet Gateway Devices", msg = err + self.upnp = nil + + case self.upnp.selectIGD() + of IGDNotFound: + info "UPnP Internet Gateway Device not found. Giving up." + self.upnp = nil + # As UPnP is not supported on our network we won't be using it --> lets erase it. + of IGDFound: + info "UPnP Internet Gateway Device found." + of IGDNotConnected: + info "UPnP Internet Gateway Device found but it's not connected. Trying anyway." + of NotAnIGD: + info "Some device found, but it's not recognised as an Internet Gateway Device. Trying anyway." + of IGDIpNotRoutable: + info "UPnP Internet Gateway Device found and is connected, but with a reserved or non-routable IP. Trying anyway." + +proc initNpmp(self: PortMapping) = + logScope: + protocol = "npmp" + + if not self.npmp.isNil: + warn "NAT-PMP already initialized!" + + self.npmp = newNatPmp() + + if err =? self.npmp.init().errorOption: + warn "Error initialization of NAT-PMP", msg = err + self.npmp = nil + + if err =? self.npmp.externalIPAddress().errorOption: + warn "Fetching of external IP failed.", msg = err + self.npmp = nil + + info "NAT-PMP initialized" + +## Try to initilize all the port mapping protocols based on what is available on the network +proc initProtocols(self: PortMapping, strategy: PortMappingStrategy) = + if strategy == PortMappingStrategy.Any or strategy == PortMappingStrategy.Upnp: + self.initUpnp() + + if not self.upnp.isNil: + return # UPnP is available, using that, no need for NAT-PMP. + + if strategy == PortMappingStrategy.Any or strategy == PortMappingStrategy.Pmp: + self.initNpmp() + +proc new*(T: type PortMapping, strategy: PortMappingStrategy): PortMapping = + let mapping = PortMapping(upnp: nil, npmp: nil, mappings: @[]) + mapping.initProtocols(strategy) + + return mapping + +proc upnpPortMapping( + self: PortMapping, internalPort: MappingPort, externalPort: MappingPort +): ?!MappingPort = + let protocol = if (internalPort is TcpPort): UPNPProtocol.TCP else: UPNPProtocol.UDP + + logScope: + protocol = "upnp" + externalPort = externalPort.value + internalPort = internalPort.value + protocol = protocol + + let pmres = self.upnp.addPortMapping( + externalPort = $(externalPort.value), + protocol = protocol, + internalHost = self.upnp.lanAddr, + internalPort = $(internalPort.value), + desc = MAPPING_DESCRIPTION, + leaseDuration = 0, + ) + + if pmres.isErr: + error "UPnP port mapping", msg = pmres.error + return failure($pmres.error) + + # let's check it + let cres = self.upnp.getSpecificPortMapping( + externalPort = $(externalPort.value), protocol = protocol + ) + if cres.isErr: + warn "UPnP port mapping check failed. Assuming the check itself is broken and the port mapping was done.", + msg = cres.error + info "UPnP added port mapping" + + return success(externalPort) + +proc npmpPortMapping( + self: PortMapping, internalPort: MappingPort, externalPort: MappingPort +): ?!MappingPort = + let protocol = + if (internalPort is TcpPort): NatPmpProtocol.TCP else: NatPmpProtocol.UDP + + logScope: + protocol = "npmp" + externalPort = externalPort.value + internalPort = internalPort.value + protocol = protocol + + let extPortRes = self.npmp.addPortMapping( + eport = externalPort.value.cushort, + iport = internalPort.value.cushort, + protocol = protocol, + lifetime = Pmp_LIFETIME, + ) + + if extPortRes.isErr: + error "NAT-PMP port mapping error", msg = extPortRes.error() + return failure(extPortRes.error()) + + info "NAT-PMP: added port mapping" + + if internalPort is TcpPort: + return success(MappingPort(newTcpMappingPort(Port(extPortRes.value)))) + else: + return success(MappingPort(newUdpMappingPort(Port(extPortRes.value)))) + +## Create port mapping that will try to utilize the same port number +## of the internal port for the external port mapping. +## +## TODO: Add support for trying mapping of random external port. + +proc doPortMapping(self: PortMapping, port: MappingPort): ?!MappingPort = + if not self.upnp.isNil: + return self.upnpPortMapping(port, port) + + if not self.npmp.isNil: + return self.npmpPortMapping(port, port) + + return failure("No active startegy") + +proc doPortMapping( + self: PortMapping, internalPort: MappingPort, externalPort: MappingPort +): ?!MappingPort = + if not self.upnp.isNil: + return self.upnpPortMapping(internalPort, externalPort) + + if not self.npmp.isNil: + return self.npmpPortMapping(internalPort, externalPort) + + return failure("No active startegy") + +proc renewPortMapping(self: PortMapping) {.async.} = + while true: + for mapping in self.mappings: + if externalPort =? mapping.externalPort: + without renewedExternalPort =? + self.doPortMapping(mapping.internalPort, externalPort), err: + error "Error while renewal of port mapping", msg = err.msg + + if renewedExternalPort.value != externalPort.value: + error "The renewed external port is not the same as the originally mapped" + + await sleepAsync(RENEWAL_SLEEP) + +## Gets external IP provided by the port mapping protocols +## Port mapping needs to be succesfully started first using `startPortMapping()` +proc getExternalIP*(self: PortMapping): ?IpAddress = + if self.upnp.isNil and self.npmp.isNil: + warn "No available port-mapping protocol" + return IpAddress.none + + if not self.upnp.isNil: + let ires = self.upnp.externalIPAddress + if ires.isOk(): + info "Got externa IP address", ip = ires.value + try: + return parseIpAddress(ires.value).some + except ValueError as e: + error "Failed to parse IP address", err = e.msg + else: + debug "Getting external IP address using UPnP failed", + msg = ires.error, protocol = "upnp" + + if not self.npmp.isNil: + let nires = self.npmp.externalIPAddress() + if nires.isErr: + debug "Getting external IP address using NAT-PMP failed", msg = nires.error + else: + try: + info "Got externa IP address", ip = $(nires.value), protocol = "npmp" + return parseIpAddress($(nires.value)).some + except ValueError as e: + error "Failed to parse IP address", err = e.msg + + return IpAddress.none + +## Returns true if some supported port mapping protocol +## is available on the local network +proc isAvailable*(self: PortMapping): bool = + return (not self.upnp.isNil) or (not self.npmp.isNil) + +proc start*( + self: PortMapping, internalPorts: seq[MappingPort] +): Future[?!seq[PortMappingEntry]] {.async: (raises: [CancelledError]).} = + if internalPorts.len == 0: + return failure("No internal ports to be mapped were supplied") + + if not self.isAvailable(): + return failure("No available port mapping protocols on the network") + + if self.mappings.len > 0: + return failure("Port mapping was already started! Stop first before re-starting.") + + for port in internalPorts: + without mappedPort =? self.doPortMapping(port), err: + warn "Failed to map port", port = port, msg = err.msg + self.mappings.add((internalPort: port, externalPort: MappingPort.none)) + + self.mappings.add((internalPort: port, externalPort: mappedPort.some)) + + self.renewalLoop = self.renewPortMapping() + asyncSpawn(self.renewalLoop) + + return success(self.mappings) + +proc stop*(self: PortMapping) {.async: (raises: [CancelledError]).} = + if self.upnp.isNil or self.npmp.isNil: + debug "Port mapping is not running, nothing to stop" + return + + info "Stopping port mapping renewal loop" + if not self.renewalLoop.isNil: + if not self.renewalLoop.finished: + try: + await self.renewalLoop.cancelAndWait() + except CancelledError: + discard + except CatchableError as e: + error "Error during cancellation of renewal loop", msg = e.msg + + self.renewalLoop = nil + + for mapping in self.mappings: + if mapping.externalPort.isNone: + continue + + if not self.upnp.isNil: + let protocol = + if (mapping.internalPort is TcpPort): UPNPProtocol.TCP else: UPNPProtocol.UDP + + if err =? + self.upnp.deletePortMapping( + externalPort = $((!mapping.externalPort).value), protocol = protocol + ).errorOption: + error "UPnP port mapping deletion error", msg = err + else: + debug "UPnP: deleted port mapping", + externalPort = !mapping.externalPort, + internalPort = mapping.internalPort, + protocol = protocol + + if not self.npmp.isNil: + let protocol = + if (mapping.internalPort is TcpPort): NatPmpProtocol.TCP else: NatPmpProtocol.UDP + + if err =? + self.npmp.deletePortMapping( + eport = (!mapping.externalPort).value.cushort, + iport = mapping.internalPort.value.cushort, + protocol = protocol, + ).errorOption: + error "NAT-PMP port mapping deletion error", msg = err + else: + debug "NAT-PMP: deleted port mapping", + externalPort = !mapping.externalPort, + internalPort = mapping.internalPort, + protocol = protocol + + self.mappings = @[] diff --git a/codex/nat/reachabilitymanager.nim b/codex/nat/reachabilitymanager.nim new file mode 100644 index 0000000000..bbc63198c5 --- /dev/null +++ b/codex/nat/reachabilitymanager.nim @@ -0,0 +1,178 @@ +import std/sequtils + +import pkg/chronos +import pkg/chronicles +import pkg/questionable +import pkg/questionable/results +import pkg/libp2p +import pkg/libp2p/protocols/connectivity/autonat/client +import pkg/libp2p/protocols/connectivity/autonat/service + +import ../rng as random +import ./port_mapping +import ./utils + +const AutonatCheckInterval = Opt.some(chronos.seconds(30)) + +logScope: + topics = "codex nat reachabilitymanager" + +type + ReachabilityManager* = ref object of RootObj + started = false + portMapping: PortMapping + networkReachability*: NetworkReachability + getAnnounceRecords*: ?GetRecords + getDiscoveryRecords*: ?GetRecords + updateAnnounceRecords*: ?UpdateRecords + updateDiscoveryRecords*: ?UpdateRecords + + GetRecords* = proc(): ?seq[MultiAddress] {.raises: [].} + UpdateRecords* = proc(records: seq[MultiAddress]) {.raises: [].} + +proc new*( + T: typedesc[ReachabilityManager], portMappingStrategy: PortMappingStrategy +): T = + return T(portMapping: PortMapping.new(portMappingStrategy)) + +proc startPortMapping( + self: ReachabilityManager +): Future[bool] {.async: (raises: [CancelledError]).} = + # This check guarantees us that the callbacks are set + # and hence we can use ! (Option.get) without fear. + if not self.started: + warn "ReachabilityManager is not started, yet we are trying to map ports already!" + return false + + try: + {.gcsafe.}: + let announceRecords = (!self.getAnnounceRecords)() + let discoveryRecords = (!self.getDiscoveryRecords)() + + var records: seq[MultiAddress] = @[] + + if announceRecords.isSome: + records.add(!announceRecords) + if discoveryRecords.isSome: + records.add(!discoveryRecords) + + let portsToBeMapped = records.mapIt(getAddressAndPort(it)).mapIt(it.port) + + without mappedPorts =? (await self.portMapping.start(portsToBeMapped)), err: + warn "Could not start port mapping", msg = err.msg + return false + + if mappedPorts.any( + proc(x: PortMappingEntry): bool = + isNone(x.externalPort) + ): + warn "Some ports were not mapped - not using port mapping then" + return false + + info "Succesfully exposed ports", ports = portsToBeMapped + + if announceRecords.isSome: + let announceMappedRecords = zip( + !announceRecords, mappedPorts[0 .. (!announceRecords).len - 1] + ) + .mapIt(getMultiAddr(getAddressAndPort(it[0]).ip, !it[1].externalPort)) + {.gcsafe.}: + (!self.updateAnnounceRecords)(announceMappedRecords) + + if discoveryRecords.isSome: + let discoveryMappedRecords = zip( + !discoveryRecords, + mappedPorts[(mappedPorts.len - (!discoveryRecords).len) .. ^1], + ) + .mapIt(getMultiAddr(getAddressAndPort(it[0]).ip, !it[1].externalPort)) + {.gcsafe.}: + (!self.updateDiscoveryRecords)(discoveryMappedRecords) + + return true + except ValueError as exc: + error "Error while starting port mapping", msg = exc.msg + return false + +proc getReachabilityHandler(manager: ReachabilityManager): StatusAndConfidenceHandler = + let statusAndConfidenceHandler = proc( + networkReachability: NetworkReachability, confidenceOpt: Opt[float] + ): Future[void] {.gcsafe, async: (raises: [CancelledError]).} = + if not manager.started: + warn "ReachabilityManager was not started, but we are already getting reachability updates! Ignoring..." + return + + without confidence =? confidenceOpt: + debug "Node reachability reported without confidence" + return + + if manager.networkReachability == networkReachability: + debug "Node reachability reported without change", + networkReachability = networkReachability + return + + info "Node reachability status changed", + networkReachability = networkReachability, confidence = confidenceOpt + + manager.networkReachability = networkReachability + + if networkReachability == NetworkReachability.NotReachable: + # Lets first start to expose port using port mapping protocols like NAT-PMP or UPnP + if manager.portMapping.isAvailable(): + debug "Port mapping available on the network" + + if await manager.startPortMapping(): + return # We exposed ports so we should be good! + + info "No more options to become reachable" + + return statusAndConfidenceHandler + +proc start*( + self: ReachabilityManager, switch: Switch, bootNodes: seq[SignedPeerRecord] +): Future[void] {.async: (raises: [CancelledError]).} = + doAssert self.getAnnounceRecords.isSome, "getAnnounceRecords is not set" + doAssert self.getDiscoveryRecords.isSome, "getDiscoveryRecords is not set" + doAssert self.updateAnnounceRecords.isSome, "updateAnnounceRecords is not set" + doAssert self.updateDiscoveryRecords.isSome, "updateDiscoveryRecords is not set" + self.started = true + + ## Until more robust way of NAT-traversal helper peers discovery is implemented + ## we will start with simple populating the libp2p peerstore with bootstrap nodes + ## https://github.com/codex-storage/nim-codex/issues/1320 + + for peer in bootNodes: + try: + await switch.connect(peer.data.peerId, peer.data.addresses.mapIt(it.address)) + except CancelledError as exc: + raise exc + except CatchableError as exc: + info "Failed to dial bootstrap nodes", err = exc.msg + +proc stop*( + self: ReachabilityManager +): Future[void] {.async: (raises: [CancelledError]).} = + await self.portMapping.stop() + self.started = false + +proc getAutonatService*(self: ReachabilityManager): Service = + ## AutonatService request other peers to dial us back + ## flagging us as Reachable or NotReachable. + ## We use minimum confidence 0.1 (confidence is calculated as numOfReplies/maxQueueSize) as + ## that will give an answer already for response from one peer. + ## As we use bootnodes for this in initial setup, it is possible we might + ## get only one peer to ask about our reachability and it is crucial to get at least some reply. + ## This should be changed once proactive NAT-traversal helper peers discovery is implemented. + + let autonatService = AutonatService.new( + autonatClient = AutonatClient.new(), + rng = random.Rng.instance(), + scheduleInterval = AutonatCheckInterval, + askNewConnectedPeers = true, + numPeersToAsk = 5, + maxQueueSize = 10, + minConfidence = 0.1, + ) + + autonatService.statusAndConfidenceHandler(self.getReachabilityHandler()) + + return Service(autonatService) diff --git a/codex/nat/utils.nim b/codex/nat/utils.nim new file mode 100644 index 0000000000..9436ec1647 --- /dev/null +++ b/codex/nat/utils.nim @@ -0,0 +1,66 @@ +## Nim-Codex +## Copyright (c) 2022 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +import std/options +import std/net + +import pkg/libp2p +import pkg/stew/endians2 + +import ./port_mapping + +proc getAddressAndPort*( + ma: MultiAddress +): tuple[ip: IpAddress, port: MappingPort] {.raises: [ValueError].} = + try: + # Try IPv4 first + let ipv4Result = ma[multiCodec("ip4")] + let ip = + if ipv4Result.isOk: + let ipBytes = ipv4Result.get().protoArgument().expect("Invalid IPv4 format") + let ipArray = [ipBytes[0], ipBytes[1], ipBytes[2], ipBytes[3]] + IpAddress(family: IPv4, address_v4: ipArray) + else: + # Try IPv6 if IPv4 not found + let ipv6Result = ma[multiCodec("ip6")] + if ipv6Result.isOk: + let ipBytes = ipv6Result.get().protoArgument().expect("Invalid IPv6 format") + var ipArray: array[16, byte] + for i in 0 .. 15: + ipArray[i] = ipBytes[i] + IpAddress(family: IPv6, address_v6: ipArray) + else: + raise newException(ValueError, "Unknown IP family") + + # Get TCP Port + let tcpPortResult = ma[multiCodec("tcp")] + if tcpPortResult.isOk: + let tcpPortBytes = + tcpPortResult.get().protoArgument().expect("Invalid port format") + let tcpPort = newTcpMappingPort(Port(fromBytesBE(uint16, tcpPortBytes))) + return (ip: ip, port: tcpPort) + + # Get UDP Port + let udpPortResult = ma[multiCodec("udp")] + if udpPortResult.isOk: + let udpPortBytes = + udpPortResult.get().protoArgument().expect("Invalid port format") + let udpPort = newUdpMappingPort(Port(fromBytesBE(uint16, udpPortBytes))) + return (ip: ip, port: udpPort) + + raise newException(ValueError, "No TCP/UDP port specified") + except Exception: + raise newException(ValueError, "Invalid multiaddr") + +proc getMultiAddr*(ip: IpAddress, port: MappingPort): MultiAddress = + let ipFamily = if ip.family == IpAddressFamily.IPv4: "/ip4/" else: "/ip6/" + let portType = if (port is TcpPort): "/tcp/" else: "/udp/" + return MultiAddress.init(ipFamily & $ip & portType & $(port.value)).expect( + "valid multiaddr" + ) diff --git a/codex/utils/addrutils.nim b/codex/utils/addrutils.nim index a9ec54f565..f9560f4bbc 100644 --- a/codex/utils/addrutils.nim +++ b/codex/utils/addrutils.nim @@ -13,9 +13,9 @@ push: import std/strutils import std/options +import std/net import pkg/libp2p -import pkg/stew/shims/net import pkg/stew/endians2 func remapAddr*( @@ -44,7 +44,7 @@ func remapAddr*( proc getMultiAddrWithIPAndUDPPort*(ip: IpAddress, port: Port): MultiAddress = ## Creates a MultiAddress with the specified IP address and UDP port - ## + ## ## Parameters: ## - ip: A valid IP address (IPv4 or IPv6) ## - port: The UDP port number diff --git a/codex/utils/natutils.nim b/codex/utils/natutils.nim deleted file mode 100644 index 996d8dd01c..0000000000 --- a/codex/utils/natutils.nim +++ /dev/null @@ -1,67 +0,0 @@ -{.push raises: [].} - -import - std/[tables, hashes], pkg/results, pkg/stew/shims/net as stewNet, chronos, chronicles - -import pkg/libp2p - -type NatStrategy* = enum - NatAny - NatUpnp - NatPmp - NatNone - -type IpLimits* = object - limit*: uint - ips: Table[IpAddress, uint] - -func hash*(ip: IpAddress): Hash = - case ip.family - of IpAddressFamily.IPv6: - hash(ip.address_v6) - of IpAddressFamily.IPv4: - hash(ip.address_v4) - -func inc*(ipLimits: var IpLimits, ip: IpAddress): bool = - let val = ipLimits.ips.getOrDefault(ip, 0) - if val < ipLimits.limit: - ipLimits.ips[ip] = val + 1 - true - else: - false - -func dec*(ipLimits: var IpLimits, ip: IpAddress) = - let val = ipLimits.ips.getOrDefault(ip, 0) - if val == 1: - ipLimits.ips.del(ip) - elif val > 1: - ipLimits.ips[ip] = val - 1 - -func isGlobalUnicast*(address: TransportAddress): bool = - if address.isGlobal() and address.isUnicast(): true else: false - -func isGlobalUnicast*(address: IpAddress): bool = - let a = initTAddress(address, Port(0)) - a.isGlobalUnicast() - -proc getRouteIpv4*(): Result[IpAddress, cstring] = - # Avoiding Exception with initTAddress and can't make it work with static. - # Note: `publicAddress` is only used an "example" IP to find the best route, - # no data is send over the network to this IP! - let - publicAddress = TransportAddress( - family: AddressFamily.IPv4, address_v4: [1'u8, 1, 1, 1], port: Port(0) - ) - route = getBestRoute(publicAddress) - - if route.source.isUnspecified(): - err("No best ipv4 route found") - else: - let ip = - try: - route.source.address() - except ValueError as e: - # This should not occur really. - error "Address conversion error", exception = e.name, msg = e.msg - return err("Invalid IP address") - ok(ip) diff --git a/tests/codex/helpers/nodeutils.nim b/tests/codex/helpers/nodeutils.nim index 6d2edd461c..5f797468db 100644 --- a/tests/codex/helpers/nodeutils.nim +++ b/tests/codex/helpers/nodeutils.nim @@ -10,8 +10,6 @@ import pkg/codex/stores import pkg/codex/blocktype as bt import pkg/codex/blockexchange import pkg/codex/systemclock -import pkg/codex/nat -import pkg/codex/utils/natutils import pkg/codex/slots import pkg/codex/node @@ -186,13 +184,6 @@ proc generateNodes*( if config.enableBootstrap: waitFor switch.peerInfo.update() - let (announceAddrs, discoveryAddrs) = nattedAddress( - NatConfig(hasExtIp: false, nat: NatNone), - switch.peerInfo.addrs, - bindPort.Port, - ) - blockDiscovery.updateAnnounceRecord(announceAddrs) - blockDiscovery.updateDhtRecord(discoveryAddrs) if blockDiscovery.dhtRecord.isSome: bootstrapNodes.add !blockDiscovery.dhtRecord diff --git a/tests/codex/slots/testbackendfactory.nim b/tests/codex/slots/testbackendfactory.nim index a24bc41a5a..f5ecf0fdbe 100644 --- a/tests/codex/slots/testbackendfactory.nim +++ b/tests/codex/slots/testbackendfactory.nim @@ -44,7 +44,6 @@ suite "Test BackendFactory": let config = CodexConf( cmd: StartUpCmd.persistence, - nat: NatConfig(hasExtIp: false, nat: NatNone), metricsAddress: parseIpAddress("127.0.0.1"), persistenceCmd: PersistenceCmd.prover, marketplaceAddress: EthAddress.example.some, @@ -64,7 +63,6 @@ suite "Test BackendFactory": let config = CodexConf( cmd: StartUpCmd.persistence, - nat: NatConfig(hasExtIp: false, nat: NatNone), metricsAddress: parseIpAddress("127.0.0.1"), persistenceCmd: PersistenceCmd.prover, marketplaceAddress: EthAddress.example.some, @@ -85,7 +83,6 @@ suite "Test BackendFactory": let config = CodexConf( cmd: StartUpCmd.persistence, - nat: NatConfig(hasExtIp: false, nat: NatNone), metricsAddress: parseIpAddress("127.0.0.1"), persistenceCmd: PersistenceCmd.prover, marketplaceAddress: EthAddress.example.some, diff --git a/tests/codex/slots/testprover.nim b/tests/codex/slots/testprover.nim index c567db55dd..a2d2ceace0 100644 --- a/tests/codex/slots/testprover.nim +++ b/tests/codex/slots/testprover.nim @@ -36,7 +36,6 @@ suite "Test Prover": metaDs = metaTmp.newDb() config = CodexConf( cmd: StartUpCmd.persistence, - nat: NatConfig(hasExtIp: false, nat: NatNone), metricsAddress: parseIpAddress("127.0.0.1"), persistenceCmd: PersistenceCmd.prover, circomR1cs: InputFile("tests/circuits/fixtures/proof_main.r1cs"),