-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathscanner_writer.go
130 lines (100 loc) · 2.56 KB
/
scanner_writer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package extio
import (
"bufio"
"io"
)
type (
// ScannerWriter satisfies the io.WriteCloser interface and
// turns a series of writes into a stream of tokens that can
// be processed by a callback.
ScannerWriter struct {
buf []byte
maxBufSize int
closed bool
splitFunc bufio.SplitFunc
tokenFunc func(token []byte) error
}
)
// NewScannerWriter creates a new ScannerWriter. Arguments are
// a function that satifies the bufio.SplitFunc type. This is
// used to parse the incoming byte stream. A maxBufSize, which
// determines how far to read into the byte stream without finding
// a token, before throwing an io.ErrShortBuffer. And a tokenFunc
// that takes the next token identified by splitFunc, and returns
// an error. An error returned by a splitFunc is returned to the
// caller of Write().
func NewScannerWriter(splitFunc bufio.SplitFunc, maxBufSize int, tokenFunc func([]byte) error) *ScannerWriter {
return &ScannerWriter{
splitFunc: splitFunc,
tokenFunc: tokenFunc,
maxBufSize: maxBufSize,
}
}
// Write writes the contents of data to the buffer and immediately
// parses the buffer for as many tokens as splitFunc identifies.
// Any remaining data is left in the buffer until the next Write
// or Flush. Returns number of bytes written and any error.
func (sc *ScannerWriter) Write(data []byte) (int, error) {
if sc.closed {
return 0, ErrClosed
}
dataLen := len(data)
if sc.buf != nil {
data = append(sc.buf, data...)
sc.buf = nil
}
for len(data) > 0 {
adv, token, err := sc.splitFunc(data, false)
if err != nil {
return 0, err
}
if token == nil {
if adv == 0 {
if len(sc.buf)+len(data) > sc.maxBufSize {
return 0, io.ErrShortBuffer
}
sc.buf = append(sc.buf, data...)
return dataLen, nil
}
} else if err := sc.tokenFunc(token); err != nil {
return 0, err
}
if adv > 0 {
data = data[adv:]
}
}
return dataLen, nil
}
// Flush fluses the contents of the buffer to the splitFunc
// signalling EOF.
func (sc *ScannerWriter) Flush() error {
if sc.closed {
return ErrClosed
}
if len(sc.buf) == 0 {
return nil
}
_, token, err := sc.splitFunc(sc.buf, true)
if err != nil {
return err
}
sc.buf = nil
if len(token) > 0 {
if err := sc.tokenFunc(token); err != nil {
return err
}
}
return nil
}
// Close closes the ScannerWriter after calling Flush().
// Any subsequent writes will return ErrClosed.
func (sc *ScannerWriter) Close() error {
if sc.closed {
return ErrClosed
}
if err := sc.Flush(); err != nil {
return err
}
sc.closed = true
return nil
}