Skip to content

Commit 90b8412

Browse files
authored
basichost: move observed address manager to basichost (#3332)
1 parent 78f84c4 commit 90b8412

File tree

18 files changed

+663
-1082
lines changed

18 files changed

+663
-1082
lines changed

config/config.go

Lines changed: 36 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
bhost "github.com/libp2p/go-libp2p/p2p/host/basic"
2929
blankhost "github.com/libp2p/go-libp2p/p2p/host/blank"
3030
"github.com/libp2p/go-libp2p/p2p/host/eventbus"
31+
"github.com/libp2p/go-libp2p/p2p/host/observedaddrs"
3132
"github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoremem"
3233
rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager"
3334
routed "github.com/libp2p/go-libp2p/p2p/host/routed"
@@ -434,23 +435,23 @@ func (cfg *Config) addTransports() ([]fx.Option, error) {
434435
return fxopts, nil
435436
}
436437

437-
func (cfg *Config) newBasicHost(swrm *swarm.Swarm, eventBus event.Bus, an *autonatv2.AutoNAT) (*bhost.BasicHost, error) {
438+
func (cfg *Config) newBasicHost(swrm *swarm.Swarm, eventBus event.Bus, an *autonatv2.AutoNAT, o bhost.ObservedAddrsManager) (*bhost.BasicHost, error) {
438439
h, err := bhost.NewHost(swrm, &bhost.HostOpts{
439-
EventBus: eventBus,
440-
ConnManager: cfg.ConnManager,
441-
AddrsFactory: cfg.AddrsFactory,
442-
NATManager: cfg.NATManager,
443-
EnablePing: !cfg.DisablePing,
444-
UserAgent: cfg.UserAgent,
445-
ProtocolVersion: cfg.ProtocolVersion,
446-
EnableHolePunching: cfg.EnableHolePunching,
447-
HolePunchingOptions: cfg.HolePunchingOptions,
448-
EnableRelayService: cfg.EnableRelayService,
449-
RelayServiceOpts: cfg.RelayServiceOpts,
450-
EnableMetrics: !cfg.DisableMetrics,
451-
PrometheusRegisterer: cfg.PrometheusRegisterer,
452-
DisableIdentifyAddressDiscovery: cfg.DisableIdentifyAddressDiscovery,
453-
AutoNATv2: an,
440+
EventBus: eventBus,
441+
ConnManager: cfg.ConnManager,
442+
AddrsFactory: cfg.AddrsFactory,
443+
NATManager: cfg.NATManager,
444+
EnablePing: !cfg.DisablePing,
445+
UserAgent: cfg.UserAgent,
446+
ProtocolVersion: cfg.ProtocolVersion,
447+
EnableHolePunching: cfg.EnableHolePunching,
448+
HolePunchingOptions: cfg.HolePunchingOptions,
449+
EnableRelayService: cfg.EnableRelayService,
450+
RelayServiceOpts: cfg.RelayServiceOpts,
451+
EnableMetrics: !cfg.DisableMetrics,
452+
PrometheusRegisterer: cfg.PrometheusRegisterer,
453+
AutoNATv2: an,
454+
ObservedAddrsManager: o,
454455
})
455456
if err != nil {
456457
return nil, err
@@ -529,6 +530,25 @@ func (cfg *Config) NewNode() (host.Host, error) {
529530
})
530531
return sw, nil
531532
}),
533+
fx.Provide(func(eventBus event.Bus, s *swarm.Swarm, lifecycle fx.Lifecycle) (bhost.ObservedAddrsManager, error) {
534+
if cfg.DisableIdentifyAddressDiscovery {
535+
return nil, nil
536+
}
537+
o, err := observedaddrs.NewManager(eventBus, s)
538+
if err != nil {
539+
return nil, err
540+
}
541+
lifecycle.Append(fx.Hook{
542+
OnStart: func(context.Context) error {
543+
o.Start(s)
544+
return nil
545+
},
546+
OnStop: func(context.Context) error {
547+
return o.Close()
548+
},
549+
})
550+
return o, nil
551+
}),
532552
fx.Provide(func() (*autonatv2.AutoNAT, error) {
533553
if !cfg.EnableAutoNATv2 {
534554
return nil, nil

p2p/host/basic/addrs_manager.go

Lines changed: 68 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"errors"
66
"fmt"
7+
"io"
78
"net"
89
"slices"
910
"sync"
@@ -20,11 +21,15 @@ import (
2021
"github.com/prometheus/client_golang/prometheus"
2122
)
2223

23-
const maxObservedAddrsPerListenAddr = 5
24+
const maxObservedAddrsPerListenAddr = 3
2425

25-
type observedAddrsManager interface {
26-
OwnObservedAddrs() []ma.Multiaddr
27-
ObservedAddrsFor(local ma.Multiaddr) []ma.Multiaddr
26+
// addrChangeTickrInterval is the interval to recompute host addrs.
27+
var addrChangeTickrInterval = 5 * time.Second
28+
29+
// ObservedAddrsManager maps our local listen addrs to externally observed addrs.
30+
type ObservedAddrsManager interface {
31+
Addrs(minObservers int) []ma.Multiaddr
32+
AddrsFor(local ma.Multiaddr) []ma.Multiaddr
2833
}
2934

3035
type hostAddrs struct {
@@ -42,7 +47,7 @@ type addrsManager struct {
4247
addrsFactory AddrsFactory
4348
listenAddrs func() []ma.Multiaddr
4449
addCertHashes func([]ma.Multiaddr) []ma.Multiaddr
45-
observedAddrsManager observedAddrsManager
50+
observedAddrsManager ObservedAddrsManager
4651
interfaceAddrs *interfaceAddrsCache
4752
addrsReachabilityTracker *addrsReachabilityTracker
4853

@@ -72,7 +77,7 @@ func newAddrsManager(
7277
addrsFactory AddrsFactory,
7378
listenAddrs func() []ma.Multiaddr,
7479
addCertHashes func([]ma.Multiaddr) []ma.Multiaddr,
75-
observedAddrsManager observedAddrsManager,
80+
observedAddrsManager ObservedAddrsManager,
7681
addrsUpdatedChan chan struct{},
7782
client autonatv2Client,
7883
enableMetrics bool,
@@ -107,7 +112,6 @@ func newAddrsManager(
107112
}
108113

109114
func (a *addrsManager) Start() error {
110-
// TODO: add Start method to NATMgr
111115
if a.addrsReachabilityTracker != nil {
112116
err := a.addrsReachabilityTracker.Start()
113117
if err != nil {
@@ -135,8 +139,6 @@ func (a *addrsManager) Close() {
135139
}
136140

137141
func (a *addrsManager) NetNotifee() network.Notifiee {
138-
// Updating addrs in sync provides the nice property that
139-
// host.Addrs() just after host.Network().Listen(x) will return x
140142
return &network.NotifyBundle{
141143
ListenF: func(network.Network, ma.Multiaddr) { a.updateAddrsSync() },
142144
ListenCloseF: func(network.Network, ma.Multiaddr) { a.updateAddrsSync() },
@@ -159,34 +161,27 @@ func (a *addrsManager) updateAddrsSync() {
159161
}
160162
}
161163

162-
func (a *addrsManager) startBackgroundWorker() error {
163-
autoRelayAddrsSub, err := a.bus.Subscribe(new(event.EvtAutoRelayAddrsUpdated), eventbus.Name("addrs-manager"))
164+
func (a *addrsManager) startBackgroundWorker() (retErr error) {
165+
autoRelayAddrsSub, err := a.bus.Subscribe(new(event.EvtAutoRelayAddrsUpdated), eventbus.Name("addrs-manager autorelay sub"))
164166
if err != nil {
165167
return fmt.Errorf("error subscribing to auto relay addrs: %s", err)
166168
}
167-
168-
autonatReachabilitySub, err := a.bus.Subscribe(new(event.EvtLocalReachabilityChanged), eventbus.Name("addrs-manager"))
169+
mc := multiCloser{autoRelayAddrsSub}
170+
autonatReachabilitySub, err := a.bus.Subscribe(new(event.EvtLocalReachabilityChanged), eventbus.Name("addrs-manager autonatv1 sub"))
169171
if err != nil {
170-
err1 := autoRelayAddrsSub.Close()
171-
if err1 != nil {
172-
err1 = fmt.Errorf("error closign autorelaysub: %w", err1)
173-
}
174-
err = fmt.Errorf("error subscribing to autonat reachability: %s", err)
175-
return errors.Join(err, err1)
172+
return errors.Join(
173+
fmt.Errorf("error subscribing to autonat reachability: %s", err),
174+
mc.Close(),
175+
)
176176
}
177+
mc = append(mc, autonatReachabilitySub)
177178

178179
emitter, err := a.bus.Emitter(new(event.EvtHostReachableAddrsChanged), eventbus.Stateful)
179180
if err != nil {
180-
err1 := autoRelayAddrsSub.Close()
181-
if err1 != nil {
182-
err1 = fmt.Errorf("error closing autorelaysub: %w", err1)
183-
}
184-
err2 := autonatReachabilitySub.Close()
185-
if err2 != nil {
186-
err2 = fmt.Errorf("error closing autonat reachability: %w", err2)
187-
}
188-
err = fmt.Errorf("error subscribing to autonat reachability: %s", err)
189-
return errors.Join(err, err1, err2)
181+
return errors.Join(
182+
fmt.Errorf("error creating reachability subscriber: %s", err),
183+
mc.Close(),
184+
)
190185
}
191186

192187
var relayAddrs []ma.Multiaddr
@@ -230,6 +225,10 @@ func (a *addrsManager) background(autoRelayAddrsSub, autonatReachabilitySub even
230225
if err != nil {
231226
log.Warnf("error closing autonat reachability sub: %s", err)
232227
}
228+
err = emitter.Close()
229+
if err != nil {
230+
log.Warnf("error closing host reachability emitter: %s", err)
231+
}
233232
}()
234233

235234
ticker := time.NewTicker(addrChangeTickrInterval)
@@ -371,7 +370,8 @@ func (a *addrsManager) HolePunchAddrs() []ma.Multiaddr {
371370
// AllAddrs may ignore observed addresses in favour of NAT mappings.
372371
// Use both for hole punching.
373372
if a.observedAddrsManager != nil {
374-
addrs = append(addrs, a.observedAddrsManager.OwnObservedAddrs()...)
373+
// For holepunching, include all the best addresses we know even ones with only 1 observer.
374+
addrs = append(addrs, a.observedAddrsManager.Addrs(1)...)
375375
}
376376
addrs = ma.Unique(addrs)
377377
return slices.DeleteFunc(addrs, func(a ma.Multiaddr) bool { return !manet.IsPublicAddr(a) })
@@ -406,7 +406,12 @@ func (a *addrsManager) getLocalAddrs() []ma.Multiaddr {
406406

407407
finalAddrs := make([]ma.Multiaddr, 0, 8)
408408
finalAddrs = a.appendPrimaryInterfaceAddrs(finalAddrs, listenAddrs)
409-
finalAddrs = a.appendNATAddrs(finalAddrs, listenAddrs, a.interfaceAddrs.All())
409+
if a.natManager != nil {
410+
finalAddrs = a.appendNATAddrs(finalAddrs, listenAddrs)
411+
}
412+
if a.observedAddrsManager != nil {
413+
finalAddrs = a.appendObservedAddrs(finalAddrs, listenAddrs, a.interfaceAddrs.All())
414+
}
410415

411416
// Remove "/p2p-circuit" addresses from the list.
412417
// The p2p-circuit listener reports its address as just /p2p-circuit. This is
@@ -444,65 +449,36 @@ func (a *addrsManager) appendPrimaryInterfaceAddrs(dst []ma.Multiaddr, listenAdd
444449
// appendNATAddrs appends the NAT-ed addrs for the listenAddrs. For unspecified listen addrs it appends the
445450
// public address for all the interfaces.
446451
// Inferring WebTransport from QUIC depends on the observed address manager.
447-
//
448-
// TODO: Merge the natmgr and identify.ObservedAddrManager in to one NatMapper module.
449-
func (a *addrsManager) appendNATAddrs(dst []ma.Multiaddr, listenAddrs []ma.Multiaddr, ifaceAddrs []ma.Multiaddr) []ma.Multiaddr {
450-
var obsAddrs []ma.Multiaddr
452+
func (a *addrsManager) appendNATAddrs(dst []ma.Multiaddr, listenAddrs []ma.Multiaddr) []ma.Multiaddr {
451453
for _, listenAddr := range listenAddrs {
452-
var natAddr ma.Multiaddr
453-
if a.natManager != nil {
454-
natAddr = a.natManager.GetMapping(listenAddr)
455-
}
456-
457-
// The order of the cases below is important.
458-
switch {
459-
case natAddr == nil: // no nat mapping
460-
dst = a.appendObservedAddrs(dst, listenAddr, ifaceAddrs)
461-
case manet.IsIPUnspecified(natAddr):
462-
log.Infof("NAT device reported an unspecified IP as it's external address: %s", natAddr)
463-
_, natRest := ma.SplitFirst(natAddr)
464-
obsAddrs = a.appendObservedAddrs(obsAddrs[:0], listenAddr, ifaceAddrs)
465-
for _, addr := range obsAddrs {
466-
obsIP, _ := ma.SplitFirst(addr)
467-
if obsIP != nil && manet.IsPublicAddr(obsIP.Multiaddr()) {
468-
dst = append(dst, obsIP.Encapsulate(natRest))
469-
}
470-
}
471-
// This is !Public as opposed to IsPrivate intentionally.
472-
// Public is a more restrictive classification in some cases, like IPv6 addresses which only
473-
// consider unicast IPv6 addresses allocated so far as public(2000::/3).
474-
case !manet.IsPublicAddr(natAddr): // nat reported non public addr(maybe CGNAT?)
475-
// use both NAT and observed addr
476-
dst = append(dst, natAddr)
477-
dst = a.appendObservedAddrs(dst, listenAddr, ifaceAddrs)
478-
default: // public addr
454+
natAddr := a.natManager.GetMapping(listenAddr)
455+
if natAddr != nil {
479456
dst = append(dst, natAddr)
480457
}
481458
}
482459
return dst
483460
}
484461

485-
func (a *addrsManager) appendObservedAddrs(dst []ma.Multiaddr, listenAddr ma.Multiaddr, ifaceAddrs []ma.Multiaddr) []ma.Multiaddr {
486-
if a.observedAddrsManager == nil {
487-
return dst
488-
}
489-
// Add it for the listenAddr first.
462+
func (a *addrsManager) appendObservedAddrs(dst []ma.Multiaddr, listenAddrs, ifaceAddrs []ma.Multiaddr) []ma.Multiaddr {
463+
// Add it for all the listenAddr first.
490464
// listenAddr maybe unspecified. That's okay as connections on UDP transports
491465
// will have the unspecified address as the local address.
492-
obsAddrs := a.observedAddrsManager.ObservedAddrsFor(listenAddr)
493-
if len(obsAddrs) > maxObservedAddrsPerListenAddr {
494-
obsAddrs = obsAddrs[:maxObservedAddrsPerListenAddr]
466+
for _, la := range listenAddrs {
467+
obsAddrs := a.observedAddrsManager.AddrsFor(la)
468+
if len(obsAddrs) > maxObservedAddrsPerListenAddr {
469+
obsAddrs = obsAddrs[:maxObservedAddrsPerListenAddr]
470+
}
471+
dst = append(dst, obsAddrs...)
495472
}
496-
dst = append(dst, obsAddrs...)
497473

498474
// if it can be resolved into more addresses, add them too
499-
resolved, err := manet.ResolveUnspecifiedAddress(listenAddr, ifaceAddrs)
475+
resolved, err := manet.ResolveUnspecifiedAddresses(listenAddrs, ifaceAddrs)
500476
if err != nil {
501-
log.Warnf("failed to resolve listen addr %s, %s: %s", listenAddr, ifaceAddrs, err)
477+
log.Warnf("failed to resolve listen addr %s, %s: %s", listenAddrs, ifaceAddrs, err)
502478
return dst
503479
}
504480
for _, addr := range resolved {
505-
obsAddrs = a.observedAddrsManager.ObservedAddrsFor(addr)
481+
obsAddrs := a.observedAddrsManager.AddrsFor(addr)
506482
if len(obsAddrs) > maxObservedAddrsPerListenAddr {
507483
obsAddrs = obsAddrs[:maxObservedAddrsPerListenAddr]
508484
}
@@ -678,3 +654,21 @@ func removeNotInSource(addrs, source []ma.Multiaddr) []ma.Multiaddr {
678654
}
679655
return addrs[:i]
680656
}
657+
658+
type multiCloser []io.Closer
659+
660+
func (mc *multiCloser) Close() error {
661+
var errs []error
662+
for _, closer := range *mc {
663+
if err := closer.Close(); err != nil {
664+
var closerName string
665+
if named, ok := closer.(interface{ Name() string }); ok {
666+
closerName = named.Name()
667+
} else {
668+
closerName = fmt.Sprintf("%T", closer)
669+
}
670+
errs = append(errs, fmt.Errorf("error closing %s: %w", closerName, err))
671+
}
672+
}
673+
return errors.Join(errs...)
674+
}

0 commit comments

Comments
 (0)