Skip to content

Commit d1a1bd6

Browse files
authored
Merge pull request #156 from PRO-Robotech/feature/dev
safe websocket
2 parents 175487e + 8a34772 commit d1a1bd6

File tree

3 files changed

+169
-61
lines changed

3 files changed

+169
-61
lines changed

package-lock.json

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/components/organisms/Events/Events.tsx

Lines changed: 148 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ import React, { FC, useCallback, useEffect, useReducer, useRef, useState } from
1313
import { theme as antdtheme, Flex, Tooltip } from 'antd'
1414
import { ResumeCircleIcon, PauseCircleIcon, LockedIcon, UnlockedIcon } from '@prorobotech/openapi-k8s-toolkit'
1515
import { TScrollMsg, TServerFrame } from './types'
16-
import { eventKey } from './utils'
16+
import { eventKey, compareRV, getRV, getMaxRV } from './utils'
1717
import { reducer } from './reducer'
1818
import { EventRow } from './molecules'
1919
import { Styled } from './styled'
@@ -44,6 +44,9 @@ export const Events: FC<TEventsProps> = ({ wsUrl, pageSize = 50, height }) => {
4444
removeIgnoredRef.current = isRemoveIgnored
4545
}, [isRemoveIgnored])
4646

47+
// track latest resourceVersion we have processed
48+
const latestRVRef = useRef<string | undefined>(undefined)
49+
4750
// Reducer-backed store of events
4851
const [state, dispatch] = useReducer(reducer, { order: [], byKey: {} })
4952

@@ -64,6 +67,14 @@ export const Events: FC<TEventsProps> = ({ wsUrl, pageSize = 50, height }) => {
6467
const backoffRef = useRef(750) // ms; increases on failures up to a cap
6568
const urlRef = useRef(wsUrl) // latest wsUrl (stable inside callbacks)
6669

70+
// Guards for unmount & reconnect timer
71+
const mountedRef = useRef(true)
72+
const reconnectTimerRef = useRef<number | null>(null)
73+
const onMessageRef = useRef<(ev: MessageEvent) => void>(() => {})
74+
const startedRef = useRef(false)
75+
const connectingRef = useRef(false)
76+
const haveAnchorRef = useRef(false)
77+
6778
// Keep urlRef in sync so connect() uses the latest wsUrl
6879
useEffect(() => {
6980
urlRef.current = wsUrl
@@ -90,88 +101,148 @@ export const Events: FC<TEventsProps> = ({ wsUrl, pageSize = 50, height }) => {
90101
wsRef.current.send(JSON.stringify(msg))
91102
}, [contToken, pageSize])
92103

93-
// Handle all incoming frames from the server
94-
const onMessage = useCallback((ev: MessageEvent) => {
95-
let frame: TServerFrame | undefined
96-
try {
97-
frame = JSON.parse(String(ev.data))
98-
} catch {
99-
// Ignore malformed frames; you could surface these in UI if desired
100-
return
101-
}
102-
if (!frame) return
103-
104-
if (frame.type === 'INITIAL') {
105-
// Replace current list with newest set; set pagination token
106-
dispatch({ type: 'RESET', items: frame.items })
107-
setContToken(frame.continue)
108-
setHasMore(Boolean(frame.continue))
109-
setLastError(undefined)
110-
return
111-
}
104+
const maybeAutoScroll = useCallback(() => {
105+
if (wantMoreRef.current && hasMore) sendScroll()
106+
}, [hasMore, sendScroll])
112107

113-
if (frame.type === 'PAGE') {
114-
// Append older items to the end; clear fetching guard
115-
dispatch({ type: 'APPEND_PAGE', items: frame.items })
116-
setContToken(frame.continue)
117-
setHasMore(Boolean(frame.continue))
118-
fetchingRef.current = false
119-
return
120-
}
108+
// Handle all incoming frames from the server
109+
useEffect(() => {
110+
onMessageRef.current = (ev: MessageEvent) => {
111+
let frame: TServerFrame | undefined
112+
try {
113+
frame = JSON.parse(String(ev.data)) as TServerFrame
114+
} catch {
115+
return
116+
}
117+
if (!frame) return
118+
119+
if (frame.type === 'INITIAL') {
120+
dispatch({ type: 'RESET', items: frame.items })
121+
setContToken(frame.continue)
122+
setHasMore(Boolean(frame.continue))
123+
setLastError(undefined)
124+
fetchingRef.current = false
125+
126+
const snapshotRV = frame.resourceVersion || getMaxRV(frame.items)
127+
if (snapshotRV) {
128+
latestRVRef.current = snapshotRV
129+
haveAnchorRef.current = true // NEW: we now have a safe anchor
130+
}
131+
return
132+
}
121133

122-
if (frame.type === 'PAGE_ERROR') {
123-
// Keep live stream but surface pagination error
124-
setLastError(frame.error || 'Failed to load next page')
125-
fetchingRef.current = false
126-
return
127-
}
134+
if (frame.type === 'PAGE') {
135+
dispatch({ type: 'APPEND_PAGE', items: frame.items })
136+
setContToken(frame.continue)
137+
setHasMore(Boolean(frame.continue))
138+
fetchingRef.current = false
139+
140+
const batchRV = getMaxRV(frame.items)
141+
if (batchRV && (!latestRVRef.current || compareRV(batchRV, latestRVRef.current) > 0)) {
142+
latestRVRef.current = batchRV
143+
}
144+
maybeAutoScroll()
145+
return
146+
}
128147

129-
if (!pausedRef.current) {
130-
if (frame.type === 'ADDED' || frame.type === 'MODIFIED') {
131-
// Live update: insert or replace
132-
dispatch({ type: 'UPSERT', item: frame.item })
148+
if (frame.type === 'PAGE_ERROR') {
149+
setLastError(frame.error || 'Failed to load next page')
150+
fetchingRef.current = false
133151
return
134152
}
135153

136-
if (!removeIgnoredRef.current && frame.type === 'DELETED') {
137-
// Live delete
138-
dispatch({ type: 'REMOVE', key: eventKey(frame.item) })
154+
if (frame.type === 'ADDED' || frame.type === 'MODIFIED' || frame.type === 'DELETED') {
155+
const rv = getRV(frame.item)
156+
if (rv && (!latestRVRef.current || compareRV(rv, latestRVRef.current) > 0)) {
157+
latestRVRef.current = rv
158+
}
159+
}
160+
161+
if (!pausedRef.current) {
162+
if (frame.type === 'ADDED' || frame.type === 'MODIFIED') {
163+
dispatch({ type: 'UPSERT', item: frame.item })
164+
return
165+
}
166+
167+
if (!removeIgnoredRef.current && frame.type === 'DELETED') {
168+
dispatch({ type: 'REMOVE', key: eventKey(frame.item) })
169+
}
139170
}
140171
}
172+
}, [maybeAutoScroll])
173+
174+
const buildWsUrl = useCallback((raw: string) => {
175+
try {
176+
const hasScheme = /^[a-z]+:/i.test(raw)
177+
const base = window.location.origin
178+
let u = hasScheme ? new URL(raw) : new URL(raw.startsWith('/') ? raw : `/${raw}`, base)
179+
if (u.protocol === 'http:') u.protocol = 'ws:'
180+
if (u.protocol === 'https:') u.protocol = 'wss:'
181+
if (u.protocol !== 'ws:' && u.protocol !== 'wss:') {
182+
u = new URL(u.pathname + u.search + u.hash, base)
183+
u.protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:'
184+
}
185+
if (haveAnchorRef.current && latestRVRef.current) {
186+
u.searchParams.set('sinceRV', latestRVRef.current)
187+
} else {
188+
u.searchParams.delete('sinceRV')
189+
}
190+
return u.toString()
191+
} catch {
192+
const origin = window.location.origin.replace(/^http/, 'ws')
193+
const prefix = raw.startsWith('/') ? '' : '/'
194+
const rv = haveAnchorRef.current ? latestRVRef.current : undefined
195+
const sep = raw.includes('?') ? '&' : '?'
196+
return `${origin}${prefix}${raw}${rv ? `${sep}sinceRV=${encodeURIComponent(rv)}` : ''}`
197+
}
141198
}, [])
142199

143200
// Establish and maintain the WebSocket connection with bounded backoff
144201
const connect = useCallback(() => {
202+
if (!mountedRef.current) return
203+
// Prevent duplicate opens
204+
if (connectingRef.current) return
205+
if (
206+
wsRef.current &&
207+
(wsRef.current.readyState === WebSocket.OPEN || wsRef.current.readyState === WebSocket.CONNECTING)
208+
) {
209+
return
210+
}
211+
connectingRef.current = true
212+
145213
setConnStatus('connecting')
146214
setLastError(undefined)
147215

148-
// Accept absolute ws(s) URLs; otherwise resolve relative to current origin
149-
const buildWsUrl = (raw: string) => {
150-
if (/^wss?:/i.test(raw)) return raw // already absolute ws(s)
151-
const origin = window.location.origin.replace(/^http/i, 'ws')
152-
if (raw.startsWith('/')) return `${origin}${raw}`
153-
return `${origin}/${raw}`
154-
}
155-
156-
const ws = new WebSocket(buildWsUrl(urlRef.current))
216+
const url = buildWsUrl(urlRef.current)
217+
const ws = new WebSocket(url)
157218
wsRef.current = ws
158219

159220
ws.addEventListener('open', () => {
221+
if (!mountedRef.current) return
222+
backoffRef.current = 750
223+
fetchingRef.current = false
160224
setConnStatus('open')
161-
backoffRef.current = 750 // reset backoff on success
225+
connectingRef.current = false
162226
})
163227

164-
ws.addEventListener('message', onMessage)
228+
ws.addEventListener('message', ev => onMessageRef.current(ev))
165229

166230
const scheduleReconnect = () => {
167-
// Only clear if we're still looking at this instance
168231
if (wsRef.current === ws) wsRef.current = null
169232
setConnStatus('closed')
170-
const wait = Math.min(backoffRef.current, 8000)
171-
const next = Math.min(wait * 2, 12000)
233+
connectingRef.current = false
234+
// Bounded exponential backoff with jitter to avoid herding
235+
const base = Math.min(backoffRef.current, 8000)
236+
const jitter = Math.random() * 0.4 + 0.8 // 0.8x–1.2x
237+
const wait = Math.floor(base * jitter)
238+
const next = Math.min(base * 2, 12000)
172239
backoffRef.current = next
173-
// Reconnect after a short delay; preserves component mount semantics
174-
setTimeout(() => {
240+
if (reconnectTimerRef.current) {
241+
window.clearTimeout(reconnectTimerRef.current)
242+
reconnectTimerRef.current = null
243+
}
244+
reconnectTimerRef.current = window.setTimeout(() => {
245+
if (!mountedRef.current) return
175246
connect()
176247
}, wait)
177248
}
@@ -181,13 +252,30 @@ export const Events: FC<TEventsProps> = ({ wsUrl, pageSize = 50, height }) => {
181252
setLastError('WebSocket error')
182253
scheduleReconnect()
183254
})
184-
}, [onMessage])
255+
}, [buildWsUrl])
185256

186257
// Kick off initial connection on mount; clean up on unmount
187258
useEffect(() => {
259+
if (startedRef.current) return undefined // StrictMode double-invoke guard
260+
startedRef.current = true
261+
262+
mountedRef.current = true
188263
connect()
189-
return () => closeWS()
190-
}, [connect, closeWS])
264+
265+
return () => {
266+
mountedRef.current = false
267+
startedRef.current = false
268+
if (reconnectTimerRef.current) {
269+
window.clearTimeout(reconnectTimerRef.current)
270+
reconnectTimerRef.current = null
271+
}
272+
closeWS()
273+
wsRef.current = null
274+
connectingRef.current = false
275+
}
276+
// INTENTIONALLY EMPTY DEPS – do not reopen on state changes
277+
// eslint-disable-next-line react-hooks/exhaustive-deps
278+
}, [])
191279

192280
// IntersectionObserver to trigger SCROLL when sentinel becomes visible
193281
useEffect(() => {

src/components/organisms/Events/utils.ts

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,3 +6,23 @@ export const eventKey = (e: TEventsV1Event) => {
66
const ns = e.metadata?.namespace ?? ''
77
return `${ns}/${n}`
88
}
9+
10+
// Compare resourceVersions safely (string-based)
11+
export const compareRV = (a: string, b: string): number => {
12+
if (a.length !== b.length) return a.length > b.length ? 1 : -1
13+
// eslint-disable-next-line no-nested-ternary
14+
return a > b ? 1 : a < b ? -1 : 0
15+
}
16+
17+
type WithRV = { metadata?: { resourceVersion?: string } }
18+
19+
export const getRV = (item: WithRV): string | undefined => item?.metadata?.resourceVersion
20+
21+
// ✅ Pure functional + no restricted syntax
22+
export const getMaxRV = <T extends WithRV>(items: ReadonlyArray<T>): string | undefined => {
23+
const rvs = items
24+
.map(getRV)
25+
.filter((v): v is string => Boolean(v))
26+
.sort(compareRV)
27+
return rvs.length ? rvs[rvs.length - 1] : undefined
28+
}

0 commit comments

Comments
 (0)