Skip to content

Commit

Permalink
Merge pull request #23 from libp2p/revert/ctrlbypass
Browse files Browse the repository at this point in the history
Revert ctrlbypass
  • Loading branch information
willscott authored Mar 27, 2020
2 parents d913bbd + d3225e9 commit 42b17e9
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 82 deletions.
5 changes: 0 additions & 5 deletions mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,6 @@ type Config struct {
// MaxMessageSize is the maximum size of a message that we'll send on a
// stream. This ensures that a single stream doesn't hog a connection.
MaxMessageSize uint32

// SendQueueSize is the maximum number of messages we'll keep in the local
// send queue before applying back pressure to writers.
SendQueueSize uint32
}

// DefaultConfig is used to return a default configuration
Expand All @@ -65,7 +61,6 @@ func DefaultConfig() *Config {
ReadBufSize: 4096,
MaxMessageSize: 64 * 1024, // Means 64KiB/10s = 52kbps minimum speed.
WriteCoalesceDelay: 100 * time.Microsecond,
SendQueueSize: 64,
}
}

Expand Down
97 changes: 30 additions & 67 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
"sync/atomic"
"time"

pool "github.com/libp2p/go-buffer-pool"
"github.com/libp2p/go-buffer-pool"
)

// Session is used to wrap a reliable ordered connection and to
Expand Down Expand Up @@ -67,9 +67,6 @@ type Session struct {
// sendCh is used to send messages
sendCh chan []byte

// sendCtrlCh is used to send control messages (skipping the normal send queue)
sendCtrlCh chan []byte

// recvDoneCh is closed when recv() exits to avoid a race
// between stream registration and stream shutdown
recvDoneCh chan struct{}
Expand Down Expand Up @@ -112,8 +109,7 @@ func newSession(config *Config, conn net.Conn, client bool, readBuf int) *Sessio
inflight: make(map[uint32]struct{}),
synCh: make(chan struct{}, config.AcceptBacklog),
acceptCh: make(chan *Stream, config.AcceptBacklog),
sendCh: make(chan []byte, config.SendQueueSize),
sendCtrlCh: make(chan []byte, 16),
sendCh: make(chan []byte, 64),
recvDoneCh: make(chan struct{}),
sendDoneCh: make(chan struct{}),
shutdownCh: make(chan struct{}),
Expand Down Expand Up @@ -274,7 +270,7 @@ func (s *Session) exitErr(err error) {
// GoAway can be used to prevent accepting further
// connections. It does not close the underlying conn.
func (s *Session) GoAway() error {
return s.sendMsg(s.goAway(goAwayNormal), nil, nil, true)
return s.sendMsg(s.goAway(goAwayNormal), nil, nil)
}

// goAway is used to send a goAway message
Expand All @@ -298,7 +294,7 @@ func (s *Session) Ping() (time.Duration, error) {

// Send the ping request
hdr := encode(typePing, flagSYN, 0, id)
if err := s.sendMsg(hdr, nil, nil, true); err != nil {
if err := s.sendMsg(hdr, nil, nil); err != nil {
return 0, err
}

Expand Down Expand Up @@ -378,7 +374,7 @@ func (s *Session) extendKeepalive() {
}

// send sends the header and body.
func (s *Session) sendMsg(hdr header, body []byte, deadline <-chan struct{}, control bool) error {
func (s *Session) sendMsg(hdr header, body []byte, deadline <-chan struct{}) error {
select {
case <-s.shutdownCh:
return s.shutdownErr
Expand All @@ -390,17 +386,11 @@ func (s *Session) sendMsg(hdr header, body []byte, deadline <-chan struct{}, con
copy(buf[:headerSize], hdr[:])
copy(buf[headerSize:], body)

var sendCh chan []byte
if control {
sendCh = s.sendCtrlCh
} else {
sendCh = s.sendCh
}
select {
case <-s.shutdownCh:
pool.Put(buf)
return s.shutdownErr
case sendCh <- buf:
case s.sendCh <- buf:
return nil
case <-deadline:
pool.Put(buf)
Expand Down Expand Up @@ -456,65 +446,38 @@ func (s *Session) sendLoop() error {
default:
}

var buf []byte
// Preferentially use control channel.
select {
case buf = <-s.sendCtrlCh:
goto SEND
case <-s.shutdownCh:
return nil
default:
}

// Flushes at least once every 100 microseconds unless we're
// constantly writing.
var buf []byte
select {
case buf = <-s.sendCh:
goto SEND
case buf = <-s.sendCtrlCh:
goto SEND
case <-s.shutdownCh:
return nil
default:
}

select {
case buf = <-s.sendCh:
goto SEND
case buf = <-s.sendCtrlCh:
goto SEND
case <-s.shutdownCh:
return nil
case <-writeTimeoutCh:
}

if err := writer.Flush(); err != nil {
if os.IsTimeout(err) {
err = ErrConnectionWriteTimeout
}
return err
}

// Preferentially use control channel.
select {
case buf = <-s.sendCtrlCh:
case <-s.shutdownCh:
return nil
default:
select {
case buf = <-s.sendCh:
case buf = <-s.sendCtrlCh:
case <-s.shutdownCh:
return nil
case <-writeTimeoutCh:
if err := writer.Flush(); err != nil {
if os.IsTimeout(err) {
err = ErrConnectionWriteTimeout
}
return err
}

select {
case buf = <-s.sendCh:
case <-s.shutdownCh:
return nil
}

if writeTimeout != nil {
writeTimeout.Reset(s.config.WriteCoalesceDelay)
}
}
}

if writeTimeout != nil {
writeTimeout.Reset(s.config.WriteCoalesceDelay)
}

SEND:

if err := extendWriteDeadline(); err != nil {
pool.Put(buf)
return err
Expand Down Expand Up @@ -619,7 +582,7 @@ func (s *Session) handleStreamMessage(hdr header) error {
// Check if this is a window update
if hdr.MsgType() == typeWindowUpdate {
if err := stream.incrSendWindow(hdr, flags); err != nil {
if sendErr := s.sendMsg(s.goAway(goAwayProtoErr), nil, nil, true); sendErr != nil {
if sendErr := s.sendMsg(s.goAway(goAwayProtoErr), nil, nil); sendErr != nil {
s.logger.Printf("[WARN] yamux: failed to send go away: %v", sendErr)
}
return err
Expand All @@ -629,7 +592,7 @@ func (s *Session) handleStreamMessage(hdr header) error {

// Read the new data
if err := stream.readData(hdr, flags, s.reader); err != nil {
if sendErr := s.sendMsg(s.goAway(goAwayProtoErr), nil, nil, true); sendErr != nil {
if sendErr := s.sendMsg(s.goAway(goAwayProtoErr), nil, nil); sendErr != nil {
s.logger.Printf("[WARN] yamux: failed to send go away: %v", sendErr)
}
return err
Expand All @@ -647,7 +610,7 @@ func (s *Session) handlePing(hdr header) error {
if flags&flagSYN == flagSYN {
go func() {
hdr := encode(typePing, flagACK, 0, pingID)
if err := s.sendMsg(hdr, nil, nil, true); err != nil {
if err := s.sendMsg(hdr, nil, nil); err != nil {
s.logger.Printf("[WARN] yamux: failed to send ping reply: %v", err)
}
}()
Expand Down Expand Up @@ -693,7 +656,7 @@ func (s *Session) incomingStream(id uint32) error {
// Reject immediately if we are doing a go away
if atomic.LoadInt32(&s.localGoAway) == 1 {
hdr := encode(typeWindowUpdate, flagRST, id, 0)
return s.sendMsg(hdr, nil, nil, true)
return s.sendMsg(hdr, nil, nil)
}

// Allocate a new stream
Expand All @@ -705,7 +668,7 @@ func (s *Session) incomingStream(id uint32) error {
// Check if stream already exists
if _, ok := s.streams[id]; ok {
s.logger.Printf("[ERR] yamux: duplicate stream declared")
if sendErr := s.sendMsg(s.goAway(goAwayProtoErr), nil, nil, true); sendErr != nil {
if sendErr := s.sendMsg(s.goAway(goAwayProtoErr), nil, nil); sendErr != nil {
s.logger.Printf("[WARN] yamux: failed to send go away: %v", sendErr)
}
return ErrDuplicateStream
Expand All @@ -723,7 +686,7 @@ func (s *Session) incomingStream(id uint32) error {
s.logger.Printf("[WARN] yamux: backlog exceeded, forcing connection reset")
delete(s.streams, id)
hdr := encode(typeWindowUpdate, flagRST, id, 0)
return s.sendMsg(hdr, nil, nil, false)
return s.sendMsg(hdr, nil, nil)
}
}

Expand Down
2 changes: 1 addition & 1 deletion session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1197,7 +1197,7 @@ func TestSession_sendMsg_Timeout(t *testing.T) {

hdr := encode(typePing, flagACK, 0, 0)
for {
err := client.sendMsg(hdr, nil, nil, true)
err := client.sendMsg(hdr, nil, nil)
if err == nil {
continue
} else if err == ErrConnectionWriteTimeout {
Expand Down
15 changes: 6 additions & 9 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"sync/atomic"
"time"

pool "github.com/libp2p/go-buffer-pool"
"github.com/libp2p/go-buffer-pool"
)

type streamState int
Expand All @@ -20,7 +20,6 @@ const (
streamRemoteClose
streamClosed
streamReset
streamWriteTimeout
)

// Stream is used to represent a logical stream
Expand All @@ -42,7 +41,6 @@ type Stream struct {

recvNotifyCh chan struct{}
sendNotifyCh chan struct{}
sendCh chan []byte

readDeadline, writeDeadline pipeDeadline
}
Expand Down Expand Up @@ -166,13 +164,12 @@ START:
// Determine the flags if any
flags = s.sendFlags()

// Send up to min(message, window)
// Send up to min(message, window
max = min(window, s.session.config.MaxMessageSize-headerSize, uint32(len(b)))

// Send the header
hdr = encode(typeData, flags, s.id, max)
if err = s.session.sendMsg(hdr, b[:max], s.writeDeadline.wait(), false); err != nil {
// Indicate queued message.
if err = s.session.sendMsg(hdr, b[:max], s.writeDeadline.wait()); err != nil {
return 0, err
}

Expand Down Expand Up @@ -231,7 +228,7 @@ func (s *Stream) sendWindowUpdate() error {

// Send the header
hdr := encode(typeWindowUpdate, flags, s.id, delta)
if err := s.session.sendMsg(hdr, nil, nil, true); err != nil {
if err := s.session.sendMsg(hdr, nil, nil); err != nil {
return err
}
return nil
Expand All @@ -242,13 +239,13 @@ func (s *Stream) sendClose() error {
flags := s.sendFlags()
flags |= flagFIN
hdr := encode(typeWindowUpdate, flags, s.id, 0)
return s.session.sendMsg(hdr, nil, nil, false)
return s.session.sendMsg(hdr, nil, nil)
}

// sendReset is used to send a RST
func (s *Stream) sendReset() error {
hdr := encode(typeWindowUpdate, flagRST, s.id, 0)
return s.session.sendMsg(hdr, nil, nil, false)
return s.session.sendMsg(hdr, nil, nil)
}

// Reset resets the stream (forcibly closes the stream)
Expand Down

0 comments on commit 42b17e9

Please sign in to comment.