Skip to content

Commit

Permalink
small changes on websocket-client.ts and websocket-smoke-test.test.ts (
Browse files Browse the repository at this point in the history
  • Loading branch information
strykerin authored Feb 19, 2025
1 parent 0af28fb commit 893f273
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 31 deletions.
50 changes: 42 additions & 8 deletions packages/core/src/lib/websocket/websocket-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,34 @@ type SubscriptionFilter = {
value: string[];
};

type RawBlock = {
blockHash: string;
txs: {
blockHash: string;
txHash: string;
category: string;
from: string;
recipients: string[];
}[];
};

export type WebSocketBlock = {
blockHash: string;
transactions: WebSocketTransaction[];
};

export type WebSocketTransaction = {
hash: string;
category: string;
from: string;
recipients: string[];
};

export class WebSocketClient {
private ws: WebSocket.WebSocket | null = null;
private clientId: string | null = null;
private blockHandlers: Map<string, (block: any) => void> = new Map();
private blockHandlers: Map<string, (block: WebSocketBlock) => void> =
new Map();

private constructor(private url: string) {}

Expand Down Expand Up @@ -103,7 +127,17 @@ export class WebSocketClient {
if (message.type === 'BLOCK' && message.data?.block) {
const handler = this.blockHandlers.get(message.data.subscriptionId);
if (handler) {
handler(message.data.block);
const rawBlock = message.data.block as RawBlock;
const block: WebSocketBlock = {
blockHash: rawBlock.blockHash,
transactions: rawBlock.txs.map((tx) => ({
hash: tx.txHash,
category: tx.category,
from: tx.from,
recipients: tx.recipients,
})),
};
handler(block);
}
}
}
Expand All @@ -120,21 +154,21 @@ export class WebSocketClient {
* Subscribe to block updates with filters
* @param callback Function to handle incoming block updates
* @param filters Optional filters for the subscription
* @returns Promise<string> Subscription ID
* @returns Promise<{ subscriptionId: string }> Subscription ID
*/
async subscribeToBlocks(
callback: (data: any) => void,
async subscribe(
callback: (data: WebSocketBlock) => void,
filters: SubscriptionFilter[] = [{ type: 'WILDCARD', value: ['*'] }]
): Promise<string> {
): Promise<{ subscriptionId: string }> {
return new Promise((resolve, reject) => {
const handleSubscribeResponse = (data: string) => {
const message = JSON.parse(data);
if (message.type === 'SUBSCRIBE_ACK') {
if (message.data.success) {
const subscriptionId = message.data.subscriptionId;
const subscriptionId: string = message.data.subscriptionId;
this.blockHandlers.set(subscriptionId, callback);
this.ws?.removeListener('message', handleSubscribeResponse);
resolve(subscriptionId);
resolve({ subscriptionId });
} else {
reject(new Error(message.data.error));
}
Expand Down
46 changes: 23 additions & 23 deletions packages/core/tests/validator/websocket-smoke-test.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ describe('WebSocket Smoke Test', () => {
});

// Send subscription request
pushChain.ws.subscribeToBlocks(() => {
pushChain.ws.subscribe(() => {
// No need to handle block updates in this test
});
});
Expand Down Expand Up @@ -307,7 +307,7 @@ describe('WebSocket Smoke Test', () => {
});

// Send subscription request with custom filters
pushChain.ws.subscribeToBlocks(() => {
pushChain.ws.subscribe(() => {
// No need to handle block updates in this test
}, customFilters);
});
Expand Down Expand Up @@ -362,7 +362,7 @@ describe('WebSocket Smoke Test', () => {
reject(new Error('Block not received within timeout'));
}, 30000);

pushChain.ws.subscribeToBlocks((block) => {
pushChain.ws.subscribe((block) => {
blockReceived = true;
receivedBlock = block;
clearTimeout(timeout);
Expand All @@ -377,7 +377,7 @@ describe('WebSocket Smoke Test', () => {
// Assertions
expect(blockReceived).toBe(true);
expect(receivedBlock).toBeDefined();
expect(receivedBlock.txs.length).toBeGreaterThan(0);
expect(receivedBlock.transactions.length).toBeGreaterThan(0);
}, 30000);

it('should subscribe with custom filters and receive a block after initiating multiple transactions', async () => {
Expand Down Expand Up @@ -432,7 +432,7 @@ describe('WebSocket Smoke Test', () => {
reject(new Error('Block not received within timeout'));
}, 30000);

pushChain.ws.subscribeToBlocks((block) => {
pushChain.ws.subscribe((block) => {
blockReceived = true;
receivedBlock = block;
clearTimeout(timeout);
Expand All @@ -451,8 +451,8 @@ describe('WebSocket Smoke Test', () => {
// Assertions
expect(blockReceived).toBe(true);
expect(receivedBlock).toBeDefined();
expect(receivedBlock.txs.length).toBeGreaterThanOrEqual(1);
expect(receivedBlock.txs[0].category).toBe('CUSTOM:V2');
expect(receivedBlock.transactions.length).toBeGreaterThanOrEqual(1);
expect(receivedBlock.transactions[0].category).toBe('CUSTOM:V2');

console.log(
'Block received with custom filters:',
Expand Down Expand Up @@ -510,7 +510,7 @@ describe('WebSocket Smoke Test', () => {
// reject(new Error('Block not received within timeout'));
// }, 30000); // Timeout for receiving a block

// pushChain.ws.subscribeToBlocks((block) => {
// pushChain.ws.subscribe((block) => {
// blockReceived = true;
// receivedBlock = block;
// clearTimeout(timeout);
Expand All @@ -530,8 +530,8 @@ describe('WebSocket Smoke Test', () => {
// // Assertions
// expect(blockReceived).toBe(true);
// expect(receivedBlock).toBeDefined();
// expect(receivedBlock.txs.length).toBeGreaterThanOrEqual(1);
// expect(receivedBlock.txs[0].category).toBe('CUSTOM:V2');
// expect(receivedBlock.transactions.length).toBeGreaterThanOrEqual(1);
// expect(receivedBlock.transactions[0].category).toBe('CUSTOM:V2');

// console.log('Block received with multiple filters:', JSON.stringify(receivedBlock, null, 2));
// }, 30000);
Expand All @@ -546,7 +546,7 @@ describe('WebSocket Smoke Test', () => {
reject(new Error('Block not received within timeout'));
}, 30000);

pushChain.ws.subscribeToBlocks((block) => {
pushChain.ws.subscribe((block) => {
blockCount++;
receivedBlock = block;
clearTimeout(timeout);
Expand Down Expand Up @@ -577,7 +577,7 @@ describe('WebSocket Smoke Test', () => {
reject(new Error('No new blocks received after reconnection'));
}, 30000);

pushChain.ws.subscribeToBlocks((block) => {
pushChain.ws.subscribe((block) => {
blockCount++;
receivedBlock = block;
clearTimeout(timeout);
Expand All @@ -596,8 +596,8 @@ describe('WebSocket Smoke Test', () => {
// Assertions
expect(blockCount).toBeGreaterThan(initialBlockCount);
expect(receivedBlock).toBeDefined();
expect(receivedBlock.txs).toBeDefined();
expect(receivedBlock.txs.length).toBeGreaterThan(0);
expect(receivedBlock.transactions).toBeDefined();
expect(receivedBlock.transactions.length).toBeGreaterThan(0);

console.log('Reconnection test completed successfully:', {
initialBlockCount,
Expand All @@ -619,14 +619,14 @@ describe('WebSocket Smoke Test', () => {
// const receivedBlocks: any[] = [];

// // Subscribe to blocks and track metrics
// await pushChain.ws.subscribeToBlocks((block) => {
// await pushChain.ws.subscribe((block) => {
// blockCount++;
// lastBlockHash = block.blockHash;
// receivedBlocks.push(block);
// console.log('Received block:', {
// blockHash: block.blockHash,
// blockCount,
// txCount: block.txs?.length || 0
// txCount: block.transactions?.length || 0
// });
// });

Expand All @@ -637,10 +637,10 @@ describe('WebSocket Smoke Test', () => {
// // Wait for initial block
// const initialBlockPromise = new Promise<void>((resolve, reject) => {
// const timeout = setTimeout(() => {
// reject(new Error('Block not received within timeout'));
// reject(new Error('WebSocetBlock not received within timeout'));
// }, 30000);

// pushChain.ws.subscribeToBlocks((block) => {
// pushChain.ws.subscribe((block) => {
// blockCount++;
// clearTimeout(timeout);
// resolve();
Expand Down Expand Up @@ -675,7 +675,7 @@ describe('WebSocket Smoke Test', () => {
// reject(new Error('No new blocks received after reconnection'));
// }, 30000);

// pushChain.ws.subscribeToBlocks((block) => {
// pushChain.ws.subscribe((block) => {
// blockCount++;
// clearTimeout(timeout);
// resolve();
Expand All @@ -697,16 +697,16 @@ describe('WebSocket Smoke Test', () => {
// expect(blockCount).toBeGreaterThan(initialBlockCount);
// expect(lastBlockHash).not.toBe(initialBlockHash);
// expect(pushChain.ws.isConnected()).toBe(true);
// expect(receivedBlocks[receivedBlocks.length - 1].txs).toBeDefined();
// expect(receivedBlocks[receivedBlocks.length - 1].txs.length).toBeGreaterThan(0);
// expect(receivedBlocks[receivedBlocks.length - 1].transactions).toBeDefined();
// expect(receivedBlocks[receivedBlocks.length - 1].transactions.length).toBeGreaterThan(0);

// console.log('Final state:', {
// initialBlockCount,
// finalBlockCount: blockCount,
// initialBlockHash,
// finalBlockHash: lastBlockHash,
// totalBlocksReceived: receivedBlocks.length,
// lastBlockTxCount: receivedBlocks[receivedBlocks.length - 1].txs.length
// lastBlockTxCount: receivedBlocks[receivedBlocks.length - 1].transactions.length
// });
// }, 30000);

Expand Down Expand Up @@ -785,7 +785,7 @@ describe('WebSocket Smoke Test', () => {
// });

// // Subscribe to blocks
// const subscriptionId = await pushChain.ws.subscribeToBlocks(() => {
// const subscriptionId = await pushChain.ws.subscribe(() => {
// // Callback intentionally empty for this test
// });
// expect(subscriptionId).toBeDefined();
Expand Down

0 comments on commit 893f273

Please sign in to comment.