diff --git a/CHANGELOG.md b/CHANGELOG.md index da2006d0c..6c8561b36 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed +- **TypeScript client (`rocketride`)**: `DataPipe.open()` no longer marks a pipe opened before SSE subscription succeeds; if `setEvents` fails, the client best-efforts `close()` so the server pipe is not left half-open +- **TypeScript client (`rocketride`)**: `getTaskStatus` applies a default per-request timeout of **15s**; callers may pass `options.timeout` (ms) or `{ timeout: false }` to skip the per-call override (falls back to the client request timeout behavior) +- **TypeScript client (`rocketride`)**: `connect()` treats an empty or whitespace-only `ROCKETRIDE_APIKEY` entry in the client env snapshot as **unset**, so it does not override constructor-provided auth + - **llm_gemini**: Updated model profiles to current Gemini lineup - Added: `gemini-3.1-pro-preview`, `gemini-3.1-flash-image-preview`, `gemini-3.1-flash-lite-preview`, `gemini-3-flash-preview`, `gemini-3-pro-image-preview` - Deprecated profiles retained for backwards compatibility: diff --git a/packages/client-python/scripts/tasks.js b/packages/client-python/scripts/tasks.js index db7c28920..5bf536073 100644 --- a/packages/client-python/scripts/tasks.js +++ b/packages/client-python/scripts/tasks.js @@ -147,6 +147,12 @@ function makeStartTestServerAction(options = {}) { script: 'ai/eaas.py', trace: options.trace, basePort: 20000, + // CI for fork PRs doesn't have repo secrets; the integration tests + // expect a shared API key between server and client. Default to the + // same local-dev key used by tests when ROCKETRIDE_APIKEY is unset. + env: { + ROCKETRIDE_APIKEY: process.env.ROCKETRIDE_APIKEY || 'MYAPIKEY', + }, onOutput: (text) => { if (taskComplete) return; const lines = text.trim().split('\n'); @@ -193,6 +199,10 @@ function makeRunPytestAction(options = {}) { const testEnv = { ...process.env, ROCKETRIDE_URI: serverUri, + // CI can provide ROCKETRIDE_APIKEY as an empty string; pytest's + // TEST_CONFIG uses os.getenv(..., 'MYAPIKEY') which does not treat + // empty as missing. Ensure a usable default for local test server auth. + ROCKETRIDE_APIKEY: process.env.ROCKETRIDE_APIKEY || 'MYAPIKEY', }; // Use absolute paths since cwd is dist/server diff --git a/packages/client-python/src/rocketride/core/exceptions.py b/packages/client-python/src/rocketride/core/exceptions.py index 72c164df4..27704fbcd 100644 --- a/packages/client-python/src/rocketride/core/exceptions.py +++ b/packages/client-python/src/rocketride/core/exceptions.py @@ -159,13 +159,18 @@ class AuthenticationException(ConnectionException): pass -class PipeException(RocketRideException): +class PipeException(RocketRideException, RuntimeError): """ Exception raised for data pipe operations. Raised when there are problems with data pipes used for sending data to pipelines, uploading files, or streaming operations. + Note: + Also inherits from :class:`RuntimeError` for backward compatibility with + callers that previously caught ``RuntimeError`` from + ``client.send()`` / ``client.pipe()`` / pipe ``open()``/``write()``/``close()``. + Common scenarios: - Failed to open data pipe - Error writing data to pipe diff --git a/packages/client-python/src/rocketride/mixins/data.py b/packages/client-python/src/rocketride/mixins/data.py index 3ad8e06c3..f6a8a4ece 100644 --- a/packages/client-python/src/rocketride/mixins/data.py +++ b/packages/client-python/src/rocketride/mixins/data.py @@ -57,7 +57,7 @@ import mimetypes from pathlib import Path from typing import Dict, Any, List, Union, Tuple, Optional -from ..core import DAPClient +from ..core import DAPClient, PipeException from ..types import PIPELINE_RESULT, UPLOAD_RESULT @@ -158,7 +158,8 @@ async def open(self) -> 'DataMixin.DataPipe': self: The opened pipe instance for method chaining Raises: - RuntimeError: If pipe is already opened or if opening fails + RuntimeError: If the pipe is already opened. + PipeException: If the server rejects the open request. Example: pipe = await client.pipe(token, mimetype="text/plain") @@ -182,7 +183,17 @@ async def open(self) -> 'DataMixin.DataPipe': response = await self._client.request(request) if self._client.did_fail(response): - raise RuntimeError(response.get('message') or 'Your pipeline is not currently running.') + msg = response.get('message') or 'Failed to open a data pipe.' + msg = ( + f'{msg}\n\n' + 'Common causes:\n' + "- Pipeline isn't running (wrong token or task terminated)\n" + '- Pipeline source is `chat` (use `client.chat()`), not `webhook`/`dropper`\n' + '- MIME type doesn\'t match the source lane (try `mimetype="text/plain"`)\n' + ) + response = dict(response) + response['message'] = msg + raise PipeException(response) self._pipe_id = response.get('body', {}).get('pipe_id') self._opened = True @@ -205,7 +216,8 @@ async def write(self, buffer: bytes) -> None: buffer: Data to send (must be bytes, not string) Raises: - RuntimeError: If pipe not opened or write fails + RuntimeError: If the pipe is not opened. + PipeException: If the server reports a write failure. ValueError: If buffer is not bytes Example: @@ -232,7 +244,10 @@ async def write(self, buffer: bytes) -> None: response = await self._client.request(request) if self._client.did_fail(response): - raise RuntimeError(response.get('message', 'Failed to write to pipe')) + msg = response.get('message') or 'Failed to write to a data pipe.' + response = dict(response) + response['message'] = msg + raise PipeException(response) async def close(self) -> PIPELINE_RESULT: """ @@ -244,6 +259,9 @@ async def close(self) -> PIPELINE_RESULT: Returns: Dict: Results from processing the data you sent + Raises: + PipeException: If the server reports a failure while finalizing the pipe. + Example: pipe = await client.pipe(token, mimetype="text/csv") await pipe.open() @@ -267,7 +285,10 @@ async def close(self) -> PIPELINE_RESULT: response = await self._client.request(request) if self._client.did_fail(response): - raise RuntimeError(response.get('message', 'Failed to close pipe')) + msg = response.get('message') or 'Failed to close a data pipe.' + response = dict(response) + response['message'] = msg + raise PipeException(response) return response.get('body', {}) @@ -366,7 +387,7 @@ async def send( Raises: ValueError: If data is not string or bytes - RuntimeError: If sending fails + PipeException: If the server rejects the underlying pipe open/write/close. Example: # Send text data diff --git a/packages/client-typescript/scripts/tasks.js b/packages/client-typescript/scripts/tasks.js index 31b337216..e2b6ccfc6 100644 --- a/packages/client-typescript/scripts/tasks.js +++ b/packages/client-typescript/scripts/tasks.js @@ -235,6 +235,12 @@ function makeStartTestServerAction(options = {}) { script: 'ai/eaas.py', trace: options.trace, basePort: 30000, + // CI for fork PRs doesn't have repo secrets; the integration tests + // expect a shared API key between server and client. Default to the + // same local-dev key used by tests when ROCKETRIDE_APIKEY is unset. + env: { + ROCKETRIDE_APIKEY: process.env.ROCKETRIDE_APIKEY || 'MYAPIKEY', + }, onOutput: (text) => { if (taskComplete) return; const lines = text.trim().split('\n'); diff --git a/packages/client-typescript/src/client/client.ts b/packages/client-typescript/src/client/client.ts index 8c175b672..34fae13f7 100644 --- a/packages/client-typescript/src/client/client.ts +++ b/packages/client-typescript/src/client/client.ts @@ -30,7 +30,7 @@ import { CONST_DEFAULT_WEB_CLOUD, CONST_DEFAULT_WEB_PROTOCOL, CONST_DEFAULT_WEB_ import { Question } from './schema/Question.js'; import { AccountApi } from './account.js'; import { BillingApi } from './billing.js'; -import { AuthenticationException, ConnectionException } from './exceptions/index.js'; +import { AuthenticationException, ConnectionException, PipeException } from './exceptions/index.js'; // Global counter for generating unique client IDs let clientId = 0; @@ -116,7 +116,8 @@ export class DataPipe { * unique pipe ID that is used for subsequent operations. * * @returns This DataPipe instance (for method chaining) - * @throws Error if the pipe is already opened or if the pipeline is not running + * @throws Error if the pipe is already opened + * @throws PipeException if the server rejects the open request */ async open(): Promise { if (this._opened) { @@ -136,18 +137,35 @@ export class DataPipe { const response = await this._client.request(request); if (this._client.didFail(response)) { - throw new Error(response.message || 'Your pipeline is not currently running.'); + const base = response.message || 'Failed to open a data pipe.'; + const msg = `${base}\n\n` + 'Common causes:\n' + "- Pipeline isn't running (wrong token or task terminated)\n" + "- Pipeline source is 'chat' (use client.chat()), not webhook/dropper\n" + "- MIME type doesn't match the source lane (try mimeType='text/plain')\n"; + throw new PipeException({ ...response, message: msg }); } this._pipeId = response.body?.pipe_id as number | undefined; - this._opened = true; // If an SSE callback was provided, subscribe and register for this pipe if (this._onSSE !== undefined && this._pipeId !== undefined) { - await this._client.setEvents(this._token, ['SSE'], this._pipeId); + try { + await this._client.setEvents(this._token, ['SSE'], this._pipeId); + } catch (err) { + // Roll back: don't leave the pipe half-open on the server. + try { + await this.close(); + } catch { + // Best-effort cleanup + } + const errMsg = err instanceof Error ? err.message : String(err); + const msg = `Failed to subscribe to SSE events for this data pipe.\n\n${errMsg}`; + throw new PipeException({ message: msg }); + } + this._client._ssePipeCallbacks.set(this._pipeId, this._onSSE); } + // Only mark opened after the server-side pipe is fully consistent (including SSE setup). + this._opened = true; + return this; } @@ -158,7 +176,8 @@ export class DataPipe { * multiple times to stream large datasets. The pipe must be opened first. * * @param buffer - Data to write, must be a Uint8Array - * @throws Error if the pipe is not opened, buffer is invalid, or write fails + * @throws Error if the pipe is not opened or buffer is invalid + * @throws PipeException if the server reports a write failure */ async write(buffer: Uint8Array): Promise { if (!this._opened) { @@ -181,7 +200,8 @@ export class DataPipe { const response = await this._client.request(request); if (this._client.didFail(response)) { - throw new Error(response.message || 'Failed to write to pipe'); + const msg = response.message || 'Failed to write to a data pipe.'; + throw new PipeException({ ...response, message: msg }); } } @@ -193,10 +213,12 @@ export class DataPipe { * the pipe cannot be reopened or written to again. * * @returns The processing result from the server, or undefined if already closed - * @throws Error if closing the pipe fails + * @throws PipeException if the server reports a failure while finalizing the pipe */ async close(): Promise { - if (!this._opened || this._closed) { + // Allow closing after a failed open() path where the server assigned a pipe_id + // but we never flipped _opened=true (e.g., SSE subscription failure). + if (this._closed || (this._pipeId === undefined && !this._opened)) { return; } @@ -212,12 +234,14 @@ export class DataPipe { const response = await this._client.request(request); if (this._client.didFail(response)) { - throw new Error(response.message || 'Failed to close pipe'); + const msg = response.message || 'Failed to close a data pipe.'; + throw new PipeException({ ...response, message: msg }); } return response.body as PIPELINE_RESULT; } finally { this._closed = true; + this._opened = false; // Unregister SSE callback and scoped monitor subscription if (this._onSSE !== undefined && this._pipeId !== undefined) { @@ -624,7 +648,8 @@ export class RocketRideClient extends DAPClient { * Sends the credential as the first DAP message and returns the full * ConnectResult (user identity + organizations + teams) on success. * - * If `credential` is omitted, falls back to the `ROCKETRIDE_APIKEY` env var. + * If `credential` is omitted, falls back to the `ROCKETRIDE_APIKEY` env var (non-empty + * string only; whitespace-only values are ignored so constructor auth is not wiped). * * In persist mode, enables automatic reconnection on disconnect. After the * first successful connect the stored `userToken` is replayed automatically. @@ -644,7 +669,9 @@ export class RocketRideClient extends DAPClient { if (credential && typeof credential === 'object') { resolvedCredential = 'cd_' + btoa(JSON.stringify(credential)); } else { - resolvedCredential = (credential as string | undefined) ?? this._env['ROCKETRIDE_APIKEY'] ?? this._apikey ?? ''; + const envKey = this._env['ROCKETRIDE_APIKEY']; + const envCredential = typeof envKey === 'string' && envKey.trim() !== '' ? envKey : undefined; + resolvedCredential = (credential as string | undefined) ?? envCredential ?? this._apikey ?? ''; } this._setAuth(resolvedCredential); @@ -1066,10 +1093,20 @@ export class RocketRideClient extends DAPClient { /** * Get the current status of a running pipeline. + * + * By default this call is bounded to 15s so callers/tests don't hang forever if the engine + * stops responding mid-request (especially important in CI). Pass `{ timeout: false }` to + * restore the previous behavior of using only the client-level request timeout (if any). */ - async getTaskStatus(token: string): Promise { + async getTaskStatus(token: string, options?: { timeout?: number | false }): Promise { try { - return await this.call('rrext_get_task_status', undefined, { token }); + const callOptions: { token: string; timeout?: number } = { token }; + if (options?.timeout === false) { + // Intentionally omit per-call timeout override. + } else { + callOptions.timeout = options?.timeout ?? 15000; + } + return await this.call('rrext_get_task_status', undefined, callOptions); } catch (err) { const errorMsg = err instanceof Error ? err.message : String(err); this.debugMessage(`Pipeline status retrieval failed: ${errorMsg}`); diff --git a/packages/client-typescript/tests/RocketRideClient.test.ts b/packages/client-typescript/tests/RocketRideClient.test.ts index 0a59d5f1d..896afc780 100644 --- a/packages/client-typescript/tests/RocketRideClient.test.ts +++ b/packages/client-typescript/tests/RocketRideClient.test.ts @@ -157,8 +157,8 @@ describe('RocketRideClient Integration Tests', () => { }); // Retry a few times in case server is busy (tests may run in parallel) - const maxAttempts = 5; - const delayMs = 2000; + const maxAttempts = 10; + const delayMs = 1000; let status: Awaited> | null = null; for (let attempt = 1; attempt <= maxAttempts; attempt++) { try { @@ -174,7 +174,7 @@ describe('RocketRideClient Integration Tests', () => { expect(Object.values(TASK_STATE)).toContain(status!.state); await client.terminate(result.token); - }, 90000); + }, TEST_CONFIG.timeout); it( 'should terminate a pipeline',