Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Changed

- **TypeScript client (`rocketride`)**: `DataPipe.open()` no longer marks a pipe opened before SSE subscription succeeds; if `setEvents` fails, the client best-efforts `close()` so the server pipe is not left half-open
- **TypeScript client (`rocketride`)**: `getTaskStatus` applies a default per-request timeout of **15s**; callers may pass `options.timeout` (ms) or `{ timeout: false }` to skip the per-call override (falls back to the client request timeout behavior)
- **TypeScript client (`rocketride`)**: `connect()` treats an empty or whitespace-only `ROCKETRIDE_APIKEY` entry in the client env snapshot as **unset**, so it does not override constructor-provided auth

- **llm_gemini**: Updated model profiles to current Gemini lineup
- Added: `gemini-3.1-pro-preview`, `gemini-3.1-flash-image-preview`, `gemini-3.1-flash-lite-preview`, `gemini-3-flash-preview`, `gemini-3-pro-image-preview`
- Deprecated profiles retained for backwards compatibility:
Expand Down
10 changes: 10 additions & 0 deletions packages/client-python/scripts/tasks.js
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,12 @@ function makeStartTestServerAction(options = {}) {
script: 'ai/eaas.py',
trace: options.trace,
basePort: 20000,
// CI for fork PRs doesn't have repo secrets; the integration tests
// expect a shared API key between server and client. Default to the
// same local-dev key used by tests when ROCKETRIDE_APIKEY is unset.
env: {
ROCKETRIDE_APIKEY: process.env.ROCKETRIDE_APIKEY || 'MYAPIKEY',
},
onOutput: (text) => {
if (taskComplete) return;
const lines = text.trim().split('\n');
Expand Down Expand Up @@ -193,6 +199,10 @@ function makeRunPytestAction(options = {}) {
const testEnv = {
...process.env,
ROCKETRIDE_URI: serverUri,
// CI can provide ROCKETRIDE_APIKEY as an empty string; pytest's
// TEST_CONFIG uses os.getenv(..., 'MYAPIKEY') which does not treat
// empty as missing. Ensure a usable default for local test server auth.
ROCKETRIDE_APIKEY: process.env.ROCKETRIDE_APIKEY || 'MYAPIKEY',
};

// Use absolute paths since cwd is dist/server
Expand Down
7 changes: 6 additions & 1 deletion packages/client-python/src/rocketride/core/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,13 +159,18 @@ class AuthenticationException(ConnectionException):
pass


class PipeException(RocketRideException):
class PipeException(RocketRideException, RuntimeError):
"""
Exception raised for data pipe operations.

Raised when there are problems with data pipes used for sending
data to pipelines, uploading files, or streaming operations.

Note:
Also inherits from :class:`RuntimeError` for backward compatibility with
callers that previously caught ``RuntimeError`` from
``client.send()`` / ``client.pipe()`` / pipe ``open()``/``write()``/``close()``.

Common scenarios:
- Failed to open data pipe
- Error writing data to pipe
Expand Down
35 changes: 28 additions & 7 deletions packages/client-python/src/rocketride/mixins/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
import mimetypes
from pathlib import Path
from typing import Dict, Any, List, Union, Tuple, Optional
from ..core import DAPClient
from ..core import DAPClient, PipeException
from ..types import PIPELINE_RESULT, UPLOAD_RESULT


Expand Down Expand Up @@ -158,7 +158,8 @@ async def open(self) -> 'DataMixin.DataPipe':
self: The opened pipe instance for method chaining

Raises:
RuntimeError: If pipe is already opened or if opening fails
RuntimeError: If the pipe is already opened.
PipeException: If the server rejects the open request.

Example:
pipe = await client.pipe(token, mimetype="text/plain")
Expand All @@ -182,7 +183,17 @@ async def open(self) -> 'DataMixin.DataPipe':
response = await self._client.request(request)

if self._client.did_fail(response):
raise RuntimeError(response.get('message') or 'Your pipeline is not currently running.')
msg = response.get('message') or 'Failed to open a data pipe.'
msg = (
f'{msg}\n\n'
'Common causes:\n'
"- Pipeline isn't running (wrong token or task terminated)\n"
'- Pipeline source is `chat` (use `client.chat()`), not `webhook`/`dropper`\n'
'- MIME type doesn\'t match the source lane (try `mimetype="text/plain"`)\n'
)
response = dict(response)
response['message'] = msg
raise PipeException(response)

self._pipe_id = response.get('body', {}).get('pipe_id')
self._opened = True
Expand All @@ -205,7 +216,8 @@ async def write(self, buffer: bytes) -> None:
buffer: Data to send (must be bytes, not string)

Raises:
RuntimeError: If pipe not opened or write fails
RuntimeError: If the pipe is not opened.
PipeException: If the server reports a write failure.
ValueError: If buffer is not bytes

Example:
Expand All @@ -232,7 +244,10 @@ async def write(self, buffer: bytes) -> None:
response = await self._client.request(request)

if self._client.did_fail(response):
raise RuntimeError(response.get('message', 'Failed to write to pipe'))
msg = response.get('message') or 'Failed to write to a data pipe.'
response = dict(response)
response['message'] = msg
raise PipeException(response)

async def close(self) -> PIPELINE_RESULT:
"""
Expand All @@ -244,6 +259,9 @@ async def close(self) -> PIPELINE_RESULT:
Returns:
Dict: Results from processing the data you sent

Raises:
PipeException: If the server reports a failure while finalizing the pipe.

Example:
pipe = await client.pipe(token, mimetype="text/csv")
await pipe.open()
Expand All @@ -267,7 +285,10 @@ async def close(self) -> PIPELINE_RESULT:
response = await self._client.request(request)

if self._client.did_fail(response):
raise RuntimeError(response.get('message', 'Failed to close pipe'))
msg = response.get('message') or 'Failed to close a data pipe.'
response = dict(response)
response['message'] = msg
raise PipeException(response)

return response.get('body', {})

Expand Down Expand Up @@ -366,7 +387,7 @@ async def send(

Raises:
ValueError: If data is not string or bytes
RuntimeError: If sending fails
PipeException: If the server rejects the underlying pipe open/write/close.

Example:
# Send text data
Expand Down
6 changes: 6 additions & 0 deletions packages/client-typescript/scripts/tasks.js
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,12 @@ function makeStartTestServerAction(options = {}) {
script: 'ai/eaas.py',
trace: options.trace,
basePort: 30000,
// CI for fork PRs doesn't have repo secrets; the integration tests
// expect a shared API key between server and client. Default to the
// same local-dev key used by tests when ROCKETRIDE_APIKEY is unset.
env: {
ROCKETRIDE_APIKEY: process.env.ROCKETRIDE_APIKEY || 'MYAPIKEY',
},
onOutput: (text) => {
if (taskComplete) return;
const lines = text.trim().split('\n');
Expand Down
65 changes: 51 additions & 14 deletions packages/client-typescript/src/client/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import { CONST_DEFAULT_WEB_CLOUD, CONST_DEFAULT_WEB_PROTOCOL, CONST_DEFAULT_WEB_
import { Question } from './schema/Question.js';
import { AccountApi } from './account.js';
import { BillingApi } from './billing.js';
import { AuthenticationException, ConnectionException } from './exceptions/index.js';
import { AuthenticationException, ConnectionException, PipeException } from './exceptions/index.js';

// Global counter for generating unique client IDs
let clientId = 0;
Expand Down Expand Up @@ -116,7 +116,8 @@ export class DataPipe {
* unique pipe ID that is used for subsequent operations.
*
* @returns This DataPipe instance (for method chaining)
* @throws Error if the pipe is already opened or if the pipeline is not running
* @throws Error if the pipe is already opened
* @throws PipeException if the server rejects the open request
*/
async open(): Promise<DataPipe> {
if (this._opened) {
Expand All @@ -136,18 +137,35 @@ export class DataPipe {
const response = await this._client.request(request);

if (this._client.didFail(response)) {
throw new Error(response.message || 'Your pipeline is not currently running.');
const base = response.message || 'Failed to open a data pipe.';
const msg = `${base}\n\n` + 'Common causes:\n' + "- Pipeline isn't running (wrong token or task terminated)\n" + "- Pipeline source is 'chat' (use client.chat()), not webhook/dropper\n" + "- MIME type doesn't match the source lane (try mimeType='text/plain')\n";
throw new PipeException({ ...response, message: msg });
}

this._pipeId = response.body?.pipe_id as number | undefined;
this._opened = true;

// If an SSE callback was provided, subscribe and register for this pipe
if (this._onSSE !== undefined && this._pipeId !== undefined) {
await this._client.setEvents(this._token, ['SSE'], this._pipeId);
try {
await this._client.setEvents(this._token, ['SSE'], this._pipeId);
Comment thread
anantteotia marked this conversation as resolved.
} catch (err) {
// Roll back: don't leave the pipe half-open on the server.
try {
await this.close();
} catch {
// Best-effort cleanup
}
const errMsg = err instanceof Error ? err.message : String(err);
const msg = `Failed to subscribe to SSE events for this data pipe.\n\n${errMsg}`;
throw new PipeException({ message: msg });
}

this._client._ssePipeCallbacks.set(this._pipeId, this._onSSE);
}

// Only mark opened after the server-side pipe is fully consistent (including SSE setup).
this._opened = true;

return this;
}

Expand All @@ -158,7 +176,8 @@ export class DataPipe {
* multiple times to stream large datasets. The pipe must be opened first.
*
* @param buffer - Data to write, must be a Uint8Array
* @throws Error if the pipe is not opened, buffer is invalid, or write fails
* @throws Error if the pipe is not opened or buffer is invalid
* @throws PipeException if the server reports a write failure
*/
async write(buffer: Uint8Array): Promise<void> {
if (!this._opened) {
Expand All @@ -181,7 +200,8 @@ export class DataPipe {
const response = await this._client.request(request);

if (this._client.didFail(response)) {
throw new Error(response.message || 'Failed to write to pipe');
const msg = response.message || 'Failed to write to a data pipe.';
throw new PipeException({ ...response, message: msg });
}
}

Expand All @@ -193,10 +213,12 @@ export class DataPipe {
* the pipe cannot be reopened or written to again.
*
* @returns The processing result from the server, or undefined if already closed
* @throws Error if closing the pipe fails
* @throws PipeException if the server reports a failure while finalizing the pipe
*/
async close(): Promise<PIPELINE_RESULT | undefined> {
if (!this._opened || this._closed) {
// Allow closing after a failed open() path where the server assigned a pipe_id
// but we never flipped _opened=true (e.g., SSE subscription failure).
if (this._closed || (this._pipeId === undefined && !this._opened)) {
return;
}

Expand All @@ -212,12 +234,14 @@ export class DataPipe {
const response = await this._client.request(request);

if (this._client.didFail(response)) {
throw new Error(response.message || 'Failed to close pipe');
const msg = response.message || 'Failed to close a data pipe.';
throw new PipeException({ ...response, message: msg });
}

return response.body as PIPELINE_RESULT;
} finally {
this._closed = true;
this._opened = false;

// Unregister SSE callback and scoped monitor subscription
if (this._onSSE !== undefined && this._pipeId !== undefined) {
Expand Down Expand Up @@ -624,7 +648,8 @@ export class RocketRideClient extends DAPClient {
* Sends the credential as the first DAP message and returns the full
* ConnectResult (user identity + organizations + teams) on success.
*
* If `credential` is omitted, falls back to the `ROCKETRIDE_APIKEY` env var.
* If `credential` is omitted, falls back to the `ROCKETRIDE_APIKEY` env var (non-empty
* string only; whitespace-only values are ignored so constructor auth is not wiped).
*
* In persist mode, enables automatic reconnection on disconnect. After the
* first successful connect the stored `userToken` is replayed automatically.
Expand All @@ -644,7 +669,9 @@ export class RocketRideClient extends DAPClient {
if (credential && typeof credential === 'object') {
resolvedCredential = 'cd_' + btoa(JSON.stringify(credential));
} else {
resolvedCredential = (credential as string | undefined) ?? this._env['ROCKETRIDE_APIKEY'] ?? this._apikey ?? '';
const envKey = this._env['ROCKETRIDE_APIKEY'];
const envCredential = typeof envKey === 'string' && envKey.trim() !== '' ? envKey : undefined;
resolvedCredential = (credential as string | undefined) ?? envCredential ?? this._apikey ?? '';
Comment thread
anantteotia marked this conversation as resolved.
}
this._setAuth(resolvedCredential);

Expand Down Expand Up @@ -1066,10 +1093,20 @@ export class RocketRideClient extends DAPClient {

/**
* Get the current status of a running pipeline.
*
* By default this call is bounded to 15s so callers/tests don't hang forever if the engine
* stops responding mid-request (especially important in CI). Pass `{ timeout: false }` to
* restore the previous behavior of using only the client-level request timeout (if any).
*/
async getTaskStatus(token: string): Promise<TASK_STATUS> {
async getTaskStatus(token: string, options?: { timeout?: number | false }): Promise<TASK_STATUS> {
try {
return await this.call<TASK_STATUS>('rrext_get_task_status', undefined, { token });
const callOptions: { token: string; timeout?: number } = { token };
if (options?.timeout === false) {
// Intentionally omit per-call timeout override.
} else {
callOptions.timeout = options?.timeout ?? 15000;
}
return await this.call<TASK_STATUS>('rrext_get_task_status', undefined, callOptions);
} catch (err) {
const errorMsg = err instanceof Error ? err.message : String(err);
this.debugMessage(`Pipeline status retrieval failed: ${errorMsg}`);
Expand Down
6 changes: 3 additions & 3 deletions packages/client-typescript/tests/RocketRideClient.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,8 @@ describe('RocketRideClient Integration Tests', () => {
});

// Retry a few times in case server is busy (tests may run in parallel)
const maxAttempts = 5;
const delayMs = 2000;
const maxAttempts = 10;
const delayMs = 1000;
let status: Awaited<ReturnType<typeof client.getTaskStatus>> | null = null;
for (let attempt = 1; attempt <= maxAttempts; attempt++) {
try {
Expand All @@ -174,7 +174,7 @@ describe('RocketRideClient Integration Tests', () => {
expect(Object.values(TASK_STATE)).toContain(status!.state);

await client.terminate(result.token);
}, 90000);
}, TEST_CONFIG.timeout);

it(
'should terminate a pipeline',
Expand Down
Loading