Skip to content

Commit 72773df

Browse files
committed
Implement active TCP candidate type (RFC6544)
By default TCP candidate type priority is UDP one minus 27 (except relay), so that UDP+srlfx priority > TCP+host priority. That priority offset can be configured using AgentConfig. Ipv6 TCP candidates are also supported. Open issue: local active TCP candidate can be connected only with 1 remote passive candidate.
1 parent 886f123 commit 72773df

11 files changed

+356
-64
lines changed

agent.go

+50-1
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"context"
1010
"fmt"
1111
"net"
12+
"strconv"
1213
"strings"
1314
"sync"
1415
"sync/atomic"
@@ -72,6 +73,8 @@ type Agent struct {
7273
prflxAcceptanceMinWait time.Duration
7374
relayAcceptanceMinWait time.Duration
7475

76+
tcpPriorityOffset uint16
77+
7578
portMin uint16
7679
portMax uint16
7780

@@ -585,6 +588,44 @@ func (a *Agent) getBestValidCandidatePair() *CandidatePair {
585588
}
586589

587590
func (a *Agent) addPair(local, remote Candidate) *CandidatePair {
591+
if local.TCPType() == TCPTypeActive && remote.TCPType() == TCPTypeActive {
592+
return nil
593+
}
594+
595+
if local.TCPType() == TCPTypeActive && remote.TCPType() == TCPTypePassive {
596+
addressToConnect := net.JoinHostPort(remote.Address(), strconv.Itoa(remote.Port()))
597+
598+
conn, err := a.net.Dial("tcp", addressToConnect)
599+
if err != nil {
600+
a.log.Errorf("Failed to dial TCP address %s: %v", addressToConnect, err)
601+
return nil
602+
}
603+
604+
packetConn := newTCPPacketConn(tcpPacketParams{
605+
ReadBuffer: tcpReadBufferSize,
606+
LocalAddr: conn.LocalAddr(),
607+
Logger: a.log,
608+
})
609+
610+
if err = packetConn.AddConn(conn, nil); err != nil {
611+
a.log.Errorf("Failed to add TCP connection: %v", err)
612+
return nil
613+
}
614+
615+
localAddress, ok := conn.LocalAddr().(*net.TCPAddr)
616+
if !ok {
617+
a.log.Errorf("Failed to cast local address to TCP address")
618+
return nil
619+
}
620+
621+
localCandidateHost, ok := local.(*CandidateHost)
622+
if !ok {
623+
a.log.Errorf("Failed to cast local candidate to CandidateHost")
624+
return nil
625+
}
626+
localCandidateHost.port = localAddress.Port // this causes a data race with candidateBase.Port()
627+
local.start(a, packetConn, a.startedCh)
628+
}
588629
p := newCandidatePair(local, remote, a.isControlling)
589630
a.checklist = append(a.checklist, p)
590631
return p
@@ -761,7 +802,9 @@ func (a *Agent) addCandidate(ctx context.Context, c Candidate, candidateConn net
761802
}
762803
}
763804

764-
c.start(a, candidateConn, a.startedCh)
805+
if c.TCPType() != TCPTypeActive {
806+
c.start(a, candidateConn, a.startedCh)
807+
}
765808

766809
set = append(set, c)
767810
a.localCandidates[c.NetworkType()] = set
@@ -1029,13 +1072,19 @@ func (a *Agent) handleInbound(m *stun.Message, local Candidate, remote net.Addr)
10291072
return
10301073
}
10311074

1075+
remoteTCPType := TCPTypeUnspecified
1076+
if local.TCPType() == TCPTypePassive {
1077+
remoteTCPType = TCPTypeActive
1078+
}
1079+
10321080
prflxCandidateConfig := CandidatePeerReflexiveConfig{
10331081
Network: networkType.String(),
10341082
Address: ip.String(),
10351083
Port: port,
10361084
Component: local.Component(),
10371085
RelAddr: "",
10381086
RelPort: 0,
1087+
TCPType: remoteTCPType,
10391088
}
10401089

10411090
prflxCandidate, err := NewCandidatePeerReflexive(&prflxCandidateConfig)

agent_active_tcp_test.go

+162
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
// SPDX-FileCopyrightText: 2023 The Pion community <https://pion.ly>
2+
// SPDX-License-Identifier: MIT
3+
4+
//go:build !js
5+
// +build !js
6+
7+
package ice
8+
9+
import (
10+
"net"
11+
"testing"
12+
"time"
13+
14+
"github.com/pion/logging"
15+
"github.com/pion/transport/v2/stdnet"
16+
"github.com/pion/transport/v2/test"
17+
"github.com/stretchr/testify/require"
18+
)
19+
20+
func getLocalIPAddress(t *testing.T, networkType NetworkType) net.IP {
21+
net, err := stdnet.NewNet()
22+
require.NoError(t, err)
23+
localIPs, err := localInterfaces(net, nil, nil, []NetworkType{networkType}, false)
24+
require.NoError(t, err)
25+
require.NotEmpty(t, localIPs)
26+
return localIPs[0]
27+
}
28+
29+
func ipv6Available(t *testing.T) bool {
30+
net, err := stdnet.NewNet()
31+
require.NoError(t, err)
32+
localIPs, err := localInterfaces(net, nil, nil, []NetworkType{NetworkTypeTCP6}, false)
33+
require.NoError(t, err)
34+
return len(localIPs) > 0
35+
}
36+
37+
func TestAgentActiveTCP(t *testing.T) {
38+
report := test.CheckRoutines(t)
39+
defer report()
40+
41+
lim := test.TimeOut(time.Second * 5)
42+
defer lim.Stop()
43+
44+
const listenPort = 7686
45+
type testCase struct {
46+
name string
47+
networkTypes []NetworkType
48+
listenIPAddress net.IP
49+
selectedPairNetworkType string
50+
}
51+
testCases := []testCase{
52+
{
53+
name: "TCP4 connection",
54+
networkTypes: []NetworkType{NetworkTypeTCP4},
55+
listenIPAddress: getLocalIPAddress(t, NetworkTypeTCP4),
56+
selectedPairNetworkType: tcp,
57+
},
58+
{
59+
name: "UDP is preferred over TCP4", // fails some time
60+
networkTypes: supportedNetworkTypes(),
61+
listenIPAddress: getLocalIPAddress(t, NetworkTypeTCP4),
62+
selectedPairNetworkType: udp,
63+
},
64+
}
65+
66+
if ipv6Available(t) {
67+
tcpv6Cases := []testCase{
68+
{
69+
name: "TCP6 connection",
70+
networkTypes: []NetworkType{NetworkTypeTCP6},
71+
listenIPAddress: getLocalIPAddress(t, NetworkTypeTCP6),
72+
selectedPairNetworkType: tcp,
73+
},
74+
{
75+
name: "UDP is preferred over TCP6", // fails some time
76+
networkTypes: supportedNetworkTypes(),
77+
listenIPAddress: getLocalIPAddress(t, NetworkTypeTCP6),
78+
selectedPairNetworkType: udp,
79+
},
80+
}
81+
testCases = append(testCases, tcpv6Cases...)
82+
}
83+
84+
for _, testCase := range testCases {
85+
t.Run(testCase.name, func(t *testing.T) {
86+
r := require.New(t)
87+
88+
listener, err := net.ListenTCP("tcp", &net.TCPAddr{
89+
IP: testCase.listenIPAddress,
90+
Port: listenPort,
91+
})
92+
r.NoError(err)
93+
defer func() {
94+
_ = listener.Close()
95+
}()
96+
97+
loggerFactory := logging.NewDefaultLoggerFactory()
98+
loggerFactory.DefaultLogLevel.Set(logging.LogLevelTrace)
99+
100+
tcpMux := NewTCPMuxDefault(TCPMuxParams{
101+
Listener: listener,
102+
Logger: loggerFactory.NewLogger("passive-ice-tcp-mux"),
103+
ReadBufferSize: 20,
104+
})
105+
106+
defer func() {
107+
_ = tcpMux.Close()
108+
}()
109+
110+
r.NotNil(tcpMux.LocalAddr(), "tcpMux.LocalAddr() is nil")
111+
112+
hostAcceptanceMinWait := 100 * time.Millisecond
113+
passiveAgent, err := NewAgent(&AgentConfig{
114+
TCPMux: tcpMux,
115+
CandidateTypes: []CandidateType{CandidateTypeHost},
116+
NetworkTypes: testCase.networkTypes,
117+
LoggerFactory: loggerFactory,
118+
IncludeLoopback: true,
119+
HostAcceptanceMinWait: &hostAcceptanceMinWait,
120+
})
121+
r.NoError(err)
122+
r.NotNil(passiveAgent)
123+
124+
activeAgent, err := NewAgent(&AgentConfig{
125+
CandidateTypes: []CandidateType{CandidateTypeHost},
126+
NetworkTypes: testCase.networkTypes,
127+
LoggerFactory: loggerFactory,
128+
HostAcceptanceMinWait: &hostAcceptanceMinWait,
129+
})
130+
r.NoError(err)
131+
r.NotNil(activeAgent)
132+
133+
passiveAgentConn, activeAgenConn := connect(passiveAgent, activeAgent)
134+
r.NotNil(passiveAgentConn)
135+
r.NotNil(activeAgenConn)
136+
137+
pair := passiveAgent.getSelectedPair()
138+
r.NotNil(pair)
139+
r.Equal(testCase.selectedPairNetworkType, pair.Local.NetworkType().NetworkShort())
140+
141+
foo := []byte("foo")
142+
_, err = passiveAgentConn.Write(foo)
143+
r.NoError(err)
144+
145+
buffer := make([]byte, 1024)
146+
n, err := activeAgenConn.Read(buffer)
147+
r.NoError(err)
148+
r.Equal(foo, buffer[:n])
149+
150+
bar := []byte("bar")
151+
_, err = activeAgenConn.Write(bar)
152+
r.NoError(err)
153+
154+
n, err = passiveAgentConn.Read(buffer)
155+
r.NoError(err)
156+
r.Equal(bar, buffer[:n])
157+
158+
r.NoError(activeAgenConn.Close())
159+
r.NoError(passiveAgentConn.Close())
160+
})
161+
}
162+
}

agent_config.go

+19
Original file line numberDiff line numberDiff line change
@@ -41,11 +41,18 @@ const (
4141
// defaultMaxBindingRequests is the maximum number of binding requests before considering a pair failed
4242
defaultMaxBindingRequests = 7
4343

44+
// TCPPriorityOffset is a number which is subtracted from the default (UDP) candidate type preference
45+
// for host, srflx and prfx candidate types.
46+
defaultTCPPriorityOffset = 27
47+
4448
// maxBufferSize is the number of bytes that can be buffered before we start to error
4549
maxBufferSize = 1000 * 1000 // 1MB
4650

4751
// maxBindingRequestTimeout is the wait time before binding requests can be deleted
4852
maxBindingRequestTimeout = 4000 * time.Millisecond
53+
54+
// tcpReadBufferSize is the size of the read buffer of tcpPacketConn used by active tcp candidate
55+
tcpReadBufferSize = 8
4956
)
5057

5158
func defaultCandidateTypes() []CandidateType {
@@ -174,6 +181,12 @@ type AgentConfig struct {
174181

175182
// Include loopback addresses in the candidate list.
176183
IncludeLoopback bool
184+
185+
// TCPPriorityOffset is a number which is subtracted from the default (UDP) candidate type preference
186+
// for host, srflx and prfx candidate types. It helps to configure relative preference of UDP candidates
187+
// against TCP ones. Relay candidates for TCP and UDP are always 0 and not affected by this setting.
188+
// When this is nil, defaultTCPPriorityOffset is used.
189+
TCPPriorityOffset *uint16
177190
}
178191

179192
// initWithDefaults populates an agent and falls back to defaults if fields are unset
@@ -208,6 +221,12 @@ func (config *AgentConfig) initWithDefaults(a *Agent) {
208221
a.relayAcceptanceMinWait = *config.RelayAcceptanceMinWait
209222
}
210223

224+
if config.TCPPriorityOffset == nil {
225+
a.tcpPriorityOffset = defaultTCPPriorityOffset
226+
} else {
227+
a.tcpPriorityOffset = *config.TCPPriorityOffset
228+
}
229+
211230
if config.DisconnectedTimeout == nil {
212231
a.disconnectedTimeout = defaultDisconnectedTimeout
213232
} else {

agent_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -1637,7 +1637,7 @@ func TestAcceptAggressiveNomination(t *testing.T) {
16371637

16381638
KeepaliveInterval := time.Hour
16391639
cfg0 := &AgentConfig{
1640-
NetworkTypes: supportedNetworkTypes(),
1640+
NetworkTypes: []NetworkType{NetworkTypeUDP4, NetworkTypeUDP6},
16411641
MulticastDNSMode: MulticastDNSModeDisabled,
16421642
Net: net0,
16431643

@@ -1652,7 +1652,7 @@ func TestAcceptAggressiveNomination(t *testing.T) {
16521652
require.NoError(t, aAgent.OnConnectionStateChange(aNotifier))
16531653

16541654
cfg1 := &AgentConfig{
1655-
NetworkTypes: supportedNetworkTypes(),
1655+
NetworkTypes: []NetworkType{NetworkTypeUDP4, NetworkTypeUDP6},
16561656
MulticastDNSMode: MulticastDNSModeDisabled,
16571657
Net: net1,
16581658
KeepaliveInterval: &KeepaliveInterval,

candidate_base.go

+8-2
Original file line numberDiff line numberDiff line change
@@ -355,7 +355,13 @@ func (c *candidateBase) Priority() uint32 {
355355
// candidates for a particular component for a particular data stream
356356
// that have the same type, the local preference MUST be unique for each
357357
// one.
358-
return (1<<24)*uint32(c.Type().Preference()) +
358+
359+
var tcpPriorityOffset uint16 = defaultTCPPriorityOffset
360+
if c.agent() != nil {
361+
tcpPriorityOffset = c.agent().tcpPriorityOffset
362+
}
363+
364+
return (1<<24)*uint32(c.Type().Preference(c.networkType, tcpPriorityOffset)) +
359365
(1<<8)*uint32(c.LocalPreference()) +
360366
uint32(256-c.Component())
361367
}
@@ -533,7 +539,7 @@ func UnmarshalCandidate(raw string) (Candidate, error) {
533539
case "srflx":
534540
return NewCandidateServerReflexive(&CandidateServerReflexiveConfig{"", protocol, address, port, component, priority, foundation, relatedAddress, relatedPort})
535541
case "prflx":
536-
return NewCandidatePeerReflexive(&CandidatePeerReflexiveConfig{"", protocol, address, port, component, priority, foundation, relatedAddress, relatedPort})
542+
return NewCandidatePeerReflexive(&CandidatePeerReflexiveConfig{"", protocol, address, port, component, priority, foundation, relatedAddress, relatedPort, tcpType})
537543
case "relay":
538544
return NewCandidateRelay(&CandidateRelayConfig{"", protocol, address, port, component, priority, foundation, relatedAddress, relatedPort, "", nil})
539545
default:

candidate_peer_reflexive.go

+2
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ type CandidatePeerReflexiveConfig struct {
2424
Foundation string
2525
RelAddr string
2626
RelPort int
27+
TCPType TCPType
2728
}
2829

2930
// NewCandidatePeerReflexive creates a new peer reflective candidate
@@ -49,6 +50,7 @@ func NewCandidatePeerReflexive(config *CandidatePeerReflexiveConfig) (*Candidate
4950
id: candidateID,
5051
networkType: networkType,
5152
candidateType: CandidateTypePeerReflexive,
53+
tcpType: config.TCPType,
5254
address: config.Address,
5355
port: config.Port,
5456
resolvedAddr: createAddr(networkType, ip, config.Port),

0 commit comments

Comments
 (0)