Skip to content

Commit

Permalink
fix: stream handling multiplier listeners
Browse files Browse the repository at this point in the history
  • Loading branch information
mohammeds1992 committed Nov 6, 2024
1 parent 202ea53 commit 93ecbe0
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 11 deletions.
4 changes: 2 additions & 2 deletions .husky/pre-commit
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@

echo "\nRunning GIT hooks..."
yarn cleanbuild
yarn nx affected --target=lint
yarn nx affected --target=test
#yarn nx affected --target=lint
#yarn nx affected --target=test
67 changes: 58 additions & 9 deletions packages/restapi/src/lib/pushstream/PushStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,14 +116,12 @@ export class PushStream extends EventEmitter {
listen: STREAM[],
newOptions: PushStreamInitializeProps
): Promise<void> {
this.uid = uuidv4();
this.listen = listen;
this.options = { ...this.options, ...newOptions };
await this.disconnect();
await this.connect();
await this.connect(true);
}

public async connect(): Promise<void> {
public async connect(reinit: boolean = false): Promise<void> {
return new Promise<void>((resolve, reject) => {
(async () => {
const shouldInitializeChatSocket =
Expand All @@ -140,10 +138,25 @@ export class PushStream extends EventEmitter {
this.listen.includes(STREAM.NOTIF_OPS) ||
this.listen.includes(STREAM.VIDEO);

let isChatSocketConnected = false;
let isNotifSocketConnected = false;
console.log('RestAPI::PushStream::connect - Initialization flags:', {
shouldInitializeChatSocket,
shouldInitializeNotifSocket,
});

let isChatSocketConnected = reinit ? this.chatSocketConnected : false;
let isNotifSocketConnected = reinit ? this.notifSocketConnected : false;
// Function to check and emit the STREAM.CONNECT event
const checkAndEmitConnectEvent = () => {
console.log(
'RestAPI::PushStream::connect - Checking conditions for STREAM.CONNECT event.',
{
shouldInitializeChatSocket,
isChatSocketConnected,
shouldInitializeNotifSocket,
isNotifSocketConnected,
}
);

if (
((shouldInitializeChatSocket && isChatSocketConnected) ||
!shouldInitializeChatSocket) &&
Expand Down Expand Up @@ -171,7 +184,20 @@ export class PushStream extends EventEmitter {
if (socketType === 'chat') {
isChatSocketConnected = false;
this.chatSocketConnected = false;

console.log(
'RestAPI::PushStream::handleSocketDisconnection - Chat socket disconnected. Decrementing chatSocketCount.',
'Previous chatSocketCount:',
this.chatSocketCount
);

this.chatSocketCount--;

console.log(
'RestAPI::PushStream::handleSocketDisconnection - New chatSocketCount:',
this.chatSocketCount
);

if (isNotifSocketConnected) {
if (
this.pushNotificationSocket &&
Expand All @@ -183,7 +209,6 @@ export class PushStream extends EventEmitter {
this.pushNotificationSocket.disconnect();
}
} else {
// Emit STREAM.DISCONNECT only if the notification socket was already disconnected
this.emit(STREAM.DISCONNECT);
console.log(
'RestAPI::PushStream::handleSocketDisconnection - Emitted STREAM.DISCONNECT for chat.'
Expand All @@ -192,7 +217,20 @@ export class PushStream extends EventEmitter {
} else if (socketType === 'notif') {
isNotifSocketConnected = false;
this.notifSocketConnected = false;

console.log(
'RestAPI::PushStream::handleSocketDisconnection - Notification socket disconnected. Decrementing notifSocketCount.',
'Previous notifSocketCount:',
this.notifSocketCount
);

this.notifSocketCount--;

console.log(
'RestAPI::PushStream::handleSocketDisconnection - New notifSocketCount:',
this.notifSocketCount
);

if (isChatSocketConnected) {
if (this.pushChatSocket && this.pushChatSocket.connected) {
console.log(
Expand All @@ -201,7 +239,6 @@ export class PushStream extends EventEmitter {
this.pushChatSocket.disconnect();
}
} else {
// Emit STREAM.DISCONNECT only if the chat socket was already disconnected
this.emit(STREAM.DISCONNECT);
console.log(
'RestAPI::PushStream::handleSocketDisconnection - Emitted STREAM.DISCONNECT for notification.'
Expand Down Expand Up @@ -277,7 +314,6 @@ export class PushStream extends EventEmitter {
console.log(
'RestAPI::PushStream::NotifSocket::Reconnect - Attempting to reconnect push notification socket...'
);
this.notifSocketCount++;
this.pushNotificationSocket.connect(); // Assuming connect() is the method to re-establish connection
} else {
// If pushNotificationSocket is already connected
Expand All @@ -295,6 +331,7 @@ export class PushStream extends EventEmitter {
};

if (this.pushChatSocket) {
this.pushChatSocket.off(EVENTS.CONNECT);
this.pushChatSocket.on(EVENTS.CONNECT, async () => {
isChatSocketConnected = true;
this.chatSocketCount++;
Expand All @@ -305,10 +342,15 @@ export class PushStream extends EventEmitter {
);
});

this.pushChatSocket.off(EVENTS.DISCONNECT);
this.pushChatSocket.on(EVENTS.DISCONNECT, async () => {
console.log(
'RestAPI::PushStream::ChatSocket::Disconnect - Chat socket disconnected.'
);
await handleSocketDisconnection('chat');
});

this.pushChatSocket.off(EVENTS.CHAT_GROUPS);
this.pushChatSocket.on(EVENTS.CHAT_GROUPS, (data: any) => {
try {
const modifiedData = DataModifier.handleChatGroupEvent(
Expand Down Expand Up @@ -347,6 +389,7 @@ export class PushStream extends EventEmitter {
}
});

this.pushChatSocket.off(EVENTS.CHAT_RECEIVED_MESSAGE);
this.pushChatSocket.on(
EVENTS.CHAT_RECEIVED_MESSAGE,
async (data: any) => {
Expand Down Expand Up @@ -386,6 +429,7 @@ export class PushStream extends EventEmitter {
}
);

this.pushChatSocket.off('SPACES');
this.pushChatSocket.on('SPACES', (data: any) => {
try {
const modifiedData = DataModifier.handleSpaceEvent(
Expand Down Expand Up @@ -426,6 +470,7 @@ export class PushStream extends EventEmitter {
}
});

this.pushChatSocket.off('SPACES_MESSAGES');
this.pushChatSocket.on('SPACES_MESSAGES', (data: any) => {
try {
const modifiedData = DataModifier.handleSpaceEvent(
Expand Down Expand Up @@ -455,6 +500,7 @@ export class PushStream extends EventEmitter {
}

if (this.pushNotificationSocket) {
this.pushNotificationSocket.off(EVENTS.CONNECT);
this.pushNotificationSocket.on(EVENTS.CONNECT, async () => {
console.log(
`RestAPI::PushStream::NotifSocket::Connect - Notification Socket Connected (ID: ${this.pushNotificationSocket.id})`
Expand All @@ -465,13 +511,15 @@ export class PushStream extends EventEmitter {
checkAndEmitConnectEvent();
});

this.pushNotificationSocket.off(EVENTS.DISCONNECT);
this.pushNotificationSocket.on(EVENTS.DISCONNECT, async () => {
console.log(
'RestAPI::PushStream::NotifSocket::Disconnect - Notification socket disconnected.'
);
await handleSocketDisconnection('notif');
});

this.pushNotificationSocket.off(EVENTS.USER_FEEDS);
this.pushNotificationSocket.on(EVENTS.USER_FEEDS, (data: any) => {
try {
if (
Expand Down Expand Up @@ -514,6 +562,7 @@ export class PushStream extends EventEmitter {
}
});

this.pushNotificationSocket.off(EVENTS.USER_SPAM_FEEDS);
this.pushNotificationSocket.on(
EVENTS.USER_SPAM_FEEDS,
(data: any) => {
Expand Down

0 comments on commit 93ecbe0

Please sign in to comment.