Skip to content

Commit c7aff31

Browse files
committed
Implement server side for TCP allocations
1 parent 100044d commit c7aff31

23 files changed

+960
-150
lines changed

client_test.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010

1111
"github.com/pion/logging"
1212
"github.com/pion/transport/v2/stdnet"
13+
"github.com/pion/turn/v2/utils"
1314
"github.com/stretchr/testify/assert"
1415
)
1516

@@ -216,7 +217,7 @@ func TestTCPClient(t *testing.T) {
216217
assert.NoError(t, err)
217218

218219
client, err := NewClient(&ClientConfig{
219-
Conn: NewSTUNConn(conn),
220+
Conn: utils.NewSTUNConn(conn),
220221
STUNServerAddr: "127.0.0.1:3478",
221222
TURNServerAddr: "127.0.0.1:3478",
222223
Username: "foo",

examples/turn-client/tcp/main.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212

1313
"github.com/pion/logging"
1414
"github.com/pion/turn/v2"
15+
"github.com/pion/turn/v2/utils"
1516
)
1617

1718
func handleSignaling(server bool, addrCh chan string, relayAddr string) {
@@ -87,7 +88,7 @@ func main() {
8788
cfg := &turn.ClientConfig{
8889
STUNServerAddr: turnServerAddr,
8990
TURNServerAddr: turnServerAddr,
90-
Conn: turn.NewSTUNConn(conn),
91+
Conn: utils.NewSTUNConn(conn),
9192
Username: cred[0],
9293
Password: cred[1],
9394
Realm: *realm,

examples/turn-server/tls/main.go

+1
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ func main() {
7474
RelayAddress: net.ParseIP(*publicIP),
7575
Address: "0.0.0.0",
7676
},
77+
Protocol: "tls",
7778
},
7879
},
7980
})

internal/allocation/allocation.go

+163-19
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,14 @@ import (
55
"net"
66
"sync"
77
"sync/atomic"
8+
"syscall"
89
"time"
910

1011
"github.com/pion/logging"
1112
"github.com/pion/stun"
1213
"github.com/pion/turn/v2/internal/ipnet"
1314
"github.com/pion/turn/v2/internal/proto"
15+
"golang.org/x/sys/unix"
1416
)
1517

1618
type allocationResponse struct {
@@ -21,18 +23,23 @@ type allocationResponse struct {
2123
// Allocation is tied to a FiveTuple and relays traffic
2224
// use CreateAllocation and GetAllocation to operate
2325
type Allocation struct {
24-
RelayAddr net.Addr
25-
Protocol Protocol
26-
TurnSocket net.PacketConn
27-
RelaySocket net.PacketConn
28-
fiveTuple *FiveTuple
29-
permissionsLock sync.RWMutex
30-
permissions map[string]*Permission
31-
channelBindingsLock sync.RWMutex
32-
channelBindings []*ChannelBind
33-
lifetimeTimer *time.Timer
34-
closed chan interface{}
35-
log logging.LeveledLogger
26+
RelayAddr net.Addr
27+
Protocol Protocol
28+
RequestedTransportProtocol proto.Protocol
29+
TurnSocket net.PacketConn
30+
RelaySocket net.PacketConn
31+
RelayListener net.Listener
32+
fiveTuple *FiveTuple
33+
permissionsLock sync.RWMutex
34+
permissions map[string]*Permission
35+
channelBindingsLock sync.RWMutex
36+
channelBindings []*ChannelBind
37+
lifetimeTimer *time.Timer
38+
closed chan interface{}
39+
log logging.LeveledLogger
40+
connsLock sync.RWMutex
41+
addrToConn map[string]net.Conn
42+
cidToConn map[proto.ConnectionID]net.Conn
3643

3744
// some clients (Firefox or others using resiprocate's nICE lib) may retry allocation
3845
// with same 5 tuple when received 413, for compatible with these clients,
@@ -52,13 +59,16 @@ func addr2IPFingerprint(addr net.Addr) string {
5259
}
5360

5461
// NewAllocation creates a new instance of NewAllocation.
55-
func NewAllocation(turnSocket net.PacketConn, fiveTuple *FiveTuple, log logging.LeveledLogger) *Allocation {
62+
func NewAllocation(turnSocket net.PacketConn, fiveTuple *FiveTuple, log logging.LeveledLogger, requestedTransportProtocol proto.Protocol) *Allocation {
5663
return &Allocation{
57-
TurnSocket: turnSocket,
58-
fiveTuple: fiveTuple,
59-
permissions: make(map[string]*Permission, 64),
60-
closed: make(chan interface{}),
61-
log: log,
64+
TurnSocket: turnSocket,
65+
RequestedTransportProtocol: requestedTransportProtocol,
66+
fiveTuple: fiveTuple,
67+
permissions: make(map[string]*Permission, 64),
68+
addrToConn: make(map[string]net.Conn),
69+
cidToConn: make(map[proto.ConnectionID]net.Conn),
70+
closed: make(chan interface{}),
71+
log: log,
6272
}
6373
}
6474

@@ -215,7 +225,11 @@ func (a *Allocation) Close() error {
215225
}
216226
a.channelBindingsLock.RUnlock()
217227

218-
return a.RelaySocket.Close()
228+
if a.RequestedTransportProtocol == proto.ProtoTCP {
229+
return a.RelayListener.Close()
230+
} else {
231+
return a.RelaySocket.Close()
232+
}
219233
}
220234

221235
// https://tools.ietf.org/html/rfc5766#section-10.3
@@ -291,3 +305,133 @@ func (a *Allocation) packetHandler(m *Manager) {
291305
}
292306
}
293307
}
308+
309+
func (a *Allocation) GetConnectionByAddr(peerAddr string) net.Conn {
310+
a.connsLock.RLock()
311+
defer a.connsLock.RUnlock()
312+
return a.addrToConn[peerAddr]
313+
}
314+
315+
func (a *Allocation) GetConnectionByID(cid proto.ConnectionID) net.Conn {
316+
a.connsLock.RLock()
317+
defer a.connsLock.RUnlock()
318+
return a.cidToConn[cid]
319+
}
320+
321+
func (a *Allocation) newConnection(cid proto.ConnectionID, dst string) error {
322+
a.connsLock.Lock()
323+
a.addrToConn[dst] = nil
324+
a.cidToConn[cid] = nil
325+
a.connsLock.Unlock()
326+
327+
dialer := &net.Dialer{
328+
LocalAddr: a.RelayAddr,
329+
Control: func(network, address string, c syscall.RawConn) error {
330+
var err error
331+
c.Control(func(fd uintptr) {
332+
err = syscall.SetsockoptInt(int(fd), syscall.SOL_SOCKET, unix.SO_REUSEADDR|unix.SO_REUSEPORT, 1)
333+
})
334+
return err
335+
},
336+
}
337+
338+
conn, err := dialer.Dial("tcp", dst)
339+
if err != nil {
340+
return err
341+
}
342+
343+
a.connsLock.Lock()
344+
a.addrToConn[dst] = conn
345+
a.cidToConn[cid] = conn
346+
a.connsLock.Unlock()
347+
348+
return nil
349+
}
350+
351+
func (a *Allocation) removeConnection(cid proto.ConnectionID, dst string) {
352+
a.connsLock.Lock()
353+
c := a.cidToConn[cid]
354+
delete(a.addrToConn, dst)
355+
delete(a.cidToConn, cid)
356+
a.connsLock.Unlock()
357+
c.Close()
358+
}
359+
360+
func (a *Allocation) connectionHandler(m *Manager) {
361+
for {
362+
// When a server receives an incoming TCP connection on a relayed
363+
// transport address, it processes the request as follows.
364+
// The server MUST accept the connection. If it is not successful,
365+
// nothing is sent to the client over the control connection.
366+
conn, err := a.RelayListener.Accept()
367+
if err != nil {
368+
m.DeleteAllocation(a.fiveTuple)
369+
return
370+
}
371+
372+
a.log.Debugf("relay listener %s received connection from %s",
373+
a.RelayListener.Addr().String(),
374+
conn.RemoteAddr().String())
375+
376+
// If the connection is successfully accepted, it is now called a peer
377+
// data connection. The server MUST buffer any data received from the
378+
// peer. The server adjusts its advertised TCP receive window to
379+
// reflect the amount of empty buffer space.
380+
381+
// If no permission for this peer has been installed for this
382+
// allocation, the server MUST close the connection with the peer
383+
// immediately after it has been accepted.
384+
385+
if p := a.GetPermission(conn.RemoteAddr()); p == nil {
386+
a.log.Infof("No Permission or Channel exists for %v on allocation %v", conn.RemoteAddr(), a.RelayAddr.String())
387+
conn.Close()
388+
continue
389+
}
390+
391+
// Otherwise, the server sends a ConnectionAttempt indication to the
392+
// client over the control connection. The indication MUST include an
393+
// XOR-PEER-ADDRESS attribute containing the peer's transport address,
394+
// as well as a CONNECTION-ID attribute uniquely identifying the peer
395+
// data connection.
396+
cid := m.newCID(a)
397+
398+
a.connsLock.Lock()
399+
a.addrToConn[conn.RemoteAddr().String()] = conn
400+
a.cidToConn[cid] = conn
401+
a.connsLock.Unlock()
402+
403+
msg, err := stun.Build(
404+
stun.TransactionID,
405+
stun.NewType(stun.MethodConnectionAttempt, stun.ClassIndication),
406+
)
407+
if err != nil {
408+
a.log.Errorf("Failed to build MethodConnectionAttempt message %v", err)
409+
continue
410+
}
411+
412+
addr, ok := conn.RemoteAddr().(*net.TCPAddr)
413+
if !ok {
414+
a.log.Errorf("Failed to parse remote tcp address")
415+
continue
416+
}
417+
418+
peerAddr := proto.PeerAddress{}
419+
peerAddr.IP = addr.IP
420+
peerAddr.Port = addr.Port
421+
422+
if err = peerAddr.AddTo(msg); err != nil {
423+
a.log.Errorf("Failed to build MethodConnectionAttempt message %v", err)
424+
return
425+
}
426+
427+
attrCid := proto.ConnectionID(cid)
428+
attrCid.AddTo(msg)
429+
a.TurnSocket.WriteTo(msg.Raw, a.fiveTuple.SrcAddr)
430+
431+
// If no ConnectionBind request associated with this peer data
432+
// connection is received after 30 seconds, the peer data connection
433+
// MUST be closed.
434+
435+
go m.removeAfter30(cid, conn.RemoteAddr())
436+
}
437+
}

0 commit comments

Comments
 (0)