Skip to content

Commit e1e061f

Browse files
committed
Implement server side for TCP allocations
1 parent 521e5ad commit e1e061f

24 files changed

+963
-153
lines changed

client_test.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"github.com/pion/logging"
1515
"github.com/pion/stun"
1616
"github.com/pion/turn/v2/internal/proto"
17+
"github.com/pion/turn/v2/utils"
1718
"github.com/stretchr/testify/assert"
1819
"github.com/stretchr/testify/require"
1920
)
@@ -227,7 +228,7 @@ func TestTCPClient(t *testing.T) {
227228
require.NoError(t, err)
228229

229230
client, err := NewClient(&ClientConfig{
230-
Conn: NewSTUNConn(conn),
231+
Conn: utils.NewSTUNConn(conn),
231232
STUNServerAddr: serverAddr,
232233
TURNServerAddr: serverAddr,
233234
Username: "foo",

examples/turn-client/tcp-alloc/main.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414

1515
"github.com/pion/logging"
1616
"github.com/pion/turn/v2"
17+
"github.com/pion/turn/v2/utils"
1718
)
1819

1920
func setupSignalingChannel(addrCh chan string, signaling bool, relayAddr string) {
@@ -98,7 +99,7 @@ func main() {
9899
cfg := &turn.ClientConfig{
99100
STUNServerAddr: turnServerAddr,
100101
TURNServerAddr: turnServerAddr,
101-
Conn: turn.NewSTUNConn(conn),
102+
Conn: utils.NewSTUNConn(conn),
102103
Username: cred[0],
103104
Password: cred[1],
104105
Realm: *realm,

examples/turn-client/tcp/main.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414

1515
"github.com/pion/logging"
1616
"github.com/pion/turn/v2"
17+
"github.com/pion/turn/v2/utils"
1718
)
1819

1920
func main() {
@@ -51,7 +52,7 @@ func main() {
5152
cfg := &turn.ClientConfig{
5253
STUNServerAddr: turnServerAddr,
5354
TURNServerAddr: turnServerAddr,
54-
Conn: turn.NewSTUNConn(conn),
55+
Conn: utils.NewSTUNConn(conn),
5556
Username: cred[0],
5657
Password: cred[1],
5758
Realm: *realm,

examples/turn-server/tls/main.go

+1
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ func main() {
7777
RelayAddress: net.ParseIP(*publicIP),
7878
Address: "0.0.0.0",
7979
},
80+
Protocol: "tls",
8081
},
8182
},
8283
})

internal/allocation/allocation.go

+163-19
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,14 @@ import (
88
"net"
99
"sync"
1010
"sync/atomic"
11+
"syscall"
1112
"time"
1213

1314
"github.com/pion/logging"
1415
"github.com/pion/stun"
1516
"github.com/pion/turn/v2/internal/ipnet"
1617
"github.com/pion/turn/v2/internal/proto"
18+
"golang.org/x/sys/unix"
1719
)
1820

1921
type allocationResponse struct {
@@ -24,18 +26,23 @@ type allocationResponse struct {
2426
// Allocation is tied to a FiveTuple and relays traffic
2527
// use CreateAllocation and GetAllocation to operate
2628
type Allocation struct {
27-
RelayAddr net.Addr
28-
Protocol Protocol
29-
TurnSocket net.PacketConn
30-
RelaySocket net.PacketConn
31-
fiveTuple *FiveTuple
32-
permissionsLock sync.RWMutex
33-
permissions map[string]*Permission
34-
channelBindingsLock sync.RWMutex
35-
channelBindings []*ChannelBind
36-
lifetimeTimer *time.Timer
37-
closed chan interface{}
38-
log logging.LeveledLogger
29+
RelayAddr net.Addr
30+
Protocol Protocol
31+
RequestedTransportProtocol proto.Protocol
32+
TurnSocket net.PacketConn
33+
RelaySocket net.PacketConn
34+
RelayListener net.Listener
35+
fiveTuple *FiveTuple
36+
permissionsLock sync.RWMutex
37+
permissions map[string]*Permission
38+
channelBindingsLock sync.RWMutex
39+
channelBindings []*ChannelBind
40+
lifetimeTimer *time.Timer
41+
closed chan interface{}
42+
log logging.LeveledLogger
43+
connsLock sync.RWMutex
44+
addrToConn map[string]net.Conn
45+
cidToConn map[proto.ConnectionID]net.Conn
3946

4047
// Some clients (Firefox or others using resiprocate's nICE lib) may retry allocation
4148
// with same 5 tuple when received 413, for compatible with these clients,
@@ -45,13 +52,16 @@ type Allocation struct {
4552
}
4653

4754
// NewAllocation creates a new instance of NewAllocation.
48-
func NewAllocation(turnSocket net.PacketConn, fiveTuple *FiveTuple, log logging.LeveledLogger) *Allocation {
55+
func NewAllocation(turnSocket net.PacketConn, fiveTuple *FiveTuple, log logging.LeveledLogger, requestedTransportProtocol proto.Protocol) *Allocation {
4956
return &Allocation{
50-
TurnSocket: turnSocket,
51-
fiveTuple: fiveTuple,
52-
permissions: make(map[string]*Permission, 64),
53-
closed: make(chan interface{}),
54-
log: log,
57+
TurnSocket: turnSocket,
58+
RequestedTransportProtocol: requestedTransportProtocol,
59+
fiveTuple: fiveTuple,
60+
permissions: make(map[string]*Permission, 64),
61+
addrToConn: make(map[string]net.Conn),
62+
cidToConn: make(map[proto.ConnectionID]net.Conn),
63+
closed: make(chan interface{}),
64+
log: log,
5565
}
5666
}
5767

@@ -208,7 +218,11 @@ func (a *Allocation) Close() error {
208218
}
209219
a.channelBindingsLock.RUnlock()
210220

211-
return a.RelaySocket.Close()
221+
if a.RequestedTransportProtocol == proto.ProtoTCP {
222+
return a.RelayListener.Close()
223+
} else {
224+
return a.RelaySocket.Close()
225+
}
212226
}
213227

214228
// https://tools.ietf.org/html/rfc5766#section-10.3
@@ -284,3 +298,133 @@ func (a *Allocation) packetHandler(m *Manager) {
284298
}
285299
}
286300
}
301+
302+
func (a *Allocation) GetConnectionByAddr(peerAddr string) net.Conn {
303+
a.connsLock.RLock()
304+
defer a.connsLock.RUnlock()
305+
return a.addrToConn[peerAddr]
306+
}
307+
308+
func (a *Allocation) GetConnectionByID(cid proto.ConnectionID) net.Conn {
309+
a.connsLock.RLock()
310+
defer a.connsLock.RUnlock()
311+
return a.cidToConn[cid]
312+
}
313+
314+
func (a *Allocation) newConnection(cid proto.ConnectionID, dst string) error {
315+
a.connsLock.Lock()
316+
a.addrToConn[dst] = nil
317+
a.cidToConn[cid] = nil
318+
a.connsLock.Unlock()
319+
320+
dialer := &net.Dialer{
321+
LocalAddr: a.RelayAddr,
322+
Control: func(network, address string, c syscall.RawConn) error {
323+
var err error
324+
c.Control(func(fd uintptr) {
325+
err = syscall.SetsockoptInt(int(fd), syscall.SOL_SOCKET, unix.SO_REUSEADDR|unix.SO_REUSEPORT, 1)
326+
})
327+
return err
328+
},
329+
}
330+
331+
conn, err := dialer.Dial("tcp", dst)
332+
if err != nil {
333+
return err
334+
}
335+
336+
a.connsLock.Lock()
337+
a.addrToConn[dst] = conn
338+
a.cidToConn[cid] = conn
339+
a.connsLock.Unlock()
340+
341+
return nil
342+
}
343+
344+
func (a *Allocation) removeConnection(cid proto.ConnectionID, dst string) {
345+
a.connsLock.Lock()
346+
c := a.cidToConn[cid]
347+
delete(a.addrToConn, dst)
348+
delete(a.cidToConn, cid)
349+
a.connsLock.Unlock()
350+
c.Close()
351+
}
352+
353+
func (a *Allocation) connectionHandler(m *Manager) {
354+
for {
355+
// When a server receives an incoming TCP connection on a relayed
356+
// transport address, it processes the request as follows.
357+
// The server MUST accept the connection. If it is not successful,
358+
// nothing is sent to the client over the control connection.
359+
conn, err := a.RelayListener.Accept()
360+
if err != nil {
361+
m.DeleteAllocation(a.fiveTuple)
362+
return
363+
}
364+
365+
a.log.Debugf("relay listener %s received connection from %s",
366+
a.RelayListener.Addr().String(),
367+
conn.RemoteAddr().String())
368+
369+
// If the connection is successfully accepted, it is now called a peer
370+
// data connection. The server MUST buffer any data received from the
371+
// peer. The server adjusts its advertised TCP receive window to
372+
// reflect the amount of empty buffer space.
373+
374+
// If no permission for this peer has been installed for this
375+
// allocation, the server MUST close the connection with the peer
376+
// immediately after it has been accepted.
377+
378+
if p := a.GetPermission(conn.RemoteAddr()); p == nil {
379+
a.log.Infof("No Permission or Channel exists for %v on allocation %v", conn.RemoteAddr(), a.RelayAddr.String())
380+
conn.Close()
381+
continue
382+
}
383+
384+
// Otherwise, the server sends a ConnectionAttempt indication to the
385+
// client over the control connection. The indication MUST include an
386+
// XOR-PEER-ADDRESS attribute containing the peer's transport address,
387+
// as well as a CONNECTION-ID attribute uniquely identifying the peer
388+
// data connection.
389+
cid := m.newCID(a)
390+
391+
a.connsLock.Lock()
392+
a.addrToConn[conn.RemoteAddr().String()] = conn
393+
a.cidToConn[cid] = conn
394+
a.connsLock.Unlock()
395+
396+
msg, err := stun.Build(
397+
stun.TransactionID,
398+
stun.NewType(stun.MethodConnectionAttempt, stun.ClassIndication),
399+
)
400+
if err != nil {
401+
a.log.Errorf("Failed to build MethodConnectionAttempt message %v", err)
402+
continue
403+
}
404+
405+
addr, ok := conn.RemoteAddr().(*net.TCPAddr)
406+
if !ok {
407+
a.log.Errorf("Failed to parse remote tcp address")
408+
continue
409+
}
410+
411+
peerAddr := proto.PeerAddress{}
412+
peerAddr.IP = addr.IP
413+
peerAddr.Port = addr.Port
414+
415+
if err = peerAddr.AddTo(msg); err != nil {
416+
a.log.Errorf("Failed to build MethodConnectionAttempt message %v", err)
417+
return
418+
}
419+
420+
attrCid := proto.ConnectionID(cid)
421+
attrCid.AddTo(msg)
422+
a.TurnSocket.WriteTo(msg.Raw, a.fiveTuple.SrcAddr)
423+
424+
// If no ConnectionBind request associated with this peer data
425+
// connection is received after 30 seconds, the peer data connection
426+
// MUST be closed.
427+
428+
go m.removeAfter30(cid, conn.RemoteAddr())
429+
}
430+
}

0 commit comments

Comments
 (0)