-
Notifications
You must be signed in to change notification settings - Fork 24
/
mux.go
141 lines (121 loc) · 4.35 KB
/
mux.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
package yamux
import (
"errors"
"fmt"
"io"
"net"
"os"
"time"
)
// Config is used to tune the Yamux session
type Config struct {
// AcceptBacklog is used to limit how many streams may be
// waiting an accept.
AcceptBacklog int
// PingBacklog is used to limit how many ping acks we can queue.
PingBacklog int
// EnableKeepalive is used to do a period keep alive
// messages using a ping.
EnableKeepAlive bool
// KeepAliveInterval is how often to perform the keep alive
KeepAliveInterval time.Duration
// MeasureRTTInterval is how often to re-measure the round trip time
MeasureRTTInterval time.Duration
// ConnectionWriteTimeout is meant to be a "safety valve" timeout after
// we which will suspect a problem with the underlying connection and
// close it. This is only applied to writes, where's there's generally
// an expectation that things will move along quickly.
ConnectionWriteTimeout time.Duration
// MaxIncomingStreams is maximum number of concurrent incoming streams
// that we accept. If the peer tries to open more streams, those will be
// reset immediately.
MaxIncomingStreams uint32
// InitialStreamWindowSize is used to control the initial
// window size that we allow for a stream.
InitialStreamWindowSize uint32
// MaxStreamWindowSize is used to control the maximum
// window size that we allow for a stream.
MaxStreamWindowSize uint32
// LogOutput is used to control the log destination
LogOutput io.Writer
// ReadBufSize controls the size of the read buffer.
//
// Set to 0 to disable it.
ReadBufSize int
// WriteCoalesceDelay is the maximum amount of time we'll delay
// coalescing a packet before sending it. This should be on the order of
// micro-milliseconds.
WriteCoalesceDelay time.Duration
// MaxMessageSize is the maximum size of a message that we'll send on a
// stream. This ensures that a single stream doesn't hog a connection.
MaxMessageSize uint32
}
// DefaultConfig is used to return a default configuration
func DefaultConfig() *Config {
return &Config{
AcceptBacklog: 256,
PingBacklog: 32,
EnableKeepAlive: true,
KeepAliveInterval: 30 * time.Second,
MeasureRTTInterval: 30 * time.Second,
ConnectionWriteTimeout: 10 * time.Second,
MaxIncomingStreams: 1000,
InitialStreamWindowSize: initialStreamWindow,
MaxStreamWindowSize: maxStreamWindow,
LogOutput: os.Stderr,
ReadBufSize: 4096,
MaxMessageSize: 64 * 1024,
WriteCoalesceDelay: 100 * time.Microsecond,
}
}
// VerifyConfig is used to verify the sanity of configuration
func VerifyConfig(config *Config) error {
if config.AcceptBacklog <= 0 {
return fmt.Errorf("backlog must be positive")
}
if config.EnableKeepAlive && config.KeepAliveInterval == 0 {
return fmt.Errorf("keep-alive interval must be positive")
}
if config.MeasureRTTInterval == 0 {
return fmt.Errorf("measure-rtt interval must be positive")
}
if config.InitialStreamWindowSize < initialStreamWindow {
return errors.New("InitialStreamWindowSize must be larger or equal 256 kB")
}
if config.MaxStreamWindowSize < config.InitialStreamWindowSize {
return errors.New("MaxStreamWindowSize must be larger than the InitialStreamWindowSize")
}
if config.MaxMessageSize < 1024 {
return fmt.Errorf("MaxMessageSize must be greater than a kilobyte")
}
if config.WriteCoalesceDelay < 0 {
return fmt.Errorf("WriteCoalesceDelay must be >= 0")
}
if config.PingBacklog < 1 {
return fmt.Errorf("PingBacklog must be > 0")
}
return nil
}
// Server is used to initialize a new server-side connection.
// There must be at most one server-side connection. If a nil config is
// provided, the DefaultConfiguration will be used.
func Server(conn net.Conn, config *Config, mm func() (MemoryManager, error)) (*Session, error) {
if config == nil {
config = DefaultConfig()
}
if err := VerifyConfig(config); err != nil {
return nil, err
}
return newSession(config, conn, false, config.ReadBufSize, mm), nil
}
// Client is used to initialize a new client-side connection.
// There must be at most one client-side connection.
func Client(conn net.Conn, config *Config, mm func() (MemoryManager, error)) (*Session, error) {
if config == nil {
config = DefaultConfig()
}
if err := VerifyConfig(config); err != nil {
return nil, err
}
return newSession(config, conn, true, config.ReadBufSize, mm), nil
}