Skip to content

Support active TCP candidates (RFC6544) #565

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 50 additions & 1 deletion agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"context"
"fmt"
"net"
"strconv"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -72,6 +73,8 @@ type Agent struct {
prflxAcceptanceMinWait time.Duration
relayAcceptanceMinWait time.Duration

tcpPriorityOffset uint16

portMin uint16
portMax uint16

Expand Down Expand Up @@ -585,6 +588,44 @@ func (a *Agent) getBestValidCandidatePair() *CandidatePair {
}

func (a *Agent) addPair(local, remote Candidate) *CandidatePair {
if local.TCPType() == TCPTypeActive && remote.TCPType() == TCPTypeActive {
return nil
}

if local.TCPType() == TCPTypeActive && remote.TCPType() == TCPTypePassive {
addressToConnect := net.JoinHostPort(remote.Address(), strconv.Itoa(remote.Port()))

conn, err := a.net.Dial("tcp", addressToConnect)
if err != nil {
a.log.Errorf("Failed to dial TCP address %s: %v", addressToConnect, err)
return nil
}

packetConn := newTCPPacketConn(tcpPacketParams{
ReadBuffer: tcpReadBufferSize,
LocalAddr: conn.LocalAddr(),
Logger: a.log,
})

if err = packetConn.AddConn(conn, nil); err != nil {
a.log.Errorf("Failed to add TCP connection: %v", err)
return nil
}

localAddress, ok := conn.LocalAddr().(*net.TCPAddr)
if !ok {
a.log.Errorf("Failed to cast local address to TCP address")
return nil
}

localCandidateHost, ok := local.(*CandidateHost)
if !ok {
a.log.Errorf("Failed to cast local candidate to CandidateHost")
return nil
}
localCandidateHost.port = localAddress.Port // this causes a data race with candidateBase.Port()
local.start(a, packetConn, a.startedCh)
}
p := newCandidatePair(local, remote, a.isControlling)
a.checklist = append(a.checklist, p)
return p
Expand Down Expand Up @@ -761,7 +802,9 @@ func (a *Agent) addCandidate(ctx context.Context, c Candidate, candidateConn net
}
}

c.start(a, candidateConn, a.startedCh)
if c.TCPType() != TCPTypeActive {
c.start(a, candidateConn, a.startedCh)
}

set = append(set, c)
a.localCandidates[c.NetworkType()] = set
Expand Down Expand Up @@ -1029,13 +1072,19 @@ func (a *Agent) handleInbound(m *stun.Message, local Candidate, remote net.Addr)
return
}

remoteTCPType := TCPTypeUnspecified
if local.TCPType() == TCPTypePassive {
remoteTCPType = TCPTypeActive
}

prflxCandidateConfig := CandidatePeerReflexiveConfig{
Network: networkType.String(),
Address: ip.String(),
Port: port,
Component: local.Component(),
RelAddr: "",
RelPort: 0,
TCPType: remoteTCPType,
}

prflxCandidate, err := NewCandidatePeerReflexive(&prflxCandidateConfig)
Expand Down
162 changes: 162 additions & 0 deletions agent_active_tcp_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
// SPDX-FileCopyrightText: 2023 The Pion community <https://pion.ly>
// SPDX-License-Identifier: MIT

//go:build !js
// +build !js

package ice

import (
"net"
"testing"
"time"

"github.com/pion/logging"
"github.com/pion/transport/v2/stdnet"
"github.com/pion/transport/v2/test"
"github.com/stretchr/testify/require"
)

func getLocalIPAddress(t *testing.T, networkType NetworkType) net.IP {
net, err := stdnet.NewNet()
require.NoError(t, err)
localIPs, err := localInterfaces(net, nil, nil, []NetworkType{networkType}, false)
require.NoError(t, err)
require.NotEmpty(t, localIPs)
return localIPs[0]
}

func ipv6Available(t *testing.T) bool {
net, err := stdnet.NewNet()
require.NoError(t, err)
localIPs, err := localInterfaces(net, nil, nil, []NetworkType{NetworkTypeTCP6}, false)
require.NoError(t, err)
return len(localIPs) > 0
}

func TestAgentActiveTCP(t *testing.T) {
report := test.CheckRoutines(t)
defer report()

lim := test.TimeOut(time.Second * 5)
defer lim.Stop()

const listenPort = 7686
type testCase struct {
name string
networkTypes []NetworkType
listenIPAddress net.IP
selectedPairNetworkType string
}
testCases := []testCase{
{
name: "TCP4 connection",
networkTypes: []NetworkType{NetworkTypeTCP4},
listenIPAddress: getLocalIPAddress(t, NetworkTypeTCP4),
selectedPairNetworkType: tcp,
},
{
name: "UDP is preferred over TCP4", // fails some time
networkTypes: supportedNetworkTypes(),
listenIPAddress: getLocalIPAddress(t, NetworkTypeTCP4),
selectedPairNetworkType: udp,
},
}

if ipv6Available(t) {
tcpv6Cases := []testCase{
{
name: "TCP6 connection",
networkTypes: []NetworkType{NetworkTypeTCP6},
listenIPAddress: getLocalIPAddress(t, NetworkTypeTCP6),
selectedPairNetworkType: tcp,
},
{
name: "UDP is preferred over TCP6", // fails some time
networkTypes: supportedNetworkTypes(),
listenIPAddress: getLocalIPAddress(t, NetworkTypeTCP6),
selectedPairNetworkType: udp,
},
}
testCases = append(testCases, tcpv6Cases...)
}

for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
r := require.New(t)

listener, err := net.ListenTCP("tcp", &net.TCPAddr{
IP: testCase.listenIPAddress,
Port: listenPort,
})
r.NoError(err)
defer func() {
_ = listener.Close()
}()

loggerFactory := logging.NewDefaultLoggerFactory()
loggerFactory.DefaultLogLevel.Set(logging.LogLevelTrace)

tcpMux := NewTCPMuxDefault(TCPMuxParams{
Listener: listener,
Logger: loggerFactory.NewLogger("passive-ice-tcp-mux"),
ReadBufferSize: 20,
})

defer func() {
_ = tcpMux.Close()
}()

r.NotNil(tcpMux.LocalAddr(), "tcpMux.LocalAddr() is nil")

hostAcceptanceMinWait := 100 * time.Millisecond
passiveAgent, err := NewAgent(&AgentConfig{
TCPMux: tcpMux,
CandidateTypes: []CandidateType{CandidateTypeHost},
NetworkTypes: testCase.networkTypes,
LoggerFactory: loggerFactory,
IncludeLoopback: true,
HostAcceptanceMinWait: &hostAcceptanceMinWait,
})
r.NoError(err)
r.NotNil(passiveAgent)

activeAgent, err := NewAgent(&AgentConfig{
CandidateTypes: []CandidateType{CandidateTypeHost},
NetworkTypes: testCase.networkTypes,
LoggerFactory: loggerFactory,
HostAcceptanceMinWait: &hostAcceptanceMinWait,
})
r.NoError(err)
r.NotNil(activeAgent)

passiveAgentConn, activeAgenConn := connect(passiveAgent, activeAgent)
r.NotNil(passiveAgentConn)
r.NotNil(activeAgenConn)

pair := passiveAgent.getSelectedPair()
r.NotNil(pair)
r.Equal(testCase.selectedPairNetworkType, pair.Local.NetworkType().NetworkShort())

foo := []byte("foo")
_, err = passiveAgentConn.Write(foo)
r.NoError(err)

buffer := make([]byte, 1024)
n, err := activeAgenConn.Read(buffer)
r.NoError(err)
r.Equal(foo, buffer[:n])

bar := []byte("bar")
_, err = activeAgenConn.Write(bar)
r.NoError(err)

n, err = passiveAgentConn.Read(buffer)
r.NoError(err)
r.Equal(bar, buffer[:n])

r.NoError(activeAgenConn.Close())
r.NoError(passiveAgentConn.Close())
})
}
}
19 changes: 19 additions & 0 deletions agent_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,18 @@ const (
// defaultMaxBindingRequests is the maximum number of binding requests before considering a pair failed
defaultMaxBindingRequests = 7

// TCPPriorityOffset is a number which is subtracted from the default (UDP) candidate type preference
// for host, srflx and prfx candidate types.
defaultTCPPriorityOffset = 27

// maxBufferSize is the number of bytes that can be buffered before we start to error
maxBufferSize = 1000 * 1000 // 1MB

// maxBindingRequestTimeout is the wait time before binding requests can be deleted
maxBindingRequestTimeout = 4000 * time.Millisecond

// tcpReadBufferSize is the size of the read buffer of tcpPacketConn used by active tcp candidate
tcpReadBufferSize = 8
)

func defaultCandidateTypes() []CandidateType {
Expand Down Expand Up @@ -174,6 +181,12 @@ type AgentConfig struct {

// Include loopback addresses in the candidate list.
IncludeLoopback bool

// TCPPriorityOffset is a number which is subtracted from the default (UDP) candidate type preference
// for host, srflx and prfx candidate types. It helps to configure relative preference of UDP candidates
// against TCP ones. Relay candidates for TCP and UDP are always 0 and not affected by this setting.
// When this is nil, defaultTCPPriorityOffset is used.
TCPPriorityOffset *uint16
}

// initWithDefaults populates an agent and falls back to defaults if fields are unset
Expand Down Expand Up @@ -208,6 +221,12 @@ func (config *AgentConfig) initWithDefaults(a *Agent) {
a.relayAcceptanceMinWait = *config.RelayAcceptanceMinWait
}

if config.TCPPriorityOffset == nil {
a.tcpPriorityOffset = defaultTCPPriorityOffset
} else {
a.tcpPriorityOffset = *config.TCPPriorityOffset
}

if config.DisconnectedTimeout == nil {
a.disconnectedTimeout = defaultDisconnectedTimeout
} else {
Expand Down
4 changes: 2 additions & 2 deletions agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1637,7 +1637,7 @@ func TestAcceptAggressiveNomination(t *testing.T) {

KeepaliveInterval := time.Hour
cfg0 := &AgentConfig{
NetworkTypes: supportedNetworkTypes(),
NetworkTypes: []NetworkType{NetworkTypeUDP4, NetworkTypeUDP6},
MulticastDNSMode: MulticastDNSModeDisabled,
Net: net0,

Expand All @@ -1652,7 +1652,7 @@ func TestAcceptAggressiveNomination(t *testing.T) {
require.NoError(t, aAgent.OnConnectionStateChange(aNotifier))

cfg1 := &AgentConfig{
NetworkTypes: supportedNetworkTypes(),
NetworkTypes: []NetworkType{NetworkTypeUDP4, NetworkTypeUDP6},
MulticastDNSMode: MulticastDNSModeDisabled,
Net: net1,
KeepaliveInterval: &KeepaliveInterval,
Expand Down
14 changes: 10 additions & 4 deletions candidate_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ func (c *candidateBase) handleInboundPacket(buf []byte, srcAddr net.Addr) {
copy(m.Raw, buf)

if err := m.Decode(); err != nil {
a.log.Warnf("Failed to handle decode ICE from %s to %s: %v", c.addr(), srcAddr, err)
a.log.Warnf("Failed to handle decode ICE from %s to %s: %v", srcAddr, c.addr(), err)
return
}

Expand All @@ -279,7 +279,7 @@ func (c *candidateBase) handleInboundPacket(buf []byte, srcAddr net.Addr) {
if !c.validateSTUNTrafficCache(srcAddr) {
remoteCandidate, valid := a.validateNonSTUNTraffic(c, srcAddr) //nolint:contextcheck
if !valid {
a.log.Warnf("Discarded message from %s, not a valid remote candidate", c.addr())
a.log.Warnf("Discarded message to %s, not a valid remote candidate", c.addr())
return
}
c.addRemoteCandidateCache(remoteCandidate, srcAddr)
Expand Down Expand Up @@ -355,7 +355,13 @@ func (c *candidateBase) Priority() uint32 {
// candidates for a particular component for a particular data stream
// that have the same type, the local preference MUST be unique for each
// one.
return (1<<24)*uint32(c.Type().Preference()) +

var tcpPriorityOffset uint16 = defaultTCPPriorityOffset
if c.agent() != nil {
tcpPriorityOffset = c.agent().tcpPriorityOffset
}

return (1<<24)*uint32(c.Type().Preference(c.networkType, tcpPriorityOffset)) +
(1<<8)*uint32(c.LocalPreference()) +
uint32(256-c.Component())
}
Expand Down Expand Up @@ -533,7 +539,7 @@ func UnmarshalCandidate(raw string) (Candidate, error) {
case "srflx":
return NewCandidateServerReflexive(&CandidateServerReflexiveConfig{"", protocol, address, port, component, priority, foundation, relatedAddress, relatedPort})
case "prflx":
return NewCandidatePeerReflexive(&CandidatePeerReflexiveConfig{"", protocol, address, port, component, priority, foundation, relatedAddress, relatedPort})
return NewCandidatePeerReflexive(&CandidatePeerReflexiveConfig{"", protocol, address, port, component, priority, foundation, relatedAddress, relatedPort, tcpType})
case "relay":
return NewCandidateRelay(&CandidateRelayConfig{"", protocol, address, port, component, priority, foundation, relatedAddress, relatedPort, "", nil})
default:
Expand Down
2 changes: 2 additions & 0 deletions candidate_peer_reflexive.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type CandidatePeerReflexiveConfig struct {
Foundation string
RelAddr string
RelPort int
TCPType TCPType
}

// NewCandidatePeerReflexive creates a new peer reflective candidate
Expand All @@ -49,6 +50,7 @@ func NewCandidatePeerReflexive(config *CandidatePeerReflexiveConfig) (*Candidate
id: candidateID,
networkType: networkType,
candidateType: CandidateTypePeerReflexive,
tcpType: config.TCPType,
address: config.Address,
port: config.Port,
resolvedAddr: createAddr(networkType, ip, config.Port),
Expand Down
Loading