-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathconnection.go
177 lines (146 loc) · 3.57 KB
/
connection.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
package server
import (
"bufio"
"crypto/tls"
"fmt"
"io"
"net"
"runtime/debug"
"sync"
"time"
"github.com/remerge/cue"
)
type Connection struct {
net.Conn
Server *Server
LimitReader io.LimitedReader
Buffer bufio.ReadWriter
closeMutex sync.Mutex
}
// NoLimit is an effective infinite upper bound for io.LimitedReader
const NoLimit int64 = (1 << 63) - 1
var connectionPool sync.Pool
func (server *Server) NewConnection(conn net.Conn) *Connection {
c := newConnection()
c.Conn = conn
c.Server = server
c.LimitReader.R = conn
c.LimitReader.N = NoLimit
br := newBufioReader(&c.LimitReader, server.BufferSize)
bw := newBufioWriter(conn, server.BufferSize)
c.Buffer.Reader = br
c.Buffer.Writer = bw
c.Server.numConns.Inc(1)
c.Server.connectionsMutex.Lock()
defer c.Server.connectionsMutex.Unlock()
c.Server.connections[c] = struct{}{}
return c
}
func newConnection() *Connection {
if v := connectionPool.Get(); v != nil {
return v.(*Connection)
}
return &Connection{}
}
func putConnection(c *Connection) {
c.Server.numConns.Dec(1)
c.Server.connectionsMutex.Lock()
delete(c.Server.connections, c)
c.Server.connectionsMutex.Unlock()
c.Conn = nil
c.Server = nil
c.LimitReader.R = nil
c.LimitReader.N = 0
if c.Buffer.Reader != nil {
putBufioReader(c.Buffer.Reader)
c.Buffer.Reader = nil
}
if c.Buffer.Writer != nil {
putBufioWriter(c.Buffer.Writer)
c.Buffer.Writer = nil
}
connectionPool.Put(c)
}
var (
bufioReaderPool sync.Pool
bufioWriterPool sync.Pool
)
func newBufioReader(r io.Reader, size int) *bufio.Reader {
if v := bufioReaderPool.Get(); v != nil {
br := v.(*bufio.Reader)
br.Reset(r)
return br
}
return bufio.NewReaderSize(r, size)
}
func putBufioReader(br *bufio.Reader) {
br.Reset(nil)
bufioReaderPool.Put(br)
}
func newBufioWriter(w io.Writer, size int) *bufio.Writer {
if v := bufioWriterPool.Get(); v != nil {
bw := v.(*bufio.Writer)
bw.Reset(w)
return bw
}
return bufio.NewWriterSize(w, size)
}
func putBufioWriter(bw *bufio.Writer) {
bw.Reset(nil)
bufioWriterPool.Put(bw)
}
func (c *Connection) Serve() {
defer func() {
if err := recover(); err != nil {
fmt.Printf("unhandled panic: %v\n", err)
debug.PrintStack()
c.Server.Log.WithFields(cue.Fields{
"person_id": c.Conn.RemoteAddr().String(),
}).Panic(err, "unhandled server connection error")
}
c.Close()
}()
if tlsConn, ok := c.Conn.(*tls.Conn); ok {
if err := tlsConn.Handshake(); err != nil {
c.Server.tlsErrors.Inc(1)
c.Server.numHandshakes.Dec(1)
return
}
c.Server.numHandshakes.Dec(1)
}
// reset deadline before handle
if err := c.Conn.SetDeadline(time.Time{}); err != nil {
return
}
c.Server.Handler.Handle(c)
}
func (c *Connection) closeInternal() {
// prevent double close
if c.Conn == nil {
return
}
if c.Server != nil {
c.Server.closes.Inc(1)
}
if err := c.Conn.SetDeadline(time.Now().Add(c.Server.Timeout)); err != nil {
_ = c.Conn.Close()
return
}
// flush write buffer before close
if c.Buffer.Writer != nil {
_ = c.Buffer.Writer.Flush()
}
_ = c.Conn.Close()
}
// Close - closes the underlying connection and puts it back in the pool
// IMPORTANT: this should NEVER be called twice as it is not go routine safe:
// The connection is put back in the pool and might be taken and reinitialized by
// another go routine. If Close() is called a second time it will modify the connection
// that is potentially already in use in a different go routine
func (c *Connection) Close() {
c.closeMutex.Lock()
c.closeInternal()
c.closeMutex.Unlock()
// put connection back into pool
putConnection(c)
}