diff --git a/event.go b/event.go index 1258038..ba48eba 100644 --- a/event.go +++ b/event.go @@ -37,15 +37,19 @@ func NewEventStreamReader(eventStream io.Reader, maxBufferSize int) *EventStream initBufferSize := minPosInt(4096, maxBufferSize) scanner.Buffer(make([]byte, initBufferSize), maxBufferSize) + // this ensures we don't keep checking data we've already scanned within one Split + newDataIndex := 0 split := func(data []byte, atEOF bool) (int, []byte, error) { if atEOF && len(data) == 0 { return 0, nil, nil } // We have a full event payload to parse. - if i, nlen := containsDoubleNewline(data); i >= 0 { + if i, nlen := containsDoubleNewline(data, newDataIndex); i >= 0 { + newDataIndex = 0 // reset for next token return i + nlen, data[0:i], nil } + newDataIndex = len(data) // we've already scanned the entire data up to this point // If we're at EOF, we have all of the data. if atEOF { return len(data), data, nil @@ -64,16 +68,38 @@ func NewEventStreamReader(eventStream io.Reader, maxBufferSize int) *EventStream // Returns a tuple containing the index of a double newline, and the number of bytes // represented by that sequence. If no double newline is present, the first value // will be negative. -func containsDoubleNewline(data []byte) (int, int) { +func containsDoubleNewline(data []byte, newDataIndex int) (int, int) { + // Look back of the length of the longest search string to start from the end of the data that + // we looked at already minus the smallest length that could contain the search string from + // backtracking. this prevents n^2 lookups if data repeatedly does not contain the search + // string and slowly grows, especially to a large size. + lookBackStart := max(0, newDataIndex-3) // len(cr lf cr lf) - 1 + lookBack := data[lookBackStart:] + // Search for each potentially valid sequence of newline characters - crcr := bytes.Index(data, []byte("\r\r")) - lflf := bytes.Index(data, []byte("\n\n")) - crlflf := bytes.Index(data, []byte("\r\n\n")) - lfcrlf := bytes.Index(data, []byte("\n\r\n")) - crlfcrlf := bytes.Index(data, []byte("\r\n\r\n")) + crcr := bytes.Index(lookBack, []byte("\r\r")) + if crcr >= 0 { + crcr += lookBackStart + } + lflf := bytes.Index(lookBack, []byte("\n\n")) + if lflf >= 0 { + lflf += lookBackStart + } + crlflf := bytes.Index(lookBack, []byte("\r\n\n")) + if crlflf >= 0 { + crlflf += lookBackStart + } + lfcrlf := bytes.Index(lookBack, []byte("\n\r\n")) + if lfcrlf >= 0 { + lfcrlf += lookBackStart + } + crlfcrlf := bytes.Index(lookBack, []byte("\r\n\r\n")) + if crlfcrlf >= 0 { + crlfcrlf += lookBackStart + } // Find the earliest position of a double newline combination minPos := minPosInt(crcr, minPosInt(lflf, minPosInt(crlflf, minPosInt(lfcrlf, crlfcrlf)))) - // Detemine the length of the sequence + // Determine the length of the sequence nlen := 2 if minPos == crlfcrlf { nlen = 4 @@ -98,6 +124,15 @@ func minPosInt(a, b int) int { return a } +// max returns the max integer between the two inputs +// TODO remove when min supported go version is 1.21, as max is now a built in function +func max(a, b int) int { + if a > b { + return a + } + return b +} + // ReadEvent scans the EventStream for events. func (e *EventStreamReader) ReadEvent() ([]byte, error) { if e.scanner.Scan() {