Skip to content

Commit 8cc8b8b

Browse files
jwhitedraggi
andcommitted
device: change Peer.endpoint locking to reduce contention
Access to Peer.endpoint was previously synchronized by Peer.RWMutex. This has now moved to Peer.endpoint.Mutex. Peer.SendBuffers() is now the sole caller of Endpoint.ClearSrc(), which is signaled via a new bool, Peer.endpoint.clearSrcOnTx. Previous Callers of Endpoint.ClearSrc() now set this bool, primarily via peer.markEndpointSrcForClearing(). Peer.SetEndpointFromPacket() clears Peer.endpoint.clearSrcOnTx when an updated conn.Endpoint is stored. This maintains the same event order as before, i.e. a conn.Endpoint received after peer.endpoint.clearSrcOnTx is set, but before the next Peer.SendBuffers() call results in the latest conn.Endpoint source being used for the next packet transmission. These changes result in throughput improvements for single flow, parallel (-P n) flow, and bidirectional (--bidir) flow iperf3 TCP/UDP tests as measured on both Linux and Windows. Latency under load improves especially for high throughput Linux scenarios. These improvements are likely realized on all platforms to some degree, as the changes are not platform-specific. Co-authored-by: James Tucker <[email protected]> Signed-off-by: Jordan Whited <[email protected]>
1 parent db7604d commit 8cc8b8b

File tree

6 files changed

+86
-84
lines changed

6 files changed

+86
-84
lines changed

device/device.go

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -461,11 +461,7 @@ func (device *Device) BindSetMark(mark uint32) error {
461461
// clear cached source addresses
462462
device.peers.RLock()
463463
for _, peer := range device.peers.keyMap {
464-
peer.Lock()
465-
defer peer.Unlock()
466-
if peer.endpoint != nil {
467-
peer.endpoint.ClearSrc()
468-
}
464+
peer.markEndpointSrcForClearing()
469465
}
470466
device.peers.RUnlock()
471467

@@ -515,11 +511,7 @@ func (device *Device) BindUpdate() error {
515511
// clear cached source addresses
516512
device.peers.RLock()
517513
for _, peer := range device.peers.keyMap {
518-
peer.Lock()
519-
defer peer.Unlock()
520-
if peer.endpoint != nil {
521-
peer.endpoint.ClearSrc()
522-
}
514+
peer.markEndpointSrcForClearing()
523515
}
524516
device.peers.RUnlock()
525517

device/mobilequirks.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,9 @@ func (device *Device) DisableSomeRoamingForBrokenMobileSemantics() {
1111
device.net.brokenRoaming = true
1212
device.peers.RLock()
1313
for _, peer := range device.peers.keyMap {
14-
peer.Lock()
15-
peer.disableRoaming = peer.endpoint != nil
16-
peer.Unlock()
14+
peer.endpoint.Lock()
15+
peer.endpoint.disableRoaming = peer.endpoint.val != nil
16+
peer.endpoint.Unlock()
1717
}
1818
device.peers.RUnlock()
1919
}

device/peer.go

Lines changed: 35 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,20 @@ import (
1717

1818
type Peer struct {
1919
isRunning atomic.Bool
20-
sync.RWMutex // Mostly protects endpoint, but is generally taken whenever we modify peer
2120
keypairs Keypairs
2221
handshake Handshake
2322
device *Device
24-
endpoint conn.Endpoint
2523
stopping sync.WaitGroup // routines pending stop
2624
txBytes atomic.Uint64 // bytes send to peer (endpoint)
2725
rxBytes atomic.Uint64 // bytes received from peer
2826
lastHandshakeNano atomic.Int64 // nano seconds since epoch
2927

30-
disableRoaming bool
28+
endpoint struct {
29+
sync.Mutex
30+
val conn.Endpoint
31+
clearSrcOnTx bool // signal to val.ClearSrc() prior to next packet transmission
32+
disableRoaming bool
33+
}
3134

3235
timers struct {
3336
retransmitHandshake *Timer
@@ -74,8 +77,6 @@ func (device *Device) NewPeer(pk NoisePublicKey) (*Peer, error) {
7477

7578
// create peer
7679
peer := new(Peer)
77-
peer.Lock()
78-
defer peer.Unlock()
7980

8081
peer.cookieGenerator.Init(pk)
8182
peer.device = device
@@ -97,7 +98,11 @@ func (device *Device) NewPeer(pk NoisePublicKey) (*Peer, error) {
9798
handshake.mutex.Unlock()
9899

99100
// reset endpoint
100-
peer.endpoint = nil
101+
peer.endpoint.Lock()
102+
peer.endpoint.val = nil
103+
peer.endpoint.disableRoaming = false
104+
peer.endpoint.clearSrcOnTx = false
105+
peer.endpoint.Unlock()
101106

102107
// init timers
103108
peer.timersInit()
@@ -116,14 +121,19 @@ func (peer *Peer) SendBuffers(buffers [][]byte) error {
116121
return nil
117122
}
118123

119-
peer.RLock()
120-
defer peer.RUnlock()
121-
122-
if peer.endpoint == nil {
124+
peer.endpoint.Lock()
125+
endpoint := peer.endpoint.val
126+
if endpoint == nil {
127+
peer.endpoint.Unlock()
123128
return errors.New("no known endpoint for peer")
124129
}
130+
if peer.endpoint.clearSrcOnTx {
131+
endpoint.ClearSrc()
132+
peer.endpoint.clearSrcOnTx = false
133+
}
134+
peer.endpoint.Unlock()
125135

126-
err := peer.device.net.bind.Send(buffers, peer.endpoint)
136+
err := peer.device.net.bind.Send(buffers, endpoint)
127137
if err == nil {
128138
var totalLen uint64
129139
for _, b := range buffers {
@@ -267,10 +277,20 @@ func (peer *Peer) Stop() {
267277
}
268278

269279
func (peer *Peer) SetEndpointFromPacket(endpoint conn.Endpoint) {
270-
if peer.disableRoaming {
280+
peer.endpoint.Lock()
281+
defer peer.endpoint.Unlock()
282+
if peer.endpoint.disableRoaming {
283+
return
284+
}
285+
peer.endpoint.clearSrcOnTx = false
286+
peer.endpoint.val = endpoint
287+
}
288+
289+
func (peer *Peer) markEndpointSrcForClearing() {
290+
peer.endpoint.Lock()
291+
defer peer.endpoint.Unlock()
292+
if peer.endpoint.val == nil {
271293
return
272294
}
273-
peer.Lock()
274-
peer.endpoint = endpoint
275-
peer.Unlock()
295+
peer.endpoint.clearSrcOnTx = true
276296
}

device/sticky_linux.go

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -110,17 +110,17 @@ func (device *Device) routineRouteListener(bind conn.Bind, netlinkSock int, netl
110110
if !ok {
111111
break
112112
}
113-
pePtr.peer.Lock()
114-
if &pePtr.peer.endpoint != pePtr.endpoint {
115-
pePtr.peer.Unlock()
113+
pePtr.peer.endpoint.Lock()
114+
if &pePtr.peer.endpoint.val != pePtr.endpoint {
115+
pePtr.peer.endpoint.Unlock()
116116
break
117117
}
118-
if uint32(pePtr.peer.endpoint.(*conn.StdNetEndpoint).SrcIfidx()) == ifidx {
119-
pePtr.peer.Unlock()
118+
if uint32(pePtr.peer.endpoint.val.(*conn.StdNetEndpoint).SrcIfidx()) == ifidx {
119+
pePtr.peer.endpoint.Unlock()
120120
break
121121
}
122-
pePtr.peer.endpoint.(*conn.StdNetEndpoint).ClearSrc()
123-
pePtr.peer.Unlock()
122+
pePtr.peer.endpoint.clearSrcOnTx = true
123+
pePtr.peer.endpoint.Unlock()
124124
}
125125
attr = attr[attrhdr.Len:]
126126
}
@@ -134,18 +134,18 @@ func (device *Device) routineRouteListener(bind conn.Bind, netlinkSock int, netl
134134
device.peers.RLock()
135135
i := uint32(1)
136136
for _, peer := range device.peers.keyMap {
137-
peer.RLock()
138-
if peer.endpoint == nil {
139-
peer.RUnlock()
137+
peer.endpoint.Lock()
138+
if peer.endpoint.val == nil {
139+
peer.endpoint.Unlock()
140140
continue
141141
}
142-
nativeEP, _ := peer.endpoint.(*conn.StdNetEndpoint)
142+
nativeEP, _ := peer.endpoint.val.(*conn.StdNetEndpoint)
143143
if nativeEP == nil {
144-
peer.RUnlock()
144+
peer.endpoint.Unlock()
145145
continue
146146
}
147147
if nativeEP.DstIP().Is6() || nativeEP.SrcIfidx() == 0 {
148-
peer.RUnlock()
148+
peer.endpoint.Unlock()
149149
break
150150
}
151151
nlmsg := struct {
@@ -188,10 +188,10 @@ func (device *Device) routineRouteListener(bind conn.Bind, netlinkSock int, netl
188188
reqPeerLock.Lock()
189189
reqPeer[i] = peerEndpointPtr{
190190
peer: peer,
191-
endpoint: &peer.endpoint,
191+
endpoint: &peer.endpoint.val,
192192
}
193193
reqPeerLock.Unlock()
194-
peer.RUnlock()
194+
peer.endpoint.Unlock()
195195
i++
196196
_, err := netlinkCancel.Write((*[unsafe.Sizeof(nlmsg)]byte)(unsafe.Pointer(&nlmsg))[:])
197197
if err != nil {

device/timers.go

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -100,11 +100,7 @@ func expiredRetransmitHandshake(peer *Peer) {
100100
peer.device.log.Verbosef("%s - Handshake did not complete after %d seconds, retrying (try %d)", peer, int(RekeyTimeout.Seconds()), peer.timers.handshakeAttempts.Load()+1)
101101

102102
/* We clear the endpoint address src address, in case this is the cause of trouble. */
103-
peer.Lock()
104-
if peer.endpoint != nil {
105-
peer.endpoint.ClearSrc()
106-
}
107-
peer.Unlock()
103+
peer.markEndpointSrcForClearing()
108104

109105
peer.SendHandshakeInitiation(true)
110106
}
@@ -123,11 +119,7 @@ func expiredSendKeepalive(peer *Peer) {
123119
func expiredNewHandshake(peer *Peer) {
124120
peer.device.log.Verbosef("%s - Retrying handshake because we stopped hearing back after %d seconds", peer, int((KeepaliveTimeout + RekeyTimeout).Seconds()))
125121
/* We clear the endpoint address src address, in case this is the cause of trouble. */
126-
peer.Lock()
127-
if peer.endpoint != nil {
128-
peer.endpoint.ClearSrc()
129-
}
130-
peer.Unlock()
122+
peer.markEndpointSrcForClearing()
131123
peer.SendHandshakeInitiation(false)
132124
}
133125

device/uapi.go

Lines changed: 29 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -99,33 +99,31 @@ func (device *Device) IpcGetOperation(w io.Writer) error {
9999

100100
for _, peer := range device.peers.keyMap {
101101
// Serialize peer state.
102-
// Do the work in an anonymous function so that we can use defer.
103-
func() {
104-
peer.RLock()
105-
defer peer.RUnlock()
106-
107-
keyf("public_key", (*[32]byte)(&peer.handshake.remoteStatic))
108-
keyf("preshared_key", (*[32]byte)(&peer.handshake.presharedKey))
109-
sendf("protocol_version=1")
110-
if peer.endpoint != nil {
111-
sendf("endpoint=%s", peer.endpoint.DstToString())
112-
}
113-
114-
nano := peer.lastHandshakeNano.Load()
115-
secs := nano / time.Second.Nanoseconds()
116-
nano %= time.Second.Nanoseconds()
117-
118-
sendf("last_handshake_time_sec=%d", secs)
119-
sendf("last_handshake_time_nsec=%d", nano)
120-
sendf("tx_bytes=%d", peer.txBytes.Load())
121-
sendf("rx_bytes=%d", peer.rxBytes.Load())
122-
sendf("persistent_keepalive_interval=%d", peer.persistentKeepaliveInterval.Load())
123-
124-
device.allowedips.EntriesForPeer(peer, func(prefix netip.Prefix) bool {
125-
sendf("allowed_ip=%s", prefix.String())
126-
return true
127-
})
128-
}()
102+
peer.handshake.mutex.RLock()
103+
keyf("public_key", (*[32]byte)(&peer.handshake.remoteStatic))
104+
keyf("preshared_key", (*[32]byte)(&peer.handshake.presharedKey))
105+
peer.handshake.mutex.RUnlock()
106+
sendf("protocol_version=1")
107+
peer.endpoint.Lock()
108+
if peer.endpoint.val != nil {
109+
sendf("endpoint=%s", peer.endpoint.val.DstToString())
110+
}
111+
peer.endpoint.Unlock()
112+
113+
nano := peer.lastHandshakeNano.Load()
114+
secs := nano / time.Second.Nanoseconds()
115+
nano %= time.Second.Nanoseconds()
116+
117+
sendf("last_handshake_time_sec=%d", secs)
118+
sendf("last_handshake_time_nsec=%d", nano)
119+
sendf("tx_bytes=%d", peer.txBytes.Load())
120+
sendf("rx_bytes=%d", peer.rxBytes.Load())
121+
sendf("persistent_keepalive_interval=%d", peer.persistentKeepaliveInterval.Load())
122+
123+
device.allowedips.EntriesForPeer(peer, func(prefix netip.Prefix) bool {
124+
sendf("allowed_ip=%s", prefix.String())
125+
return true
126+
})
129127
}
130128
}()
131129

@@ -262,7 +260,7 @@ func (peer *ipcSetPeer) handlePostConfig() {
262260
return
263261
}
264262
if peer.created {
265-
peer.disableRoaming = peer.device.net.brokenRoaming && peer.endpoint != nil
263+
peer.endpoint.disableRoaming = peer.device.net.brokenRoaming && peer.endpoint.val != nil
266264
}
267265
if peer.device.isUp() {
268266
peer.Start()
@@ -345,9 +343,9 @@ func (device *Device) handlePeerLine(peer *ipcSetPeer, key, value string) error
345343
if err != nil {
346344
return ipcErrorf(ipc.IpcErrorInvalid, "failed to set endpoint %v: %w", value, err)
347345
}
348-
peer.Lock()
349-
defer peer.Unlock()
350-
peer.endpoint = endpoint
346+
peer.endpoint.Lock()
347+
defer peer.endpoint.Unlock()
348+
peer.endpoint.val = endpoint
351349

352350
case "persistent_keepalive_interval":
353351
device.log.Verbosef("%v - UAPI: Updating persistent keepalive interval", peer.Peer)

0 commit comments

Comments
 (0)