Skip to content

Commit

Permalink
Merge pull request #36 from libp2p/open-stream-context
Browse files Browse the repository at this point in the history
add a context to Open() and OpenStream()
  • Loading branch information
marten-seemann authored Dec 19, 2020
2 parents 7a85a06 + 8a9f35f commit 5f2a5e7
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 39 deletions.
7 changes: 4 additions & 3 deletions bench_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package yamux

import (
"context"
"io"
"testing"
)
Expand Down Expand Up @@ -37,7 +38,7 @@ func BenchmarkAccept(b *testing.B) {
}()

for i := 0; i < b.N; i++ {
stream, err := client.Open()
stream, err := client.Open(context.Background())
if err != nil {
b.Fatalf("err: %v", err)
}
Expand Down Expand Up @@ -69,7 +70,7 @@ func BenchmarkSendRecv(b *testing.B) {
}
}()

stream, err := client.Open()
stream, err := client.Open(context.Background())
if err != nil {
b.Fatalf("err: %v", err)
}
Expand Down Expand Up @@ -113,7 +114,7 @@ func BenchmarkSendRecvLarge(b *testing.B) {
}
}()

stream, err := client.Open()
stream, err := client.Open(context.Background())
if err != nil {
b.Fatalf("err: %v", err)
}
Expand Down
9 changes: 6 additions & 3 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package yamux

import (
"bufio"
"context"
"fmt"
"io"
"io/ioutil"
Expand Down Expand Up @@ -156,16 +157,16 @@ func (s *Session) NumStreams() int {
}

// Open is used to create a new stream as a net.Conn
func (s *Session) Open() (net.Conn, error) {
conn, err := s.OpenStream()
func (s *Session) Open(ctx context.Context) (net.Conn, error) {
conn, err := s.OpenStream(ctx)
if err != nil {
return nil, err
}
return conn, nil
}

// OpenStream is used to create a new stream
func (s *Session) OpenStream() (*Stream, error) {
func (s *Session) OpenStream(ctx context.Context) (*Stream, error) {
if s.IsClosed() {
return nil, s.shutdownErr
}
Expand All @@ -176,6 +177,8 @@ func (s *Session) OpenStream() (*Stream, error) {
// Block if we have too many inflight SYNs
select {
case s.synCh <- struct{}{}:
case <-ctx.Done():
return nil, ctx.Err()
case <-s.shutdownCh:
return nil, s.shutdownErr
}
Expand Down
5 changes: 3 additions & 2 deletions session_norace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package yamux

import (
"bytes"
"context"
"io"
"io/ioutil"
"sync"
Expand Down Expand Up @@ -93,7 +94,7 @@ func TestSendData_VeryLarge(t *testing.T) {
for i := 0; i < workers; i++ {
go func() {
defer wg.Done()
stream, err := client.Open()
stream, err := client.Open(context.Background())
if err != nil {
t.Errorf("err: %v", err)
return
Expand Down Expand Up @@ -142,7 +143,7 @@ func TestLargeWindow(t *testing.T) {
defer client.Close()
defer server.Close()

stream, err := client.Open()
stream, err := client.Open(context.Background())
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down
Loading

0 comments on commit 5f2a5e7

Please sign in to comment.