From 93ecbe0a396a009ca5205a1bd784ce5034608fca Mon Sep 17 00:00:00 2001 From: Mohammed S Date: Wed, 6 Nov 2024 12:01:12 +0530 Subject: [PATCH] fix: stream handling multiplier listeners --- .husky/pre-commit | 4 +- .../restapi/src/lib/pushstream/PushStream.ts | 67 ++++++++++++++++--- 2 files changed, 60 insertions(+), 11 deletions(-) diff --git a/.husky/pre-commit b/.husky/pre-commit index 2665d0376..6282fa5cb 100755 --- a/.husky/pre-commit +++ b/.husky/pre-commit @@ -3,5 +3,5 @@ echo "\nRunning GIT hooks..." yarn cleanbuild -yarn nx affected --target=lint -yarn nx affected --target=test \ No newline at end of file +#yarn nx affected --target=lint +#yarn nx affected --target=test \ No newline at end of file diff --git a/packages/restapi/src/lib/pushstream/PushStream.ts b/packages/restapi/src/lib/pushstream/PushStream.ts index 5f4345ac8..06e7853f7 100644 --- a/packages/restapi/src/lib/pushstream/PushStream.ts +++ b/packages/restapi/src/lib/pushstream/PushStream.ts @@ -116,14 +116,12 @@ export class PushStream extends EventEmitter { listen: STREAM[], newOptions: PushStreamInitializeProps ): Promise { - this.uid = uuidv4(); this.listen = listen; this.options = { ...this.options, ...newOptions }; - await this.disconnect(); - await this.connect(); + await this.connect(true); } - public async connect(): Promise { + public async connect(reinit: boolean = false): Promise { return new Promise((resolve, reject) => { (async () => { const shouldInitializeChatSocket = @@ -140,10 +138,25 @@ export class PushStream extends EventEmitter { this.listen.includes(STREAM.NOTIF_OPS) || this.listen.includes(STREAM.VIDEO); - let isChatSocketConnected = false; - let isNotifSocketConnected = false; + console.log('RestAPI::PushStream::connect - Initialization flags:', { + shouldInitializeChatSocket, + shouldInitializeNotifSocket, + }); + + let isChatSocketConnected = reinit ? this.chatSocketConnected : false; + let isNotifSocketConnected = reinit ? this.notifSocketConnected : false; // Function to check and emit the STREAM.CONNECT event const checkAndEmitConnectEvent = () => { + console.log( + 'RestAPI::PushStream::connect - Checking conditions for STREAM.CONNECT event.', + { + shouldInitializeChatSocket, + isChatSocketConnected, + shouldInitializeNotifSocket, + isNotifSocketConnected, + } + ); + if ( ((shouldInitializeChatSocket && isChatSocketConnected) || !shouldInitializeChatSocket) && @@ -171,7 +184,20 @@ export class PushStream extends EventEmitter { if (socketType === 'chat') { isChatSocketConnected = false; this.chatSocketConnected = false; + + console.log( + 'RestAPI::PushStream::handleSocketDisconnection - Chat socket disconnected. Decrementing chatSocketCount.', + 'Previous chatSocketCount:', + this.chatSocketCount + ); + this.chatSocketCount--; + + console.log( + 'RestAPI::PushStream::handleSocketDisconnection - New chatSocketCount:', + this.chatSocketCount + ); + if (isNotifSocketConnected) { if ( this.pushNotificationSocket && @@ -183,7 +209,6 @@ export class PushStream extends EventEmitter { this.pushNotificationSocket.disconnect(); } } else { - // Emit STREAM.DISCONNECT only if the notification socket was already disconnected this.emit(STREAM.DISCONNECT); console.log( 'RestAPI::PushStream::handleSocketDisconnection - Emitted STREAM.DISCONNECT for chat.' @@ -192,7 +217,20 @@ export class PushStream extends EventEmitter { } else if (socketType === 'notif') { isNotifSocketConnected = false; this.notifSocketConnected = false; + + console.log( + 'RestAPI::PushStream::handleSocketDisconnection - Notification socket disconnected. Decrementing notifSocketCount.', + 'Previous notifSocketCount:', + this.notifSocketCount + ); + this.notifSocketCount--; + + console.log( + 'RestAPI::PushStream::handleSocketDisconnection - New notifSocketCount:', + this.notifSocketCount + ); + if (isChatSocketConnected) { if (this.pushChatSocket && this.pushChatSocket.connected) { console.log( @@ -201,7 +239,6 @@ export class PushStream extends EventEmitter { this.pushChatSocket.disconnect(); } } else { - // Emit STREAM.DISCONNECT only if the chat socket was already disconnected this.emit(STREAM.DISCONNECT); console.log( 'RestAPI::PushStream::handleSocketDisconnection - Emitted STREAM.DISCONNECT for notification.' @@ -277,7 +314,6 @@ export class PushStream extends EventEmitter { console.log( 'RestAPI::PushStream::NotifSocket::Reconnect - Attempting to reconnect push notification socket...' ); - this.notifSocketCount++; this.pushNotificationSocket.connect(); // Assuming connect() is the method to re-establish connection } else { // If pushNotificationSocket is already connected @@ -295,6 +331,7 @@ export class PushStream extends EventEmitter { }; if (this.pushChatSocket) { + this.pushChatSocket.off(EVENTS.CONNECT); this.pushChatSocket.on(EVENTS.CONNECT, async () => { isChatSocketConnected = true; this.chatSocketCount++; @@ -305,10 +342,15 @@ export class PushStream extends EventEmitter { ); }); + this.pushChatSocket.off(EVENTS.DISCONNECT); this.pushChatSocket.on(EVENTS.DISCONNECT, async () => { + console.log( + 'RestAPI::PushStream::ChatSocket::Disconnect - Chat socket disconnected.' + ); await handleSocketDisconnection('chat'); }); + this.pushChatSocket.off(EVENTS.CHAT_GROUPS); this.pushChatSocket.on(EVENTS.CHAT_GROUPS, (data: any) => { try { const modifiedData = DataModifier.handleChatGroupEvent( @@ -347,6 +389,7 @@ export class PushStream extends EventEmitter { } }); + this.pushChatSocket.off(EVENTS.CHAT_RECEIVED_MESSAGE); this.pushChatSocket.on( EVENTS.CHAT_RECEIVED_MESSAGE, async (data: any) => { @@ -386,6 +429,7 @@ export class PushStream extends EventEmitter { } ); + this.pushChatSocket.off('SPACES'); this.pushChatSocket.on('SPACES', (data: any) => { try { const modifiedData = DataModifier.handleSpaceEvent( @@ -426,6 +470,7 @@ export class PushStream extends EventEmitter { } }); + this.pushChatSocket.off('SPACES_MESSAGES'); this.pushChatSocket.on('SPACES_MESSAGES', (data: any) => { try { const modifiedData = DataModifier.handleSpaceEvent( @@ -455,6 +500,7 @@ export class PushStream extends EventEmitter { } if (this.pushNotificationSocket) { + this.pushNotificationSocket.off(EVENTS.CONNECT); this.pushNotificationSocket.on(EVENTS.CONNECT, async () => { console.log( `RestAPI::PushStream::NotifSocket::Connect - Notification Socket Connected (ID: ${this.pushNotificationSocket.id})` @@ -465,6 +511,7 @@ export class PushStream extends EventEmitter { checkAndEmitConnectEvent(); }); + this.pushNotificationSocket.off(EVENTS.DISCONNECT); this.pushNotificationSocket.on(EVENTS.DISCONNECT, async () => { console.log( 'RestAPI::PushStream::NotifSocket::Disconnect - Notification socket disconnected.' @@ -472,6 +519,7 @@ export class PushStream extends EventEmitter { await handleSocketDisconnection('notif'); }); + this.pushNotificationSocket.off(EVENTS.USER_FEEDS); this.pushNotificationSocket.on(EVENTS.USER_FEEDS, (data: any) => { try { if ( @@ -514,6 +562,7 @@ export class PushStream extends EventEmitter { } }); + this.pushNotificationSocket.off(EVENTS.USER_SPAM_FEEDS); this.pushNotificationSocket.on( EVENTS.USER_SPAM_FEEDS, (data: any) => {