Skip to content

Commit fee3221

Browse files
committed
fix streaming
1 parent 3540632 commit fee3221

File tree

2 files changed

+103
-46
lines changed

2 files changed

+103
-46
lines changed

src/endpoints/events/eventsWs/eventsWebSocket.ts

Lines changed: 82 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
/* eslint-disable max-lines-per-function */
22
import { Request } from 'express'
3-
import WebSocket, { WebSocketServer } from 'ws'
3+
import WebSocket from 'ws'
44
import { WebsocketRequestHandler } from 'express-ws'
55
import * as k8s from '@kubernetes/client-node'
66
import { createUserKubeClient } from 'src/constants/kubeClients'
7+
import { eventSortKey } from './utils'
78

89
type TWatchPhase = 'ADDED' | 'MODIFIED' | 'DELETED' | 'BOOKMARK'
910

@@ -44,40 +45,42 @@ const getJoinedParam = (url: URL, key: string): string | undefined => {
4445
}
4546

4647
export const eventsWebSocket: WebsocketRequestHandler = async (ws: WebSocket, req: Request) => {
48+
console.log(`[${new Date().toISOString()}]: Incoming WebSocket connection for events`)
49+
4750
const headers: Record<string, string | string[] | undefined> = { ...(req.headers || {}) }
4851
delete headers['host']
4952

5053
const reqUrl = new URL(req.url || '', `http://${req.headers.host}`)
5154
const namespace = reqUrl.searchParams.get('namespace') || undefined
5255
const initialLimit = parseLimit(reqUrl.searchParams.get('limit'))
5356
const initialContinue = reqUrl.searchParams.get('_continue') || undefined
57+
console.log(`[${new Date().toISOString()}]: Query params parsed:`, { namespace, initialLimit, initialContinue })
5458

55-
// Supports multiple occurrences; they’ll be joined by commas.
56-
const fieldSelector = getJoinedParam(reqUrl, 'fieldSelector') ?? getJoinedParam(reqUrl, 'field') // optional alias if you decide to support it
57-
const labelSelector = getJoinedParam(reqUrl, 'labelSelector') ?? getJoinedParam(reqUrl, 'labels') // optional alias if you decide to support it
58-
59+
const fieldSelector = getJoinedParam(reqUrl, 'fieldSelector') ?? getJoinedParam(reqUrl, 'field')
60+
const labelSelector = getJoinedParam(reqUrl, 'labelSelector') ?? getJoinedParam(reqUrl, 'labels')
5961
const sinceRV = reqUrl.searchParams.get('sinceRV') || undefined
6062

63+
console.log(`[${new Date().toISOString()}]: Selectors:`, { fieldSelector, labelSelector, sinceRV })
64+
6165
const userKube = createUserKubeClient(headers)
66+
console.log(`[${new Date().toISOString()}]: Created Kubernetes client for user`)
67+
6268
const watch = new k8s.Watch(userKube.kubeConfig)
6369
const evApi = userKube.kubeConfig.makeApiClient(k8s.EventsV1Api)
6470

6571
let closed = false
6672
// Seed lastRV from client if provided (so we can resume)
6773
let lastRV: string | undefined = sinceRV
6874
let sentInitial = false
69-
7075
let abortCurrentWatch: (() => void) | null = null
7176
let startingWatch = false
7277

7378
const watchPath = namespace
7479
? `/apis/events.k8s.io/v1/namespaces/${namespace}/events`
7580
: `/apis/events.k8s.io/v1/events`
7681

77-
/**
78-
* One page of list. If `captureRV` is true we update lastRV from the list's metadata.
79-
* We return the page along with pagination metadata so the caller can forward it to the UI.
80-
*/
82+
console.log(`[${new Date().toISOString()}]: Using watchPath:`, watchPath)
83+
8184
const listPage = async ({
8285
limit,
8386
_continue,
@@ -87,14 +90,15 @@ export const eventsWebSocket: WebsocketRequestHandler = async (ws: WebSocket, re
8790
_continue?: string
8891
captureRV: boolean
8992
}) => {
93+
console.log(`[${new Date().toISOString()}]: Listing page of events`, { limit, _continue, captureRV, lastRV })
94+
9095
const baseOpts: k8s.EventsV1ApiListEventForAllNamespacesRequest = {
9196
fieldSelector,
9297
labelSelector,
9398
limit: typeof limit === 'number' ? limit : undefined,
94-
_continue, // may be undefined
99+
_continue,
95100
}
96101

97-
// Only add RV knobs when NOT paginating with a continue token
98102
if (!_continue && lastRV) {
99103
baseOpts.resourceVersion = lastRV
100104
baseOpts.resourceVersionMatch = 'NotOlderThan'
@@ -104,64 +108,81 @@ export const eventsWebSocket: WebsocketRequestHandler = async (ws: WebSocket, re
104108
? await evApi.listNamespacedEvent({ namespace, ...baseOpts })
105109
: await evApi.listEventForAllNamespaces(baseOpts)
106110

107-
// Record RV only for the *initial* snapshot page we are anchoring the watch to.
108-
if (captureRV) lastRV = resp.metadata?.resourceVersion
111+
const items = (resp.items ?? []) as k8s.EventsV1Event[]
112+
items.sort((a, b) => eventSortKey(b) - eventSortKey(a)) // newest first
109113

114+
console.log(`[${new Date().toISOString()}]: List page received`, {
115+
// itemCount: resp.items?.length,
116+
itemCount: items.length,
117+
continue: resp.metadata?._continue,
118+
resourceVersion: resp.metadata?.resourceVersion,
119+
})
120+
121+
if (captureRV) lastRV = resp.metadata?.resourceVersion
110122
return {
111-
items: resp.items ?? [],
123+
// items: resp.items ?? [],
124+
items,
112125
continue: resp.metadata?._continue,
113126
remainingItemCount: resp.metadata?.remainingItemCount,
114127
resourceVersion: resp.metadata?.resourceVersion,
115128
}
116129
}
117130

118131
const onEvent = (phase: string, obj: unknown) => {
132+
console.log(`[${new Date().toISOString()}]: Watch event fired:`, phase)
119133
if (closed) return
120134
const p = phase as TWatchPhase
121135

122-
// (Optional but recommended) advance RV on BOOKMARK
123136
if (p === 'BOOKMARK' && obj && typeof obj === 'object') {
124137
const md = (obj as any).metadata
125138
if (md?.resourceVersion) lastRV = md.resourceVersion
139+
console.log(`[${new Date().toISOString()}]: Bookmark event, updated RV:`, lastRV)
126140
return
127141
}
128142

129143
if ((p === 'ADDED' || p === 'MODIFIED' || p === 'DELETED') && isEventsV1Event(obj)) {
130144
const rv = obj.metadata?.resourceVersion
131145
if (rv) lastRV = rv
146+
console.log(`[${new Date().toISOString()}]: Event:`, p, 'name:', obj.metadata?.name, 'RV:', rv)
132147
try {
133148
if (ws.readyState === WebSocket.OPEN) {
134149
ws.send(JSON.stringify({ type: p, item: obj }))
135150
}
136-
} catch {
137-
// ignore send errors (socket might be racing to close)
151+
} catch (err) {
152+
console.warn(`[${new Date().toISOString()}]: Failed to send event:`, err)
138153
}
139154
}
140155
}
141156

142157
const onError = async (err: unknown) => {
158+
console.error(`[${new Date().toISOString()}]: Watch error:`, err)
143159
if (closed) return
144160
if (isGone410(err)) {
161+
console.warn(`[${new Date().toISOString()}]: 410 Gone detected, resetting list page`)
145162
try {
146163
await listPage({ limit: initialLimit, _continue: undefined, captureRV: true })
147-
} catch {
148-
/* noop */
164+
} catch (e) {
165+
console.error(`[${new Date().toISOString()}]: Failed to reset listPage after 410:`, e)
149166
}
150167
}
151168
// Restart the watch after a short delay; ensure we stop the current one first
152169
setTimeout(() => void startWatch(), 1200)
153170
}
154171

155172
const startWatch = async (): Promise<void> => {
156-
if (closed || startingWatch) return
173+
console.log(`[${new Date().toISOString()}]: Starting watch...`)
174+
if (closed || startingWatch) {
175+
console.log(`[${new Date().toISOString()}]: Skipping watch start, closed or already starting`)
176+
return
177+
}
157178
startingWatch = true
158179
try {
159-
// stop any existing watch before starting a new one
160180
if (abortCurrentWatch) {
181+
console.log(`[${new Date().toISOString()}]: Aborting existing watch before starting new one`)
161182
try {
162183
abortCurrentWatch()
163184
} catch {
164-
/* skip */
185+
console.warn(`[${new Date().toISOString()}]: Failed to abort existing watch`)
165186
}
166187
abortCurrentWatch = null
167188
}
@@ -173,47 +194,52 @@ export const eventsWebSocket: WebsocketRequestHandler = async (ws: WebSocket, re
173194
}
174195
if (lastRV) {
175196
watchOpts.resourceVersion = lastRV
176-
watchOpts.resourceVersionMatch = 'NotOlderThan'
197+
// watchOpts.resourceVersionMatch = 'NotOlderThan'
177198
}
178199

200+
console.log(`[${new Date().toISOString()}]: Watch options:`, watchOpts)
179201
const reqObj = await watch.watch(watchPath, watchOpts, onEvent, onError)
202+
console.log(`[${new Date().toISOString()}]: Watch established`)
180203
abortCurrentWatch = () => {
204+
console.log(`[${new Date().toISOString()}]: Aborting watch...`)
181205
try {
182206
;(reqObj as any)?.abort?.()
183207
} catch {
184-
/* skip */
208+
console.warn(`[${new Date().toISOString()}]: Abort failed`)
185209
}
186210
try {
187211
;(reqObj as any)?.destroy?.()
188212
} catch {
189-
/* skip */
213+
console.warn(`[${new Date().toISOString()}]: Destroy failed`)
190214
}
191215
}
192216
} catch (err) {
217+
console.error(`[${new Date().toISOString()}]: Error starting watch:`, err)
193218
if (!closed && isGone410(err)) {
219+
console.warn(`[${new Date().toISOString()}]: Re-listing after 410 on watch start`)
194220
try {
195221
await listPage({ limit: initialLimit, _continue: undefined, captureRV: true })
196-
} catch {
197-
/* noop */
222+
} catch (e) {
223+
console.error(`[${new Date().toISOString()}]: Failed re-list after 410:`, e)
198224
}
199225
}
200-
// try again a bit later
201226
setTimeout(() => void startWatch(), 2000)
202227
} finally {
203228
startingWatch = false
204229
}
205230
}
206231

207-
// Kick off: do a *single* snapshot (paged if requested) and send INITIAL once
208232
try {
233+
console.log(`[${new Date().toISOString()}]: Performing initial list...`)
209234
const page = await listPage({
210235
limit: initialLimit,
211236
_continue: initialContinue,
212-
captureRV: true, // anchor the watch to this snapshot
237+
captureRV: true,
213238
})
214239

215240
if (!sentInitial && ws.readyState === WebSocket.OPEN) {
216241
sentInitial = true
242+
console.log(`[${new Date().toISOString()}]: Sending INITIAL snapshot to client`)
217243
try {
218244
ws.send(
219245
JSON.stringify({
@@ -224,39 +250,42 @@ export const eventsWebSocket: WebsocketRequestHandler = async (ws: WebSocket, re
224250
resourceVersion: page.resourceVersion,
225251
}),
226252
)
227-
} catch {
228-
/* ignore */
253+
} catch (err) {
254+
console.error(`[${new Date().toISOString()}]: Failed to send INITIAL page:`, err)
229255
}
230256
}
231-
} catch {
232-
// snapshot failed; proceed to watch "from now"
233-
// (do NOT zero out lastRV; if sinceRV was provided we want to NotOlderThan from it)
234-
sentInitial = true // prevent accidental INITIAL later
257+
} catch (err) {
258+
console.error(`[${new Date().toISOString()}]: Initial list failed:`, err)
259+
sentInitial = true
235260
}
236261

237262
void startWatch()
238263
const rotateIv = setInterval(
239264
() => {
265+
console.log(`[${new Date().toISOString()}]: Periodic watch rotation triggered`)
240266
void startWatch()
241267
},
242268
10 * 60 * 1000,
243269
)
244270

245-
// Infinite scroll: UI requests more pages after INITIAL using the `_continue` token.
246271
ws.on('message', async data => {
272+
console.log(`[${new Date().toISOString()}]: Received WS message:`, data.toString())
247273
if (closed) return
248274
let msg: any
249275
try {
250276
msg = JSON.parse(String(data))
251277
} catch {
278+
console.warn(`[${new Date().toISOString()}]: Invalid JSON from client`)
252279
return
253280
}
254281
if (msg?.type === 'SCROLL') {
282+
console.log(`[${new Date().toISOString()}]: Client requested SCROLL:`, msg)
255283
const limit = typeof msg.limit === 'number' && msg.limit > 0 ? Math.trunc(msg.limit) : undefined
256284
const token = typeof msg.continue === 'string' ? msg.continue : undefined
257285
if (!token) return
258286
try {
259-
const page = await listPage({ limit, _continue: token, captureRV: false }) // do NOT touch lastRV
287+
const page = await listPage({ limit, _continue: token, captureRV: false })
288+
console.log(`[${new Date().toISOString()}]: Sending PAGE to client:`, { count: page.items.length })
260289
if (ws.readyState === WebSocket.OPEN) {
261290
ws.send(
262291
JSON.stringify({
@@ -268,53 +297,60 @@ export const eventsWebSocket: WebsocketRequestHandler = async (ws: WebSocket, re
268297
)
269298
}
270299
} catch (e) {
300+
console.error(`[${new Date().toISOString()}]: Page fetch failed:`, e)
271301
if (ws.readyState === WebSocket.OPEN) {
272302
ws.send(JSON.stringify({ type: 'PAGE_ERROR', error: 'Failed to load next page' }))
273303
}
274304
}
275305
}
276306
})
277307

278-
// Keep-alive pings with liveness tracking
279308
let isAlive = true
280309
;(ws as any).on?.('pong', () => {
310+
console.log(`[${new Date().toISOString()}]: Pong received from client`)
281311
isAlive = true
282312
})
283313

284314
const pingIv = setInterval(() => {
285315
try {
286316
if ((ws as any).readyState !== WebSocket.OPEN) return
287317
if (!isAlive) {
288-
// no pong since last ping — terminate to free resources
318+
console.warn(`[${new Date().toISOString()}]: No pong received — terminating socket`)
289319
;(ws as any).terminate?.()
290320
return
291321
}
292322
isAlive = false
323+
console.log(`[${new Date().toISOString()}]: Sending ping to client`)
293324
;(ws as any).ping?.()
294325
} catch (e) {
295-
console.debug('events ping error (ignored):', e)
326+
console.debug(`[${new Date().toISOString()}]: Ping error (ignored):`, e)
296327
}
297328
}, 25_000)
298329

299330
const cleanup = () => {
331+
console.log(`[${new Date().toISOString()}]: Cleaning up WebSocket and watchers`)
300332
closed = true
301333
clearInterval(pingIv)
302334
clearInterval(rotateIv)
303335
try {
304336
abortCurrentWatch?.()
305337
} catch {
306-
/* skip */
338+
console.warn(`[${new Date().toISOString()}]: Abort during cleanup failed`)
307339
}
308340
abortCurrentWatch = null
309341
}
310342

311-
;(ws as any).on?.('close', cleanup)
312-
;(ws as any).on?.('error', () => {
343+
;(ws as any).on?.('close', () => {
344+
console.log(`[${new Date().toISOString()}]: WebSocket closed`)
345+
cleanup()
346+
})
347+
;(ws as any).on?.('error', err => {
348+
console.error(`[${new Date().toISOString()}]: WebSocket error:`, err)
313349
cleanup()
314350
try {
315351
;(ws as any).close?.()
316352
} catch (e) {
317-
console.debug('events ws close error (ignored):', e)
353+
console.debug(`[${new Date().toISOString()}]: Error closing WS after error (ignored):`, e)
318354
}
319355
})
320356
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
import * as k8s from '@kubernetes/client-node'
2+
3+
// Add these helpers near the top (e.g., under isGone410)
4+
const toMillis = (t?: unknown): number => {
5+
if (!t) return 0
6+
if (t instanceof Date) return t.getTime()
7+
if (typeof t === 'string') {
8+
const ms = Date.parse(t)
9+
return Number.isFinite(ms) ? ms : 0
10+
}
11+
return 0
12+
}
13+
14+
export const eventSortKey = (ev: k8s.EventsV1Event): number => {
15+
// events.k8s.io/v1 prefers eventTime; fall back to series.lastObservedTime, then creationTimestamp
16+
return Math.max(
17+
toMillis((ev as any).eventTime),
18+
toMillis((ev.series as any)?.lastObservedTime),
19+
toMillis((ev.metadata as any)?.creationTimestamp),
20+
)
21+
}

0 commit comments

Comments
 (0)