diff --git a/packages/ssestream/ssestream.go b/packages/ssestream/ssestream.go index 777e2197..a27862ed 100644 --- a/packages/ssestream/ssestream.go +++ b/packages/ssestream/ssestream.go @@ -150,10 +150,31 @@ func (s *Stream[T]) Next() bool { } for s.decoder.Next() { - switch s.decoder.Event().Type { + eventType := s.decoder.Event().Type + eventData := s.decoder.Event().Data + + // Check for [DONE] message which indicates end of stream + if string(eventData) == "[DONE]\n" || string(eventData) == "[DONE]" { + // [DONE] indicates the end of the stream, so we return false + return false + } + + // If event type is empty, try to infer from data or default to processing + if eventType == "" { + // For Anthropic's SSE format, data events without explicit type should still be processed + var nxt T + s.err = json.Unmarshal(eventData, &nxt) + if s.err != nil { + return false + } + s.cur = nxt + return true + } + + switch eventType { case "completion": var nxt T - s.err = json.Unmarshal(s.decoder.Event().Data, &nxt) + s.err = json.Unmarshal(eventData, &nxt) if s.err != nil { return false } @@ -161,7 +182,7 @@ func (s *Stream[T]) Next() bool { return true case "message_start", "message_delta", "message_stop", "content_block_start", "content_block_delta", "content_block_stop": var nxt T - s.err = json.Unmarshal(s.decoder.Event().Data, &nxt) + s.err = json.Unmarshal(eventData, &nxt) if s.err != nil { return false } @@ -170,7 +191,7 @@ func (s *Stream[T]) Next() bool { case "ping": continue case "error": - s.err = fmt.Errorf("received error while streaming: %s", string(s.decoder.Event().Data)) + s.err = fmt.Errorf("received error while streaming: %s", string(eventData)) return false } }