Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/client-python/src/rocketride/core/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ class AuthenticationException(ConnectionException):
pass


class PipeException(RocketRideException):
class PipeException(RocketRideException, RuntimeError):
"""
Exception raised for data pipe operations.

Expand Down
35 changes: 28 additions & 7 deletions packages/client-python/src/rocketride/mixins/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
import mimetypes
from pathlib import Path
from typing import Dict, Any, List, Union, Tuple, Optional
from ..core import DAPClient
from ..core import DAPClient, PipeException
from ..types import PIPELINE_RESULT, UPLOAD_RESULT


Expand Down Expand Up @@ -150,7 +150,8 @@ async def open(self) -> 'DataMixin.DataPipe':
self: The opened pipe instance for method chaining

Raises:
RuntimeError: If pipe is already opened or if opening fails
RuntimeError: If the pipe is already opened.
PipeException: If the server rejects the open request.

Example:
pipe = await client.pipe(token, mimetype="text/plain")
Expand All @@ -174,7 +175,17 @@ async def open(self) -> 'DataMixin.DataPipe':
response = await self._client.request(request)

if self._client.did_fail(response):
raise RuntimeError(response.get('message', 'Your pipeline is not currently running.'))
msg = response.get('message') or 'Failed to open a data pipe.'
msg = (
f"{msg}\n\n"
"Common causes:\n"
"- Pipeline isn't running (wrong token or task terminated)\n"
"- Pipeline source is `chat` (use `client.chat()`), not `webhook`/`dropper`\n"
"- MIME type doesn't match the source lane (try `mimetype=\"text/plain\"`)\n"
)
response = dict(response)
response['message'] = msg
raise PipeException(response)
Comment thread
stepmikhaylov marked this conversation as resolved.
Comment thread
stepmikhaylov marked this conversation as resolved.

self._pipe_id = response.get('body', {}).get('pipe_id')
self._opened = True
Expand All @@ -197,7 +208,8 @@ async def write(self, buffer: bytes) -> None:
buffer: Data to send (must be bytes, not string)

Raises:
RuntimeError: If pipe not opened or write fails
RuntimeError: If the pipe is not opened.
PipeException: If the server reports a write failure.
ValueError: If buffer is not bytes

Example:
Expand All @@ -224,7 +236,10 @@ async def write(self, buffer: bytes) -> None:
response = await self._client.request(request)

if self._client.did_fail(response):
raise RuntimeError(response.get('message', 'Failed to write to pipe'))
msg = response.get('message') or 'Failed to write to a data pipe.'
response = dict(response)
response['message'] = msg
raise PipeException(response)
Comment thread
coderabbitai[bot] marked this conversation as resolved.

async def close(self) -> PIPELINE_RESULT:
"""
Expand All @@ -236,6 +251,9 @@ async def close(self) -> PIPELINE_RESULT:
Returns:
Dict: Results from processing the data you sent

Raises:
PipeException: If the server reports a failure while finalizing the pipe.

Example:
pipe = await client.pipe(token, mimetype="text/csv")
await pipe.open()
Expand All @@ -259,7 +277,10 @@ async def close(self) -> PIPELINE_RESULT:
response = await self._client.request(request)

if self._client.did_fail(response):
raise RuntimeError(response.get('message', 'Failed to close pipe'))
msg = response.get('message') or 'Failed to close a data pipe.'
response = dict(response)
response['message'] = msg
raise PipeException(response)

return response.get('body', {})

Expand Down Expand Up @@ -356,7 +377,7 @@ async def send(

Raises:
ValueError: If data is not string or bytes
RuntimeError: If sending fails
PipeException: If the server rejects the underlying pipe open/write/close.

Example:
# Send text data
Expand Down
Loading