Skip to content
Open
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions packages/client-python/scripts/tasks.js
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,12 @@ function makeStartTestServerAction(options = {}) {
script: 'ai/eaas.py',
trace: options.trace,
basePort: 20000,
// CI for fork PRs doesn't have repo secrets; the integration tests
// expect a shared API key between server and client. Default to the
// same local-dev key used by tests when ROCKETRIDE_APIKEY is unset.
env: {
ROCKETRIDE_APIKEY: process.env.ROCKETRIDE_APIKEY || 'MYAPIKEY',
},
onOutput: (text) => {
if (taskComplete) return;
const lines = text.trim().split('\n');
Expand Down Expand Up @@ -193,6 +199,10 @@ function makeRunPytestAction(options = {}) {
const testEnv = {
...process.env,
ROCKETRIDE_URI: serverUri,
// CI can provide ROCKETRIDE_APIKEY as an empty string; pytest's
// TEST_CONFIG uses os.getenv(..., 'MYAPIKEY') which does not treat
// empty as missing. Ensure a usable default for local test server auth.
ROCKETRIDE_APIKEY: process.env.ROCKETRIDE_APIKEY || 'MYAPIKEY',
};

// Use absolute paths since cwd is dist/server
Expand Down
7 changes: 6 additions & 1 deletion packages/client-python/src/rocketride/core/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,13 +159,18 @@ class AuthenticationException(ConnectionException):
pass


class PipeException(RocketRideException):
class PipeException(RocketRideException, RuntimeError):
"""
Exception raised for data pipe operations.

Raised when there are problems with data pipes used for sending
data to pipelines, uploading files, or streaming operations.

Note:
Also inherits from :class:`RuntimeError` for backward compatibility with
callers that previously caught ``RuntimeError`` from
``client.send()`` / ``client.pipe()`` / pipe ``open()``/``write()``/``close()``.

Common scenarios:
- Failed to open data pipe
- Error writing data to pipe
Expand Down
35 changes: 28 additions & 7 deletions packages/client-python/src/rocketride/mixins/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
import mimetypes
from pathlib import Path
from typing import Dict, Any, List, Union, Tuple, Optional
from ..core import DAPClient
from ..core import DAPClient, PipeException
from ..types import PIPELINE_RESULT, UPLOAD_RESULT


Expand Down Expand Up @@ -150,7 +150,8 @@ async def open(self) -> 'DataMixin.DataPipe':
self: The opened pipe instance for method chaining

Raises:
RuntimeError: If pipe is already opened or if opening fails
RuntimeError: If the pipe is already opened.
PipeException: If the server rejects the open request.

Example:
pipe = await client.pipe(token, mimetype="text/plain")
Expand All @@ -174,7 +175,17 @@ async def open(self) -> 'DataMixin.DataPipe':
response = await self._client.request(request)

if self._client.did_fail(response):
raise RuntimeError(response.get('message') or 'Your pipeline is not currently running.')
msg = response.get('message') or 'Failed to open a data pipe.'
msg = (
f"{msg}\n\n"
"Common causes:\n"
"- Pipeline isn't running (wrong token or task terminated)\n"
"- Pipeline source is `chat` (use `client.chat()`), not `webhook`/`dropper`\n"
"- MIME type doesn't match the source lane (try `mimetype=\"text/plain\"`)\n"
)
response = dict(response)
response['message'] = msg
raise PipeException(response)
Comment thread
stepmikhaylov marked this conversation as resolved.
Comment thread
stepmikhaylov marked this conversation as resolved.

self._pipe_id = response.get('body', {}).get('pipe_id')
self._opened = True
Expand All @@ -197,7 +208,8 @@ async def write(self, buffer: bytes) -> None:
buffer: Data to send (must be bytes, not string)

Raises:
RuntimeError: If pipe not opened or write fails
RuntimeError: If the pipe is not opened.
PipeException: If the server reports a write failure.
ValueError: If buffer is not bytes

Example:
Expand All @@ -224,7 +236,10 @@ async def write(self, buffer: bytes) -> None:
response = await self._client.request(request)

if self._client.did_fail(response):
raise RuntimeError(response.get('message', 'Failed to write to pipe'))
msg = response.get('message') or 'Failed to write to a data pipe.'
response = dict(response)
response['message'] = msg
raise PipeException(response)
Comment thread
coderabbitai[bot] marked this conversation as resolved.

async def close(self) -> PIPELINE_RESULT:
"""
Expand All @@ -236,6 +251,9 @@ async def close(self) -> PIPELINE_RESULT:
Returns:
Dict: Results from processing the data you sent

Raises:
PipeException: If the server reports a failure while finalizing the pipe.

Example:
pipe = await client.pipe(token, mimetype="text/csv")
await pipe.open()
Expand All @@ -259,7 +277,10 @@ async def close(self) -> PIPELINE_RESULT:
response = await self._client.request(request)

if self._client.did_fail(response):
raise RuntimeError(response.get('message', 'Failed to close pipe'))
msg = response.get('message') or 'Failed to close a data pipe.'
response = dict(response)
response['message'] = msg
raise PipeException(response)

return response.get('body', {})

Expand Down Expand Up @@ -356,7 +377,7 @@ async def send(

Raises:
ValueError: If data is not string or bytes
RuntimeError: If sending fails
PipeException: If the server rejects the underlying pipe open/write/close.

Example:
# Send text data
Expand Down
6 changes: 6 additions & 0 deletions packages/client-typescript/scripts/tasks.js
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,12 @@ function makeStartTestServerAction(options = {}) {
script: 'ai/eaas.py',
trace: options.trace,
basePort: 30000,
// CI for fork PRs doesn't have repo secrets; the integration tests
// expect a shared API key between server and client. Default to the
// same local-dev key used by tests when ROCKETRIDE_APIKEY is unset.
env: {
ROCKETRIDE_APIKEY: process.env.ROCKETRIDE_APIKEY || 'MYAPIKEY',
},
onOutput: (text) => {
if (taskComplete) return;
const lines = text.trim().split('\n');
Expand Down
42 changes: 32 additions & 10 deletions packages/client-typescript/src/client/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import { CONST_DEFAULT_WEB_CLOUD, CONST_DEFAULT_WEB_PROTOCOL, CONST_DEFAULT_WEB_
import { Question } from './schema/Question.js';
import { AccountApi } from './account.js';
import { BillingApi } from './billing.js';
import { AuthenticationException, ConnectionException } from './exceptions/index.js';
import { AuthenticationException, ConnectionException, PipeException } from './exceptions/index.js';

// Global counter for generating unique client IDs
let clientId = 0;
Expand Down Expand Up @@ -116,7 +116,8 @@ export class DataPipe {
* unique pipe ID that is used for subsequent operations.
*
* @returns This DataPipe instance (for method chaining)
* @throws Error if the pipe is already opened or if the pipeline is not running
* @throws Error if the pipe is already opened
* @throws PipeException if the server rejects the open request
*/
async open(): Promise<DataPipe> {
if (this._opened) {
Expand All @@ -136,15 +137,29 @@ export class DataPipe {
const response = await this._client.request(request);

if (this._client.didFail(response)) {
throw new Error(response.message || 'Your pipeline is not currently running.');
const base = response.message || 'Failed to open a data pipe.';
const msg =
`${base}\n\n` +
'Common causes:\n' +
"- Pipeline isn't running (wrong token or task terminated)\n" +
"- Pipeline source is 'chat' (use client.chat()), not webhook/dropper\n" +
"- MIME type doesn't match the source lane (try mimeType='text/plain')\n";
throw new PipeException({ ...response, message: msg });
}

this._pipeId = response.body?.pipe_id as number | undefined;
this._opened = true;

// If an SSE callback was provided, subscribe and register for this pipe
if (this._onSSE !== undefined && this._pipeId !== undefined) {
await this._client.setEvents(this._token, ['SSE'], this._pipeId);
try {
await this._client.setEvents(this._token, ['SSE'], this._pipeId);
Comment thread
anantteotia marked this conversation as resolved.
} catch (err) {
const errMsg = err instanceof Error ? err.message : String(err);
const msg = `Failed to subscribe to SSE events for this data pipe.\n\n${errMsg}`;
throw new PipeException({ message: msg });
}

this._client._ssePipeCallbacks.set(this._pipeId, this._onSSE);
}

Expand All @@ -158,7 +173,8 @@ export class DataPipe {
* multiple times to stream large datasets. The pipe must be opened first.
*
* @param buffer - Data to write, must be a Uint8Array
* @throws Error if the pipe is not opened, buffer is invalid, or write fails
* @throws Error if the pipe is not opened or buffer is invalid
* @throws PipeException if the server reports a write failure
*/
async write(buffer: Uint8Array): Promise<void> {
if (!this._opened) {
Expand All @@ -181,7 +197,8 @@ export class DataPipe {
const response = await this._client.request(request);

if (this._client.didFail(response)) {
throw new Error(response.message || 'Failed to write to pipe');
const msg = response.message || 'Failed to write to a data pipe.';
throw new PipeException({ ...response, message: msg });
}
}

Expand All @@ -193,7 +210,7 @@ export class DataPipe {
* the pipe cannot be reopened or written to again.
*
* @returns The processing result from the server, or undefined if already closed
* @throws Error if closing the pipe fails
* @throws PipeException if the server reports a failure while finalizing the pipe
*/
async close(): Promise<PIPELINE_RESULT | undefined> {
if (!this._opened || this._closed) {
Expand All @@ -212,7 +229,8 @@ export class DataPipe {
const response = await this._client.request(request);

if (this._client.didFail(response)) {
throw new Error(response.message || 'Failed to close pipe');
const msg = response.message || 'Failed to close a data pipe.';
throw new PipeException({ ...response, message: msg });
}

return response.body as PIPELINE_RESULT;
Expand Down Expand Up @@ -593,7 +611,9 @@ export class RocketRideClient extends DAPClient {
if (credential && typeof credential === 'object') {
resolvedCredential = 'cd_' + btoa(JSON.stringify(credential));
} else {
resolvedCredential = (credential as string | undefined) ?? this._env['ROCKETRIDE_APIKEY'] ?? this._apikey ?? '';
const envKey = this._env['ROCKETRIDE_APIKEY'];
const envCredential = typeof envKey === 'string' && envKey.trim() !== '' ? envKey : undefined;
resolvedCredential = (credential as string | undefined) ?? envCredential ?? this._apikey ?? '';
Comment thread
anantteotia marked this conversation as resolved.
}
this._setAuth(resolvedCredential);

Expand Down Expand Up @@ -1020,7 +1040,9 @@ export class RocketRideClient extends DAPClient {
*/
async getTaskStatus(token: string): Promise<TASK_STATUS> {
try {
return await this.call<TASK_STATUS>('rrext_get_task_status', undefined, { token });
// Task status should return quickly; bound the wait so callers/tests can't hang forever
// if the engine stops responding mid-request (especially important in CI).
return await this.call<TASK_STATUS>('rrext_get_task_status', undefined, { token, timeout: 15000 });
Comment thread
anantteotia marked this conversation as resolved.
Outdated
} catch (err) {
const errorMsg = err instanceof Error ? err.message : String(err);
this.debugMessage(`Pipeline status retrieval failed: ${errorMsg}`);
Expand Down
6 changes: 3 additions & 3 deletions packages/client-typescript/tests/RocketRideClient.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,8 @@ describe('RocketRideClient Integration Tests', () => {
});

// Retry a few times in case server is busy (tests may run in parallel)
const maxAttempts = 5;
const delayMs = 2000;
const maxAttempts = 10;
const delayMs = 1000;
let status: Awaited<ReturnType<typeof client.getTaskStatus>> | null = null;
for (let attempt = 1; attempt <= maxAttempts; attempt++) {
try {
Expand All @@ -174,7 +174,7 @@ describe('RocketRideClient Integration Tests', () => {
expect(Object.values(TASK_STATE)).toContain(status!.state);

await client.terminate(result.token);
}, 90000);
}, TEST_CONFIG.timeout);

it(
'should terminate a pipeline',
Expand Down
Loading