Skip to content

Commit

Permalink
Merge pull request #59 from libp2p/configure-initial-stream-recv-window
Browse files Browse the repository at this point in the history
make the initial stream receive window configurable
  • Loading branch information
marten-seemann authored May 21, 2021
2 parents 42482e3 + 43bcb2f commit 27a16a1
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 17 deletions.
32 changes: 20 additions & 12 deletions mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ type Config struct {
// an expectation that things will move along quickly.
ConnectionWriteTimeout time.Duration

// InitialStreamWindowSize is used to control the initial
// window size that we allow for a stream.
InitialStreamWindowSize uint32

// MaxStreamWindowSize is used to control the maximum
// window size that we allow for a stream.
MaxStreamWindowSize uint32
Expand All @@ -56,16 +60,17 @@ type Config struct {
// DefaultConfig is used to return a default configuration
func DefaultConfig() *Config {
return &Config{
AcceptBacklog: 256,
PingBacklog: 32,
EnableKeepAlive: true,
KeepAliveInterval: 30 * time.Second,
ConnectionWriteTimeout: 10 * time.Second,
MaxStreamWindowSize: maxStreamWindow,
LogOutput: os.Stderr,
ReadBufSize: 4096,
MaxMessageSize: 64 * 1024,
WriteCoalesceDelay: 100 * time.Microsecond,
AcceptBacklog: 256,
PingBacklog: 32,
EnableKeepAlive: true,
KeepAliveInterval: 30 * time.Second,
ConnectionWriteTimeout: 10 * time.Second,
InitialStreamWindowSize: initialStreamWindow,
MaxStreamWindowSize: maxStreamWindow,
LogOutput: os.Stderr,
ReadBufSize: 4096,
MaxMessageSize: 64 * 1024,
WriteCoalesceDelay: 100 * time.Microsecond,
}
}

Expand All @@ -77,8 +82,11 @@ func VerifyConfig(config *Config) error {
if config.KeepAliveInterval == 0 {
return fmt.Errorf("keep-alive interval must be positive")
}
if config.MaxStreamWindowSize < initialStreamWindow {
return errors.New("MaxStreamWindowSize must be larger than the initialStreamWindow (256 kB)")
if config.InitialStreamWindowSize < initialStreamWindow {
return errors.New("InitialStreamWindowSize must be larger or equal 256 kB")
}
if config.MaxStreamWindowSize < config.InitialStreamWindowSize {
return errors.New("MaxStreamWindowSize must be larger than the InitialStreamWindowSize")
}
if config.MaxMessageSize < 1024 {
return fmt.Errorf("MaxMessageSize must be greater than a kilobyte")
Expand Down
49 changes: 49 additions & 0 deletions session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"io"
"io/ioutil"
"math/rand"
"net"
"reflect"
"runtime"
Expand Down Expand Up @@ -1683,3 +1684,51 @@ func TestReadDeadlineInterrupt(t *testing.T) {
}
}
}

// Make sure that a transfer doesn't stall, no matter what values the peers use for their InitialStreamWindow.
func TestInitialStreamWindow(t *testing.T) {
for i := 0; i < 10; i++ {
const (
maxWindow = 5 * initialStreamWindow
transferSize = 10 * maxWindow
)
rand.Seed(time.Now().UnixNano())
randomUint32 := func(min, max uint32) uint32 { return uint32(rand.Int63n(int64(max-min))) + min }

cconf := DefaultConfig()
cconf.InitialStreamWindowSize = randomUint32(initialStreamWindow, maxWindow)
sconf := DefaultConfig()
sconf.InitialStreamWindowSize = randomUint32(initialStreamWindow, maxWindow)

conn1, conn2 := testConn()
client, _ := Client(conn1, cconf)
server, _ := Server(conn2, sconf)

errChan := make(chan error, 1)
go func() {
defer close(errChan)
str, err := client.OpenStream(context.Background())
if err != nil {
errChan <- err
return
}
defer str.Close()
if _, err := str.Write(make([]byte, transferSize)); err != nil {
errChan <- err
return
}
}()

str, err := server.AcceptStream()
if err != nil {
t.Fatal(err)
}
data, err := ioutil.ReadAll(str)
if err != nil {
t.Fatal(err)
}
if uint32(len(data)) != transferSize {
t.Fatalf("expected %d bytes to be transferred, got %d", transferSize, len(data))
}
}
}
13 changes: 8 additions & 5 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,14 @@ func newStream(session *Session, id uint32, state streamState) *Stream {
sendWindow: initialStreamWindow,
readDeadline: makePipeDeadline(),
writeDeadline: makePipeDeadline(),
recvBuf: newSegmentedBuffer(initialStreamWindow),
recvWindow: initialStreamWindow,
epochStart: time.Now(),
recvNotifyCh: make(chan struct{}, 1),
sendNotifyCh: make(chan struct{}, 1),
// Initialize the recvBuf with initialStreamWindow, not config.InitialStreamWindowSize.
// The peer isn't allowed to send more data than initialStreamWindow until we've sent
// the first window update (which will grant it up to config.InitialStreamWindowSize).
recvBuf: newSegmentedBuffer(initialStreamWindow),
recvWindow: session.config.InitialStreamWindowSize,
epochStart: time.Now(),
recvNotifyCh: make(chan struct{}, 1),
sendNotifyCh: make(chan struct{}, 1),
}
return s
}
Expand Down

0 comments on commit 27a16a1

Please sign in to comment.