Skip to content

Commit 045c06c

Browse files
moved SSE back to a folder + added updated files to .fernignore
1 parent 8bf7866 commit 045c06c

File tree

9 files changed

+232
-225
lines changed

9 files changed

+232
-225
lines changed

.fernignore

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,4 +16,6 @@ src/cohere/aws_client.py
1616
src/cohere/sagemaker_client.py
1717
src/cohere/client_v2.py
1818
mypy.ini
19-
src/cohere/aliases.py
19+
src/cohere/aliases.py
20+
src/cohere/v2/raw_client.py # remove when SSE updates are released
21+
src/cohere/core/http_sse/* # remove when SSE updates are released

src/cohere/core/http_sse.py

Lines changed: 0 additions & 223 deletions
This file was deleted.
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
from ._api import EventSource, aconnect_sse, connect_sse
2+
from ._exceptions import SSEError
3+
from ._models import ServerSentEvent
4+
5+
__version__ = "0.4.1"
6+
7+
__all__ = [
8+
"__version__",
9+
"EventSource",
10+
"connect_sse",
11+
"aconnect_sse",
12+
"ServerSentEvent",
13+
"SSEError",
14+
]

src/cohere/core/http_sse/_api.py

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
from collections.abc import AsyncGenerator
2+
from contextlib import asynccontextmanager, contextmanager
3+
from typing import Any, AsyncIterator, Iterator, cast
4+
5+
import httpx
6+
7+
from ._decoders import SSEDecoder
8+
from ._exceptions import SSEError
9+
from ._models import ServerSentEvent
10+
11+
12+
class EventSource:
13+
def __init__(self, response: httpx.Response) -> None:
14+
self._response = response
15+
16+
def _check_content_type(self) -> None:
17+
content_type = self._response.headers.get("content-type", "").partition(";")[0]
18+
if "text/event-stream" not in content_type:
19+
raise SSEError(
20+
"Expected response header Content-Type to contain 'text/event-stream', "
21+
f"got {content_type!r}"
22+
)
23+
24+
@property
25+
def response(self) -> httpx.Response:
26+
return self._response
27+
28+
def iter_sse(self) -> Iterator[ServerSentEvent]:
29+
self._check_content_type()
30+
decoder = SSEDecoder()
31+
32+
buffer = ""
33+
for chunk in self._response.iter_bytes():
34+
# Decode chunk and add to buffer
35+
text_chunk = chunk.decode('utf-8', errors='replace')
36+
buffer += text_chunk
37+
38+
# Process complete lines
39+
while '\n' in buffer:
40+
line, buffer = buffer.split('\n', 1)
41+
line = line.rstrip('\r')
42+
sse = decoder.decode(line)
43+
if sse is not None:
44+
yield sse
45+
46+
# Process any remaining data in buffer
47+
if buffer.strip():
48+
line = buffer.rstrip('\r')
49+
sse = decoder.decode(line)
50+
if sse is not None:
51+
yield sse
52+
53+
async def aiter_sse(self) -> AsyncGenerator[ServerSentEvent, None]:
54+
self._check_content_type()
55+
decoder = SSEDecoder()
56+
lines = cast(AsyncGenerator[str, None], self._response.aiter_lines())
57+
try:
58+
async for line in lines:
59+
line = line.rstrip("\n")
60+
sse = decoder.decode(line)
61+
if sse is not None:
62+
yield sse
63+
finally:
64+
await lines.aclose()
65+
66+
67+
@contextmanager
68+
def connect_sse(
69+
client: httpx.Client, method: str, url: str, **kwargs: Any
70+
) -> Iterator[EventSource]:
71+
headers = kwargs.pop("headers", {})
72+
headers["Accept"] = "text/event-stream"
73+
headers["Cache-Control"] = "no-store"
74+
75+
with client.stream(method, url, headers=headers, **kwargs) as response:
76+
yield EventSource(response)
77+
78+
79+
@asynccontextmanager
80+
async def aconnect_sse(
81+
client: httpx.AsyncClient,
82+
method: str,
83+
url: str,
84+
**kwargs: Any,
85+
) -> AsyncIterator[EventSource]:
86+
headers = kwargs.pop("headers", {})
87+
headers["Accept"] = "text/event-stream"
88+
headers["Cache-Control"] = "no-store"
89+
90+
async with client.stream(method, url, headers=headers, **kwargs) as response:
91+
yield EventSource(response)

0 commit comments

Comments
 (0)