Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WebSockets on PushVnodes #28

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -143,4 +143,5 @@ package-lock.json
# devtools secrets
package-lock.json
/docker/external
/docker/debug-*
/log
29 changes: 29 additions & 0 deletions docker/debug-v1.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
CHAIN_DIR="/Users/apasha/projects/push"
DOC_DIR="/Users/apasha/projects/push/push-vnode/docker"

cd ${DOC_DIR}
set -o allexport
source ${DOC_DIR}/.env
source ${DOC_DIR}/common.env
source ${DOC_DIR}/v-specific.env
set +o allexport

export CONFIG_DIR=${DOC_DIR}/v1
export LOG_DIR=${CONFIG_DIR}/log
export ABI_DIR=${DOC_DIR}/_abi
export ETH_KEY_PATH=${CONFIG_DIR}/node_key.json
export LOCALH=true
export VALIDATOR_PING_SCHEDULE="* */30 * * * *"

export PG_HOST=localhost
export PG_PORT=${EXTERNAL_PG_PORT}
export DB_NAME=vnode1
export PORT=4001
export REDIS_URL=redis://localhost:${EXTERNAL_REDIS_PORT}
export VALIDATOR_RPC_ENDPOINT=http://localhost:8545

echo > ${LOG_DIR}/debug.log
echo > ${LOG_DIR}/error.log
cd ${DOC_DIR}/..
yarn start

29 changes: 29 additions & 0 deletions docker/debug-v2.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
CHAIN_DIR="/Users/apasha/projects/push"
DOC_DIR="/Users/apasha/projects/push/push-vnode/docker"

cd ${DOC_DIR}
set -o allexport
source ${DOC_DIR}/.env
source ${DOC_DIR}/common.env
source ${DOC_DIR}/v-specific.env
set +o allexport

export CONFIG_DIR=${DOC_DIR}/v2
export LOG_DIR=${CONFIG_DIR}/log
export ABI_DIR=${DOC_DIR}/_abi
export ETH_KEY_PATH=${CONFIG_DIR}/node_key.json
export LOCALH=true
export VALIDATOR_PING_SCHEDULE="* */30 * * * *"

export PG_HOST=localhost
export PG_PORT=${EXTERNAL_PG_PORT}
export DB_NAME=vnode2
export PORT=4002
export REDIS_URL=redis://localhost:${EXTERNAL_REDIS_PORT}
export VALIDATOR_RPC_ENDPOINT=http://localhost:8545

echo > ${LOG_DIR}/debug.log
echo > ${LOG_DIR}/error.log
cd ${DOC_DIR}/..
yarn dev

29 changes: 29 additions & 0 deletions docker/debug-v3.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
CHAIN_DIR="/Users/apasha/projects/push"
DOC_DIR="/Users/apasha/projects/push/push-vnode/docker"

cd ${DOC_DIR}
set -o allexport
source ${DOC_DIR}/.env
source ${DOC_DIR}/common.env
source ${DOC_DIR}/v-specific.env
set +o allexport

export CONFIG_DIR=${DOC_DIR}/v3
export LOG_DIR=${CONFIG_DIR}/log
export ABI_DIR=${DOC_DIR}/_abi
export ETH_KEY_PATH=${CONFIG_DIR}/node_key.json
export LOCALH=true
export VALIDATOR_PING_SCHEDULE="* */30 * * * *"

export PG_HOST=localhost
export PG_PORT=${EXTERNAL_PG_PORT}
export DB_NAME=vnode3
export PORT=4003
export REDIS_URL=redis://localhost:${EXTERNAL_REDIS_PORT}
export VALIDATOR_RPC_ENDPOINT=http://localhost:8545

echo > ${LOG_DIR}/debug.log
echo > ${LOG_DIR}/error.log
cd ${DOC_DIR}/..
yarn start

4 changes: 4 additions & 0 deletions docker/v1/.env
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
WS_MAX_PAYLOAD=5242880 // 5mb
DISCOVERY_MIN_ARCHIVE_NODES=2
DISCOVERY_REFRESH_INTERVAL=900000
DISCOVERY_HEALTH_CHECK_TIMEOUT=30000
4 changes: 4 additions & 0 deletions docker/v10/.env
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
WS_MAX_PAYLOAD=5242880 // 5mb
DISCOVERY_MIN_ARCHIVE_NODES=2
DISCOVERY_REFRESH_INTERVAL=900000
DISCOVERY_HEALTH_CHECK_TIMEOUT=30000
4 changes: 4 additions & 0 deletions docker/v2/.env
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
WS_MAX_PAYLOAD=5242880 // 5mb
DISCOVERY_MIN_ARCHIVE_NODES=2
DISCOVERY_REFRESH_INTERVAL=900000
DISCOVERY_HEALTH_CHECK_TIMEOUT=30000
4 changes: 4 additions & 0 deletions docker/v3/.env
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
WS_MAX_PAYLOAD=5242880 // 5mb
DISCOVERY_MIN_ARCHIVE_NODES=2
DISCOVERY_REFRESH_INTERVAL=900000
DISCOVERY_HEALTH_CHECK_TIMEOUT=30000
4 changes: 4 additions & 0 deletions docker/v4/.env
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
WS_MAX_PAYLOAD=5242880 // 5mb
DISCOVERY_MIN_ARCHIVE_NODES=2
DISCOVERY_REFRESH_INTERVAL=900000
DISCOVERY_HEALTH_CHECK_TIMEOUT=30000
4 changes: 4 additions & 0 deletions docker/v5/.env
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
WS_MAX_PAYLOAD=5242880 // 5mb
DISCOVERY_MIN_ARCHIVE_NODES=2
DISCOVERY_REFRESH_INTERVAL=900000
DISCOVERY_HEALTH_CHECK_TIMEOUT=30000
4 changes: 4 additions & 0 deletions docker/v6/.env
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
WS_MAX_PAYLOAD=5242880 // 5mb
DISCOVERY_MIN_ARCHIVE_NODES=2
DISCOVERY_REFRESH_INTERVAL=900000
DISCOVERY_HEALTH_CHECK_TIMEOUT=30000
4 changes: 4 additions & 0 deletions docker/v7/.env
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
WS_MAX_PAYLOAD=5242880 // 5mb
DISCOVERY_MIN_ARCHIVE_NODES=2
DISCOVERY_REFRESH_INTERVAL=900000
DISCOVERY_HEALTH_CHECK_TIMEOUT=30000
4 changes: 4 additions & 0 deletions docker/v8/.env
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
WS_MAX_PAYLOAD=5242880 // 5mb
DISCOVERY_MIN_ARCHIVE_NODES=2
DISCOVERY_REFRESH_INTERVAL=900000
DISCOVERY_HEALTH_CHECK_TIMEOUT=30000
4 changes: 4 additions & 0 deletions docker/v9/.env
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
WS_MAX_PAYLOAD=5242880 // 5mb
DISCOVERY_MIN_ARCHIVE_NODES=2
DISCOVERY_REFRESH_INTERVAL=900000
DISCOVERY_HEALTH_CHECK_TIMEOUT=30000
20 changes: 19 additions & 1 deletion src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import winston from "winston";
import pgPromise from 'pg-promise'
import { IClient } from 'pg-promise/typescript/pg-subset'
import {DbLoader} from "./services/messaging/dbLoader";
import { WebSocketManager } from "./services/WebSockets/WebSocketManager";
import { ValidatorContractState } from "./services/messaging-common/validatorContractState";

let server: Server;
let log = WinstonUtil.newLog("SERVER");
Expand All @@ -48,6 +50,12 @@ async function startServer(logLevel: string = null, testMode = false, padder = 0

await initValidator();

const validatorContractState = Container.get(ValidatorContractState);
await validatorContractState.postConstruct();
const validatorNode = Container.get(ValidatorNode);
const archivalNodes = validatorContractState.getArchivalNodesMap();


const app: Express = express();
server = http.createServer(app);

Expand Down Expand Up @@ -82,11 +90,21 @@ async function startServer(logLevel: string = null, testMode = false, padder = 0



🛡️ Server listening on port: ${PORT} 🛡️
🛡️ HTTP Server listening on port: ${PORT} 🛡️

################################################
`);
});


// Initialize WebSocket Manager with error handling
try {
const wsManager = Container.get(WebSocketManager);
await wsManager.postConstruct(validatorNode.nodeId, validatorContractState.wallet, archivalNodes, server);
} catch (error) {
log.error('Failed to initialize WebSocket Manager: %o', error);
log.warn('Continuing with HTTP server initialization despite WebSocket failure');
}
}

function printMemoryUsage() {
Expand Down
80 changes: 80 additions & 0 deletions src/services/WebSockets/BlockStatusManager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
import { Service } from 'typedi';
import { WinstonUtil } from "../../utilz/winstonUtil";
import { FilterBlockResponse, BlockConfirmation } from './types';
import { WebSocketServer } from './WebSocketServer';
import { DiscoveryService } from './DiscoverService';

/**
* Manages the confirmation status of blocks across multiple ANodes.
* Tracks block confirmations and broadcasts updates when required confirmations are met.
*/
@Service()
export class BlockStatusManager {
private confirmations = new Map<string, BlockConfirmation>();
private readonly CONFIRMATION_EXPIRY_TIME = 30 * 60 * 1000; // 30 minutes
private readonly log = WinstonUtil.newLog(BlockStatusManager);

constructor(
private readonly wsServer: WebSocketServer,
private readonly discoveryService: DiscoveryService
) {}

/**
* Processes a block confirmation from an ANode and triggers broadcast if confirmation threshold is met.
* @param blockHash - The hash of the block being confirmed
* @param nodeId - The ID of the ANode confirming the block
* @param blockData - The block data received from the ANode
*/
async handleBlockConfirmation(blockHash: string, nodeId: string, blockData: FilterBlockResponse) {
let confirmation = this.confirmations.get(blockHash);
if (!confirmation) {
confirmation = {
timestamp: Date.now(),
nodes: new Set()
};
this.confirmations.set(blockHash, confirmation);
}

confirmation.nodes.add(nodeId);
this.log.debug(`Block ${blockHash} confirmed by ANode: ${nodeId}. Total confirmations: ${confirmation.nodes.size}`);

if (confirmation.nodes.size >= this.discoveryService.getMinArchiveNodes()) {
await this.handleBlockConfirmed(blockHash, blockData);
}
}

/**
* Handles the logic when a block reaches the required number of confirmations.
* Broadcasts the block update to connected clients and removes it from tracking.
* @param blockHash - The hash of the confirmed block
* @param blockData - The block data to broadcast
* @private
*/
private async handleBlockConfirmed(blockHash: string, blockData: FilterBlockResponse) {
this.log.info(`Block ${blockHash} reached required confirmations`);
this.wsServer.broadcastBlockUpdate(blockData);
this.confirmations.delete(blockHash);
}

/**
* Removes block confirmations that have exceeded the expiry time.
* @private
*/
private cleanupOldConfirmations() {
const now = Date.now();
for (const [blockHash, confirmation] of this.confirmations.entries()) {
if (now - confirmation.timestamp > this.CONFIRMATION_EXPIRY_TIME) {
this.confirmations.delete(blockHash);
this.log.debug(`Cleaned up old confirmation for block ${blockHash}`);
}
}
}

/**
* Initiates the cleanup of expired block confirmations.
* Should be called periodically to prevent memory leaks.
*/
public performCleanup() {
this.cleanupOldConfirmations();
}
}
Loading