diff --git a/agent.go b/agent.go index 5350330f..a6439092 100644 --- a/agent.go +++ b/agent.go @@ -9,6 +9,7 @@ import ( "context" "fmt" "net" + "strconv" "strings" "sync" "sync/atomic" @@ -72,6 +73,8 @@ type Agent struct { prflxAcceptanceMinWait time.Duration relayAcceptanceMinWait time.Duration + tcpPriorityOffset uint16 + portMin uint16 portMax uint16 @@ -585,6 +588,44 @@ func (a *Agent) getBestValidCandidatePair() *CandidatePair { } func (a *Agent) addPair(local, remote Candidate) *CandidatePair { + if local.TCPType() == TCPTypeActive && remote.TCPType() == TCPTypeActive { + return nil + } + + if local.TCPType() == TCPTypeActive && remote.TCPType() == TCPTypePassive { + addressToConnect := net.JoinHostPort(remote.Address(), strconv.Itoa(remote.Port())) + + conn, err := a.net.Dial("tcp", addressToConnect) + if err != nil { + a.log.Errorf("Failed to dial TCP address %s: %v", addressToConnect, err) + return nil + } + + packetConn := newTCPPacketConn(tcpPacketParams{ + ReadBuffer: tcpReadBufferSize, + LocalAddr: conn.LocalAddr(), + Logger: a.log, + }) + + if err = packetConn.AddConn(conn, nil); err != nil { + a.log.Errorf("Failed to add TCP connection: %v", err) + return nil + } + + localAddress, ok := conn.LocalAddr().(*net.TCPAddr) + if !ok { + a.log.Errorf("Failed to cast local address to TCP address") + return nil + } + + localCandidateHost, ok := local.(*CandidateHost) + if !ok { + a.log.Errorf("Failed to cast local candidate to CandidateHost") + return nil + } + localCandidateHost.port = localAddress.Port // this causes a data race with candidateBase.Port() + local.start(a, packetConn, a.startedCh) + } p := newCandidatePair(local, remote, a.isControlling) a.checklist = append(a.checklist, p) return p @@ -761,7 +802,9 @@ func (a *Agent) addCandidate(ctx context.Context, c Candidate, candidateConn net } } - c.start(a, candidateConn, a.startedCh) + if c.TCPType() != TCPTypeActive { + c.start(a, candidateConn, a.startedCh) + } set = append(set, c) a.localCandidates[c.NetworkType()] = set @@ -1029,6 +1072,11 @@ func (a *Agent) handleInbound(m *stun.Message, local Candidate, remote net.Addr) return } + remoteTCPType := TCPTypeUnspecified + if local.TCPType() == TCPTypePassive { + remoteTCPType = TCPTypeActive + } + prflxCandidateConfig := CandidatePeerReflexiveConfig{ Network: networkType.String(), Address: ip.String(), @@ -1036,6 +1084,7 @@ func (a *Agent) handleInbound(m *stun.Message, local Candidate, remote net.Addr) Component: local.Component(), RelAddr: "", RelPort: 0, + TCPType: remoteTCPType, } prflxCandidate, err := NewCandidatePeerReflexive(&prflxCandidateConfig) diff --git a/agent_active_tcp_test.go b/agent_active_tcp_test.go new file mode 100644 index 00000000..0b11cf2e --- /dev/null +++ b/agent_active_tcp_test.go @@ -0,0 +1,162 @@ +// SPDX-FileCopyrightText: 2023 The Pion community +// SPDX-License-Identifier: MIT + +//go:build !js +// +build !js + +package ice + +import ( + "net" + "testing" + "time" + + "github.com/pion/logging" + "github.com/pion/transport/v2/stdnet" + "github.com/pion/transport/v2/test" + "github.com/stretchr/testify/require" +) + +func getLocalIPAddress(t *testing.T, networkType NetworkType) net.IP { + net, err := stdnet.NewNet() + require.NoError(t, err) + localIPs, err := localInterfaces(net, nil, nil, []NetworkType{networkType}, false) + require.NoError(t, err) + require.NotEmpty(t, localIPs) + return localIPs[0] +} + +func ipv6Available(t *testing.T) bool { + net, err := stdnet.NewNet() + require.NoError(t, err) + localIPs, err := localInterfaces(net, nil, nil, []NetworkType{NetworkTypeTCP6}, false) + require.NoError(t, err) + return len(localIPs) > 0 +} + +func TestAgentActiveTCP(t *testing.T) { + report := test.CheckRoutines(t) + defer report() + + lim := test.TimeOut(time.Second * 5) + defer lim.Stop() + + const listenPort = 7686 + type testCase struct { + name string + networkTypes []NetworkType + listenIPAddress net.IP + selectedPairNetworkType string + } + testCases := []testCase{ + { + name: "TCP4 connection", + networkTypes: []NetworkType{NetworkTypeTCP4}, + listenIPAddress: getLocalIPAddress(t, NetworkTypeTCP4), + selectedPairNetworkType: tcp, + }, + { + name: "UDP is preferred over TCP4", // fails some time + networkTypes: supportedNetworkTypes(), + listenIPAddress: getLocalIPAddress(t, NetworkTypeTCP4), + selectedPairNetworkType: udp, + }, + } + + if ipv6Available(t) { + tcpv6Cases := []testCase{ + { + name: "TCP6 connection", + networkTypes: []NetworkType{NetworkTypeTCP6}, + listenIPAddress: getLocalIPAddress(t, NetworkTypeTCP6), + selectedPairNetworkType: tcp, + }, + { + name: "UDP is preferred over TCP6", // fails some time + networkTypes: supportedNetworkTypes(), + listenIPAddress: getLocalIPAddress(t, NetworkTypeTCP6), + selectedPairNetworkType: udp, + }, + } + testCases = append(testCases, tcpv6Cases...) + } + + for _, testCase := range testCases { + t.Run(testCase.name, func(t *testing.T) { + r := require.New(t) + + listener, err := net.ListenTCP("tcp", &net.TCPAddr{ + IP: testCase.listenIPAddress, + Port: listenPort, + }) + r.NoError(err) + defer func() { + _ = listener.Close() + }() + + loggerFactory := logging.NewDefaultLoggerFactory() + loggerFactory.DefaultLogLevel.Set(logging.LogLevelTrace) + + tcpMux := NewTCPMuxDefault(TCPMuxParams{ + Listener: listener, + Logger: loggerFactory.NewLogger("passive-ice-tcp-mux"), + ReadBufferSize: 20, + }) + + defer func() { + _ = tcpMux.Close() + }() + + r.NotNil(tcpMux.LocalAddr(), "tcpMux.LocalAddr() is nil") + + hostAcceptanceMinWait := 100 * time.Millisecond + passiveAgent, err := NewAgent(&AgentConfig{ + TCPMux: tcpMux, + CandidateTypes: []CandidateType{CandidateTypeHost}, + NetworkTypes: testCase.networkTypes, + LoggerFactory: loggerFactory, + IncludeLoopback: true, + HostAcceptanceMinWait: &hostAcceptanceMinWait, + }) + r.NoError(err) + r.NotNil(passiveAgent) + + activeAgent, err := NewAgent(&AgentConfig{ + CandidateTypes: []CandidateType{CandidateTypeHost}, + NetworkTypes: testCase.networkTypes, + LoggerFactory: loggerFactory, + HostAcceptanceMinWait: &hostAcceptanceMinWait, + }) + r.NoError(err) + r.NotNil(activeAgent) + + passiveAgentConn, activeAgenConn := connect(passiveAgent, activeAgent) + r.NotNil(passiveAgentConn) + r.NotNil(activeAgenConn) + + pair := passiveAgent.getSelectedPair() + r.NotNil(pair) + r.Equal(testCase.selectedPairNetworkType, pair.Local.NetworkType().NetworkShort()) + + foo := []byte("foo") + _, err = passiveAgentConn.Write(foo) + r.NoError(err) + + buffer := make([]byte, 1024) + n, err := activeAgenConn.Read(buffer) + r.NoError(err) + r.Equal(foo, buffer[:n]) + + bar := []byte("bar") + _, err = activeAgenConn.Write(bar) + r.NoError(err) + + n, err = passiveAgentConn.Read(buffer) + r.NoError(err) + r.Equal(bar, buffer[:n]) + + r.NoError(activeAgenConn.Close()) + r.NoError(passiveAgentConn.Close()) + }) + } +} diff --git a/agent_config.go b/agent_config.go index f5897cd4..35feabdc 100644 --- a/agent_config.go +++ b/agent_config.go @@ -41,11 +41,18 @@ const ( // defaultMaxBindingRequests is the maximum number of binding requests before considering a pair failed defaultMaxBindingRequests = 7 + // TCPPriorityOffset is a number which is subtracted from the default (UDP) candidate type preference + // for host, srflx and prfx candidate types. + defaultTCPPriorityOffset = 27 + // maxBufferSize is the number of bytes that can be buffered before we start to error maxBufferSize = 1000 * 1000 // 1MB // maxBindingRequestTimeout is the wait time before binding requests can be deleted maxBindingRequestTimeout = 4000 * time.Millisecond + + // tcpReadBufferSize is the size of the read buffer of tcpPacketConn used by active tcp candidate + tcpReadBufferSize = 8 ) func defaultCandidateTypes() []CandidateType { @@ -174,6 +181,12 @@ type AgentConfig struct { // Include loopback addresses in the candidate list. IncludeLoopback bool + + // TCPPriorityOffset is a number which is subtracted from the default (UDP) candidate type preference + // for host, srflx and prfx candidate types. It helps to configure relative preference of UDP candidates + // against TCP ones. Relay candidates for TCP and UDP are always 0 and not affected by this setting. + // When this is nil, defaultTCPPriorityOffset is used. + TCPPriorityOffset *uint16 } // initWithDefaults populates an agent and falls back to defaults if fields are unset @@ -208,6 +221,12 @@ func (config *AgentConfig) initWithDefaults(a *Agent) { a.relayAcceptanceMinWait = *config.RelayAcceptanceMinWait } + if config.TCPPriorityOffset == nil { + a.tcpPriorityOffset = defaultTCPPriorityOffset + } else { + a.tcpPriorityOffset = *config.TCPPriorityOffset + } + if config.DisconnectedTimeout == nil { a.disconnectedTimeout = defaultDisconnectedTimeout } else { diff --git a/agent_test.go b/agent_test.go index ba24f533..99b01147 100644 --- a/agent_test.go +++ b/agent_test.go @@ -1637,7 +1637,7 @@ func TestAcceptAggressiveNomination(t *testing.T) { KeepaliveInterval := time.Hour cfg0 := &AgentConfig{ - NetworkTypes: supportedNetworkTypes(), + NetworkTypes: []NetworkType{NetworkTypeUDP4, NetworkTypeUDP6}, MulticastDNSMode: MulticastDNSModeDisabled, Net: net0, @@ -1652,7 +1652,7 @@ func TestAcceptAggressiveNomination(t *testing.T) { require.NoError(t, aAgent.OnConnectionStateChange(aNotifier)) cfg1 := &AgentConfig{ - NetworkTypes: supportedNetworkTypes(), + NetworkTypes: []NetworkType{NetworkTypeUDP4, NetworkTypeUDP6}, MulticastDNSMode: MulticastDNSModeDisabled, Net: net1, KeepaliveInterval: &KeepaliveInterval, diff --git a/candidate_base.go b/candidate_base.go index 8e551d8f..4c9e75b8 100644 --- a/candidate_base.go +++ b/candidate_base.go @@ -263,7 +263,7 @@ func (c *candidateBase) handleInboundPacket(buf []byte, srcAddr net.Addr) { copy(m.Raw, buf) if err := m.Decode(); err != nil { - a.log.Warnf("Failed to handle decode ICE from %s to %s: %v", c.addr(), srcAddr, err) + a.log.Warnf("Failed to handle decode ICE from %s to %s: %v", srcAddr, c.addr(), err) return } @@ -279,7 +279,7 @@ func (c *candidateBase) handleInboundPacket(buf []byte, srcAddr net.Addr) { if !c.validateSTUNTrafficCache(srcAddr) { remoteCandidate, valid := a.validateNonSTUNTraffic(c, srcAddr) //nolint:contextcheck if !valid { - a.log.Warnf("Discarded message from %s, not a valid remote candidate", c.addr()) + a.log.Warnf("Discarded message to %s, not a valid remote candidate", c.addr()) return } c.addRemoteCandidateCache(remoteCandidate, srcAddr) @@ -355,7 +355,13 @@ func (c *candidateBase) Priority() uint32 { // candidates for a particular component for a particular data stream // that have the same type, the local preference MUST be unique for each // one. - return (1<<24)*uint32(c.Type().Preference()) + + + var tcpPriorityOffset uint16 = defaultTCPPriorityOffset + if c.agent() != nil { + tcpPriorityOffset = c.agent().tcpPriorityOffset + } + + return (1<<24)*uint32(c.Type().Preference(c.networkType, tcpPriorityOffset)) + (1<<8)*uint32(c.LocalPreference()) + uint32(256-c.Component()) } @@ -533,7 +539,7 @@ func UnmarshalCandidate(raw string) (Candidate, error) { case "srflx": return NewCandidateServerReflexive(&CandidateServerReflexiveConfig{"", protocol, address, port, component, priority, foundation, relatedAddress, relatedPort}) case "prflx": - return NewCandidatePeerReflexive(&CandidatePeerReflexiveConfig{"", protocol, address, port, component, priority, foundation, relatedAddress, relatedPort}) + return NewCandidatePeerReflexive(&CandidatePeerReflexiveConfig{"", protocol, address, port, component, priority, foundation, relatedAddress, relatedPort, tcpType}) case "relay": return NewCandidateRelay(&CandidateRelayConfig{"", protocol, address, port, component, priority, foundation, relatedAddress, relatedPort, "", nil}) default: diff --git a/candidate_peer_reflexive.go b/candidate_peer_reflexive.go index f019ec69..28439279 100644 --- a/candidate_peer_reflexive.go +++ b/candidate_peer_reflexive.go @@ -24,6 +24,7 @@ type CandidatePeerReflexiveConfig struct { Foundation string RelAddr string RelPort int + TCPType TCPType } // NewCandidatePeerReflexive creates a new peer reflective candidate @@ -49,6 +50,7 @@ func NewCandidatePeerReflexive(config *CandidatePeerReflexiveConfig) (*Candidate id: candidateID, networkType: networkType, candidateType: CandidateTypePeerReflexive, + tcpType: config.TCPType, address: config.Address, port: config.Port, resolvedAddr: createAddr(networkType, ip, config.Port), diff --git a/candidate_test.go b/candidate_test.go index d668936c..3d2b696b 100644 --- a/candidate_test.go +++ b/candidate_test.go @@ -36,7 +36,7 @@ func TestCandidatePriority(t *testing.T) { tcpType: TCPTypeActive, }, }, - WantPriority: 2128609279, + WantPriority: 1675624447, }, { Candidate: &CandidateHost{ @@ -47,7 +47,7 @@ func TestCandidatePriority(t *testing.T) { tcpType: TCPTypePassive, }, }, - WantPriority: 2124414975, + WantPriority: 1671430143, }, { Candidate: &CandidateHost{ @@ -58,7 +58,7 @@ func TestCandidatePriority(t *testing.T) { tcpType: TCPTypeSimultaneousOpen, }, }, - WantPriority: 2120220671, + WantPriority: 1667235839, }, { Candidate: &CandidatePeerReflexive{ @@ -78,7 +78,7 @@ func TestCandidatePriority(t *testing.T) { tcpType: TCPTypeSimultaneousOpen, }, }, - WantPriority: 1860173823, + WantPriority: 1407188991, }, { Candidate: &CandidatePeerReflexive{ @@ -89,7 +89,7 @@ func TestCandidatePriority(t *testing.T) { tcpType: TCPTypeActive, }, }, - WantPriority: 1855979519, + WantPriority: 1402994687, }, { Candidate: &CandidatePeerReflexive{ @@ -100,7 +100,7 @@ func TestCandidatePriority(t *testing.T) { tcpType: TCPTypePassive, }, }, - WantPriority: 1851785215, + WantPriority: 1398800383, }, { Candidate: &CandidateServerReflexive{ @@ -285,7 +285,7 @@ func TestCandidateMarshal(t *testing.T) { }, "", }, - "1052353102 1 tcp 2128609279 192.168.0.196 0 typ host tcptype active", + "1052353102 1 tcp 1675624447 192.168.0.196 0 typ host tcptype active", false, }, { diff --git a/candidatetype.go b/candidatetype.go index 3972934c..2a231bad 100644 --- a/candidatetype.go +++ b/candidatetype.go @@ -38,18 +38,24 @@ func (c CandidateType) String() string { // The RECOMMENDED values are 126 for host candidates, 100 // for server reflexive candidates, 110 for peer reflexive candidates, // and 0 for relayed candidates. -func (c CandidateType) Preference() uint16 { +func (c CandidateType) Preference(networkType NetworkType, tcpOffset uint16) uint16 { + var result uint16 switch c { case CandidateTypeHost: - return 126 + result = 126 case CandidateTypePeerReflexive: - return 110 + result = 110 case CandidateTypeServerReflexive: - return 100 + result = 100 case CandidateTypeRelay, CandidateTypeUnspecified: return 0 + default: + return 0 + } + if networkType.IsTCP() { + return result - tcpOffset } - return 0 + return result } func containsCandidateType(candidateType CandidateType, candidateTypeList []CandidateType) bool { diff --git a/candidatetype_test.go b/candidatetype_test.go new file mode 100644 index 00000000..1bc89df6 --- /dev/null +++ b/candidatetype_test.go @@ -0,0 +1,42 @@ +// SPDX-FileCopyrightText: 2023 The Pion community +// SPDX-License-Identifier: MIT + +package ice + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestCandidateTypePreference(t *testing.T) { + r := require.New(t) + + hostDefaultPreference := uint16(126) + prflxDefaultPreference := uint16(110) + srflxDefaultPreference := uint16(100) + relayDefaultPreference := uint16(0) + unspecifiedDefaultPreference := uint16(0) + + tcpOffsets := []uint16{0, 10} + + for _, tcpOffset := range tcpOffsets { + for _, networkType := range supportedNetworkTypes() { + if networkType.IsTCP() { + r.Equal(hostDefaultPreference-tcpOffset, CandidateTypeHost.Preference(networkType, tcpOffset)) + r.Equal(prflxDefaultPreference-tcpOffset, CandidateTypePeerReflexive.Preference(networkType, tcpOffset)) + r.Equal(srflxDefaultPreference-tcpOffset, CandidateTypeServerReflexive.Preference(networkType, tcpOffset)) + } else { + r.Equal(hostDefaultPreference, CandidateTypeHost.Preference(networkType, tcpOffset)) + r.Equal(prflxDefaultPreference, CandidateTypePeerReflexive.Preference(networkType, tcpOffset)) + r.Equal(srflxDefaultPreference, CandidateTypeServerReflexive.Preference(networkType, tcpOffset)) + } + } + } + for _, tcpOffset := range tcpOffsets { + for _, networkType := range supportedNetworkTypes() { + r.Equal(relayDefaultPreference, CandidateTypeRelay.Preference(networkType, tcpOffset)) + r.Equal(unspecifiedDefaultPreference, CandidateTypeUnspecified.Preference(networkType, tcpOffset)) + } + } +} diff --git a/gather.go b/gather.go index 15e2da14..05f1874e 100644 --- a/gather.go +++ b/gather.go @@ -25,6 +25,12 @@ const ( stunGatherTimeout = time.Second * 5 ) +type connAndPort struct { + conn net.PacketConn + port int + tcpType TCPType +} + // Close a net.Conn and log if we have a failure func closeConnAndLog(c io.Closer, log logging.LeveledLogger, msg string, args ...interface{}) { if c == nil || (reflect.ValueOf(c).Kind() == reflect.Ptr && reflect.ValueOf(c).IsNil()) { @@ -155,53 +161,21 @@ func (a *Agent) gatherCandidatesLocal(ctx context.Context, networkTypes []Networ } for network := range networks { - type connAndPort struct { - conn net.PacketConn - port int - } - var ( - conns []connAndPort - tcpType TCPType - ) + var connectionConfigurations []connAndPort switch network { case tcp: - if a.tcpMux == nil { - continue - } + // Handle ICE TCP active mode + connectionConfigurations = append(connectionConfigurations, connAndPort{nil, 0, TCPTypeActive}) // Handle ICE TCP passive mode - var muxConns []net.PacketConn - if multi, ok := a.tcpMux.(AllConnsGetter); ok { - a.log.Debugf("GetAllConns by ufrag: %s", a.localUfrag) - muxConns, err = multi.GetAllConns(a.localUfrag, mappedIP.To4() == nil, ip) - if err != nil { - a.log.Warnf("Failed to get all TCP connections by ufrag: %s %s %s", network, ip, a.localUfrag) - continue - } - } else { - a.log.Debugf("GetConn by ufrag: %s", a.localUfrag) - conn, err := a.tcpMux.GetConnByUfrag(a.localUfrag, mappedIP.To4() == nil, ip) - if err != nil { - a.log.Warnf("Failed to get TCP connections by ufrag: %s %s %s", network, ip, a.localUfrag) - continue - } - muxConns = []net.PacketConn{conn} + if a.tcpMux != nil { + connectionConfigurations = a.getTCPMuxConnections(mappedIP, ip, network, connectionConfigurations) } - - // Extract the port for each PacketConn we got. - for _, conn := range muxConns { - if tcpConn, ok := conn.LocalAddr().(*net.TCPAddr); ok { - conns = append(conns, connAndPort{conn, tcpConn.Port}) - } else { - a.log.Warnf("Failed to get port of connection from TCPMux: %s %s %s", network, ip, a.localUfrag) - } - } - if len(conns) == 0 { + if len(connectionConfigurations) == 0 { // Didn't succeed with any, try the next network. continue } - tcpType = TCPTypePassive // Is there a way to verify that the listen address is even // accessible from the current interface. case udp: @@ -212,36 +186,36 @@ func (a *Agent) gatherCandidatesLocal(ctx context.Context, networkTypes []Networ } if udpConn, ok := conn.LocalAddr().(*net.UDPAddr); ok { - conns = append(conns, connAndPort{conn, udpConn.Port}) + connectionConfigurations = append(connectionConfigurations, connAndPort{conn, udpConn.Port, TCPTypeUnspecified}) } else { a.log.Warnf("Failed to get port of UDPAddr from ListenUDPInPortRange: %s %s %s", network, ip, a.localUfrag) continue } } - for _, connAndPort := range conns { + for _, connectionConfiguration := range connectionConfigurations { hostConfig := CandidateHostConfig{ Network: network, Address: address, - Port: connAndPort.port, + Port: connectionConfiguration.port, Component: ComponentRTP, - TCPType: tcpType, + TCPType: connectionConfiguration.tcpType, } c, err := NewCandidateHost(&hostConfig) if err != nil { - closeConnAndLog(connAndPort.conn, a.log, "failed to create host candidate: %s %s %d: %v", network, mappedIP, connAndPort.port, err) + closeConnAndLog(connectionConfiguration.conn, a.log, "failed to create host candidate: %s %s %d: %v", network, mappedIP, connectionConfiguration.port, err) continue } if a.mDNSMode == MulticastDNSModeQueryAndGather { if err = c.setIP(ip); err != nil { - closeConnAndLog(connAndPort.conn, a.log, "failed to create host candidate: %s %s %d: %v", network, mappedIP, connAndPort.port, err) + closeConnAndLog(connectionConfiguration.conn, a.log, "failed to create host candidate: %s %s %d: %v", network, mappedIP, connectionConfiguration.port, err) continue } } - if err := a.addCandidate(ctx, c, connAndPort.conn); err != nil { + if err := a.addCandidate(ctx, c, connectionConfiguration.conn); err != nil { if closeErr := c.close(); closeErr != nil { a.log.Warnf("Failed to close candidate: %v", closeErr) } @@ -252,6 +226,37 @@ func (a *Agent) gatherCandidatesLocal(ctx context.Context, networkTypes []Networ } } +func (a *Agent) getTCPMuxConnections(mappedIP net.IP, ip net.IP, network string, conns []connAndPort) []connAndPort { + var muxConns []net.PacketConn + if multi, ok := a.tcpMux.(AllConnsGetter); ok { + a.log.Debugf("GetAllConns by ufrag: %s", a.localUfrag) + var err error + muxConns, err = multi.GetAllConns(a.localUfrag, mappedIP.To4() == nil, ip) + if err != nil { + a.log.Warnf("Failed to get all TCP connections by ufrag: %s %s %s", network, ip, a.localUfrag) + return conns + } + } else { + a.log.Debugf("GetConn by ufrag: %s", a.localUfrag) + conn, err := a.tcpMux.GetConnByUfrag(a.localUfrag, mappedIP.To4() == nil, ip) + if err != nil { + a.log.Warnf("Failed to get TCP connections by ufrag: %s %s %s", network, ip, a.localUfrag) + return conns + } + muxConns = []net.PacketConn{conn} + } + + // Extract the port for each PacketConn we got. + for _, conn := range muxConns { + if tcpConn, ok := conn.LocalAddr().(*net.TCPAddr); ok { + conns = append(conns, connAndPort{conn, tcpConn.Port, TCPTypePassive}) + } else { + a.log.Warnf("Failed to get port of connection from TCPMux: %s %s %s", network, ip, a.localUfrag) + } + } + return conns +} + func (a *Agent) gatherCandidatesLocalUDPMux(ctx context.Context) error { //nolint:gocognit if a.udpMux == nil { return errUDPMuxDisabled diff --git a/gather_test.go b/gather_test.go index 9baf11b7..ecafd509 100644 --- a/gather_test.go +++ b/gather_test.go @@ -675,7 +675,7 @@ func TestMultiUDPMuxUsage(t *testing.T) { } a, err := NewAgent(&AgentConfig{ - NetworkTypes: supportedNetworkTypes(), + NetworkTypes: []NetworkType{NetworkTypeUDP4, NetworkTypeUDP6}, CandidateTypes: []CandidateType{CandidateTypeHost}, UDPMux: NewMultiUDPMuxDefault(udpMuxInstances...), }) @@ -751,7 +751,8 @@ func TestMultiTCPMuxUsage(t *testing.T) { portFound := make(map[int]bool) for c := range candidateCh { - if c.NetworkType().IsTCP() { + activeCandidate := c.Port() == 0 + if c.NetworkType().IsTCP() && !activeCandidate { portFound[c.Port()] = true } }