Skip to content

Commit

Permalink
Remeasure round trip time (#70)
Browse files Browse the repository at this point in the history
* feat(session): remeasure round trip time

periodically re-measure round trip time

* feat(session): measure rtt during keep-alive

* refactor(session): repeat measure rtt via timer

* fix(session): fix flaky test

* simplify RTT timer by using a time.Ticker

* calculate a smoothed RTT

Co-authored-by: Marten Seemann <[email protected]>
  • Loading branch information
hannahhoward and marten-seemann authored Sep 7, 2022
1 parent e0dd63f commit 48aa3a7
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 2 deletions.
8 changes: 8 additions & 0 deletions mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ type Config struct {
// KeepAliveInterval is how often to perform the keep alive
KeepAliveInterval time.Duration

// MeasureRTTInterval is how often to re-measure the round trip time
MeasureRTTInterval time.Duration

// ConnectionWriteTimeout is meant to be a "safety valve" timeout after
// we which will suspect a problem with the underlying connection and
// close it. This is only applied to writes, where's there's generally
Expand Down Expand Up @@ -69,6 +72,7 @@ func DefaultConfig() *Config {
PingBacklog: 32,
EnableKeepAlive: true,
KeepAliveInterval: 30 * time.Second,
MeasureRTTInterval: 30 * time.Second,
ConnectionWriteTimeout: 10 * time.Second,
MaxIncomingStreams: 1000,
InitialStreamWindowSize: initialStreamWindow,
Expand All @@ -88,6 +92,10 @@ func VerifyConfig(config *Config) error {
if config.KeepAliveInterval == 0 {
return fmt.Errorf("keep-alive interval must be positive")
}
if config.MeasureRTTInterval == 0 {
return fmt.Errorf("measure-rtt interval must be positive")
}

if config.InitialStreamWindowSize < initialStreamWindow {
return errors.New("InitialStreamWindowSize must be larger or equal 256 kB")
}
Expand Down
22 changes: 20 additions & 2 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func newSession(config *Config, conn net.Conn, client bool, readBuf int, newMemo
}
go s.recv()
go s.send()
go s.measureRTT()
go s.startMeasureRTT()
return s
}

Expand Down Expand Up @@ -341,7 +341,25 @@ func (s *Session) measureRTT() {
if err != nil {
return
}
atomic.StoreInt64(&s.rtt, rtt.Nanoseconds())
if !atomic.CompareAndSwapInt64(&s.rtt, 0, rtt.Nanoseconds()) {
prev := atomic.LoadInt64(&s.rtt)
smoothedRTT := prev/2 + rtt.Nanoseconds()/2
atomic.StoreInt64(&s.rtt, smoothedRTT)
}
}

func (s *Session) startMeasureRTT() {
s.measureRTT()
t := time.NewTicker(s.config.MeasureRTTInterval)
defer t.Stop()
for {
select {
case <-s.CloseChan():
return
case <-t.C:
s.measureRTT()
}
}
}

// 0 if we don't yet have a measurement
Expand Down
2 changes: 2 additions & 0 deletions session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,8 @@ func TestPing(t *testing.T) {
defer server.Close()

clientConn := client.conn.(*pipeConn)
time.Sleep(time.Millisecond)

clientConn.BlockWrites()
go func() {
time.Sleep(10 * time.Millisecond)
Expand Down

0 comments on commit 48aa3a7

Please sign in to comment.