Skip to content

Commit

Permalink
handle errors on socket (#516)
Browse files Browse the repository at this point in the history
Signed-off-by: ryjiang <[email protected]>
  • Loading branch information
shanghaikid authored May 28, 2024
1 parent 89b0616 commit c769e32
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 12 deletions.
13 changes: 12 additions & 1 deletion client/src/context/Data.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -330,12 +330,23 @@ export const DataProvider = (props: { children: React.ReactNode }) => {
socket.current.emit(WS_EVENTS.REGISTER, clientId);

socket.current.on('connect', async () => {
console.log('--- ws connected ---', clientId);
// console.info('--- ws connected ---', clientId);
// fetch db
await fetchDatabases(true);
// set connected to trues
setConnected(true);
});

// handle disconnect
socket.current.on('disconnect', () => {
// Set connected to false
setConnected(false);
});

// handle error
socket.current.on('error', error => {
socket.current?.disconnect();
});
} else {
socket.current?.disconnect();
// clear collections
Expand Down
4 changes: 3 additions & 1 deletion server/src/collections/collections.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,9 @@ export class CollectionsService {
const res = await this.getAllCollections(clientId, collections);

// emit event to current client
socketClient.emit(WS_EVENTS.COLLECTION_UPDATE, res);
if (socketClient) {
socketClient.emit(WS_EVENTS.COLLECTION_UPDATE, res);
}
} catch (e) {
console.log('ignore queue error');
}
Expand Down
23 changes: 13 additions & 10 deletions server/src/crons/crons.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,19 +69,22 @@ export class CronsService {
);
// get current socket
const socketClient = clients.get(currentJob.clientId);
// emit event to current client, loading and indexing events are indetified as collection update
socketClient.emit(WS_EVENTS.COLLECTION_UPDATE, collections);

// if all collections are loaded, stop cron
const LoadingOrBuildingCollections = collections.filter(v => {
const isLoading = checkLoading(v);
const isBuildingIndex = checkIndexing(v);
if (!socketClient) {
// emit event to current client, loading and indexing events are indetified as collection update
socketClient.emit(WS_EVENTS.COLLECTION_UPDATE, collections);

return isLoading || isBuildingIndex;
});
// if all collections are loaded, stop cron
const LoadingOrBuildingCollections = collections.filter(v => {
const isLoading = checkLoading(v);
const isBuildingIndex = checkIndexing(v);

if (LoadingOrBuildingCollections.length === 0) {
this.schedulerRegistry.deleteCronJob(clientId, data);
return isLoading || isBuildingIndex;
});

if (LoadingOrBuildingCollections.length === 0) {
this.schedulerRegistry.deleteCronJob(clientId, data);
}
}
} catch (error) {
// When user not connect milvus, stop cron
Expand Down
11 changes: 11 additions & 0 deletions server/src/socket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,17 @@ export function initWebSocket(server: http.Server) {
console.info(chalk.green(`ws client disconnected ${clientId}`));
clients.delete(clientId);
});

socket.on('error', (error: Error) => {
console.error(
chalk.red(`ws client error ${clientId}: ${error.message}`)
);
});
});
});

// Handle server-level errors
io.on('error', (error: Error) => {
console.error(chalk.red(`ws server error: ${error.message}`));
});
}

0 comments on commit c769e32

Please sign in to comment.