-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathasync_reader.go
105 lines (97 loc) · 2.1 KB
/
async_reader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
package extio
import (
"io"
"sync"
)
type (
// An AsyncReader takes an io.Reader and buffers it in a goroutine
// subsequent Read([]byte) calls are populated from buffers sent over
// an internal channel.
AsyncReader struct {
r io.Reader
c chan segment
abort chan struct{}
bufs sync.Pool
buf []byte
BufferSize int
ChannelSize int
}
segment struct {
b []byte
err error
}
)
// NewAsyncReader creates a new AsyncReader from the supplied io.Reader
// and populates it with defaults
func NewAsyncReader(r io.Reader) *AsyncReader {
return &AsyncReader{
r: r,
abort: make(chan struct{}),
BufferSize: 2 << 20,
ChannelSize: 32,
}
}
// Start initializes the goroutine that buffers data from the io.Reader
func (ar *AsyncReader) Start() {
ar.c = make(chan segment, ar.ChannelSize)
ar.bufs = sync.Pool{New: func() interface{} { return make([]byte, ar.BufferSize) }}
go func() {
defer close(ar.c)
for {
buf := ar.bufs.Get().([]byte)
n, err := io.ReadFull(ar.r, buf)
select {
case <-ar.abort:
return
case ar.c <- segment{b: buf[:n], err: err}:
}
if err != nil {
// includes io.EOF
return
}
}
}()
}
// Read takes a byte slice and copies bytes into it
// and returns number of bytes read and any error encountered.
// Will emit io.EOF at completion.
func (ar *AsyncReader) Read(b []byte) (int, error) {
var (
s segment
open bool
)
LOOP:
for len(ar.buf) < len(b) {
select {
case <-ar.abort:
return 0, nil
case s, open = <-ar.c:
if !open {
break LOOP
}
if s.err != nil && s.err != io.EOF && s.err != io.ErrUnexpectedEOF {
return 0, s.err
}
ar.buf = append(ar.buf, s.b...)
ar.bufs.Put(s.b)
}
}
if len(ar.buf) > len(b) {
n := copy(b, ar.buf[:len(b)])
l := copy(ar.buf[0:], ar.buf[n:])
ar.buf = ar.buf[:l]
return n, nil
}
if len(ar.buf) > 0 {
n := copy(b, ar.buf)
ar.buf = ar.buf[:0]
return n, nil
}
return 0, io.EOF
}
// Close aborts the buffering goroutine and
// emits no more data on subsequent Read([]byte) calls
func (ar *AsyncReader) Close() error {
close(ar.abort)
return nil
}