This repository has been archived by the owner on May 29, 2020. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathclient.go
457 lines (400 loc) · 9.64 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
package synapse
import (
"crypto/tls"
"errors"
"fmt"
"net"
"sync"
"sync/atomic"
"time"
"github.com/philhofer/fwd"
"github.com/tinylib/msgp/msgp"
"github.com/tinylib/synapse/sema"
)
const (
// waiter "high water mark"
// TODO(maybe): make this adjustable.
waiterHWM = 32
)
const (
clientClosed = iota
clientOpen
)
var (
// ErrClosed is returns when a call is attempted
// on a closed client.
ErrClosed = errors.New("synapse: client is closed")
// ErrTimeout is returned when a server
// doesn't respond to a request before
// the client's timeout scavenger can
// free the waiting goroutine
ErrTimeout = errors.New("synapse: the server didn't respond in time")
// ErrTooLarge is returned when the message
// size is larger than 65,535 bytes.
ErrTooLarge = errors.New("synapse: message body too large")
)
// Dial creates a new client by dialing
// the provided network and remote address.
// The provided timeout is used as the timeout
// for requests, in milliseconds.
func Dial(network string, raddr string, timeout time.Duration) (*Client, error) {
conn, err := net.Dial(network, raddr)
if err != nil {
return nil, err
}
return NewClient(conn, timeout)
}
// DialTLS acts identically to Dial, except that it dials the connection
// over TLS using the provided *tls.Config.
func DialTLS(network, raddr string, timeout time.Duration, config *tls.Config) (*Client, error) {
conn, err := tls.Dial(network, raddr, config)
if err != nil {
return nil, err
}
return NewClient(conn, timeout)
}
// NewClient creates a new client from an
// existing net.Conn. Timeout is the maximum time,
// in milliseconds, to wait for server responses
// before sending an error to the caller. NewClient
// fails with an error if it cannot ping the server
// over the connection.
func NewClient(c net.Conn, timeout time.Duration) (*Client, error) {
cl := &Client{
conn: c,
writing: make(chan *waiter, waiterHWM),
done: make(chan struct{}),
state: clientOpen,
}
go cl.readLoop()
go cl.writeLoop()
go cl.timeoutLoop(timeout)
// do a ping to check
// for sanity
err := cl.ping()
if err != nil {
cl.Close()
return nil, fmt.Errorf("synapse: ping failed: %s", err)
}
return cl, nil
}
// Client is a client to
// a single synapse server.
type Client struct {
conn net.Conn // connection
wlock sync.Mutex // write lock
csn uint64 // sequence number; atomic
writing chan *waiter // queue to write to conn; size is effectively HWM
done chan struct{} // closed during (*Client).Close to shut down timeoutloop
wg sync.WaitGroup // outstanding client procs
state uint32 // open, closed, etc.
pending wMap // map seq number to waiting handler
}
// used to transfer control
// flow to blocking goroutines
type waiter struct {
next *waiter // next in linked list, or nil
parent *Client // parent *client
seq uint64 // sequence number
done sema.Point // for notifying response
err error // response error on wakeup, if applicable
in []byte // response body
reap bool // can reap for timeout
static bool // is part of the statically allocated arena
}
// Close idempotently closes the
// client's connection to the server.
// Close returns once every waiting
// goroutine has either received a
// response or timed out; goroutines
// with requests in progress are not interrupted.
func (c *Client) Close() error {
// there can only be one winner of the Close() race
if !atomic.CompareAndSwapUint32(&c.state, clientOpen, clientClosed) {
return ErrClosed
}
c.wg.Wait()
close(c.done)
close(c.writing)
return c.conn.Close()
}
// close with error
// sets the status of every waiting
// goroutine to 'err' and unblocks it.
func (c *Client) closeError(err error) {
if !atomic.CompareAndSwapUint32(&c.state, clientOpen, clientClosed) {
return
}
err = fmt.Errorf("synapse: fatal error: %s", err)
// we can't actually guarantee that we will preempt
// every goroutine, but we can try.
c.pending.flush(err)
for c.pending.length() > 0 {
c.pending.flush(err)
}
c.wg.Wait()
close(c.done)
close(c.writing)
c.conn.Close()
}
// a handler for io.Read() and io.Write(),
// e.g.
// if !c.do(w.Write(data)) { goto fail }
func (c *Client) do(_ int, err error) bool {
if err != nil {
c.closeError(err)
return false
}
return true
}
// readLoop continuously polls
// the connection for server
// responses. responses are then
// filled into the appropriate
// waiter's input buffer. it returns
// on the first error returned by Read()
func (c *Client) readLoop() {
var seq uint64
var sz int
var frame fType
var lead [leadSize]byte
bwr := fwd.NewReaderSize(c.conn, 4096)
for {
if !c.do(bwr.ReadFull(lead[:])) {
return
}
seq, frame, sz = readFrame(lead)
// only accept fCMD and fRES frames;
// they are routed to waiters
// precisely the same way
if frame != fCMD && frame != fRES {
// ignore
if !c.do(bwr.Skip(sz)) {
return
}
continue
}
w := c.pending.remove(seq)
if w == nil {
if !c.do(bwr.Skip(sz)) {
return
}
continue
}
// fill the waiters input
// buffer and then notify
if cap(w.in) >= sz {
w.in = w.in[:sz]
} else {
w.in = make([]byte, sz)
}
if !c.do(bwr.ReadFull(w.in)) {
return
}
// wakeup waiter w/
// error from last
// read call (usually nil)
w.err = nil
sema.Wake(&w.done)
}
}
// once every 'msec' milliseconds, reap
// every pending item with reap=true, and
// set all others to reap=true.
//
// NOTE(pmh): this doesn't actually guarantee
// de-queing after the given duration;
// rather, it limits max wait time to 2*msec.
func (c *Client) timeoutLoop(d time.Duration) {
tick := time.Tick(d)
for {
select {
case <-c.done:
return
case <-tick:
c.pending.reap()
}
}
}
func (c *Client) writeLoop() {
bwr := fwd.NewWriterSize(c.conn, 4096)
for {
wt, ok := <-c.writing
if !ok {
// this is the "normal"
// exit point for this
// goroutine.
return
}
if !c.do(bwr.Write(wt.in)) {
return
}
more:
select {
case another, ok := <-c.writing:
if ok {
if !c.do(bwr.Write(another.in)) {
return
}
goto more
} else {
bwr.Flush()
return
}
default:
if !c.do(0, bwr.Flush()) {
return
}
}
}
}
// write a command to the connection - works
// similarly to standard write()
func (w *waiter) writeCommand(cmd command, msg []byte) error {
w.parent.wg.Add(1)
if atomic.LoadUint32(&w.parent.state) == clientClosed {
return ErrClosed
}
seqn := atomic.AddUint64(&w.parent.csn, 1)
cmdlen := len(msg) + 1
if cmdlen > maxMessageSize {
return ErrTooLarge
}
// write frame + message
need := leadSize + cmdlen
if cap(w.in) >= need {
w.in = w.in[:need]
} else {
w.in = make([]byte, need)
}
putFrame(w.in, seqn, fCMD, cmdlen)
w.in[leadSize] = byte(cmd)
copy(w.in[leadSize+1:], msg)
w.writebody(seqn)
return nil
}
func (w *waiter) writebody(seq uint64) {
w.seq = seq
w.reap = false
p := w.parent
p.pending.insert(w)
p.writing <- w
}
func (w *waiter) write(method Method, in msgp.Marshaler) error {
w.parent.wg.Add(1)
if atomic.LoadUint32(&w.parent.state) == clientClosed {
return ErrClosed
}
sn := atomic.AddUint64(&w.parent.csn, 1)
var err error
// save bytes up front
if cap(w.in) < leadSize {
w.in = make([]byte, leadSize, 256)
} else {
w.in = w.in[:leadSize]
}
// write body
w.in = msgp.AppendUint32(w.in, uint32(method))
// handle nil body
if in != nil {
w.in, err = in.MarshalMsg(w.in)
if err != nil {
return err
}
} else {
w.in = msgp.AppendMapHeader(w.in, 0)
}
// raw request body
olen := len(w.in) - leadSize
if olen > maxMessageSize {
return ErrTooLarge
}
putFrame(w.in, sn, fREQ, olen)
w.writebody(sn)
return nil
}
func (w *waiter) read(out msgp.Unmarshaler) error {
code, body, err := msgp.ReadIntBytes(w.in)
if err != nil {
return err
}
if Status(code) != StatusOK {
str, _, err := msgp.ReadStringBytes(w.in)
if err != nil {
str = "<?>"
}
return &ResponseError{Code: Status(code), Expl: str}
}
if out != nil {
_, err = out.UnmarshalMsg(body)
}
return err
}
func (w *waiter) call(method Method, in msgp.Marshaler, out msgp.Unmarshaler) error {
err := w.write(method, in)
if err != nil {
w.parent.wg.Done()
return err
}
// wait for response
sema.Wait(&w.done)
w.parent.wg.Done()
if w.err != nil {
return w.err
}
return w.read(out)
}
// Call sends a request to the server with 'in' as the body,
// and then decodes the response into 'out'. Call is safe
// to call from multiple goroutines simultaneously.
func (c *Client) Call(method Method, in msgp.Marshaler, out msgp.Unmarshaler) error {
w := waiters.pop(c)
err := w.call(method, in, out)
waiters.push(w)
return err
}
// errors specific to commands
var (
errNoCmd = errors.New("synapse: no response CMD code")
errInvalidCmd = errors.New("synapse: invalid command")
errUnknownCmd = errors.New("synapse: unknown command")
)
// doCommand executes a command from the client
func (c *Client) sendCommand(cmd command, msg []byte) error {
w := waiters.pop(c)
err := w.writeCommand(cmd, msg)
if err != nil {
c.wg.Done()
return err
}
// wait
sema.Wait(&w.done)
c.wg.Done()
if w.err != nil {
return w.err
}
// bad response
if len(w.in) == 0 {
waiters.push(w)
return errNoCmd
}
ret := command(w.in[0])
if ret == cmdInvalid || ret >= _maxcommand {
waiters.push(w)
return errInvalidCmd
}
act := cmdDirectory[ret]
if act == nil {
waiters.push(w)
return errUnknownCmd
}
act.Client(c, w.in[1:])
waiters.push(w)
return nil
}
// perform the ping command;
// returns an error if the server
// didn't respond appropriately
func (c *Client) ping() error {
return c.sendCommand(cmdPing, nil)
}