Skip to content

Commit 5608bd0

Browse files
committed
fix(antigravity): improve streaming retry logic and implement true SSE streaming
- Add isRetryableResponse() to detect SUBSCRIPTION_REQUIRED 403 errors for retry handling - Remove JSDoc comments from isRetryableError() for clarity - Add debug logging for request/response details (streaming flag, status, content-type) - Refactor transformStreamingResponse() to use TransformStream for true streaming - Replace buffering approach with incremental chunk processing - Implement createSseTransformStream() for line-by-line transformation - Reduces memory footprint and Time-To-First-Byte (TTFB) - Update SSE content-type detection to include alt=sse URL parameter - Simplify response transformation logic for non-streaming path - Add more granular debug logging for thought signature extraction 🤖 GENERATED WITH ASSISTANCE OF [OhMyOpenCode](https://github.com/code-yeongyu/oh-my-opencode)
1 parent abd90bb commit 5608bd0

File tree

2 files changed

+96
-77
lines changed

2 files changed

+96
-77
lines changed

src/auth/antigravity/fetch.ts

Lines changed: 48 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -63,16 +63,24 @@ function debugLog(message: string): void {
6363
}
6464
}
6565

66-
/**
67-
* Check if an error is a retryable network/server error
68-
*/
6966
function isRetryableError(status: number): boolean {
70-
// 4xx client errors (except 429 rate limit) are not retryable
71-
// 5xx server errors are retryable
72-
// Network errors (status 0) are retryable
73-
if (status === 0) return true // Network error
74-
if (status === 429) return true // Rate limit
75-
if (status >= 500 && status < 600) return true // Server errors
67+
if (status === 0) return true
68+
if (status === 429) return true
69+
if (status >= 500 && status < 600) return true
70+
return false
71+
}
72+
73+
async function isRetryableResponse(response: Response): Promise<boolean> {
74+
if (isRetryableError(response.status)) return true
75+
if (response.status === 403) {
76+
try {
77+
const text = await response.clone().text()
78+
if (text.includes("SUBSCRIPTION_REQUIRED") || text.includes("Gemini Code Assist license")) {
79+
debugLog(`[RETRY] 403 SUBSCRIPTION_REQUIRED detected, will retry with next endpoint`)
80+
return true
81+
}
82+
} catch {}
83+
}
7684
return false
7785
}
7886

@@ -145,14 +153,20 @@ async function attemptFetch(
145153
thoughtSignature,
146154
})
147155

156+
debugLog(`[REQ] streaming=${transformed.streaming}, url=${transformed.url}`)
157+
148158
const response = await fetch(transformed.url, {
149159
method: init.method || "POST",
150160
headers: transformed.headers,
151161
body: JSON.stringify(transformed.body),
152162
signal: init.signal,
153163
})
154164

155-
if (!response.ok && isRetryableError(response.status)) {
165+
debugLog(
166+
`[RESP] status=${response.status} content-type=${response.headers.get("content-type") ?? ""} url=${response.url}`
167+
)
168+
169+
if (!response.ok && (await isRetryableResponse(response))) {
156170
debugLog(`Endpoint failed: ${endpoint} (status: ${response.status}), trying next`)
157171
return null
158172
}
@@ -223,41 +237,36 @@ async function transformResponseWithThinking(
223237
result = await transformResponse(response)
224238
}
225239

240+
if (streaming) {
241+
return result.response
242+
}
243+
226244
try {
227245
const text = await result.response.clone().text()
228246
debugLog(`[TSIG][RESP] Response text length: ${text.length}`)
229247

230-
if (streaming) {
231-
const signature = extractSignatureFromSsePayload(text)
232-
debugLog(`[TSIG][RESP] SSE signature extracted: ${signature ? "yes" : "no"}`)
233-
if (signature) {
234-
setThoughtSignature(fetchInstanceId, signature)
235-
debugLog(`[TSIG][STORE] Stored signature for ${fetchInstanceId}: ${signature.substring(0, 30)}...`)
236-
}
248+
const parsed = JSON.parse(text) as GeminiResponseBody
249+
debugLog(`[TSIG][RESP] Parsed keys: ${Object.keys(parsed).join(", ")}`)
250+
debugLog(`[TSIG][RESP] Has candidates: ${!!parsed.candidates}, count: ${parsed.candidates?.length ?? 0}`)
251+
252+
const signature = extractSignatureFromResponse(parsed)
253+
debugLog(`[TSIG][RESP] Signature extracted: ${signature ? signature.substring(0, 30) + "..." : "NONE"}`)
254+
if (signature) {
255+
setThoughtSignature(fetchInstanceId, signature)
256+
debugLog(`[TSIG][STORE] Stored signature for ${fetchInstanceId}`)
237257
} else {
238-
const parsed = JSON.parse(text) as GeminiResponseBody
239-
debugLog(`[TSIG][RESP] Parsed keys: ${Object.keys(parsed).join(", ")}`)
240-
debugLog(`[TSIG][RESP] Has candidates: ${!!parsed.candidates}, count: ${parsed.candidates?.length ?? 0}`)
241-
242-
const signature = extractSignatureFromResponse(parsed)
243-
debugLog(`[TSIG][RESP] Signature extracted: ${signature ? signature.substring(0, 30) + "..." : "NONE"}`)
244-
if (signature) {
245-
setThoughtSignature(fetchInstanceId, signature)
246-
debugLog(`[TSIG][STORE] Stored signature for ${fetchInstanceId}`)
247-
} else {
248-
debugLog(`[TSIG][WARN] No signature found in response!`)
249-
}
258+
debugLog(`[TSIG][WARN] No signature found in response!`)
259+
}
250260

251-
if (shouldIncludeThinking(modelName)) {
252-
const thinkingResult = extractThinkingBlocks(parsed)
253-
if (thinkingResult.hasThinking) {
254-
const transformed = transformResponseThinking(parsed)
255-
return new Response(JSON.stringify(transformed), {
256-
status: result.response.status,
257-
statusText: result.response.statusText,
258-
headers: result.response.headers,
259-
})
260-
}
261+
if (shouldIncludeThinking(modelName)) {
262+
const thinkingResult = extractThinkingBlocks(parsed)
263+
if (thinkingResult.hasThinking) {
264+
const transformed = transformResponseThinking(parsed)
265+
return new Response(JSON.stringify(transformed), {
266+
status: result.response.status,
267+
statusText: result.response.statusText,
268+
headers: result.response.headers,
269+
})
261270
}
262271
}
263272
} catch {}

src/auth/antigravity/response.ts

Lines changed: 48 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -339,31 +339,39 @@ export function transformStreamingPayload(payload: string): string {
339339
.join("\n")
340340
}
341341

342+
function createSseTransformStream(): TransformStream<Uint8Array, Uint8Array> {
343+
const decoder = new TextDecoder()
344+
const encoder = new TextEncoder()
345+
let buffer = ""
346+
347+
return new TransformStream({
348+
transform(chunk, controller) {
349+
buffer += decoder.decode(chunk, { stream: true })
350+
const lines = buffer.split("\n")
351+
buffer = lines.pop() || ""
352+
353+
for (const line of lines) {
354+
const transformed = transformSseLine(line)
355+
controller.enqueue(encoder.encode(transformed + "\n"))
356+
}
357+
},
358+
flush(controller) {
359+
if (buffer) {
360+
const transformed = transformSseLine(buffer)
361+
controller.enqueue(encoder.encode(transformed))
362+
}
363+
},
364+
})
365+
}
366+
342367
/**
343368
* Transforms a streaming SSE response from Antigravity to OpenAI format.
344369
*
345-
* **⚠️ CURRENT IMPLEMENTATION: BUFFERING**
346-
* This implementation reads the entire stream into memory before transforming.
347-
* While functional, it does not preserve true streaming characteristics:
348-
* - Blocks until entire response is received
349-
* - Consumes memory proportional to response size
350-
* - Increases Time-To-First-Byte (TTFB)
351-
*
352-
* **TODO: Future Enhancement**
353-
* Implement true streaming using ReadableStream transformation:
354-
* - Parse SSE chunks incrementally
355-
* - Transform and yield chunks as they arrive
356-
* - Reduce memory footprint and TTFB
357-
*
358-
* For streaming responses (current buffered approach):
359-
* - Unwraps the `response` field from each SSE event
360-
* - Returns transformed SSE text as new Response
361-
* - Extracts usage metadata from headers
362-
*
363-
* Note: Does NOT handle thinking block extraction (Task 10)
370+
* Uses TransformStream to process SSE chunks incrementally as they arrive.
371+
* Each line is transformed immediately and yielded to the client.
364372
*
365373
* @param response - The SSE response from Antigravity API
366-
* @returns TransformResult with transformed response and metadata
374+
* @returns TransformResult with transformed streaming response
367375
*/
368376
export async function transformStreamingResponse(response: Response): Promise<TransformResult> {
369377
const headers = new Headers(response.headers)
@@ -402,7 +410,8 @@ export async function transformStreamingResponse(response: Response): Promise<Tr
402410

403411
// Check content type
404412
const contentType = response.headers.get("content-type") ?? ""
405-
const isEventStream = contentType.includes("text/event-stream")
413+
const isEventStream =
414+
contentType.includes("text/event-stream") || response.url.includes("alt=sse")
406415

407416
if (!isEventStream) {
408417
// Not SSE, delegate to non-streaming transform
@@ -434,24 +443,25 @@ export async function transformStreamingResponse(response: Response): Promise<Tr
434443
}
435444
}
436445

437-
// Handle SSE stream
438-
// NOTE: Current implementation buffers entire stream - see JSDoc for details
439-
try {
440-
const text = await response.text()
441-
const transformed = transformStreamingPayload(text)
442-
443-
return {
444-
response: new Response(transformed, {
445-
status: response.status,
446-
statusText: response.statusText,
447-
headers,
448-
}),
449-
usage,
450-
}
451-
} catch {
452-
// If reading fails, return original response
446+
if (!response.body) {
453447
return { response, usage }
454448
}
449+
450+
headers.delete("content-length")
451+
headers.delete("content-encoding")
452+
headers.set("content-type", "text/event-stream; charset=utf-8")
453+
454+
const transformStream = createSseTransformStream()
455+
const transformedBody = response.body.pipeThrough(transformStream)
456+
457+
return {
458+
response: new Response(transformedBody, {
459+
status: response.status,
460+
statusText: response.statusText,
461+
headers,
462+
}),
463+
usage,
464+
}
455465
}
456466

457467
/**
@@ -462,7 +472,7 @@ export async function transformStreamingResponse(response: Response): Promise<Tr
462472
*/
463473
export function isStreamingResponse(response: Response): boolean {
464474
const contentType = response.headers.get("content-type") ?? ""
465-
return contentType.includes("text/event-stream")
475+
return contentType.includes("text/event-stream") || response.url.includes("alt=sse")
466476
}
467477

468478
/**

0 commit comments

Comments
 (0)