diff --git a/config/config.go b/config/config.go index 2fa53c1962..23e8a3c036 100644 --- a/config/config.go +++ b/config/config.go @@ -252,6 +252,7 @@ type Config struct { BackendFlushInterval time.Duration `yaml:"backend-flush-interval"` ExperimentalUpgrade bool `yaml:"experimental-upgrade"` ExperimentalUpgradeAudit bool `yaml:"experimental-upgrade-audit"` + TCPQueueTimeoutServer time.Duration `yaml:"tcp-queue-timeout-server"` ReadTimeoutServer time.Duration `yaml:"read-timeout-server"` ReadHeaderTimeoutServer time.Duration `yaml:"read-header-timeout-server"` WriteTimeoutServer time.Duration `yaml:"write-timeout-server"` @@ -576,6 +577,7 @@ func NewConfig() *Config { flag.DurationVar(&cfg.BackendFlushInterval, "backend-flush-interval", 20*time.Millisecond, "flush interval for upgraded proxy connections") flag.BoolVar(&cfg.ExperimentalUpgrade, "experimental-upgrade", false, "enable experimental feature to handle upgrade protocol requests") flag.BoolVar(&cfg.ExperimentalUpgradeAudit, "experimental-upgrade-audit", false, "enable audit logging of the request line and the messages during the experimental web socket upgrades") + flag.DurationVar(&cfg.TCPQueueTimeoutServer, "tcp-queue-timeout-server", time.Second, "set timeout for how long TCP connections can be queued in http server connections") flag.DurationVar(&cfg.ReadTimeoutServer, "read-timeout-server", 5*time.Minute, "set ReadTimeout for http server connections") flag.DurationVar(&cfg.ReadHeaderTimeoutServer, "read-header-timeout-server", 60*time.Second, "set ReadHeaderTimeout for http server connections") flag.DurationVar(&cfg.WriteTimeoutServer, "write-timeout-server", 60*time.Second, "set WriteTimeout for http server connections") @@ -948,6 +950,7 @@ func (c *Config) ToOptions() skipper.Options { BackendFlushInterval: c.BackendFlushInterval, ExperimentalUpgrade: c.ExperimentalUpgrade, ExperimentalUpgradeAudit: c.ExperimentalUpgradeAudit, + TCPQueueTimeoutServer: c.TCPQueueTimeoutServer, ReadTimeoutServer: c.ReadTimeoutServer, ReadHeaderTimeoutServer: c.ReadHeaderTimeoutServer, WriteTimeoutServer: c.WriteTimeoutServer, diff --git a/config/config_test.go b/config/config_test.go index 0deac48612..1b83382089 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -129,6 +129,7 @@ func defaultConfig(with func(*Config)) *Config { IdleConnsPerHost: 64, CloseIdleConnsPeriod: 20 * time.Second, BackendFlushInterval: 20 * time.Millisecond, + TCPQueueTimeoutServer: time.Second, ReadTimeoutServer: 5 * time.Minute, ReadHeaderTimeoutServer: 1 * time.Minute, WriteTimeoutServer: 1 * time.Minute, diff --git a/go.mod b/go.mod index b9894519c1..40c2b6a05b 100644 --- a/go.mod +++ b/go.mod @@ -68,6 +68,8 @@ require ( github.com/HdrHistogram/hdrhistogram-go v1.1.2 // indirect github.com/Microsoft/go-winio v0.6.2 // indirect github.com/agnivade/levenshtein v1.2.1 // indirect + github.com/amirylm/go-options v0.0.2 // indirect + github.com/amirylm/lockfree v0.0.4 // indirect github.com/armon/go-metrics v0.4.1 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b // indirect @@ -101,6 +103,7 @@ require ( github.com/go-ole/go-ole v1.2.6 // indirect github.com/gobwas/glob v0.2.3 // indirect github.com/gogo/protobuf v1.3.2 // indirect + github.com/golang-collections/collections v0.0.0-20130729185459-604e922904d3 // indirect github.com/golang/protobuf v1.5.4 // indirect github.com/google/btree v1.0.0 // indirect github.com/google/flatbuffers v25.2.10+incompatible // indirect diff --git a/go.sum b/go.sum index 5c5564bde1..a2e8209a43 100644 --- a/go.sum +++ b/go.sum @@ -34,6 +34,10 @@ github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883 h1:bvNMNQO63//z+xNg github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883/go.mod h1:rCTlJbsFo29Kk6CurOXKm700vrz8f0KW0JNfpkRJY/8= github.com/andybalholm/brotli v1.2.0 h1:ukwgCxwYrmACq68yiUqwIWnGY0cTPox/M94sVwToPjQ= github.com/andybalholm/brotli v1.2.0/go.mod h1:rzTDkvFWvIrjDXZHkuS16NPggd91W3kUSvPlQ1pLaKY= +github.com/amirylm/go-options v0.0.2 h1:OvuFcKUg3+7jdKeY54XrRnAIxP9Dmultlg9dS7Q3TpA= +github.com/amirylm/go-options v0.0.2/go.mod h1:OmhJW65Aeyb74akzydI9SVgCjuwKlPNcTZeXk7TETPk= +github.com/amirylm/lockfree v0.0.4 h1:SAC96Droepe6HjDqymFY3E6UyJ6GR2crOGvbXFlk+kY= +github.com/amirylm/lockfree v0.0.4/go.mod h1:92tGIqOCCQdd9SR5nGLYwK4GN9PTKlQmRwXKxqfVz/U= github.com/arbovm/levenshtein v0.0.0-20160628152529-48b4e1c0c4d0 h1:jfIu9sQUG6Ig+0+Ap1h4unLjW6YQJpKZVmUzxsD4E/Q= github.com/arbovm/levenshtein v0.0.0-20160628152529-48b4e1c0c4d0/go.mod h1:t2tdKJDJF9BV14lnkjHmOQgcvEKgtqs5a1N3LNdJhGE= github.com/armon/go-metrics v0.4.1 h1:hR91U9KYmb6bLBYLQjyM+3j+rcd/UhE+G78SFnF8gJA= @@ -169,6 +173,8 @@ github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7a github.com/gogo/protobuf v1.3.1/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= +github.com/golang-collections/collections v0.0.0-20130729185459-604e922904d3 h1:zN2lZNZRflqFyxVaTIU61KNKQ9C0055u9CAfpmqUvo4= +github.com/golang-collections/collections v0.0.0-20130729185459-604e922904d3/go.mod h1:nPpo7qLxd6XL3hWJG/O60sR8ZKfMCiIoNap5GvD12KU= github.com/golang-jwt/jwt/v4 v4.4.2/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w39/MY0Ch0= github.com/golang-jwt/jwt/v4 v4.5.2 h1:YtQM7lnr8iZ+j5q71MGKkNw9Mn7AjHM68uc9g5fXeUI= github.com/golang-jwt/jwt/v4 v4.5.2/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w39/MY0Ch0= diff --git a/metrics/metrics.go b/metrics/metrics.go index 08fa111736..300e5c0d3d 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -84,6 +84,36 @@ type Metrics interface { Close() } +type NoMetric struct{} + +func (NoMetric) MeasureSince(string, time.Time) {} +func (NoMetric) IncCounter(string) {} +func (NoMetric) IncCounterBy(string, int64) {} +func (NoMetric) IncFloatCounterBy(string, float64) {} +func (NoMetric) MeasureRouteLookup(time.Time) {} +func (NoMetric) MeasureFilterCreate(string, time.Time) {} +func (NoMetric) MeasureFilterRequest(string, time.Time) {} +func (NoMetric) MeasureAllFiltersRequest(string, time.Time) {} +func (NoMetric) MeasureBackendRequestHeader(string, int) {} +func (NoMetric) MeasureBackend(string, time.Time) {} +func (NoMetric) MeasureBackendHost(string, time.Time) {} +func (NoMetric) MeasureFilterResponse(string, time.Time) {} +func (NoMetric) MeasureAllFiltersResponse(string, time.Time) {} +func (NoMetric) MeasureResponse(int, string, string, time.Time) {} +func (NoMetric) MeasureResponseSize(string, int64) {} +func (NoMetric) MeasureProxy(time.Duration, time.Duration) {} +func (NoMetric) MeasureServe(string, string, string, int, time.Time) {} +func (NoMetric) IncRoutingFailures() {} +func (NoMetric) IncErrorsBackend(string) {} +func (NoMetric) MeasureBackend5xx(time.Time) {} +func (NoMetric) IncErrorsStreaming(string) {} +func (NoMetric) RegisterHandler(string, *http.ServeMux) {} +func (NoMetric) UpdateGauge(string, float64) {} +func (NoMetric) UpdateInvalidRoute(map[string]int) {} +func (NoMetric) Close() {} + +var _ Metrics = NoMetric{} + // Options for initializing metrics collection. type Options struct { // the metrics exposing format. diff --git a/queuelistener/listener.go b/queuelistener/listener.go index 527db456b6..aa2868be64 100644 --- a/queuelistener/listener.go +++ b/queuelistener/listener.go @@ -20,6 +20,7 @@ const ( maxCalculatedQueueSize = 50_000 acceptedConnectionsKey = "listener.accepted.connections" queuedConnectionsKey = "listener.queued.connections" + queueTimeoutKey = "listener.queued.timeouts" acceptLatencyKey = "listener.accept.latency" ) @@ -79,6 +80,8 @@ type Options struct { } type listener struct { + metrics metrics.Metrics + log logging.Logger options Options maxConcurrency int64 maxQueueSize int64 @@ -96,6 +99,7 @@ type listener struct { var ( token struct{} errListenerClosed = errors.New("listener closed") + errAcceptTimeout = errors.New("accept timeout") ) func (c *connection) Close() error { @@ -132,10 +136,7 @@ func (o Options) maxQueueSize() int64 { return int64(o.MaxQueueSize) } - maxQueueSize := 10 * o.maxConcurrency() - if maxQueueSize > maxCalculatedQueueSize { - maxQueueSize = maxCalculatedQueueSize - } + maxQueueSize := min(10*o.maxConcurrency(), maxCalculatedQueueSize) return maxQueueSize } @@ -155,6 +156,8 @@ func listenWith(nl net.Listener, o Options) (net.Listener, error) { l := &listener{ options: o, + log: o.Log, + metrics: o.Metrics, maxConcurrency: o.maxConcurrency(), maxQueueSize: o.maxQueueSize(), externalListener: nl, @@ -165,7 +168,10 @@ func listenWith(nl net.Listener, o Options) (net.Listener, error) { releaseConnection: make(chan struct{}), quit: make(chan struct{}), } - o.Log.Infof("TCP lifo listener config: %s", l) + if l.metrics == nil { + l.metrics = metrics.NoMetric{} + } + l.log.Infof("TCP lifo listener config: %s", l) go l.listenExternal() go l.listenInternal() @@ -233,7 +239,7 @@ func (l *listener) listenExternal() { //lint:ignore SA1019 Temporary is deprecated in Go 1.18, but keep it for now (https://github.com/zalando/skipper/issues/1992) if nerr, ok := err.(net.Error); ok && nerr.Temporary() { delay = bounce(delay) - l.options.Log.Errorf( + l.log.Errorf( "queue listener: accept error: %v, retrying in %v", err, delay, @@ -309,10 +315,8 @@ func (l *listener) listenInternal() { ) } - if l.options.Metrics != nil { - l.options.Metrics.UpdateGauge(acceptedConnectionsKey, float64(concurrency)) - l.options.Metrics.UpdateGauge(queuedConnectionsKey, float64(queue.size)) - } + l.metrics.UpdateGauge(acceptedConnectionsKey, float64(concurrency)) + l.metrics.UpdateGauge(queuedConnectionsKey, float64(queue.size)) select { case conn := <-l.acceptExternal: @@ -362,7 +366,7 @@ func (l *listener) listenInternal() { // Closing the real listener in a separate goroutine is based on inspecting the // stdlib. It's fair to just log the errors. if err := l.externalListener.Close(); err != nil { - l.options.Log.Errorf("Failed to close network listener: %v.", err) + l.log.Errorf("Failed to close network listener: %v.", err) } if l.closedHook != nil { @@ -377,9 +381,7 @@ func (l *listener) listenInternal() { func (l *listener) Accept() (net.Conn, error) { select { case c := <-l.acceptInternal: - if l.options.Metrics != nil { - l.options.Metrics.MeasureSince(acceptLatencyKey, c.external.accepted) - } + l.metrics.MeasureSince(acceptLatencyKey, c.external.accepted) return c, nil case err := <-l.internalError: return nil, err diff --git a/queuelistener/stack.go b/queuelistener/stack.go new file mode 100644 index 0000000000..264cd485bb --- /dev/null +++ b/queuelistener/stack.go @@ -0,0 +1,44 @@ +package queuelistener + +import ( + "sync" +) + +const stackSize int = 10000 + +type naiveStack[T any] struct { + mu sync.Mutex + top int + items [stackSize]*T +} + +func NewStack() *naiveStack[external] { + return &naiveStack[external]{ + top: -1, + } +} + +func (s *naiveStack[T]) Push(data *T) { + s.mu.Lock() + defer s.mu.Unlock() + + if s.top == len(s.items)-1 { + return + } + + s.top++ + s.items[s.top] = data +} + +func (s *naiveStack[T]) Pop() *T { + s.mu.Lock() + defer s.mu.Unlock() + + if s.top == -1 { + return nil + } else { + defer func() { s.top-- }() + } + + return s.items[s.top] +} diff --git a/queuelistener/stack_listener.go b/queuelistener/stack_listener.go new file mode 100644 index 0000000000..d8ab4016ff --- /dev/null +++ b/queuelistener/stack_listener.go @@ -0,0 +1,161 @@ +package queuelistener + +import ( + "errors" + "fmt" + "net" + "net/http" + "sync" + "time" + + "github.com/zalando/skipper/logging" + "github.com/zalando/skipper/metrics" +) + +type stackListener struct { + log logging.Logger + metrics metrics.Metrics + maxConcurrency int64 + maxQueueSize int64 + memoryLimitBytes int64 + connectionBytes int + queueTimeout time.Duration + stack *naiveStack[external] + externalListener net.Listener + acceptInternal chan external + quit chan struct{} + once sync.Once +} + +func StackListener(o Options) (net.Listener, error) { + nl, err := net.Listen(o.Network, o.Address) + if err != nil { + return nil, fmt.Errorf("StackListener failed net.Listen: %w", err) + } + + acceptCH := make(chan external) + + if o.Log == nil { + o.Log = &logging.DefaultLog{} + } + + if o.MemoryLimitBytes <= 0 { + o.MemoryLimitBytes = defaultMemoryLimitBytes + } + + if o.ConnectionBytes <= 0 { + o.ConnectionBytes = defaultConnectionBytes + } + + m := o.Metrics + if m == nil { + m = metrics.NoMetric{} + } + l := &stackListener{ + log: o.Log, + metrics: m, + externalListener: nl, + maxConcurrency: o.maxConcurrency(), + maxQueueSize: o.maxQueueSize(), + memoryLimitBytes: o.MemoryLimitBytes, + connectionBytes: o.ConnectionBytes, + queueTimeout: o.QueueTimeout, + stack: NewStack(), + acceptInternal: acceptCH, + quit: make(chan struct{}), + once: sync.Once{}, + } + l.log.Infof("TCP lifo listener config: %s", l) + + go l.listenExternal() + go l.listenInternal() + return l, nil +} + +func (l *stackListener) String() string { + return fmt.Sprintf("stackListener concurrency: %d, queue size: %d, memory limit: %d, bytes per connection: %d, queue timeout: %s", l.maxConcurrency, l.maxQueueSize, l.memoryLimitBytes, l.connectionBytes, l.queueTimeout) +} + +func (l *stackListener) Accept() (net.Conn, error) { + select { + case <-l.quit: + return nil, errListenerClosed + case c := <-l.acceptInternal: + l.metrics.MeasureSince(acceptLatencyKey, c.accepted) + d := time.Since(c.accepted) + if d > l.queueTimeout { + l.metrics.IncCounter(queueTimeoutKey) + if c.Conn != nil { + c.Conn.Close() + } + return nil, errAcceptTimeout + } + return c, nil + } +} + +func (l *stackListener) Addr() net.Addr { + return l.externalListener.Addr() +} + +func (l *stackListener) Close() error { + l.once.Do(func() { + close(l.quit) + l.externalListener.Close() + close(l.acceptInternal) + }) + + return nil +} + +func (l *stackListener) listenExternal() { + var ( + err error + c net.Conn + ) + for { + select { + case <-l.quit: + return + default: + } + + c, err = l.externalListener.Accept() + if err != nil { + if errors.Is(err, http.ErrServerClosed) { + l.log.Infof("Server closed: %v", err) + return + } + + // client closed for example + //l.log.Infof("Failed to accept connection (%T): %v", err, err) + if c != nil { + l.log.Info("close connection") + c.Close() + } + continue + } + cc := external{c, time.Now()} + l.stack.Push(&cc) + } +} + +func (l *stackListener) listenInternal() { + for { + select { + case <-l.quit: + return + default: + } + cc := l.stack.Pop() + if cc == nil { + // reduce cpu usage caused by busywait + time.Sleep(10 * time.Microsecond) + continue + } + l.metrics.IncCounter(acceptedConnectionsKey) + l.metrics.UpdateGauge(queuedConnectionsKey, float64(l.stack.top+1)) + + l.acceptInternal <- *cc + } +} diff --git a/queuelistener/stack_listener_test.go b/queuelistener/stack_listener_test.go new file mode 100644 index 0000000000..69b9295a8a --- /dev/null +++ b/queuelistener/stack_listener_test.go @@ -0,0 +1,749 @@ +package queuelistener + +import ( + "fmt" + "io" + "net" + "net/http" + "runtime" + "testing" + "time" + + log "github.com/sirupsen/logrus" + "github.com/stretchr/testify/assert" + + "github.com/zalando/skipper/metrics" + "github.com/zalando/skipper/metrics/metricstest" + skpnet "github.com/zalando/skipper/net" +) + +func TestStackListener(t *testing.T) { + for _, tt := range []struct { + name string + opt Options + want error + }{ + { + name: "wrong listener options", + want: fmt.Errorf("StackListener failed net.Listen:"), + }, + { + name: "listener with metrics", + opt: Options{ + Network: "tcp", + Address: ":9090", + MaxConcurrency: 10000, + MaxQueueSize: 10000, + MemoryLimitBytes: 1000, + ConnectionBytes: 100, + QueueTimeout: time.Second, + Metrics: metrics.Default, + }, + want: nil, + }, + { + name: "listener without metrics", + opt: Options{ + Network: "tcp", + Address: ":9090", + MaxConcurrency: 10000, + MaxQueueSize: 10000, + MemoryLimitBytes: 1000, + ConnectionBytes: 100, + QueueTimeout: time.Second, + Metrics: nil, + }, + want: nil, + }} { + t.Run(tt.name, func(t *testing.T) { + srv := &http.Server{ + Addr: tt.opt.Address, + Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusAccepted) + w.Write([]byte("OK")) + }), + ReadTimeout: time.Second, + ReadHeaderTimeout: time.Second, + WriteTimeout: time.Second, + IdleTimeout: time.Second, + MaxHeaderBytes: 1000, + } + defer srv.Close() + + cm := &skpnet.ConnManager{ + Keepalive: 0, + KeepaliveRequests: 0, + Metrics: tt.opt.Metrics, + } + cm.Configure(srv) + + l, err := StackListener(tt.opt) + if err != nil { + if tt.want == nil { + t.Fatalf("Failed to create StackListener: %v", err) + } else { + // have err and want error + return + } + } + defer l.Close() + if tt.want != nil && err == nil { + t.Fatalf("Failed to get error from StackListener, want: %v", tt.want) + } + + go func() { + t.Logf("start server") + if err := srv.Serve(l); err != http.ErrServerClosed { + log.Errorf("Serve failed: %v", err) + } + return + }() + + dst := "http://" + l.Addr().String() + var rsp *http.Response + for range 3 { + rsp, err = http.DefaultClient.Get(dst) + if err != nil { + time.Sleep(time.Second) + continue + } + break + } + if err != nil { + t.Fatalf("Failed to do a GET request to %s", dst) + } + if rsp.StatusCode != http.StatusAccepted { + t.Fatalf("Failed to get response status code we expect, got: %d", rsp.StatusCode) + } + defer rsp.Body.Close() + io.Copy(io.Discard, rsp.Body) + http.DefaultClient.CloseIdleConnections() + }) + } +} + +func TestStackListenerRequestFlow(t *testing.T) { + opt := Options{ + Network: "tcp", + Address: ":9090", + MaxConcurrency: 10000, + MaxQueueSize: 10000, + MemoryLimitBytes: 1000, + ConnectionBytes: 100, + QueueTimeout: time.Second, + Metrics: &metricstest.MockMetrics{}, + } + + srv := &http.Server{ + Addr: opt.Address, + Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusAccepted) + w.Write([]byte("OK")) + }), + ReadTimeout: time.Second, + ReadHeaderTimeout: time.Second, + WriteTimeout: time.Second, + IdleTimeout: time.Second, + MaxHeaderBytes: 1000, + } + defer srv.Close() + + cm := &skpnet.ConnManager{ + Keepalive: 0, + KeepaliveRequests: 0, + Metrics: opt.Metrics, + } + cm.Configure(srv) + + l, err := StackListener(opt) + if err != nil { + t.Fatalf("Failed to create StackListener: %v", err) + } + defer l.Close() + + go func() { + t.Logf("start server") + if err := srv.Serve(l); err != http.ErrServerClosed { + log.Errorf("Serve failed: %v", err) + } + return + }() + + dst := "http://" + l.Addr().String() + var rsp *http.Response + for range 3 { + rsp, err = http.DefaultClient.Get(dst) + if err != nil { + time.Sleep(time.Second) + continue + } + break + } + if err != nil { + t.Fatalf("Failed to do a GET request to %s", dst) + } + if rsp.StatusCode != http.StatusAccepted { + t.Fatalf("Failed to get response status code we expect, got: %d", rsp.StatusCode) + } + defer rsp.Body.Close() + io.Copy(io.Discard, rsp.Body) + + for range 10 { + rsp, err = http.DefaultClient.Get(dst) + if err != nil { + t.Fatalf("Failed to do a GET request to %s", dst) + } + if rsp.StatusCode != http.StatusAccepted { + t.Fatalf("Failed to get response status code we expect, got: %d", rsp.StatusCode) + } + t.Logf("Response: %d", rsp.StatusCode) + io.Copy(io.Discard, rsp.Body) + rsp.Body.Close() + } + + t.Logf("opt.Metrics: %+v", opt.Metrics) +} + +func TestStackListenerTimeout(t *testing.T) { + for _, tt := range []struct { + name string + opt Options + want error + }{ + { + name: "listener timeout with metrics", + opt: Options{ + Network: "tcp", + Address: ":9090", + MaxConcurrency: 10000, + MaxQueueSize: 10000, + MemoryLimitBytes: 1000, + ConnectionBytes: 100, + QueueTimeout: time.Microsecond, + Metrics: &metricstest.MockMetrics{}, + }, + want: nil, + }, + { + name: "listener timeout without metrics", + opt: Options{ + Network: "tcp", + Address: ":9090", + MaxConcurrency: 10000, + MaxQueueSize: 10000, + MemoryLimitBytes: 1000, + ConnectionBytes: 100, + QueueTimeout: time.Microsecond, + Metrics: nil, + }, + want: nil, + }} { + t.Run(tt.name, func(t *testing.T) { + srv := &http.Server{ + Addr: tt.opt.Address, + Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusAccepted) + w.Write([]byte("OK")) + }), + ReadTimeout: time.Second, + ReadHeaderTimeout: time.Second, + WriteTimeout: time.Second, + IdleTimeout: time.Second, + MaxHeaderBytes: 1000, + } + defer srv.Close() + + cm := &skpnet.ConnManager{ + Keepalive: 0, + KeepaliveRequests: 0, + Metrics: tt.opt.Metrics, + } + cm.Configure(srv) + + l, err := StackListener(tt.opt) + if err != nil { + if tt.want == nil { + t.Fatalf("Failed to create StackListener: %v", err) + } else { + // have err and want error + return + } + } + defer l.Close() + + go func() { + t.Logf("start server") + if err := srv.Serve(l); err != http.ErrServerClosed { + t.Logf("Server closed: %v", err) + } + return + }() + + dst := "http://" + l.Addr().String() + + _, err = http.DefaultClient.Get(dst) + if err == nil { + t.Fatal("Failed to create a fail") + } + + if tt.opt.Metrics != nil { + if m, ok := tt.opt.Metrics.(*metricstest.MockMetrics); ok { + m.WithCounters(func(c map[string]int64) { + t.Logf("counters: %v", c) + + assert.Equal(t, int64(1), c["listener.queued.timeouts"]) + assert.Equal(t, int64(1), c["listener.accepted.connections"]) + }) + } + } + + http.DefaultClient.CloseIdleConnections() + + }) + } +} + +func TestStackListenerShutdown(t *testing.T) { + for _, tt := range []struct { + name string + opt Options + backendTime time.Duration + want error + }{ + { + name: "listener timeout with metrics", + opt: Options{ + Network: "tcp", + Address: ":9090", + MaxConcurrency: 10000, + MaxQueueSize: 10000, + MemoryLimitBytes: 1000, + ConnectionBytes: 100, + QueueTimeout: 100 * time.Millisecond, + Metrics: &metricstest.MockMetrics{}, + }, + backendTime: time.Second, + want: nil, + }, + { + name: "listener timeout without metrics", + opt: Options{ + Network: "tcp", + Address: ":9090", + MaxConcurrency: 10000, + MaxQueueSize: 10000, + MemoryLimitBytes: 1000, + ConnectionBytes: 100, + QueueTimeout: 100 * time.Millisecond, + Metrics: nil, + }, + backendTime: time.Second, + want: nil, + }} { + t.Run(tt.name, func(t *testing.T) { + srv := &http.Server{ + Addr: tt.opt.Address, + Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + + // backend hang + time.Sleep(tt.backendTime) + + w.WriteHeader(http.StatusAccepted) + w.Write([]byte("OK")) + }), + ReadTimeout: time.Second, + ReadHeaderTimeout: time.Second, + WriteTimeout: time.Second, + IdleTimeout: time.Second, + MaxHeaderBytes: 1000, + } + defer srv.Close() + + cm := &skpnet.ConnManager{ + Keepalive: 0, + KeepaliveRequests: 0, + Metrics: tt.opt.Metrics, + } + cm.Configure(srv) + + l, err := StackListener(tt.opt) + if err != nil { + if tt.want == nil { + t.Fatalf("Failed to create StackListener: %v", err) + } else { + // have err and want error + return + } + } + defer l.Close() + + go func() { + t.Logf("start server") + if err := srv.Serve(l); err != http.ErrServerClosed { + t.Logf("Server closed: %v", err) + } + return + }() + + dst := "http://" + l.Addr().String() + errCH := make(chan error) + go func(ch chan error) { + _, err = http.DefaultClient.Get(dst) + ch <- err + }(errCH) + + time.Sleep(500 * time.Millisecond) + // server init close + err = l.Close() + t.Logf("Close: %v", err) + + err = <-errCH + t.Logf("client err: %v", err) + // if !strings.Contains(err.Error(), "connection refused") { + // t.Errorf("Failed to get client err: %v", err) + // } + + if tt.opt.Metrics != nil { + if m, ok := tt.opt.Metrics.(*metricstest.MockMetrics); ok { + m.WithCounters(func(c map[string]int64) { + t.Logf("counters: %v", c) + + // assert.Equal(t, int64(1), c["listener.queued.timeouts"]) + // assert.Equal(t, int64(1), c["listener.accepted.connections"]) + }) + } + } + + http.DefaultClient.CloseIdleConnections() + + }) + } +} + +func TestTCPListenerStackListener(t *testing.T) { + addr := ":9090" + l, err := StackListener(Options{ + Network: "tcp", + Address: addr, + MaxConcurrency: 10000, + MaxQueueSize: 10000, + MemoryLimitBytes: 1000, + ConnectionBytes: 100, + QueueTimeout: time.Second, + Metrics: metrics.Default, + //Metrics: metrics.NoMetric{}, + }) + if err != nil { + t.Fatalf("Failed to create QueueListener: %v", err) + } + defer l.Close() + + srv := &http.Server{ + Addr: addr, + Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusAccepted) + w.Write([]byte("OK")) + }), + ReadTimeout: time.Second, + ReadHeaderTimeout: time.Second, + WriteTimeout: time.Second, + IdleTimeout: time.Second, + MaxHeaderBytes: 1000, + } + defer srv.Close() + + cm := &skpnet.ConnManager{ + Keepalive: 10 * time.Second, + KeepaliveRequests: 10, + Metrics: metrics.Default, + } + cm.Configure(srv) + + go func() { + if err := srv.Serve(l); err != http.ErrServerClosed { + log.Errorf("Serve failed: %v", err) + } + }() + + // check ready + err = fmt.Errorf("an error") + var rsp *http.Response + for err != nil { + rsp, err = http.DefaultClient.Get("http://" + l.Addr().String() + "/") + time.Sleep(time.Millisecond) + } + io.Copy(io.Discard, rsp.Body) + rsp.Body.Close() + http.DefaultClient.CloseIdleConnections() + + buf := []byte("GET / HTTP/1.1\r\nHost: localhost\r\n\r\n") + result := make([]byte, 1024) + + for n := 0; n < 2; n++ { + c, err := net.Dial("tcp", l.Addr().String()) + if err != nil { + t.Fatalf("Failed to dial: %v", err) + return + } + _, err = c.Write(buf) + if err != nil { + t.Fatalf("Failed to write: %v", err) + c.Close() + return + } + + n, err := c.Read(result) + if err != nil { + t.Fatalf("Failed to write: %v", err) + c.Close() + return + } + + if n != 124 { + t.Fatalf("Read %d bytes: %q", n, string(result[0:n])) + } + t.Logf("Read %d bytes: %q", n, string(result[0:n])) + c.Close() + + } +} + +func TestTCPListenerQueueListener(t *testing.T) { + addr := ":9090" + l, err := Listen(Options{ + Network: "tcp", + Address: addr, + MaxConcurrency: 10000, + MaxQueueSize: 10000, + MemoryLimitBytes: 1000, + ConnectionBytes: 100, + QueueTimeout: time.Second, + Metrics: metrics.Default, + }) + if err != nil { + t.Fatalf("Failed to create QueueListener: %v", err) + } + defer l.Close() + + srv := &http.Server{ + Addr: addr, + Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusAccepted) + w.Write([]byte("OK")) + }), + ReadTimeout: time.Second, + ReadHeaderTimeout: time.Second, + WriteTimeout: time.Second, + IdleTimeout: time.Second, + MaxHeaderBytes: 1000, + } + defer srv.Close() + + cm := &skpnet.ConnManager{ + Keepalive: 10 * time.Second, + KeepaliveRequests: 10, + Metrics: metrics.Default, + } + cm.Configure(srv) + + go func() { + if err := srv.Serve(l); err != http.ErrServerClosed { + log.Errorf("Serve failed: %v", err) + } + }() + + // check ready + err = fmt.Errorf("an error") + var rsp *http.Response + for err != nil { + rsp, err = http.DefaultClient.Get("http://" + l.Addr().String() + "/") + time.Sleep(time.Millisecond) + } + io.Copy(io.Discard, rsp.Body) + rsp.Body.Close() + http.DefaultClient.CloseIdleConnections() + + buf := []byte("GET / HTTP/1.1\r\nHost: localhost\r\n\r\n") + result := make([]byte, 1024) + + for n := 0; n < 2; n++ { + c, err := net.Dial("tcp", l.Addr().String()) + if err != nil { + t.Fatalf("Failed to dial: %v", err) + return + } + _, err = c.Write(buf) + if err != nil { + t.Fatalf("Failed to write: %v", err) + c.Close() + return + } + + n, err := c.Read(result) + if err != nil { + t.Fatalf("Failed to write: %v", err) + c.Close() + return + } + + if n != 124 { + t.Fatalf("Read %d bytes: %q", n, string(result[0:n])) + } + c.Close() + + } +} + +func BenchmarkStackListener(b *testing.B) { + maxprocs := runtime.GOMAXPROCS(-5) + stackAddr := fmt.Sprintf(":90%02d", maxprocs) + stackListener, err := StackListener(Options{ + Log: &noLog{}, + Network: "tcp", + Address: stackAddr, + MaxConcurrency: 10000, + MaxQueueSize: 10000, + MemoryLimitBytes: 1000, + ConnectionBytes: 100, + QueueTimeout: time.Second, + //Metrics: metrics.Default, + Metrics: metrics.NoMetric{}, + }) + if err != nil { + b.Fatalf("Failed to create StackListener: %v", err) + } + + benchmarkListener(b, stackAddr, stackListener) + stackListener.Close() + +} + +type noLog struct{} + +func (*noLog) Error(...interface{}) {} +func (*noLog) Errorf(string, ...interface{}) {} +func (*noLog) Warn(...interface{}) {} +func (*noLog) Warnf(string, ...interface{}) {} +func (*noLog) Info(...interface{}) {} +func (*noLog) Infof(string, ...interface{}) {} +func (*noLog) Debug(...interface{}) {} +func (*noLog) Debugf(string, ...interface{}) {} + +func BenchmarkQueueListener(b *testing.B) { + queueAddr := ":9090" + queueListener, err := Listen(Options{ + Log: &noLog{}, + Network: "tcp", + Address: queueAddr, + MaxConcurrency: 10000, + MaxQueueSize: 10000, + MemoryLimitBytes: 1000, + ConnectionBytes: 100, + QueueTimeout: time.Second, + //Metrics: metrics.Default, + Metrics: metrics.NoMetric{}, + }) + if err != nil { + b.Fatalf("Failed to create QueueListener: %v", err) + } + benchmarkListener(b, queueAddr, queueListener) + queueListener.Close() +} + +func benchmarkListener(b *testing.B, addr string, l net.Listener) { + srv := &http.Server{ + Addr: addr, + Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusAccepted) + w.Write([]byte("OK")) + }), + ReadTimeout: time.Second, + ReadHeaderTimeout: time.Second, + WriteTimeout: time.Second, + IdleTimeout: time.Second, + MaxHeaderBytes: 1000, + } + defer srv.Close() + + cm := &skpnet.ConnManager{ + Keepalive: 10 * time.Second, + KeepaliveRequests: 10, + Metrics: metrics.Default, + } + cm.Configure(srv) + + go func() { + if err := srv.Serve(l); err != http.ErrServerClosed { + log.Errorf("Serve failed: %v", err) + } + }() + + // check ready + err := fmt.Errorf("an error") + var rsp *http.Response + for err != nil { + rsp, err = http.DefaultClient.Get("http://" + l.Addr().String() + "/") + time.Sleep(time.Millisecond) + } + io.Copy(io.Discard, rsp.Body) + rsp.Body.Close() + http.DefaultClient.CloseIdleConnections() + + // tr := http.Transport{ + // DisableKeepAlives: true, + // } + // client := http.Client{ + // Transport: &tr, + // } + + buf := []byte("GET / HTTP/1.1\r\nHost: localhost\r\n\r\n") + result := make([]byte, 1024) + b.ResetTimer() + + for n := 0; n < b.N; n++ { + c, err := net.Dial("tcp", l.Addr().String()) + if err != nil { + b.Logf("Failed to dial: %v", err) + return + } + _, err = c.Write(buf) + if err != nil { + b.Logf("Failed to write: %v", err) + c.Close() + return + } + + n, err := c.Read(result) + if err != nil { + b.Logf("Failed to write: %v", err) + c.Close() + return + } + + if n != 124 { + b.Logf("Read %d bytes: %q", n, string(result[0:n])) + } + c.Close() + } + + /* + for n := 0; n < b.N; n++ { + // TODO(sszuecs): we should create connections TCP/IP + // and not use a pooled client + + rsp, err := client.Get("http://" + l.Addr().String() + "/") + if err != nil { + b.Fatalf("Failed to send request: %v", err) + } + if rsp.StatusCode != http.StatusAccepted { + b.Fatalf("Failed to get status code: %d != %d", rsp.StatusCode, http.StatusAccepted) + } + res, err := io.ReadAll(rsp.Body) + if result := string(res); result != "OK" { + b.Logf("Failed to get result: %q", result) + } + rsp.Body.Close() + } + client.CloseIdleConnections() + */ +} diff --git a/queuelistener/stack_other.go b/queuelistener/stack_other.go new file mode 100644 index 0000000000..a1743e6bee --- /dev/null +++ b/queuelistener/stack_other.go @@ -0,0 +1,118 @@ +package queuelistener + +import ( + "net" + "sync/atomic" + "unsafe" +) + +// https://englyk.com/book2/Lock-Free_Data_Structures/ + +// Node represents a node in the stack +type Node struct { + value any + next *Node +} + +// LockFreeStack represents a lock-free stack +type LockFreeStack struct { + head unsafe.Pointer // *Node +} + +// NewLockFreeStack creates a new lock-free stack +func NewLockFreeStack() *LockFreeStack { + return &LockFreeStack{} +} + +// Push adds a new value onto the stack +func (s *LockFreeStack) Push(value interface{}) { + newNode := &Node{value: value} + for { + oldHead := atomic.LoadPointer(&s.head) + newNode.next = (*Node)(oldHead) + if atomic.CompareAndSwapPointer(&s.head, oldHead, unsafe.Pointer(newNode)) { + break + } + } +} + +// Pop removes and returns the value from the top of the stack +func (s *LockFreeStack) Pop() (value any, ok bool) { + for { + oldHead := atomic.LoadPointer(&s.head) + if oldHead == nil { + return nil, false // Stack is empty + } + newHead := (*Node)(oldHead).next + if atomic.CompareAndSwapPointer(&s.head, oldHead, unsafe.Pointer(newHead)) { + return (*Node)(oldHead).value, true + } + } +} + +// TODO: Elimination-Backoff Stack + +// TODO: Semantic Relaxation and Elastic Designs +// highly concurrent systems where minor deviations from strict LIFO order are acceptable in exchange for significant performance gains + +// TODO Semantically Relaxed Stack + +// TODO Wait-Free Stack: Goel Stack + +// TODO Wait-Free Stack: SIM Stack + +// Treiber Stack +type node[T any] struct { + val T + next *node[T] +} +type treiberStack[T any] struct { + head atomic.Pointer[node[T]] // node[T] +} + +func NewTreiberStack() *treiberStack[net.Conn] { + return &treiberStack[net.Conn]{} +} + +func (s *treiberStack[T]) Push(data T) { + newHead := node[T]{ + val: data, + next: s.head.Load(), + } + if s.head.CompareAndSwap(newHead.next, &newHead) { + return + } else { + s.Push(data) + } +} + +func (s *treiberStack[T]) Pop() *T { + oldHead := s.head.Load() // maybe panic + if oldHead == nil { + return nil + } + if s.head.CompareAndSwap(oldHead, oldHead.next) { + return &oldHead.val + } else { + return s.Pop() + } +} + +// https://en.wikipedia.org/wiki/Treiber_stack +/* + class LockFreeStack { + private class Node(val value: T, val next: Node?) + + private val head = AtomicReference?>(null) + + tailrec fun push(value: T) { + val newHead = Node(value = value, next = head.get()) + if (head.compareAndSet(newHead.next, newHead)) return else push(value) + } + + tailrec fun pop(): T { + val oldHead = head.get() ?: throw NoSuchElementException("Stack is empty") + return if (head.compareAndSet(oldHead, oldHead.next)) oldHead.value else pop() + } + } +*/ diff --git a/queuelistener/stack_other_test.go b/queuelistener/stack_other_test.go new file mode 100644 index 0000000000..ebb06c8cac --- /dev/null +++ b/queuelistener/stack_other_test.go @@ -0,0 +1,133 @@ +package queuelistener + +import ( + "net" + "testing" + + "github.com/amirylm/lockfree/core" + lstack "github.com/amirylm/lockfree/stack" + "github.com/golang-collections/collections/stack" +) + +func TestTreiberStack(t *testing.T) { + var c1, c2, c3 net.Conn + s := NewTreiberStack() + if v := s.Pop(); v != nil { + t.Fatalf("Failed to get nil from empty stack: %v", v) + } + + s.Push(c1) + s.Push(c2) + s.Push(c3) + if v := s.Pop(); *v != c3 { + t.Fatalf("Failed to get c3 from stack: %v", v) + } + if v := s.Pop(); *v != c2 { + t.Fatalf("Failed to get c2 from stack: %v", v) + } + if v := s.Pop(); *v != c1 { + t.Fatalf("Failed to get c1 from stack: %v", v) + } + + if v := s.Pop(); v != nil { + t.Fatalf("Failed to get nil from empty stack: %v", v) + } +} + +// https://pkg.go.dev/github.com/golang-collections/collections/stack + +func BenchmarkGoStack(b *testing.B) { + gostack := stack.New() + for n := 0; n < b.N; n++ { + gostack.Push(n) + k := gostack.Pop() + if k != n { + b.Fatalf("%d != %d", k, n) + } + } +} + +// https://github.com/amirylm/lockfree/blob/main/stack/stack.go + +func BenchmarkAmirylmLockFreeStack(b *testing.B) { + // func WithCapacity(c int) options.Option[Options] { + // func New[Value any](opts ...options.Option[core.Options]) core.Stack[Value] { + var c *net.Conn + llstack := lstack.New[*net.Conn](core.WithCapacity(stackSize)) + for n := 0; n < b.N; n++ { + llstack.Push(c) + k, _ := llstack.Pop() + if k != c { + b.Fatalf("%p != %p", k, c) + } + } +} + +func BenchmarkLockFreeStack(b *testing.B) { + var c *net.Conn + llstack := NewLockFreeStack() + for n := 0; n < b.N; n++ { + llstack.Push(c) + k, _ := llstack.Pop() + if k != c { + b.Fatalf("%p != %p", k, c) + } + } +} + +func BenchmarkTreiberStack(b *testing.B) { + var c net.Conn + mystack := NewTreiberStack() + for n := 0; n < b.N; n++ { + mystack.Push(c) + k := mystack.Pop() + if *k != c { + b.Fatalf("%p != %p", k, &c) + } + } +} + +func BenchmarkAmirylmLockFreeStackParallel(b *testing.B) { + // func WithCapacity(c int) options.Option[Options] { + // func New[Value any](opts ...options.Option[core.Options]) core.Stack[Value] { + var c *net.Conn + llstack := lstack.New[*net.Conn](core.WithCapacity(stackSize)) + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + llstack.Push(c) + k, _ := llstack.Pop() + if k != c { + b.Fatalf("%p != %p", k, c) + } + } + }) +} + +func BenchmarkLockFreeStackParallel(b *testing.B) { + var c *net.Conn + llstack := NewLockFreeStack() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + llstack.Push(c) + k, _ := llstack.Pop() + if k != c { + b.Fatalf("%p != %p", k, c) + } + } + }) +} + +func BenchmarkTreiberStackParallel(b *testing.B) { + var c net.Conn + mystack := NewTreiberStack() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + mystack.Push(c) + k := mystack.Pop() + if *k != c { + b.Fatalf("%v != %v", k, c) + } + + } + }) +} diff --git a/queuelistener/stack_test.go b/queuelistener/stack_test.go new file mode 100644 index 0000000000..85cd2be5bd --- /dev/null +++ b/queuelistener/stack_test.go @@ -0,0 +1,132 @@ +package queuelistener + +import ( + "net" + "testing" + "time" +) + +func TestNaiveStack(t *testing.T) { + var ( + c net.Conn + a [5]*external + ) + for i := range len(a) { + a[i] = &external{Conn: c} + } + for i := range len(a) { + if a[i] == nil { + t.Fatalf("a[%d] should not be nil", i) + } + } + + t.Run("pop before push should return nil until we get result", func(t *testing.T) { + s := NewStack() + + ch := make(chan struct{}) + dataCH := make(chan *external) + go func() { + <-ch + s.Push(a[0]) + }() + go func() { + v := s.Pop() + for v == nil { + time.Sleep(time.Millisecond) + v = s.Pop() + } + dataCH <- v + }() + close(ch) + if v := <-dataCH; v != a[0] { + t.Fatalf("Failed to get item from stack: %v", v) + } + }) + + t.Run("push pop push pop push pop ... should work", func(t *testing.T) { + s := NewStack() + // push pop; push pop; .. + for i := range len(a) { + s.Push(a[i]) + if v := s.Pop(); v != a[i] { + t.Fatalf("Failed to get %d from stack: %v", i, v) + } + } + }) + + t.Run("push push push .. pop pop pop... should work", func(t *testing.T) { + s := NewStack() + // push push ..; pop; pop;.. + for i := range len(a) { + s.Push(a[i]) + } + for i := range len(a) { + if v := s.Pop(); v != a[len(a)-i-1] { + t.Fatalf("Failed to get a[%d] from stack: %v", len(a)-i-1, v) + } + } + }) + + t.Run("test push max should return without change", func(t *testing.T) { + s := NewStack() + s.top = len(s.items) - 2 + + for i := range len(a) { + s.Push(a[i]) // only first will be pushed on our stack rest will be ignored + } + + if v := s.Pop(); v != a[0] { + t.Fatalf("Failed to get a[0] from stack: %v", v) + } + }) +} + +func BenchmarkNaiveStack(b *testing.B) { + var ( + c net.Conn + a [5]*external + ) + for i := range len(a) { + a[i] = &external{Conn: c} + } + + mystack := NewStack() + for n := 0; n < b.N; n++ { + for i := range len(a) { + mystack.Push(a[i]) + } + + for i := range len(a) { + k := mystack.Pop() + if k != a[len(a)-1-i] { + b.Fatalf("%p != %p", k, a[len(a)-1-i]) + } + } + } +} + +func BenchmarkNaiveStackParallel(b *testing.B) { + var ( + c net.Conn + a [5]*external + ) + for i := range 5 { + a[i] = &external{Conn: c} + } + + mystack := NewStack() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + for i := range len(a) { + mystack.Push(a[i]) + } + + for range len(a) { + k := mystack.Pop() + if k == nil { + b.Fatalf("%v", k) + } + } + } + }) +} diff --git a/skipper.go b/skipper.go index 944d282138..53df2e0c22 100644 --- a/skipper.go +++ b/skipper.go @@ -362,6 +362,9 @@ type Options struct { // by the proxy are closed. CloseIdleConnsPeriod time.Duration + // TCPQueueTimeoutServer is the timeout for the accept() handling in case StackListener is used + TCPQueueTimeoutServer time.Duration + // Defines ReadTimeoutServer for server http connections. ReadTimeoutServer time.Duration @@ -1336,12 +1339,13 @@ func listen(o *Options, address string, mtr metrics.Metrics) (net.Listener, erro } } - qto := o.ReadHeaderTimeoutServer + qto := o.TCPQueueTimeoutServer if qto <= 0 { - qto = o.ReadTimeoutServer + qto = time.Second } - return queuelistener.Listen(queuelistener.Options{ + return queuelistener.StackListener(queuelistener.Options{ + //return queuelistener.Listen(queuelistener.Options{ Network: "tcp", Address: address, MaxConcurrency: o.MaxTCPListenerConcurrency, diff --git a/skipper_test.go b/skipper_test.go index 051434ab29..0d21f720cc 100644 --- a/skipper_test.go +++ b/skipper_test.go @@ -526,6 +526,7 @@ func TestDataClients(t *testing.T) { ExpectedBytesPerRequest: 1024, ReadHeaderTimeoutServer: 0, ReadTimeoutServer: 1 * time.Second, + TCPQueueTimeoutServer: 1 * time.Second, MetricsFlavours: []string{"codahale"}, EnablePrometheusMetrics: true, LoadBalancerHealthCheckInterval: 3 * time.Second,