From 4ad70992562837e94fcc429bcf004d4c6aec0625 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adam=20Uhl=C3=AD=C5=99?= Date: Fri, 24 Oct 2025 15:04:20 +0200 Subject: [PATCH 1/3] feat: autonat support --- codex/codex.nim | 28 +- codex/conf.nim | 47 +-- codex/nat.nim | 432 ----------------------- codex/nat/port_mapping.nim | 363 +++++++++++++++++++ codex/nat/reachabilitymanager.nim | 153 ++++++++ codex/nat/utils.nim | 68 ++++ codex/utils/addrutils.nim | 4 +- codex/utils/natutils.nim | 67 ---- tests/codex/helpers/nodeutils.nim | 7 +- tests/codex/slots/testbackendfactory.nim | 3 - tests/codex/slots/testprover.nim | 1 - 11 files changed, 626 insertions(+), 547 deletions(-) delete mode 100644 codex/nat.nim create mode 100644 codex/nat/port_mapping.nim create mode 100644 codex/nat/reachabilitymanager.nim create mode 100644 codex/nat/utils.nim delete mode 100644 codex/utils/natutils.nim diff --git a/codex/codex.nim b/codex/codex.nim index 8135746410..0ff25ee0aa 100644 --- a/codex/codex.nim +++ b/codex/codex.nim @@ -40,11 +40,10 @@ import ./contracts import ./systemclock import ./contracts/clock import ./contracts/deployment -import ./utils/addrutils import ./namespaces import ./codextypes import ./logutils -import ./nat +import ./nat/reachabilitymanager logScope: topics = "codex node" @@ -57,6 +56,7 @@ type repoStore: RepoStore maintenance: BlockMaintainer taskpool: Taskpool + reachabilityManager: ReachabilityManager CodexPrivateKey* = libp2p.PrivateKey # alias EthWallet = ethers.Wallet @@ -166,12 +166,16 @@ proc start*(s: CodexServer) {.async.} = await s.codexNode.switch.start() - let (announceAddrs, discoveryAddrs) = nattedAddress( - s.config.nat, s.codexNode.switch.peerInfo.addrs, s.config.discoveryPort - ) + s.reachabilityManager.getAnnounceRecords = some proc() = + s.codexNode.switch.peerInfo.addrs + s.reachabilityManager.getDiscoveryRecords = some proc() = + s.codexNode.discovery.dhtRecord.data.addresses.mapIt(it.address) + s.reachabilityManager.updateAnnounceRecords = some proc(records: seq[MultiAddress]) = + s.codexNode.discovery.updateAnnounceRecord(records) + s.reachabilityManager.updateDiscoveryRecords = some proc(records: seq[MultiAddress]) = + s.codexNode.discovery.updateDhtRecord(records) - s.codexNode.discovery.updateAnnounceRecord(announceAddrs) - s.codexNode.discovery.updateDhtRecord(discoveryAddrs) + await s.reachabilityManager.start(s.codexNode.switch, s.config.bootstrapNodes) await s.bootstrapInteractions() await s.codexNode.start() @@ -183,6 +187,7 @@ proc stop*(s: CodexServer) {.async.} = let res = await noCancel allFinishedFailed[void]( @[ s.restServer.stop(), + s.reachabilityManager.stop(), s.codexNode.switch.stop(), s.codexNode.stop(), s.repoStore.stop(), @@ -201,6 +206,9 @@ proc new*( T: type CodexServer, config: CodexConf, privateKey: CodexPrivateKey ): CodexServer = ## create CodexServer including setting up datastore, repostore, etc + + let reachabilityManager = ReachabilityManager.new(config.portMappingStrategy) + let switch = SwitchBuilder .new() .withPrivateKey(privateKey) @@ -212,6 +220,11 @@ proc new*( .withAgentVersion(config.agentString) .withSignedPeerRecord(true) .withTcpTransport({ServerFlags.ReuseAddr}) + # Adds AutoNAT server support - ability to respond to other peers ask about their reachability status + .withAutonat() + + # Adds AutoNAT client support - to discover the node's rechability + .withServices(@[reachabilityManager.getAutonatService()]) .build() var @@ -338,4 +351,5 @@ proc new*( repoStore: repoStore, maintenance: maintenance, taskpool: taskpool, + reachabilityManager: reachabilityManager, ) diff --git a/codex/conf.nim b/codex/conf.nim index 77ef96caac..fb021a43de 100644 --- a/codex/conf.nim +++ b/codex/conf.nim @@ -41,13 +41,12 @@ import ./logutils import ./stores import ./units import ./utils -import ./nat -import ./utils/natutils +import ./nat/port_mapping from ./contracts/config import DefaultRequestCacheSize, DefaultMaxPriorityFeePerGas from ./validationconfig import MaxSlots, ValidationGroups -export units, net, codextypes, logutils, completeCmdArg, parseCmdArg, NatConfig +export units, net, codextypes, logutils, completeCmdArg, parseCmdArg export ValidationGroups, MaxSlots export @@ -151,14 +150,14 @@ type name: "listen-addrs" .}: seq[MultiAddress] - nat* {. + forcePortMapping* {. desc: - "Specify method to use for determining public address. " & - "Must be one of: any, none, upnp, pmp, extip:", - defaultValue: defaultNatConfig(), + "Overide automatic detection to specific upnp mode. " & + "Must be one of: any, none, upnp, pmp", + defaultValue: PortMappingStrategy.Any, defaultValueDesc: "any", - name: "nat" - .}: NatConfig + name: "force-port-mapping" + .}: PortMappingStrategy discoveryPort* {. desc: "Discovery (UDP) port", @@ -464,9 +463,6 @@ logutils.formatIt(LogFormat.json, EthAddress): func defaultAddress*(conf: CodexConf): IpAddress = result = static parseIpAddress("127.0.0.1") -func defaultNatConfig*(): NatConfig = - result = NatConfig(hasExtIp: false, nat: NatStrategy.NatAny) - func persistence*(self: CodexConf): bool = self.cmd == StartUpCmd.persistence @@ -538,29 +534,20 @@ proc parseCmdArg*(T: type SignedPeerRecord, uri: string): T = quit QuitFailure res -func parseCmdArg*(T: type NatConfig, p: string): T {.raises: [ValueError].} = +func parseCmdArg*(T: type PortMappingStrategy, p: string): T {.raises: [ValueError].} = case p.toLowerAscii of "any": - NatConfig(hasExtIp: false, nat: NatStrategy.NatAny) + PortMappingStrategy.Any of "none": - NatConfig(hasExtIp: false, nat: NatStrategy.NatNone) + PortMappingStrategy.None of "upnp": - NatConfig(hasExtIp: false, nat: NatStrategy.NatUpnp) + PortMappingStrategy.Upnp of "pmp": - NatConfig(hasExtIp: false, nat: NatStrategy.NatPmp) + PortMappingStrategy.Pmp else: - if p.startsWith("extip:"): - try: - let ip = parseIpAddress(p[6 ..^ 1]) - NatConfig(hasExtIp: true, extIp: ip) - except ValueError: - let error = "Not a valid IP address: " & p[6 ..^ 1] - raise newException(ValueError, error) - else: - let error = "Not a valid NAT option: " & p - raise newException(ValueError, error) + raise newException(ValueError, "Not a valid NAT option: " & p) -proc completeCmdArg*(T: type NatConfig, val: string): seq[string] = +proc completeCmdArg*(T: type PortMappingStrategy, val: string): seq[string] = return @[] proc parseCmdArg*(T: type EthAddress, address: string): T = @@ -642,11 +629,11 @@ proc readValue*( val = dur proc readValue*( - r: var TomlReader, val: var NatConfig + r: var TomlReader, val: var PortMappingStrategy ) {.raises: [SerializationError].} = val = try: - parseCmdArg(NatConfig, r.readValue(string)) + parseCmdArg(PortMappingStrategy, r.readValue(string)) except CatchableError as err: raise newException(SerializationError, err.msg) diff --git a/codex/nat.nim b/codex/nat.nim deleted file mode 100644 index d022dad6cb..0000000000 --- a/codex/nat.nim +++ /dev/null @@ -1,432 +0,0 @@ -# Copyright (c) 2019-2023 Status Research & Development GmbH -# Licensed under either of -# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) -# * MIT license ([LICENSE-MIT](LICENSE-MIT)) -# at your option. -# This file may not be copied, modified, or distributed except according to -# those terms. - -{.push raises: [].} - -import - std/[options, os, strutils, times, net, atomics], - stew/shims/net as stewNet, - stew/[objects, results], - nat_traversal/[miniupnpc, natpmp], - json_serialization/std/net - -import pkg/chronos -import pkg/chronicles -import pkg/libp2p - -import ./utils -import ./utils/natutils -import ./utils/addrutils - -const - UPNP_TIMEOUT = 200 # ms - PORT_MAPPING_INTERVAL = 20 * 60 # seconds - NATPMP_LIFETIME = 60 * 60 # in seconds, must be longer than PORT_MAPPING_INTERVAL - -type PortMappings* = object - internalTcpPort: Port - externalTcpPort: Port - internalUdpPort: Port - externalUdpPort: Port - description: string - -type PortMappingArgs = - tuple[strategy: NatStrategy, tcpPort, udpPort: Port, description: string] - -type NatConfig* = object - case hasExtIp*: bool - of true: extIp*: IpAddress - of false: nat*: NatStrategy - -var - upnp {.threadvar.}: Miniupnp - npmp {.threadvar.}: NatPmp - strategy = NatStrategy.NatNone - natClosed: Atomic[bool] - extIp: Option[IpAddress] - activeMappings: seq[PortMappings] - natThreads: seq[Thread[PortMappingArgs]] = @[] - -logScope: - topics = "nat" - -type PrefSrcStatus = enum - NoRoutingInfo - PrefSrcIsPublic - PrefSrcIsPrivate - BindAddressIsPublic - BindAddressIsPrivate - -## Also does threadvar initialisation. -## Must be called before redirectPorts() in each thread. -proc getExternalIP*(natStrategy: NatStrategy, quiet = false): Option[IpAddress] = - var externalIP: IpAddress - - if natStrategy == NatStrategy.NatAny or natStrategy == NatStrategy.NatUpnp: - if upnp == nil: - upnp = newMiniupnp() - - upnp.discoverDelay = UPNP_TIMEOUT - let dres = upnp.discover() - if dres.isErr: - debug "UPnP", msg = dres.error - else: - var - msg: cstring - canContinue = true - case upnp.selectIGD() - of IGDNotFound: - msg = "Internet Gateway Device not found. Giving up." - canContinue = false - of IGDFound: - msg = "Internet Gateway Device found." - of IGDNotConnected: - msg = "Internet Gateway Device found but it's not connected. Trying anyway." - of NotAnIGD: - msg = - "Some device found, but it's not recognised as an Internet Gateway Device. Trying anyway." - of IGDIpNotRoutable: - msg = - "Internet Gateway Device found and is connected, but with a reserved or non-routable IP. Trying anyway." - if not quiet: - debug "UPnP", msg - if canContinue: - let ires = upnp.externalIPAddress() - if ires.isErr: - debug "UPnP", msg = ires.error - else: - # if we got this far, UPnP is working and we don't need to try NAT-PMP - try: - externalIP = parseIpAddress(ires.value) - strategy = NatStrategy.NatUpnp - return some(externalIP) - except ValueError as e: - error "parseIpAddress() exception", err = e.msg - return - - if natStrategy == NatStrategy.NatAny or natStrategy == NatStrategy.NatPmp: - if npmp == nil: - npmp = newNatPmp() - let nres = npmp.init() - if nres.isErr: - debug "NAT-PMP", msg = nres.error - else: - let nires = npmp.externalIPAddress() - if nires.isErr: - debug "NAT-PMP", msg = nires.error - else: - try: - externalIP = parseIpAddress($(nires.value)) - strategy = NatStrategy.NatPmp - return some(externalIP) - except ValueError as e: - error "parseIpAddress() exception", err = e.msg - return - -# This queries the routing table to get the "preferred source" attribute and -# checks if it's a public IP. If so, then it's our public IP. -# -# Further more, we check if the bind address (user provided, or a "0.0.0.0" -# default) is a public IP. That's a long shot, because code paths involving a -# user-provided bind address are not supposed to get here. -proc getRoutePrefSrc(bindIp: IpAddress): (Option[IpAddress], PrefSrcStatus) = - let bindAddress = initTAddress(bindIp, Port(0)) - - if bindAddress.isAnyLocal(): - let ip = getRouteIpv4() - if ip.isErr(): - # No route was found, log error and continue without IP. - error "No routable IP address found, check your network connection", - error = ip.error - return (none(IpAddress), NoRoutingInfo) - elif ip.get().isGlobalUnicast(): - return (some(ip.get()), PrefSrcIsPublic) - else: - return (none(IpAddress), PrefSrcIsPrivate) - elif bindAddress.isGlobalUnicast(): - return (some(bindIp), BindAddressIsPublic) - else: - return (none(IpAddress), BindAddressIsPrivate) - -# Try to detect a public IP assigned to this host, before trying NAT traversal. -proc getPublicRoutePrefSrcOrExternalIP*( - natStrategy: NatStrategy, bindIp: IpAddress, quiet = true -): Option[IpAddress] = - let (prefSrcIp, prefSrcStatus) = getRoutePrefSrc(bindIp) - - case prefSrcStatus - of NoRoutingInfo, PrefSrcIsPublic, BindAddressIsPublic: - return prefSrcIp - of PrefSrcIsPrivate, BindAddressIsPrivate: - let extIp = getExternalIP(natStrategy, quiet) - if extIp.isSome: - return some(extIp.get) - -proc doPortMapping( - strategy: NatStrategy, tcpPort, udpPort: Port, description: string -): Option[(Port, Port)] {.gcsafe.} = - var - extTcpPort: Port - extUdpPort: Port - - if strategy == NatStrategy.NatUpnp: - for t in [(tcpPort, UPNPProtocol.TCP), (udpPort, UPNPProtocol.UDP)]: - let - (port, protocol) = t - pmres = upnp.addPortMapping( - externalPort = $port, - protocol = protocol, - internalHost = upnp.lanAddr, - internalPort = $port, - desc = description, - leaseDuration = 0, - ) - if pmres.isErr: - error "UPnP port mapping", msg = pmres.error, port - return - else: - # let's check it - let cres = - upnp.getSpecificPortMapping(externalPort = $port, protocol = protocol) - if cres.isErr: - warn "UPnP port mapping check failed. Assuming the check itself is broken and the port mapping was done.", - msg = cres.error - - info "UPnP: added port mapping", - externalPort = port, internalPort = port, protocol = protocol - case protocol - of UPNPProtocol.TCP: - extTcpPort = port - of UPNPProtocol.UDP: - extUdpPort = port - elif strategy == NatStrategy.NatPmp: - for t in [(tcpPort, NatPmpProtocol.TCP), (udpPort, NatPmpProtocol.UDP)]: - let - (port, protocol) = t - pmres = npmp.addPortMapping( - eport = port.cushort, - iport = port.cushort, - protocol = protocol, - lifetime = NATPMP_LIFETIME, - ) - if pmres.isErr: - error "NAT-PMP port mapping", msg = pmres.error, port - return - else: - let extPort = Port(pmres.value) - info "NAT-PMP: added port mapping", - externalPort = extPort, internalPort = port, protocol = protocol - case protocol - of NatPmpProtocol.TCP: - extTcpPort = extPort - of NatPmpProtocol.UDP: - extUdpPort = extPort - return some((extTcpPort, extUdpPort)) - -proc repeatPortMapping(args: PortMappingArgs) {.thread, raises: [ValueError].} = - ignoreSignalsInThread() - let - (strategy, tcpPort, udpPort, description) = args - interval = initDuration(seconds = PORT_MAPPING_INTERVAL) - sleepDuration = 1_000 # in ms, also the maximum delay after pressing Ctrl-C - - var lastUpdate = now() - - # We can't use copies of Miniupnp and NatPmp objects in this thread, because they share - # C pointers with other instances that have already been garbage collected, so - # we use threadvars instead and initialise them again with getExternalIP(), - # even though we don't need the external IP's value. - let ipres = getExternalIP(strategy, quiet = true) - if ipres.isSome: - while natClosed.load() == false: - let - # we're being silly here with this channel polling because we can't - # select on Nim channels like on Go ones - currTime = now() - if currTime >= (lastUpdate + interval): - discard doPortMapping(strategy, tcpPort, udpPort, description) - lastUpdate = currTime - - sleep(sleepDuration) - -proc stopNatThreads() {.noconv.} = - # stop the thread - debug "Stopping NAT port mapping renewal threads" - try: - natClosed.store(true) - joinThreads(natThreads) - except Exception as exc: - warn "Failed to stop NAT port mapping renewal thread", exc = exc.msg - - # delete our port mappings - - # FIXME: if the initial port mapping failed because it already existed for the - # required external port, we should not delete it. It might have been set up - # by another program. - - # In Windows, a new thread is created for the signal handler, so we need to - # initialise our threadvars again. - - let ipres = getExternalIP(strategy, quiet = true) - if ipres.isSome: - if strategy == NatStrategy.NatUpnp: - for entry in activeMappings: - for t in [ - (entry.externalTcpPort, entry.internalTcpPort, UPNPProtocol.TCP), - (entry.externalUdpPort, entry.internalUdpPort, UPNPProtocol.UDP), - ]: - let - (eport, iport, protocol) = t - pmres = upnp.deletePortMapping(externalPort = $eport, protocol = protocol) - if pmres.isErr: - error "UPnP port mapping deletion", msg = pmres.error - else: - debug "UPnP: deleted port mapping", - externalPort = eport, internalPort = iport, protocol = protocol - elif strategy == NatStrategy.NatPmp: - for entry in activeMappings: - for t in [ - (entry.externalTcpPort, entry.internalTcpPort, NatPmpProtocol.TCP), - (entry.externalUdpPort, entry.internalUdpPort, NatPmpProtocol.UDP), - ]: - let - (eport, iport, protocol) = t - pmres = npmp.deletePortMapping( - eport = eport.cushort, iport = iport.cushort, protocol = protocol - ) - if pmres.isErr: - error "NAT-PMP port mapping deletion", msg = pmres.error - else: - debug "NAT-PMP: deleted port mapping", - externalPort = eport, internalPort = iport, protocol = protocol - -proc redirectPorts*( - strategy: NatStrategy, tcpPort, udpPort: Port, description: string -): Option[(Port, Port)] = - result = doPortMapping(strategy, tcpPort, udpPort, description) - if result.isSome: - let (externalTcpPort, externalUdpPort) = result.get() - # needed by NAT-PMP on port mapping deletion - # Port mapping works. Let's launch a thread that repeats it, in case the - # NAT-PMP lease expires or the router is rebooted and forgets all about - # these mappings. - activeMappings.add( - PortMappings( - internalTcpPort: tcpPort, - externalTcpPort: externalTcpPort, - internalUdpPort: udpPort, - externalUdpPort: externalUdpPort, - description: description, - ) - ) - try: - natThreads.add(Thread[PortMappingArgs]()) - natThreads[^1].createThread( - repeatPortMapping, (strategy, externalTcpPort, externalUdpPort, description) - ) - # atexit() in disguise - if natThreads.len == 1: - # we should register the thread termination function only once - addQuitProc(stopNatThreads) - except Exception as exc: - warn "Failed to create NAT port mapping renewal thread", exc = exc.msg - -proc setupNat*( - natStrategy: NatStrategy, tcpPort, udpPort: Port, clientId: string -): tuple[ip: Option[IpAddress], tcpPort, udpPort: Option[Port]] = - ## Setup NAT port mapping and get external IP address. - ## If any of this fails, we don't return any IP address but do return the - ## original ports as best effort. - ## TODO: Allow for tcp or udp port mapping to be optional. - if extIp.isNone: - extIp = getExternalIP(natStrategy) - if extIp.isSome: - let ip = extIp.get - let extPorts = ( - {.gcsafe.}: - redirectPorts( - strategy, tcpPort = tcpPort, udpPort = udpPort, description = clientId - ) - ) - if extPorts.isSome: - let (extTcpPort, extUdpPort) = extPorts.get() - (ip: some(ip), tcpPort: some(extTcpPort), udpPort: some(extUdpPort)) - else: - warn "UPnP/NAT-PMP available but port forwarding failed" - (ip: none(IpAddress), tcpPort: some(tcpPort), udpPort: some(udpPort)) - else: - warn "UPnP/NAT-PMP not available" - (ip: none(IpAddress), tcpPort: some(tcpPort), udpPort: some(udpPort)) - -proc setupAddress*( - natConfig: NatConfig, bindIp: IpAddress, tcpPort, udpPort: Port, clientId: string -): tuple[ip: Option[IpAddress], tcpPort, udpPort: Option[Port]] {.gcsafe.} = - ## Set-up of the external address via any of the ways as configured in - ## `NatConfig`. In case all fails an error is logged and the bind ports are - ## selected also as external ports, as best effort and in hope that the - ## external IP can be figured out by other means at a later stage. - ## TODO: Allow for tcp or udp bind ports to be optional. - - if natConfig.hasExtIp: - # any required port redirection must be done by hand - return (some(natConfig.extIp), some(tcpPort), some(udpPort)) - - case natConfig.nat - of NatStrategy.NatAny: - let (prefSrcIp, prefSrcStatus) = getRoutePrefSrc(bindIp) - - case prefSrcStatus - of NoRoutingInfo, PrefSrcIsPublic, BindAddressIsPublic: - return (prefSrcIp, some(tcpPort), some(udpPort)) - of PrefSrcIsPrivate, BindAddressIsPrivate: - return setupNat(natConfig.nat, tcpPort, udpPort, clientId) - of NatStrategy.NatNone: - let (prefSrcIp, prefSrcStatus) = getRoutePrefSrc(bindIp) - - case prefSrcStatus - of NoRoutingInfo, PrefSrcIsPublic, BindAddressIsPublic: - return (prefSrcIp, some(tcpPort), some(udpPort)) - of PrefSrcIsPrivate: - error "No public IP address found. Should not use --nat:none option" - return (none(IpAddress), some(tcpPort), some(udpPort)) - of BindAddressIsPrivate: - error "Bind IP is not a public IP address. Should not use --nat:none option" - return (none(IpAddress), some(tcpPort), some(udpPort)) - of NatStrategy.NatUpnp, NatStrategy.NatPmp: - return setupNat(natConfig.nat, tcpPort, udpPort, clientId) - -proc nattedAddress*( - natConfig: NatConfig, addrs: seq[MultiAddress], udpPort: Port -): tuple[libp2p, discovery: seq[MultiAddress]] = - ## Takes a NAT configuration, sequence of multiaddresses and UDP port and returns: - ## - Modified multiaddresses with NAT-mapped addresses for libp2p - ## - Discovery addresses with NAT-mapped UDP ports - - var discoveryAddrs = newSeq[MultiAddress](0) - let newAddrs = addrs.mapIt: - block: - # Extract IP address and port from the multiaddress - let (ipPart, port) = getAddressAndPort(it) - if ipPart.isSome and port.isSome: - # Try to setup NAT mapping for the address - let (newIP, tcp, udp) = - setupAddress(natConfig, ipPart.get, port.get, udpPort, "codex") - if newIP.isSome: - # NAT mapping successful - add discovery address with mapped UDP port - discoveryAddrs.add(getMultiAddrWithIPAndUDPPort(newIP.get, udp.get)) - # Remap original address with NAT IP and TCP port - it.remapAddr(ip = newIP, port = tcp) - else: - # NAT mapping failed - use original address - echo "Failed to get external IP, using original address", it - discoveryAddrs.add(getMultiAddrWithIPAndUDPPort(ipPart.get, udpPort)) - it - else: - # Invalid multiaddress format - return as is - it - (newAddrs, discoveryAddrs) diff --git a/codex/nat/port_mapping.nim b/codex/nat/port_mapping.nim new file mode 100644 index 0000000000..f0222af647 --- /dev/null +++ b/codex/nat/port_mapping.nim @@ -0,0 +1,363 @@ +# Copyright (c) 2019-2025 Status Research & Development GmbH +# Licensed under either of +# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +# * MIT license ([LICENSE-MIT](LICENSE-MIT)) +# at your option. +# This file may not be copied, modified, or distributed except according to +# those terms. + +{.push raises: [].} + +import std/[options, os, strutils, times, net, atomics] + +import pkg/stew/objects +import pkg/nat_traversal/[miniupnpc, natpmp] +import pkg/json_serialization/std/net +import pkg/results +import pkg/questionable +import pkg/questionable/results +import pkg/chronos +import pkg/chronicles +import pkg/libp2p + +import ../utils + +logScope: + topics = "codex nat port-mapping" + +const + UPNP_TIMEOUT = 200 # ms + RENEWAL_INTERVAL = 20 * 60 # seconds + Pmp_LIFETIME = 60 * 60 # in seconds, must be longer than RENEWAL_INTERVAL + MAPPING_DESCRIPTION = "codex" + +type PortMappingStrategy* = enum + Any + Upnp + Pmp + None + +type MappingPort* = ref object of RootObj + value*: Port + +type TcpPort* = ref object of MappingPort +type UdpPort* = ref object of MappingPort + +proc newTcpMappingPort*(value: Port): TcpPort = + TcpPort(value: value) + +proc newUdpMappingPort*(value: Port): UdpPort = + UdpPort(value: value) + +type PortMapping = tuple[internalPort: MappingPort, externalPort: Option[MappingPort]] +type RenewelThreadArgs = + tuple[strategy: PortMappingStrategy, portMapping: seq[PortMapping]] + +var + upnp {.threadvar.}: Miniupnp + npmp {.threadvar.}: NatPmp + mappings: seq[PortMapping] + portMappingExiting: Atomic[bool] + renewalThread: Thread[RenewelThreadArgs] + +proc initUpnp(): bool = + logScope: + protocol = "upnp" + + if upnp != nil: + warn "UPnP already initialized!" + return true + + upnp = newMiniupnp() + upnp.discoverDelay = UPNP_TIMEOUT + + if err =? upnp.discover().errorOption: + warn "UPnP error discoverning Internet Gateway Devices", msg = err + upnp = nil + return false + + case upnp.selectIGD() + of IGDNotFound: + info "UPnP Internet Gateway Device not found. Giving up." + upnp = nil + # As UPnP is not supported on our network we won't be using it --> lets erase it. + of IGDFound: + info "UPnP Internet Gateway Device found." + of IGDNotConnected: + info "UPnP Internet Gateway Device found but it's not connected. Trying anyway." + of NotAnIGD: + info "Some device found, but it's not recognised as an Internet Gateway Device. Trying anyway." + of IGDIpNotRoutable: + info "UPnP Internet Gateway Device found and is connected, but with a reserved or non-routable IP. Trying anyway." + + return true + +proc initNpmp(): bool = + logScope: + protocol = "npmp" + + if npmp != nil: + warn "NAT-PMP already initialized!" + return true + + npmp = newNatPmp() + + if err =? npmp.init().errorOption: + warn "Error initialization of NAT-PMP", msg = err + npmp = nil + return false + + if err =? npmp.externalIPAddress().errorOption: + warn "Fetching of external IP failed.", msg = err + npmp = nil + return false + + info "NAT-PMP initialized" + return true + +## Try to initilize all the port mapping protocols and returns +## the protocol that will be used. +proc initProtocols(strategy: PortMappingStrategy): PortMappingStrategy = + if strategy == PortMappingStrategy.Any or strategy == PortMappingStrategy.Upnp: + if initUpnp(): + return PortMappingStrategy.Upnp + + if strategy == PortMappingStrategy.Any or strategy == PortMappingStrategy.Pmp: + if initNpmp(): + return PortMappingStrategy.Pmp + + return PortMappingStrategy.None + +proc upnpPortMapping( + internalPort: MappingPort, externalPort: MappingPort +): ?!MappingPort {.gcsafe.} = + let protocol = if (internalPort is TcpPort): UPNPProtocol.TCP else: UPNPProtocol.UDP + + logScope: + protocol = "upnp" + externalPort = externalPort.value + internalPort = internalPort.value + protocol = protocol + + let pmres = upnp.addPortMapping( + externalPort = $(externalPort.value), + protocol = protocol, + internalHost = upnp.lanAddr, + internalPort = $(internalPort.value), + desc = MAPPING_DESCRIPTION, + leaseDuration = 0, + ) + + if pmres.isErr: + error "UPnP port mapping", msg = pmres.error + return failure(pmres.error) + + # let's check it + let cres = upnp.getSpecificPortMapping( + externalPort = $(externalPort.value), protocol = protocol + ) + if cres.isErr: + warn "UPnP port mapping check failed. Assuming the check itself is broken and the port mapping was done.", + msg = cres.error + info "UPnP added port mapping" + + return success(externalPort) + +proc npmpPortMapping( + internalPort: MappingPort, externalPort: MappingPort +): ?!MappingPort {.gcsafe.} = + let protocol = + if (internalPort is TcpPort): NatPmpProtocol.TCP else: NatPmpProtocol.UDP + + logScope: + protocol = "npmp" + externalPort = externalPort.value + internalPort = internalPort.value + protocol = protocol + + without extPort =? + npmp.addPortMapping( + eport = externalPort.value, + iport = internalPort.value, + protocol = protocol, + lifetime = Pmp_LIFETIME, + ), err: + error "NAT-PMP port mapping error", msg = err.msg + return failure(err.msg) + + info "NAT-PMP: added port mapping" + + if internalPort is TcpPort: + return success(newTcpMappingPort(extPort)) + else: + return success(newUdpMappingPort(extPort)) + +## Create port mapping that will try to utilize the same port number +## of the internal port for the external port mapping. +## +## TODO: Add support for trying mapping of random external port. + +proc doPortMapping(port: MappingPort): ?!MappingPort {.gcsafe.} = + if upnp != nil: + return upnpPortMapping(port, port) + + if npmp != nil: + return npmpPortMapping(port, port) + + return failure("No active startegy") + +proc doPortMapping( + internalPort: MappingPort, externalPort: MappingPort +): ?!MappingPort {.gcsafe.} = + if upnp != nil: + return upnpPortMapping(internalPort, externalPort) + + if npmp != nil: + return npmpPortMapping(internalPort, externalPort) + + return failure("No active startegy") + +## Gets external IP provided by the port mapping protocols +## Port mapping needs to be succesfully started first using `startPortMapping()` +proc getExternalIP*(): ?IpAddress = + if upnp == nil and npmp == nil: + warn "No available port-mapping protocol" + return IpAddress.none + + if upnp != nil: + let ires = upnp.externalIPAddress + if ires.isOk(): + info "Got externa IP address: " & ires.value, ip = ires.value + return parseIpAddress(ires.value).some + else: + debug "Getting external IP address using UPnP failed", + msg = ires.error, protocol = "upnp" + + if npmp != nil: + let nires = npmp.externalIPAddress() + if nires.isErr: + debug "Getting external IP address using NAT-PMP failed", msg = nires.error + else: + try: + info "Got externa IP address: " & $(nires.value), + ip =$ (nires.value), protocol = "npmp" + return parseIpAddress($(nires.value)).some + except ValueError as e: + error "parseIpAddress() exception", err = e.msg + + return IpAddress.none + +proc startPortMapping*( + strategy: PortMappingStrategy, internalPorts: seq[MappingPort] +): ?!seq[PortMapping] = + if strategy == PortMappingStrategy.None: + return failure("No port mapping strategy requested") + + if internalPorts.len == 0: + return failure("No internal ports to be mapped were supplied") + + strategy = initProtocols(strategy) + if strategy == PortMappingStrategy.None: + return failure("No available port mapping protocols on the network") + + portMapping = newSeqOfCap[PortMappings](internalPorts.len) + + for port in internalPorts: + without mappedPort =? doPortMapping(port), err: + warn "Failed to map port", port = port.value, msg = err.msg + portMapping.add((internalPort: port, externalPort: MappingPort.none)) + + portMapping.add((internalPort: port, externalPort: mappedPort.some)) + + startRenewalThread(strategy) + + return success(externalPorts) + +proc stopPortMapping*() = + if upnp == nil or npmp == nil: + debug "Port mapping is not running, nothing to stop" + return + + info "Stopping port mapping renewal threads" + try: + portMappingExiting.store(true) + renewalThread.join() + except CatchableError as exc: + warn "Failed to stop port mapping renewal thread", exc = exc.msg + + for mapping in portMapping: + if upnp != nil: + let protocol = + if (internalPort is TcpPort): UPNPProtocol.TCP else: UPNPProtocol.UDP + + if err =? + upnp.deletePortMapping( + externalPort = $(mapping.externalPort.value), protocol = protocol + ).errorOption: + error "UPnP port mapping deletion error", msg = err.msg + else: + debug "UPnP: deleted port mapping", + externalPort = mapping.externalPort, + internalPort = mapping.internalPort, + protocol = protocol + + if npnp != nil: + let protocol = + if (internalPort is TcpPort): NatPmpProtocol.TCP else: NatPmpProtocol.UDP + + if err =? + npmp.deletePortMapping( + eport = mapping.externalPort.value, + iport = mapping.internalPort.value, + protocol = protocol, + ).errorOption: + error "NAT-PMP port mapping deletion error", msg = err.msg + else: + debug "NAT-PMP: deleted port mapping", + externalPort = mapping.externalPort, + internalPort = mapping.internalPort, + protocol = protocol + +proc startRenewalThread( + strategy: PortMappingStrategy, + internalPorts: seq[MappingPort], + externalPorts: seq[?MappingPort], +) = + try: + renewalThread = Thread[RenewelThreadArgs]() + renewalThread.createThread(renewPortMapping, (strategy, portMapping)) + except CatchableError as exc: + warn "Failed to create NAT port mapping renewal thread", exc = exc.msg + +proc renewPortMapping(args: RenewelThreadArgs) {.thread, raises: [ValueError].} = + ignoreSignalsInThread() + let + (strategy, portMappings) = args + interval = initDuration(seconds = RENEWAL_INTERVAL) + sleepDuration = 1_000 # in ms, also the maximum delay after pressing Ctrl-C + + var lastUpdate = now() + + # We can't use copies of Miniupnp and Pmp objects in this thread, because they share + # C pointers with other instances that have already been garbage collected, so + # we use threadvars instead and initialise them again with initProtocols(), + # even though we don't need the external IP's value. + + if initProtocols(strategy) == PortMappingStrategy.None: + error "Could not initiate protocols in renewal thread" + return + + while portMappingExiting.load() == false: + if now() >= (lastUpdate + interval): + for mapping in portMappings: + if externalPort =? mapping.externalPort: + without renewedExternalPort =? + doPortMapping(mapping.internalPort, externalPort), err: + error "Error while renewal of port mapping", msg = err.msg + + if renewedExternalPort.value != externalPort.value: + error "The renewed external port is not the same as the originally mapped" + + lastUpdate = now() + + sleep(sleepDuration) diff --git a/codex/nat/reachabilitymanager.nim b/codex/nat/reachabilitymanager.nim new file mode 100644 index 0000000000..b110b45ee5 --- /dev/null +++ b/codex/nat/reachabilitymanager.nim @@ -0,0 +1,153 @@ +import std/sequtils + +import pkg/chronos +import pkg/chronicles +import pkg/questionable +import pkg/questionable/results +import pkg/libp2p +import pkg/libp2p/protocols/connectivity/autonat/client +import pkg/libp2p/protocols/connectivity/autonat/service + +import ../rng as random +import ./port_mapping + +const AutonatCheckInterval = Opt.some(chronos.seconds(30)) + +logScope: + topics = "codex nat reachabilitymanager" + +type + ReachabilityManager* = ref object of RootObj + networkReachability*: NetworkReachability + portMappingStrategy: PortMappingStrategy + getAnnounceRecords*: ?GetRecords + getDiscoveryRecords*: ?GetRecords + updateAnnounceRecords*: ?UpdateRecords + updateDiscoveryRecords*: ?UpdateRecords + started = false + + GetRecords* = proc(): seq[MultiAddress] {.raises: [].} + UpdateRecords* = proc(records: seq[MultiAddress]) {.raises: [].} + +proc new*( + T: typedesc[ReachabilityManager], portMappingStrategy: PortMappingStrategy +): T = + return T(portMappingStrategy: portMappingStrategy) + +proc getReachabilityHandler(manager: ReachabilityManager): StatusAndConfidenceHandler = + let statusAndConfidenceHandler = proc( + networkReachability: NetworkReachability, confidenceOpt: Opt[float] + ): Future[void] {.gcsafe, async: (raises: [CancelledError]).} = + if not started: + warn "ReachabilityManager was not started, but we are already getting reachability updates! Ignoring..." + return + + without confidence =? confidenceOpt: + debug "Node reachability reported without confidence" + return + + if manager.networkReachability == networkReachability: + debug "Node reachability reported without change", + networkReachability = networkReachability + return + + info "Node reachability status changed", + networkReachability = networkReachability, confidence = confidenceOpt + + manager.networkReachability = networkReachability + + if networkReachability == NetworkReachability.Unreachable: + # Lets first start to expose port using port mapping protocols like NAT-PMP or UPnP + if manager.startPortMapping(): + return # We exposed ports so we should be good! + + info "No more options to become reachable" + + return statusAndConfidenceHandler + +proc startPortMapping(self: ReachabilityManager): bool = + try: + let announceRecords = self.getAnnounceRecords() + let discoveryRecords = self.getDiscoveryRecords() + let portsToBeMapped = + (announceRecords & discoveryRecords).mapIt(getAddressAndPort(it)).mapIt(it.port) + + without mappedPorts =? startPortMapping( + manager.portMappingStrategy, portsToBeMapped + ), err: + warn "Could not start port mapping", msg = err + return false + + if mappedPorts.any( + proc(x: ?MappingPort): bool = + isNone(x) + ): + warn "Some ports were not mapped - not using port mapping then" + return false + + info "Started port mapping" + + let announceMappedRecords = zip( + announceRecords, mappedPorts[0 .. announceRecords.len - 1] + ) + .mapIt(getMultiAddr(getAddressAndPort(it[0]).ip, it[1].value)) + self.updateAnnounceRecords(announceMappedRecords) + + let discoveryMappedRecords = zip( + discoveryRecords, mappedPorts[announceRecords.len, ^1] + ) + .mapIt(getMultiAddr(getAddressAndPort(it[0]).ip, it[1].value)) + self.updateDiscoveryRecords(discoveryMappedRecords) + + return true + except ValueError as exc: + error "Error while starting port mapping", msg = exc.msg + return false + +proc start*( + self: ReachabilityManager, switch: Switch, bootNodes: seq[SignedPeerRecord] +): Future[void] {.async: (raises: [CancelledError]).} = + doAssert self.getAnnounceRecords.isSome, "getAnnounceRecords is not set" + doAssert self.getDiscoveryRecords.isSome, "getDiscoveryRecords is not set" + doAssert self.updateAnnounceRecords.isSome, "updateAnnounceRecords is not set" + doAssert self.updateDiscoveryRecords.isSome, "updateDiscoveryRecords is not set" + self.started = true + + ## Until more robust way of NAT-traversal helper peers discovery is implemented + ## we will start with simple populating the libp2p peerstore with bootstrap nodes + ## https://github.com/codex-storage/nim-codex/issues/1320 + + for peer in bootNodes: + try: + await switch.connect(peer.data.peerId, peer.data.addresses.mapIt(it.address)) + except CancelledError as exc: + raise exc + except CatchableError as exc: + info "Failed to dial bootstrap nodes", err = exc.msg + +proc stop*(): Future[void] {.async: (raises: [CancelledError]).} = + stopPortMapping() + self.started = false + +proc getAutonatService*(self: ReachabilityManager): Service = + ## AutonatService request other peers to dial us back + ## flagging us as Reachable or NotReachable. + ## We use minimum confidence 0.1 (confidence is calculated as numOfReplies/maxQueueSize) as + ## that will give an answer already for response from one peer. + ## As we use bootnodes for this in initial setup, it is possible we might + ## get only one peer to ask about our reachability and it is crucial to get at least some reply. + ## This should be changed once proactive NAT-traversal helper peers discovery is implemented. + + let autonatService = AutonatService.new( + autonatClient = AutonatClient.new(), + rng = random.Rng.instance(), + scheduleInterval = AutonatCheckInterval, + askNewConnectedPeers = true, + numPeersToAsk = 5, + maxQueueSize = 10, + minConfidence = 0.1, + ) + + autonatService.statusAndConfidenceHandler(self.getReachabilityHandler()) + + return Service(autonatService) diff --git a/codex/nat/utils.nim b/codex/nat/utils.nim new file mode 100644 index 0000000000..ae856d31df --- /dev/null +++ b/codex/nat/utils.nim @@ -0,0 +1,68 @@ +## Nim-Codex +## Copyright (c) 2022 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +import std/strutils +import std/options +import std/net + +import pkg/libp2p +import pkg/questionable +import pkg/questionable/results + +import ./port_mapping + +proc getAddressAndPort*( + ma: MultiAddress +): tuple[ip: IpAddress, port: MappingPort] {.raises: [ValueError].} = + try: + # Try IPv4 first + let ipv4Result = ma[multiCodec("ip4")] + let ip = + if ipv4Result.isOk: + let ipBytes = ipv4Result.get().protoArgument().expect("Invalid IPv4 format") + let ipArray = [ipBytes[0], ipBytes[1], ipBytes[2], ipBytes[3]] + IpAddress(family: IPv4, address_v4: ipArray) + else: + # Try IPv6 if IPv4 not found + let ipv6Result = ma[multiCodec("ip6")] + if ipv6Result.isOk: + let ipBytes = ipv6Result.get().protoArgument().expect("Invalid IPv6 format") + var ipArray: array[16, byte] + for i in 0 .. 15: + ipArray[i] = ipBytes[i] + IpAddress(family: IPv6, address_v6: ipArray) + else: + raise newException(ValueError, "Unknown IP family") + + # Get TCP Port + let tcpPortResult = ma[multiCodec("tcp")] + if tcpPortResult.isOk: + let tcpPortBytes = + tcpPortResult.get().protoArgument().expect("Invalid port format") + let tcpPort = newTcpMappingPort(Port(fromBytesBE(uint16, tcpPortBytes))) + return (ip: ip, port: tcpPort) + + # Get UDP Port + let udpPortResult = ma[multiCodec("udp")] + if udpPortResult.isOk: + let udpPortBytes = + udpPortResult.get().protoArgument().expect("Invalid port format") + let udpPort = newUdpMappingPort(Port(fromBytesBE(uint16, udpPortBytes))) + return (ip: ip, port: udpPort) + + raise newException(ValueError, "No TCP/UDP port specified") + except Exception: + raise newException(ValueError, "Invalid multiaddr") + +proc getMultiAddr*(ip: IpAddress, port: MappingPort): MultiAddress = + let ipFamily = if ip.family == IpAddressFamily.IPv4: "/ip4/" else: "/ip6/" + let portType = if (internalPort is TcpPort): "/tcp/" else: "/udp/" + return MultiAddress.init(ipFamily & $ip & portType & $(port.value)).expect( + "valid multiaddr" + ) diff --git a/codex/utils/addrutils.nim b/codex/utils/addrutils.nim index a9ec54f565..f9560f4bbc 100644 --- a/codex/utils/addrutils.nim +++ b/codex/utils/addrutils.nim @@ -13,9 +13,9 @@ push: import std/strutils import std/options +import std/net import pkg/libp2p -import pkg/stew/shims/net import pkg/stew/endians2 func remapAddr*( @@ -44,7 +44,7 @@ func remapAddr*( proc getMultiAddrWithIPAndUDPPort*(ip: IpAddress, port: Port): MultiAddress = ## Creates a MultiAddress with the specified IP address and UDP port - ## + ## ## Parameters: ## - ip: A valid IP address (IPv4 or IPv6) ## - port: The UDP port number diff --git a/codex/utils/natutils.nim b/codex/utils/natutils.nim deleted file mode 100644 index 996d8dd01c..0000000000 --- a/codex/utils/natutils.nim +++ /dev/null @@ -1,67 +0,0 @@ -{.push raises: [].} - -import - std/[tables, hashes], pkg/results, pkg/stew/shims/net as stewNet, chronos, chronicles - -import pkg/libp2p - -type NatStrategy* = enum - NatAny - NatUpnp - NatPmp - NatNone - -type IpLimits* = object - limit*: uint - ips: Table[IpAddress, uint] - -func hash*(ip: IpAddress): Hash = - case ip.family - of IpAddressFamily.IPv6: - hash(ip.address_v6) - of IpAddressFamily.IPv4: - hash(ip.address_v4) - -func inc*(ipLimits: var IpLimits, ip: IpAddress): bool = - let val = ipLimits.ips.getOrDefault(ip, 0) - if val < ipLimits.limit: - ipLimits.ips[ip] = val + 1 - true - else: - false - -func dec*(ipLimits: var IpLimits, ip: IpAddress) = - let val = ipLimits.ips.getOrDefault(ip, 0) - if val == 1: - ipLimits.ips.del(ip) - elif val > 1: - ipLimits.ips[ip] = val - 1 - -func isGlobalUnicast*(address: TransportAddress): bool = - if address.isGlobal() and address.isUnicast(): true else: false - -func isGlobalUnicast*(address: IpAddress): bool = - let a = initTAddress(address, Port(0)) - a.isGlobalUnicast() - -proc getRouteIpv4*(): Result[IpAddress, cstring] = - # Avoiding Exception with initTAddress and can't make it work with static. - # Note: `publicAddress` is only used an "example" IP to find the best route, - # no data is send over the network to this IP! - let - publicAddress = TransportAddress( - family: AddressFamily.IPv4, address_v4: [1'u8, 1, 1, 1], port: Port(0) - ) - route = getBestRoute(publicAddress) - - if route.source.isUnspecified(): - err("No best ipv4 route found") - else: - let ip = - try: - route.source.address() - except ValueError as e: - # This should not occur really. - error "Address conversion error", exception = e.name, msg = e.msg - return err("Invalid IP address") - ok(ip) diff --git a/tests/codex/helpers/nodeutils.nim b/tests/codex/helpers/nodeutils.nim index 6d2edd461c..9a328b3324 100644 --- a/tests/codex/helpers/nodeutils.nim +++ b/tests/codex/helpers/nodeutils.nim @@ -186,11 +186,8 @@ proc generateNodes*( if config.enableBootstrap: waitFor switch.peerInfo.update() - let (announceAddrs, discoveryAddrs) = nattedAddress( - NatConfig(hasExtIp: false, nat: NatNone), - switch.peerInfo.addrs, - bindPort.Port, - ) + let (announceAddrs, discoveryAddrs) = + nattedAddress(switch.peerInfo.addrs, bindPort.Port) blockDiscovery.updateAnnounceRecord(announceAddrs) blockDiscovery.updateDhtRecord(discoveryAddrs) if blockDiscovery.dhtRecord.isSome: diff --git a/tests/codex/slots/testbackendfactory.nim b/tests/codex/slots/testbackendfactory.nim index a24bc41a5a..f5ecf0fdbe 100644 --- a/tests/codex/slots/testbackendfactory.nim +++ b/tests/codex/slots/testbackendfactory.nim @@ -44,7 +44,6 @@ suite "Test BackendFactory": let config = CodexConf( cmd: StartUpCmd.persistence, - nat: NatConfig(hasExtIp: false, nat: NatNone), metricsAddress: parseIpAddress("127.0.0.1"), persistenceCmd: PersistenceCmd.prover, marketplaceAddress: EthAddress.example.some, @@ -64,7 +63,6 @@ suite "Test BackendFactory": let config = CodexConf( cmd: StartUpCmd.persistence, - nat: NatConfig(hasExtIp: false, nat: NatNone), metricsAddress: parseIpAddress("127.0.0.1"), persistenceCmd: PersistenceCmd.prover, marketplaceAddress: EthAddress.example.some, @@ -85,7 +83,6 @@ suite "Test BackendFactory": let config = CodexConf( cmd: StartUpCmd.persistence, - nat: NatConfig(hasExtIp: false, nat: NatNone), metricsAddress: parseIpAddress("127.0.0.1"), persistenceCmd: PersistenceCmd.prover, marketplaceAddress: EthAddress.example.some, diff --git a/tests/codex/slots/testprover.nim b/tests/codex/slots/testprover.nim index c567db55dd..a2d2ceace0 100644 --- a/tests/codex/slots/testprover.nim +++ b/tests/codex/slots/testprover.nim @@ -36,7 +36,6 @@ suite "Test Prover": metaDs = metaTmp.newDb() config = CodexConf( cmd: StartUpCmd.persistence, - nat: NatConfig(hasExtIp: false, nat: NatNone), metricsAddress: parseIpAddress("127.0.0.1"), persistenceCmd: PersistenceCmd.prover, circomR1cs: InputFile("tests/circuits/fixtures/proof_main.r1cs"), From 68e8b3c1e1c08f8188a6c3daa2846e96a45bab79 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adam=20Uhl=C3=AD=C5=99?= Date: Fri, 31 Oct 2025 11:47:27 +0100 Subject: [PATCH 2/3] chore: making it compile --- codex/nat/port_mapping.nim | 213 ++++++++++++++++-------------- codex/nat/reachabilitymanager.nim | 88 ++++++------ codex/nat/utils.nim | 6 +- 3 files changed, 159 insertions(+), 148 deletions(-) diff --git a/codex/nat/port_mapping.nim b/codex/nat/port_mapping.nim index f0222af647..2409da2018 100644 --- a/codex/nat/port_mapping.nim +++ b/codex/nat/port_mapping.nim @@ -10,7 +10,6 @@ import std/[options, os, strutils, times, net, atomics] -import pkg/stew/objects import pkg/nat_traversal/[miniupnpc, natpmp] import pkg/json_serialization/std/net import pkg/results @@ -40,6 +39,9 @@ type PortMappingStrategy* = enum type MappingPort* = ref object of RootObj value*: Port +proc `$`(p: MappingPort): string = + $(p.value) + type TcpPort* = ref object of MappingPort type UdpPort* = ref object of MappingPort @@ -49,7 +51,7 @@ proc newTcpMappingPort*(value: Port): TcpPort = proc newUdpMappingPort*(value: Port): UdpPort = UdpPort(value: value) -type PortMapping = tuple[internalPort: MappingPort, externalPort: Option[MappingPort]] +type PortMapping* = tuple[internalPort: MappingPort, externalPort: Option[MappingPort]] type RenewelThreadArgs = tuple[strategy: PortMappingStrategy, portMapping: seq[PortMapping]] @@ -150,7 +152,7 @@ proc upnpPortMapping( if pmres.isErr: error "UPnP port mapping", msg = pmres.error - return failure(pmres.error) + return failure($pmres.error) # let's check it let cres = upnp.getSpecificPortMapping( @@ -175,22 +177,23 @@ proc npmpPortMapping( internalPort = internalPort.value protocol = protocol - without extPort =? - npmp.addPortMapping( - eport = externalPort.value, - iport = internalPort.value, - protocol = protocol, - lifetime = Pmp_LIFETIME, - ), err: - error "NAT-PMP port mapping error", msg = err.msg - return failure(err.msg) + let extPortRes = npmp.addPortMapping( + eport = externalPort.value.cushort, + iport = internalPort.value.cushort, + protocol = protocol, + lifetime = Pmp_LIFETIME, + ) + + if extPortRes.isErr: + error "NAT-PMP port mapping error", msg = extPortRes.error() + return failure(extPortRes.error()) info "NAT-PMP: added port mapping" if internalPort is TcpPort: - return success(newTcpMappingPort(extPort)) + return success(MappingPort(newTcpMappingPort(Port(extPortRes.value)))) else: - return success(newUdpMappingPort(extPort)) + return success(MappingPort(newUdpMappingPort(Port(extPortRes.value)))) ## Create port mapping that will try to utilize the same port number ## of the internal port for the external port mapping. @@ -217,6 +220,46 @@ proc doPortMapping( return failure("No active startegy") +proc renewPortMapping(args: RenewelThreadArgs) {.thread, raises: [ValueError].} = + ignoreSignalsInThread() + let + (strategy, portMappings) = args + interval = initDuration(seconds = RENEWAL_INTERVAL) + sleepDuration = 1_000 # in ms, also the maximum delay after pressing Ctrl-C + + var lastUpdate = now() + + # We can't use copies of Miniupnp and Pmp objects in this thread, because they share + # C pointers with other instances that have already been garbage collected, so + # we use threadvars instead and initialise them again with initProtocols(), + # even though we don't need the external IP's value. + + if initProtocols(strategy) == PortMappingStrategy.None: + error "Could not initiate protocols in renewal thread" + return + + while portMappingExiting.load() == false: + if now() >= (lastUpdate + interval): + for mapping in portMappings: + if externalPort =? mapping.externalPort: + without renewedExternalPort =? + doPortMapping(mapping.internalPort, externalPort), err: + error "Error while renewal of port mapping", msg = err.msg + + if renewedExternalPort.value != externalPort.value: + error "The renewed external port is not the same as the originally mapped" + + lastUpdate = now() + + sleep(sleepDuration) + +proc startRenewalThread(strategy: PortMappingStrategy) = + try: + renewalThread = Thread[RenewelThreadArgs]() + renewalThread.createThread(renewPortMapping, (strategy, mappings)) + except CatchableError as exc: + warn "Failed to create NAT port mapping renewal thread", exc = exc.msg + ## Gets external IP provided by the port mapping protocols ## Port mapping needs to be succesfully started first using `startPortMapping()` proc getExternalIP*(): ?IpAddress = @@ -227,8 +270,11 @@ proc getExternalIP*(): ?IpAddress = if upnp != nil: let ires = upnp.externalIPAddress if ires.isOk(): - info "Got externa IP address: " & ires.value, ip = ires.value - return parseIpAddress(ires.value).some + info "Got externa IP address", ip = ires.value + try: + return parseIpAddress(ires.value).some + except ValueError as e: + error "Failed to parse IP address", err = e.msg else: debug "Getting external IP address using UPnP failed", msg = ires.error, protocol = "upnp" @@ -239,16 +285,15 @@ proc getExternalIP*(): ?IpAddress = debug "Getting external IP address using NAT-PMP failed", msg = nires.error else: try: - info "Got externa IP address: " & $(nires.value), - ip =$ (nires.value), protocol = "npmp" + info "Got externa IP address", ip = $(nires.value), protocol = "npmp" return parseIpAddress($(nires.value)).some except ValueError as e: - error "parseIpAddress() exception", err = e.msg + error "Failed to parse IP address", err = e.msg return IpAddress.none proc startPortMapping*( - strategy: PortMappingStrategy, internalPorts: seq[MappingPort] + strategy: var PortMappingStrategy, internalPorts: seq[MappingPort] ): ?!seq[PortMapping] = if strategy == PortMappingStrategy.None: return failure("No port mapping strategy requested") @@ -260,18 +305,21 @@ proc startPortMapping*( if strategy == PortMappingStrategy.None: return failure("No available port mapping protocols on the network") - portMapping = newSeqOfCap[PortMappings](internalPorts.len) + if mappings.len > 0: + return failure("Port mapping was already started! Stop first before re-starting.") + + mappings = newSeqOfCap[PortMapping](internalPorts.len) for port in internalPorts: without mappedPort =? doPortMapping(port), err: - warn "Failed to map port", port = port.value, msg = err.msg - portMapping.add((internalPort: port, externalPort: MappingPort.none)) + warn "Failed to map port", port = port, msg = err.msg + mappings.add((internalPort: port, externalPort: MappingPort.none)) - portMapping.add((internalPort: port, externalPort: mappedPort.some)) + mappings.add((internalPort: port, externalPort: mappedPort.some)) startRenewalThread(strategy) - return success(externalPorts) + return success(mappings) proc stopPortMapping*() = if upnp == nil or npmp == nil: @@ -281,83 +329,44 @@ proc stopPortMapping*() = info "Stopping port mapping renewal threads" try: portMappingExiting.store(true) - renewalThread.join() + renewalThread.joinThread() except CatchableError as exc: warn "Failed to stop port mapping renewal thread", exc = exc.msg - for mapping in portMapping: - if upnp != nil: - let protocol = - if (internalPort is TcpPort): UPNPProtocol.TCP else: UPNPProtocol.UDP - - if err =? - upnp.deletePortMapping( - externalPort = $(mapping.externalPort.value), protocol = protocol - ).errorOption: - error "UPnP port mapping deletion error", msg = err.msg - else: - debug "UPnP: deleted port mapping", - externalPort = mapping.externalPort, - internalPort = mapping.internalPort, - protocol = protocol - - if npnp != nil: - let protocol = - if (internalPort is TcpPort): NatPmpProtocol.TCP else: NatPmpProtocol.UDP - - if err =? - npmp.deletePortMapping( - eport = mapping.externalPort.value, - iport = mapping.internalPort.value, - protocol = protocol, - ).errorOption: - error "NAT-PMP port mapping deletion error", msg = err.msg - else: - debug "NAT-PMP: deleted port mapping", - externalPort = mapping.externalPort, - internalPort = mapping.internalPort, - protocol = protocol - -proc startRenewalThread( - strategy: PortMappingStrategy, - internalPorts: seq[MappingPort], - externalPorts: seq[?MappingPort], -) = - try: - renewalThread = Thread[RenewelThreadArgs]() - renewalThread.createThread(renewPortMapping, (strategy, portMapping)) - except CatchableError as exc: - warn "Failed to create NAT port mapping renewal thread", exc = exc.msg - -proc renewPortMapping(args: RenewelThreadArgs) {.thread, raises: [ValueError].} = - ignoreSignalsInThread() - let - (strategy, portMappings) = args - interval = initDuration(seconds = RENEWAL_INTERVAL) - sleepDuration = 1_000 # in ms, also the maximum delay after pressing Ctrl-C - - var lastUpdate = now() - - # We can't use copies of Miniupnp and Pmp objects in this thread, because they share - # C pointers with other instances that have already been garbage collected, so - # we use threadvars instead and initialise them again with initProtocols(), - # even though we don't need the external IP's value. - - if initProtocols(strategy) == PortMappingStrategy.None: - error "Could not initiate protocols in renewal thread" - return - - while portMappingExiting.load() == false: - if now() >= (lastUpdate + interval): - for mapping in portMappings: - if externalPort =? mapping.externalPort: - without renewedExternalPort =? - doPortMapping(mapping.internalPort, externalPort), err: - error "Error while renewal of port mapping", msg = err.msg - - if renewedExternalPort.value != externalPort.value: - error "The renewed external port is not the same as the originally mapped" - - lastUpdate = now() - - sleep(sleepDuration) + for mapping in mappings: + if mapping.externalPort.isNone: + continue + + if upnp != nil: + let protocol = + if (mapping.internalPort is TcpPort): UPNPProtocol.TCP else: UPNPProtocol.UDP + + if err =? + upnp.deletePortMapping( + externalPort = $((!mapping.externalPort).value), protocol = protocol + ).errorOption: + error "UPnP port mapping deletion error", msg = err + else: + debug "UPnP: deleted port mapping", + externalPort = !mapping.externalPort, + internalPort = mapping.internalPort, + protocol = protocol + + if npmp != nil: + let protocol = + if (mapping.internalPort is TcpPort): NatPmpProtocol.TCP else: NatPmpProtocol.UDP + + if err =? + npmp.deletePortMapping( + eport = (!mapping.externalPort).value.cushort, + iport = mapping.internalPort.value.cushort, + protocol = protocol, + ).errorOption: + error "NAT-PMP port mapping deletion error", msg = err + else: + debug "NAT-PMP: deleted port mapping", + externalPort = !mapping.externalPort, + internalPort = mapping.internalPort, + protocol = protocol + + mappings = @[] diff --git a/codex/nat/reachabilitymanager.nim b/codex/nat/reachabilitymanager.nim index b110b45ee5..864a2d7548 100644 --- a/codex/nat/reachabilitymanager.nim +++ b/codex/nat/reachabilitymanager.nim @@ -10,6 +10,7 @@ import pkg/libp2p/protocols/connectivity/autonat/service import ../rng as random import ./port_mapping +import ./utils const AutonatCheckInterval = Opt.some(chronos.seconds(30)) @@ -34,11 +35,53 @@ proc new*( ): T = return T(portMappingStrategy: portMappingStrategy) +proc startPortMapping(self: ReachabilityManager): bool = + if not self.started: + warn "ReachabilityManager is not started, yet we are trying to map ports already!" + return false + + try: + let announceRecords = (!self.getAnnounceRecords)() + let discoveryRecords = (!self.getDiscoveryRecords)() + let portsToBeMapped = + (announceRecords & discoveryRecords).mapIt(getAddressAndPort(it)).mapIt(it.port) + + without mappedPorts =? startPortMapping(self.portMappingStrategy, portsToBeMapped), + err: + warn "Could not start port mapping", msg = err.msg + return false + + if mappedPorts.any( + proc(x: PortMapping): bool = + isNone(x.externalPort) + ): + warn "Some ports were not mapped - not using port mapping then" + return false + + info "Started port mapping" + + let announceMappedRecords = zip( + announceRecords, mappedPorts[0 .. announceRecords.len - 1] + ) + .mapIt(getMultiAddr(getAddressAndPort(it[0]).ip, !it[1].externalPort)) + (!self.updateAnnounceRecords)(announceMappedRecords) + + let discoveryMappedRecords = zip( + discoveryRecords, mappedPorts[announceRecords.len .. ^1] + ) + .mapIt(getMultiAddr(getAddressAndPort(it[0]).ip, !it[1].externalPort)) + (!self.updateDiscoveryRecords)(discoveryMappedRecords) + + return true + except ValueError as exc: + error "Error while starting port mapping", msg = exc.msg + return false + proc getReachabilityHandler(manager: ReachabilityManager): StatusAndConfidenceHandler = let statusAndConfidenceHandler = proc( networkReachability: NetworkReachability, confidenceOpt: Opt[float] - ): Future[void] {.gcsafe, async: (raises: [CancelledError]).} = - if not started: + ): Future[void] {.async: (raises: [CancelledError]).} = + if not manager.started: warn "ReachabilityManager was not started, but we are already getting reachability updates! Ignoring..." return @@ -56,7 +99,7 @@ proc getReachabilityHandler(manager: ReachabilityManager): StatusAndConfidenceHa manager.networkReachability = networkReachability - if networkReachability == NetworkReachability.Unreachable: + if networkReachability == NetworkReachability.NotReachable: # Lets first start to expose port using port mapping protocols like NAT-PMP or UPnP if manager.startPortMapping(): return # We exposed ports so we should be good! @@ -65,45 +108,6 @@ proc getReachabilityHandler(manager: ReachabilityManager): StatusAndConfidenceHa return statusAndConfidenceHandler -proc startPortMapping(self: ReachabilityManager): bool = - try: - let announceRecords = self.getAnnounceRecords() - let discoveryRecords = self.getDiscoveryRecords() - let portsToBeMapped = - (announceRecords & discoveryRecords).mapIt(getAddressAndPort(it)).mapIt(it.port) - - without mappedPorts =? startPortMapping( - manager.portMappingStrategy, portsToBeMapped - ), err: - warn "Could not start port mapping", msg = err - return false - - if mappedPorts.any( - proc(x: ?MappingPort): bool = - isNone(x) - ): - warn "Some ports were not mapped - not using port mapping then" - return false - - info "Started port mapping" - - let announceMappedRecords = zip( - announceRecords, mappedPorts[0 .. announceRecords.len - 1] - ) - .mapIt(getMultiAddr(getAddressAndPort(it[0]).ip, it[1].value)) - self.updateAnnounceRecords(announceMappedRecords) - - let discoveryMappedRecords = zip( - discoveryRecords, mappedPorts[announceRecords.len, ^1] - ) - .mapIt(getMultiAddr(getAddressAndPort(it[0]).ip, it[1].value)) - self.updateDiscoveryRecords(discoveryMappedRecords) - - return true - except ValueError as exc: - error "Error while starting port mapping", msg = exc.msg - return false - proc start*( self: ReachabilityManager, switch: Switch, bootNodes: seq[SignedPeerRecord] ): Future[void] {.async: (raises: [CancelledError]).} = diff --git a/codex/nat/utils.nim b/codex/nat/utils.nim index ae856d31df..9436ec1647 100644 --- a/codex/nat/utils.nim +++ b/codex/nat/utils.nim @@ -7,13 +7,11 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. -import std/strutils import std/options import std/net import pkg/libp2p -import pkg/questionable -import pkg/questionable/results +import pkg/stew/endians2 import ./port_mapping @@ -62,7 +60,7 @@ proc getAddressAndPort*( proc getMultiAddr*(ip: IpAddress, port: MappingPort): MultiAddress = let ipFamily = if ip.family == IpAddressFamily.IPv4: "/ip4/" else: "/ip6/" - let portType = if (internalPort is TcpPort): "/tcp/" else: "/udp/" + let portType = if (port is TcpPort): "/tcp/" else: "/udp/" return MultiAddress.init(ipFamily & $ip & portType & $(port.value)).expect( "valid multiaddr" ) From 4b98194620c542d3dd5477424341990003787d7a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adam=20Uhl=C3=AD=C5=99?= Date: Thu, 6 Nov 2025 17:49:23 +0100 Subject: [PATCH 3/3] feat: drop renewal thread in favor of async polling --- codex/codex.nim | 12 +- codex/conf.nim | 2 +- codex/nat/port_mapping.nim | 240 +++++++++++++----------------- codex/nat/reachabilitymanager.nim | 81 ++++++---- tests/codex/helpers/nodeutils.nim | 6 - 5 files changed, 165 insertions(+), 176 deletions(-) diff --git a/codex/codex.nim b/codex/codex.nim index 0ff25ee0aa..62fd6cc9ba 100644 --- a/codex/codex.nim +++ b/codex/codex.nim @@ -166,10 +166,12 @@ proc start*(s: CodexServer) {.async.} = await s.codexNode.switch.start() - s.reachabilityManager.getAnnounceRecords = some proc() = - s.codexNode.switch.peerInfo.addrs - s.reachabilityManager.getDiscoveryRecords = some proc() = - s.codexNode.discovery.dhtRecord.data.addresses.mapIt(it.address) + s.reachabilityManager.getAnnounceRecords = some proc(): ?seq[MultiAddress] = + s.codexNode.switch.peerInfo.addrs.some + s.reachabilityManager.getDiscoveryRecords = some proc(): ?seq[MultiAddress] = + if dhtRecord =? s.codexNode.discovery.dhtRecord: + return dhtRecord.data.addresses.mapIt(it.address).some + s.reachabilityManager.updateAnnounceRecords = some proc(records: seq[MultiAddress]) = s.codexNode.discovery.updateAnnounceRecord(records) s.reachabilityManager.updateDiscoveryRecords = some proc(records: seq[MultiAddress]) = @@ -207,7 +209,7 @@ proc new*( ): CodexServer = ## create CodexServer including setting up datastore, repostore, etc - let reachabilityManager = ReachabilityManager.new(config.portMappingStrategy) + let reachabilityManager = ReachabilityManager.new(config.forcePortMapping) let switch = SwitchBuilder .new() diff --git a/codex/conf.nim b/codex/conf.nim index fb021a43de..882c37d281 100644 --- a/codex/conf.nim +++ b/codex/conf.nim @@ -47,7 +47,7 @@ from ./contracts/config import DefaultRequestCacheSize, DefaultMaxPriorityFeePer from ./validationconfig import MaxSlots, ValidationGroups export units, net, codextypes, logutils, completeCmdArg, parseCmdArg -export ValidationGroups, MaxSlots +export ValidationGroups, MaxSlots, PortMappingStrategy export DefaultQuotaBytes, DefaultBlockTtl, DefaultBlockInterval, DefaultNumBlocksPerInterval, diff --git a/codex/nat/port_mapping.nim b/codex/nat/port_mapping.nim index 2409da2018..57944d08f7 100644 --- a/codex/nat/port_mapping.nim +++ b/codex/nat/port_mapping.nim @@ -8,7 +8,7 @@ {.push raises: [].} -import std/[options, os, strutils, times, net, atomics] +import std/[options, strutils, net] import pkg/nat_traversal/[miniupnpc, natpmp] import pkg/json_serialization/std/net @@ -26,7 +26,7 @@ logScope: const UPNP_TIMEOUT = 200 # ms - RENEWAL_INTERVAL = 20 * 60 # seconds + RENEWAL_SLEEP = (20 * 60).seconds Pmp_LIFETIME = 60 * 60 # in seconds, must be longer than RENEWAL_INTERVAL MAPPING_DESCRIPTION = "codex" @@ -39,7 +39,7 @@ type PortMappingStrategy* = enum type MappingPort* = ref object of RootObj value*: Port -proc `$`(p: MappingPort): string = +proc `$`*(p: MappingPort): string = $(p.value) type TcpPort* = ref object of MappingPort @@ -51,37 +51,33 @@ proc newTcpMappingPort*(value: Port): TcpPort = proc newUdpMappingPort*(value: Port): UdpPort = UdpPort(value: value) -type PortMapping* = tuple[internalPort: MappingPort, externalPort: Option[MappingPort]] -type RenewelThreadArgs = - tuple[strategy: PortMappingStrategy, portMapping: seq[PortMapping]] +type PortMappingEntry* = + tuple[internalPort: MappingPort, externalPort: Option[MappingPort]] -var - upnp {.threadvar.}: Miniupnp - npmp {.threadvar.}: NatPmp - mappings: seq[PortMapping] - portMappingExiting: Atomic[bool] - renewalThread: Thread[RenewelThreadArgs] +type PortMapping* = ref object of RootObj + upnp: Miniupnp + npmp: NatPmp + mappings: seq[PortMappingEntry] + renewalLoop: Future[void] -proc initUpnp(): bool = +proc initUpnp(self: PortMapping) = logScope: protocol = "upnp" - if upnp != nil: + if not self.upnp.isNil: warn "UPnP already initialized!" - return true - upnp = newMiniupnp() - upnp.discoverDelay = UPNP_TIMEOUT + self.upnp = newMiniupnp() + self.upnp.discoverDelay = UPNP_TIMEOUT - if err =? upnp.discover().errorOption: + if err =? self.upnp.discover().errorOption: warn "UPnP error discoverning Internet Gateway Devices", msg = err - upnp = nil - return false + self.upnp = nil - case upnp.selectIGD() + case self.upnp.selectIGD() of IGDNotFound: info "UPnP Internet Gateway Device not found. Giving up." - upnp = nil + self.upnp = nil # As UPnP is not supported on our network we won't be using it --> lets erase it. of IGDFound: info "UPnP Internet Gateway Device found." @@ -92,47 +88,45 @@ proc initUpnp(): bool = of IGDIpNotRoutable: info "UPnP Internet Gateway Device found and is connected, but with a reserved or non-routable IP. Trying anyway." - return true - -proc initNpmp(): bool = +proc initNpmp(self: PortMapping) = logScope: protocol = "npmp" - if npmp != nil: + if not self.npmp.isNil: warn "NAT-PMP already initialized!" - return true - npmp = newNatPmp() + self.npmp = newNatPmp() - if err =? npmp.init().errorOption: + if err =? self.npmp.init().errorOption: warn "Error initialization of NAT-PMP", msg = err - npmp = nil - return false + self.npmp = nil - if err =? npmp.externalIPAddress().errorOption: + if err =? self.npmp.externalIPAddress().errorOption: warn "Fetching of external IP failed.", msg = err - npmp = nil - return false + self.npmp = nil info "NAT-PMP initialized" - return true -## Try to initilize all the port mapping protocols and returns -## the protocol that will be used. -proc initProtocols(strategy: PortMappingStrategy): PortMappingStrategy = +## Try to initilize all the port mapping protocols based on what is available on the network +proc initProtocols(self: PortMapping, strategy: PortMappingStrategy) = if strategy == PortMappingStrategy.Any or strategy == PortMappingStrategy.Upnp: - if initUpnp(): - return PortMappingStrategy.Upnp + self.initUpnp() + + if not self.upnp.isNil: + return # UPnP is available, using that, no need for NAT-PMP. if strategy == PortMappingStrategy.Any or strategy == PortMappingStrategy.Pmp: - if initNpmp(): - return PortMappingStrategy.Pmp + self.initNpmp() - return PortMappingStrategy.None +proc new*(T: type PortMapping, strategy: PortMappingStrategy): PortMapping = + let mapping = PortMapping(upnp: nil, npmp: nil, mappings: @[]) + mapping.initProtocols(strategy) + + return mapping proc upnpPortMapping( - internalPort: MappingPort, externalPort: MappingPort -): ?!MappingPort {.gcsafe.} = + self: PortMapping, internalPort: MappingPort, externalPort: MappingPort +): ?!MappingPort = let protocol = if (internalPort is TcpPort): UPNPProtocol.TCP else: UPNPProtocol.UDP logScope: @@ -141,10 +135,10 @@ proc upnpPortMapping( internalPort = internalPort.value protocol = protocol - let pmres = upnp.addPortMapping( + let pmres = self.upnp.addPortMapping( externalPort = $(externalPort.value), protocol = protocol, - internalHost = upnp.lanAddr, + internalHost = self.upnp.lanAddr, internalPort = $(internalPort.value), desc = MAPPING_DESCRIPTION, leaseDuration = 0, @@ -155,7 +149,7 @@ proc upnpPortMapping( return failure($pmres.error) # let's check it - let cres = upnp.getSpecificPortMapping( + let cres = self.upnp.getSpecificPortMapping( externalPort = $(externalPort.value), protocol = protocol ) if cres.isErr: @@ -166,8 +160,8 @@ proc upnpPortMapping( return success(externalPort) proc npmpPortMapping( - internalPort: MappingPort, externalPort: MappingPort -): ?!MappingPort {.gcsafe.} = + self: PortMapping, internalPort: MappingPort, externalPort: MappingPort +): ?!MappingPort = let protocol = if (internalPort is TcpPort): NatPmpProtocol.TCP else: NatPmpProtocol.UDP @@ -177,7 +171,7 @@ proc npmpPortMapping( internalPort = internalPort.value protocol = protocol - let extPortRes = npmp.addPortMapping( + let extPortRes = self.npmp.addPortMapping( eport = externalPort.value.cushort, iport = internalPort.value.cushort, protocol = protocol, @@ -200,75 +194,48 @@ proc npmpPortMapping( ## ## TODO: Add support for trying mapping of random external port. -proc doPortMapping(port: MappingPort): ?!MappingPort {.gcsafe.} = - if upnp != nil: - return upnpPortMapping(port, port) +proc doPortMapping(self: PortMapping, port: MappingPort): ?!MappingPort = + if not self.upnp.isNil: + return self.upnpPortMapping(port, port) - if npmp != nil: - return npmpPortMapping(port, port) + if not self.npmp.isNil: + return self.npmpPortMapping(port, port) return failure("No active startegy") proc doPortMapping( - internalPort: MappingPort, externalPort: MappingPort -): ?!MappingPort {.gcsafe.} = - if upnp != nil: - return upnpPortMapping(internalPort, externalPort) + self: PortMapping, internalPort: MappingPort, externalPort: MappingPort +): ?!MappingPort = + if not self.upnp.isNil: + return self.upnpPortMapping(internalPort, externalPort) - if npmp != nil: - return npmpPortMapping(internalPort, externalPort) + if not self.npmp.isNil: + return self.npmpPortMapping(internalPort, externalPort) return failure("No active startegy") -proc renewPortMapping(args: RenewelThreadArgs) {.thread, raises: [ValueError].} = - ignoreSignalsInThread() - let - (strategy, portMappings) = args - interval = initDuration(seconds = RENEWAL_INTERVAL) - sleepDuration = 1_000 # in ms, also the maximum delay after pressing Ctrl-C - - var lastUpdate = now() - - # We can't use copies of Miniupnp and Pmp objects in this thread, because they share - # C pointers with other instances that have already been garbage collected, so - # we use threadvars instead and initialise them again with initProtocols(), - # even though we don't need the external IP's value. - - if initProtocols(strategy) == PortMappingStrategy.None: - error "Could not initiate protocols in renewal thread" - return +proc renewPortMapping(self: PortMapping) {.async.} = + while true: + for mapping in self.mappings: + if externalPort =? mapping.externalPort: + without renewedExternalPort =? + self.doPortMapping(mapping.internalPort, externalPort), err: + error "Error while renewal of port mapping", msg = err.msg - while portMappingExiting.load() == false: - if now() >= (lastUpdate + interval): - for mapping in portMappings: - if externalPort =? mapping.externalPort: - without renewedExternalPort =? - doPortMapping(mapping.internalPort, externalPort), err: - error "Error while renewal of port mapping", msg = err.msg + if renewedExternalPort.value != externalPort.value: + error "The renewed external port is not the same as the originally mapped" - if renewedExternalPort.value != externalPort.value: - error "The renewed external port is not the same as the originally mapped" - - lastUpdate = now() - - sleep(sleepDuration) - -proc startRenewalThread(strategy: PortMappingStrategy) = - try: - renewalThread = Thread[RenewelThreadArgs]() - renewalThread.createThread(renewPortMapping, (strategy, mappings)) - except CatchableError as exc: - warn "Failed to create NAT port mapping renewal thread", exc = exc.msg + await sleepAsync(RENEWAL_SLEEP) ## Gets external IP provided by the port mapping protocols ## Port mapping needs to be succesfully started first using `startPortMapping()` -proc getExternalIP*(): ?IpAddress = - if upnp == nil and npmp == nil: +proc getExternalIP*(self: PortMapping): ?IpAddress = + if self.upnp.isNil and self.npmp.isNil: warn "No available port-mapping protocol" return IpAddress.none - if upnp != nil: - let ires = upnp.externalIPAddress + if not self.upnp.isNil: + let ires = self.upnp.externalIPAddress if ires.isOk(): info "Got externa IP address", ip = ires.value try: @@ -279,8 +246,8 @@ proc getExternalIP*(): ?IpAddress = debug "Getting external IP address using UPnP failed", msg = ires.error, protocol = "upnp" - if npmp != nil: - let nires = npmp.externalIPAddress() + if not self.npmp.isNil: + let nires = self.npmp.externalIPAddress() if nires.isErr: debug "Getting external IP address using NAT-PMP failed", msg = nires.error else: @@ -292,57 +259,62 @@ proc getExternalIP*(): ?IpAddress = return IpAddress.none -proc startPortMapping*( - strategy: var PortMappingStrategy, internalPorts: seq[MappingPort] -): ?!seq[PortMapping] = - if strategy == PortMappingStrategy.None: - return failure("No port mapping strategy requested") +## Returns true if some supported port mapping protocol +## is available on the local network +proc isAvailable*(self: PortMapping): bool = + return (not self.upnp.isNil) or (not self.npmp.isNil) +proc start*( + self: PortMapping, internalPorts: seq[MappingPort] +): Future[?!seq[PortMappingEntry]] {.async: (raises: [CancelledError]).} = if internalPorts.len == 0: return failure("No internal ports to be mapped were supplied") - strategy = initProtocols(strategy) - if strategy == PortMappingStrategy.None: + if not self.isAvailable(): return failure("No available port mapping protocols on the network") - if mappings.len > 0: + if self.mappings.len > 0: return failure("Port mapping was already started! Stop first before re-starting.") - mappings = newSeqOfCap[PortMapping](internalPorts.len) - for port in internalPorts: - without mappedPort =? doPortMapping(port), err: + without mappedPort =? self.doPortMapping(port), err: warn "Failed to map port", port = port, msg = err.msg - mappings.add((internalPort: port, externalPort: MappingPort.none)) + self.mappings.add((internalPort: port, externalPort: MappingPort.none)) - mappings.add((internalPort: port, externalPort: mappedPort.some)) + self.mappings.add((internalPort: port, externalPort: mappedPort.some)) - startRenewalThread(strategy) + self.renewalLoop = self.renewPortMapping() + asyncSpawn(self.renewalLoop) - return success(mappings) + return success(self.mappings) -proc stopPortMapping*() = - if upnp == nil or npmp == nil: +proc stop*(self: PortMapping) {.async: (raises: [CancelledError]).} = + if self.upnp.isNil or self.npmp.isNil: debug "Port mapping is not running, nothing to stop" return - info "Stopping port mapping renewal threads" - try: - portMappingExiting.store(true) - renewalThread.joinThread() - except CatchableError as exc: - warn "Failed to stop port mapping renewal thread", exc = exc.msg + info "Stopping port mapping renewal loop" + if not self.renewalLoop.isNil: + if not self.renewalLoop.finished: + try: + await self.renewalLoop.cancelAndWait() + except CancelledError: + discard + except CatchableError as e: + error "Error during cancellation of renewal loop", msg = e.msg + + self.renewalLoop = nil - for mapping in mappings: + for mapping in self.mappings: if mapping.externalPort.isNone: continue - if upnp != nil: + if not self.upnp.isNil: let protocol = if (mapping.internalPort is TcpPort): UPNPProtocol.TCP else: UPNPProtocol.UDP if err =? - upnp.deletePortMapping( + self.upnp.deletePortMapping( externalPort = $((!mapping.externalPort).value), protocol = protocol ).errorOption: error "UPnP port mapping deletion error", msg = err @@ -352,12 +324,12 @@ proc stopPortMapping*() = internalPort = mapping.internalPort, protocol = protocol - if npmp != nil: + if not self.npmp.isNil: let protocol = if (mapping.internalPort is TcpPort): NatPmpProtocol.TCP else: NatPmpProtocol.UDP if err =? - npmp.deletePortMapping( + self.npmp.deletePortMapping( eport = (!mapping.externalPort).value.cushort, iport = mapping.internalPort.value.cushort, protocol = protocol, @@ -369,4 +341,4 @@ proc stopPortMapping*() = internalPort = mapping.internalPort, protocol = protocol - mappings = @[] + self.mappings = @[] diff --git a/codex/nat/reachabilitymanager.nim b/codex/nat/reachabilitymanager.nim index 864a2d7548..bbc63198c5 100644 --- a/codex/nat/reachabilitymanager.nim +++ b/codex/nat/reachabilitymanager.nim @@ -19,58 +19,74 @@ logScope: type ReachabilityManager* = ref object of RootObj + started = false + portMapping: PortMapping networkReachability*: NetworkReachability - portMappingStrategy: PortMappingStrategy getAnnounceRecords*: ?GetRecords getDiscoveryRecords*: ?GetRecords updateAnnounceRecords*: ?UpdateRecords updateDiscoveryRecords*: ?UpdateRecords - started = false - GetRecords* = proc(): seq[MultiAddress] {.raises: [].} + GetRecords* = proc(): ?seq[MultiAddress] {.raises: [].} UpdateRecords* = proc(records: seq[MultiAddress]) {.raises: [].} proc new*( T: typedesc[ReachabilityManager], portMappingStrategy: PortMappingStrategy ): T = - return T(portMappingStrategy: portMappingStrategy) + return T(portMapping: PortMapping.new(portMappingStrategy)) -proc startPortMapping(self: ReachabilityManager): bool = +proc startPortMapping( + self: ReachabilityManager +): Future[bool] {.async: (raises: [CancelledError]).} = + # This check guarantees us that the callbacks are set + # and hence we can use ! (Option.get) without fear. if not self.started: warn "ReachabilityManager is not started, yet we are trying to map ports already!" return false try: - let announceRecords = (!self.getAnnounceRecords)() - let discoveryRecords = (!self.getDiscoveryRecords)() - let portsToBeMapped = - (announceRecords & discoveryRecords).mapIt(getAddressAndPort(it)).mapIt(it.port) + {.gcsafe.}: + let announceRecords = (!self.getAnnounceRecords)() + let discoveryRecords = (!self.getDiscoveryRecords)() + + var records: seq[MultiAddress] = @[] + + if announceRecords.isSome: + records.add(!announceRecords) + if discoveryRecords.isSome: + records.add(!discoveryRecords) + + let portsToBeMapped = records.mapIt(getAddressAndPort(it)).mapIt(it.port) - without mappedPorts =? startPortMapping(self.portMappingStrategy, portsToBeMapped), - err: + without mappedPorts =? (await self.portMapping.start(portsToBeMapped)), err: warn "Could not start port mapping", msg = err.msg return false if mappedPorts.any( - proc(x: PortMapping): bool = + proc(x: PortMappingEntry): bool = isNone(x.externalPort) ): warn "Some ports were not mapped - not using port mapping then" return false - info "Started port mapping" - - let announceMappedRecords = zip( - announceRecords, mappedPorts[0 .. announceRecords.len - 1] - ) - .mapIt(getMultiAddr(getAddressAndPort(it[0]).ip, !it[1].externalPort)) - (!self.updateAnnounceRecords)(announceMappedRecords) - - let discoveryMappedRecords = zip( - discoveryRecords, mappedPorts[announceRecords.len .. ^1] - ) - .mapIt(getMultiAddr(getAddressAndPort(it[0]).ip, !it[1].externalPort)) - (!self.updateDiscoveryRecords)(discoveryMappedRecords) + info "Succesfully exposed ports", ports = portsToBeMapped + + if announceRecords.isSome: + let announceMappedRecords = zip( + !announceRecords, mappedPorts[0 .. (!announceRecords).len - 1] + ) + .mapIt(getMultiAddr(getAddressAndPort(it[0]).ip, !it[1].externalPort)) + {.gcsafe.}: + (!self.updateAnnounceRecords)(announceMappedRecords) + + if discoveryRecords.isSome: + let discoveryMappedRecords = zip( + !discoveryRecords, + mappedPorts[(mappedPorts.len - (!discoveryRecords).len) .. ^1], + ) + .mapIt(getMultiAddr(getAddressAndPort(it[0]).ip, !it[1].externalPort)) + {.gcsafe.}: + (!self.updateDiscoveryRecords)(discoveryMappedRecords) return true except ValueError as exc: @@ -80,7 +96,7 @@ proc startPortMapping(self: ReachabilityManager): bool = proc getReachabilityHandler(manager: ReachabilityManager): StatusAndConfidenceHandler = let statusAndConfidenceHandler = proc( networkReachability: NetworkReachability, confidenceOpt: Opt[float] - ): Future[void] {.async: (raises: [CancelledError]).} = + ): Future[void] {.gcsafe, async: (raises: [CancelledError]).} = if not manager.started: warn "ReachabilityManager was not started, but we are already getting reachability updates! Ignoring..." return @@ -101,8 +117,11 @@ proc getReachabilityHandler(manager: ReachabilityManager): StatusAndConfidenceHa if networkReachability == NetworkReachability.NotReachable: # Lets first start to expose port using port mapping protocols like NAT-PMP or UPnP - if manager.startPortMapping(): - return # We exposed ports so we should be good! + if manager.portMapping.isAvailable(): + debug "Port mapping available on the network" + + if await manager.startPortMapping(): + return # We exposed ports so we should be good! info "No more options to become reachable" @@ -129,8 +148,10 @@ proc start*( except CatchableError as exc: info "Failed to dial bootstrap nodes", err = exc.msg -proc stop*(): Future[void] {.async: (raises: [CancelledError]).} = - stopPortMapping() +proc stop*( + self: ReachabilityManager +): Future[void] {.async: (raises: [CancelledError]).} = + await self.portMapping.stop() self.started = false proc getAutonatService*(self: ReachabilityManager): Service = diff --git a/tests/codex/helpers/nodeutils.nim b/tests/codex/helpers/nodeutils.nim index 9a328b3324..5f797468db 100644 --- a/tests/codex/helpers/nodeutils.nim +++ b/tests/codex/helpers/nodeutils.nim @@ -10,8 +10,6 @@ import pkg/codex/stores import pkg/codex/blocktype as bt import pkg/codex/blockexchange import pkg/codex/systemclock -import pkg/codex/nat -import pkg/codex/utils/natutils import pkg/codex/slots import pkg/codex/node @@ -186,10 +184,6 @@ proc generateNodes*( if config.enableBootstrap: waitFor switch.peerInfo.update() - let (announceAddrs, discoveryAddrs) = - nattedAddress(switch.peerInfo.addrs, bindPort.Port) - blockDiscovery.updateAnnounceRecord(announceAddrs) - blockDiscovery.updateDhtRecord(discoveryAddrs) if blockDiscovery.dhtRecord.isSome: bootstrapNodes.add !blockDiscovery.dhtRecord