diff --git a/AVC/BFT/bft/buddy_service.go b/AVC/BFT/bft/buddy_service.go index 698403d4..f9968a43 100644 --- a/AVC/BFT/bft/buddy_service.go +++ b/AVC/BFT/bft/buddy_service.go @@ -149,7 +149,11 @@ func (s *BuddyService) runBFTConsensus(ctx context.Context, req *pb.BFTRequest) s.reportFailure(req, fmt.Sprintf("failed to create BFT adapter: %v", err)) return } - defer adapter.Close() + defer func() { + if err := adapter.Close(); err != nil { + log.Printf("⚠️ [%s] Failed to close BFT adapter: %v", s.buddyID, err) + } + }() log.Printf("✅ [%s] BFT adapter initialized on channel: %s", s.buddyID, req.GossipsubTopic) @@ -211,7 +215,11 @@ func (s *BuddyService) reportResult(req *pb.BFTRequest, result *Result) { log.Printf("❌ [%s] Failed to connect to sequencer: %v", s.buddyID, err) return } - defer conn.Close() + defer func() { + if closeErr := conn.Close(); closeErr != nil { + log.Printf("⚠️ [%s] Failed to close sequencer connection: %v", s.buddyID, closeErr) + } + }() client := pb.NewBFTServiceClient(conn) @@ -255,7 +263,11 @@ func (s *BuddyService) reportFailure(req *pb.BFTRequest, reason string) { log.Printf("❌ [%s] Failed to connect to sequencer: %v", s.buddyID, err) return } - defer conn.Close() + defer func() { + if closeErr := conn.Close(); closeErr != nil { + log.Printf("⚠️ [%s] Failed to close sequencer connection: %v", s.buddyID, closeErr) + } + }() client := pb.NewBFTServiceClient(conn) pbResult := &pb.BFTResult{ diff --git a/AVC/BFT/bft/sequencer_client.go b/AVC/BFT/bft/sequencer_client.go index e30525c3..24c758c2 100644 --- a/AVC/BFT/bft/sequencer_client.go +++ b/AVC/BFT/bft/sequencer_client.go @@ -165,7 +165,11 @@ func (s *SequencerBFTClient) sendBFTRequest( Message: fmt.Sprintf("connection failed: %v", err), } } - defer conn.Close() + defer func() { + if err := conn.Close(); err != nil { + log.Printf("❌ [Sequencer] Failed to close connection to %s: %v", buddy.ID, err) + } + }() client := pb.NewBFTServiceClient(conn) diff --git a/AVC/BFT/network/libp2p_setup.go b/AVC/BFT/network/libp2p_setup.go index 4a41dc9a..e6734d02 100644 --- a/AVC/BFT/network/libp2p_setup.go +++ b/AVC/BFT/network/libp2p_setup.go @@ -31,7 +31,9 @@ func SetupLibp2pHost(ctx context.Context, port int) (host.Host, *pubsub.PubSub, // Create GossipSub ps, err := pubsub.NewGossipSub(ctx, h) if err != nil { - h.Close() + if closeErr := h.Close(); closeErr != nil { + fmt.Printf("❌ Failed to close libp2p host: %v\n", closeErr) + } return nil, nil, fmt.Errorf("failed to create gossipsub: %w", err) } diff --git a/AVC/BuddyNodes/MessagePassing/BuddyNodeStream.go b/AVC/BuddyNodes/MessagePassing/BuddyNodeStream.go index d2e1d20d..107a12e3 100644 --- a/AVC/BuddyNodes/MessagePassing/BuddyNodeStream.go +++ b/AVC/BuddyNodes/MessagePassing/BuddyNodeStream.go @@ -53,7 +53,11 @@ func NewStructBuddyNode(buddy *AVCStruct.BuddyNode) *StructBuddyNode { // HandleBuddyNodesMessageStream handles incoming messages on the buddy nodes protocol func (StructBuddyNode *StructBuddyNode) HandleBuddyNodesMessageStream(host host.Host, s network.Stream) { - defer s.Close() + defer func() { + if err := s.Close(); err != nil { + logger().NamedLogger.Error(context.Background(), "Failed to close buddy nodes stream", err) + } + }() // Create context for this stream handler spanCtx := StructBuddyNode.logger_ctx diff --git a/AVC/BuddyNodes/MessagePassing/ListenerHandler.go b/AVC/BuddyNodes/MessagePassing/ListenerHandler.go index b99db208..abbdeeac 100644 --- a/AVC/BuddyNodes/MessagePassing/ListenerHandler.go +++ b/AVC/BuddyNodes/MessagePassing/ListenerHandler.go @@ -75,7 +75,11 @@ func (lh *ListenerHandler) HandleSubmitMessageStream(logger_ctx context.Context, attribute.String("remote_peer_id", remotePeer.String()), ) // Ensure stream is closed to prevent resource leaks - defer s.Close() + defer func() { + if err := s.Close(); err != nil { + logger().NamedLogger.Error(logger_ctx, "Failed to close SubmitMessageStream", err) + } + }() logger().NamedLogger.Info(spanCtx, "ListenerHandler.HandleSubmitMessageStream CALLED", ion.String("remote_peer_id", remotePeer.String()), @@ -173,7 +177,11 @@ func (lh *ListenerHandler) HandleSubmitMessageStream(logger_ctx context.Context, ion.String("topic", TOPIC), ion.String("function", "MessagePassing.HandleSubmitMessageStream")) lh.handleBFTRequest(spanCtx, s, message) - defer s.Close() + defer func() { + if err := s.Close(); err != nil { + logger().NamedLogger.Error(logger_ctx, "Failed to close SubmitMessageStream", err) + } + }() case config.Type_SubmitVote: logger().NamedLogger.Info(spanCtx, "Handling Type_SubmitVote", ion.String("created_at", time.Now().UTC().Format(time.RFC3339)), @@ -181,7 +189,11 @@ func (lh *ListenerHandler) HandleSubmitMessageStream(logger_ctx context.Context, ion.String("topic", TOPIC), ion.String("function", "MessagePassing.HandleSubmitMessageStream")) lh.handleSubmitVote(spanCtx, s, message) - defer s.Close() + defer func() { + if err := s.Close(); err != nil { + logger().NamedLogger.Error(logger_ctx, "Failed to close SubmitMessageStream", err) + } + }() case config.Type_AskForSubscription: logger().NamedLogger.Info(spanCtx, "Handling Type_AskForSubscription", ion.String("created_at", time.Now().UTC().Format(time.RFC3339)), @@ -196,7 +208,11 @@ func (lh *ListenerHandler) HandleSubmitMessageStream(logger_ctx context.Context, ion.String("topic", TOPIC), ion.String("function", "MessagePassing.HandleSubmitMessageStream")) lh.handleSubscriptionResponse(spanCtx, s, message) - defer s.Close() + defer func() { + if err := s.Close(); err != nil { + logger().NamedLogger.Error(logger_ctx, "Failed to close SubmitMessageStream", err) + } + }() case config.Type_VoteResult: logger().NamedLogger.Info(spanCtx, "Handling Type_VoteResult", ion.String("created_at", time.Now().UTC().Format(time.RFC3339)), @@ -204,7 +220,11 @@ func (lh *ListenerHandler) HandleSubmitMessageStream(logger_ctx context.Context, ion.String("topic", TOPIC), ion.String("function", "MessagePassing.HandleSubmitMessageStream")) lh.handleVoteResultRequest(spanCtx, s, message) - defer s.Close() + defer func() { + if err := s.Close(); err != nil { + logger().NamedLogger.Error(logger_ctx, "Failed to close SubmitMessageStream", err) + } + }() default: span.SetAttributes(attribute.String("status", "unknown_message_type")) logger().NamedLogger.Error(spanCtx, "Unknown message type", @@ -215,7 +235,11 @@ func (lh *ListenerHandler) HandleSubmitMessageStream(logger_ctx context.Context, ion.String("log_file", LOG_FILE), ion.String("topic", TOPIC), ion.String("function", "MessagePassing.HandleSubmitMessageStream")) - defer s.Close() + defer func() { + if err := s.Close(); err != nil { + logger().NamedLogger.Error(logger_ctx, "Failed to close SubmitMessageStream", err) + } + }() } duration := time.Since(startTime).Seconds() @@ -608,7 +632,11 @@ func (lh *ListenerHandler) runBFTConsensusFlow(logger_ctx context.Context, conte lh.sendBFTResultToSequencer(consensusSpanCtx, bftCtx.Round, bftCtx.BlockHash, myBuddyID, false, "REJECT", fmt.Sprintf("Failed to create adapter: %v", err)) return } - defer adapter.Close() + defer func() { + if err := adapter.Close(); err != nil { + logger().NamedLogger.Error(logger_ctx, "Failed to close BFT adapter", err) + } + }() logger().NamedLogger.Info(consensusSpanCtx, "BFT adapter created successfully", ion.String("context_key", contextKey), @@ -865,7 +893,11 @@ func (lh *ListenerHandler) sendBFTResultToSequencer( ion.String("function", "MessagePassing.sendBFTResultToSequencer")) return } - defer stream.Close() + defer func() { + if err := stream.Close(); err != nil { + logger().NamedLogger.Error(logger_ctx, "Failed to close stream to Sequencer", err) + } + }() // Send message messageBytes, err := json.Marshal(message) @@ -1400,7 +1432,9 @@ func (lh *ListenerHandler) sendSubscriptionResponse(logger_ctx context.Context, ion.String("log_file", LOG_FILE), ion.String("topic", TOPIC), ion.String("function", "MessagePassing.sendSubscriptionResponse")) - s.Close() + if err := s.Close(); err != nil { + logger().NamedLogger.Warn(logger_ctx, "Failed to close stream", ion.String("error", err.Error())) + } return } @@ -1419,7 +1453,9 @@ func (lh *ListenerHandler) sendSubscriptionResponse(logger_ctx context.Context, ion.String("log_file", LOG_FILE), ion.String("topic", TOPIC), ion.String("function", "MessagePassing.sendSubscriptionResponse")) - s.Close() + if err := s.Close(); err != nil { + logger().NamedLogger.Warn(logger_ctx, "Failed to close stream", ion.String("error", err.Error())) + } return } @@ -1432,7 +1468,9 @@ func (lh *ListenerHandler) sendSubscriptionResponse(logger_ctx context.Context, ion.String("error", err.Error()), ion.String("remote_peer_id", remotePeer.String()), ion.String("function", "MessagePassing.sendSubscriptionResponse")) - s.Close() + if err := s.Close(); err != nil { + logger().NamedLogger.Warn(logger_ctx, "Failed to close stream", ion.String("error", err.Error())) + } return } } @@ -1442,7 +1480,9 @@ func (lh *ListenerHandler) sendSubscriptionResponse(logger_ctx context.Context, // This is critical for preventing stream reset errors after prolonged runtime time.Sleep(200 * time.Millisecond) - s.Close() + if err := s.Close(); err != nil { + logger().NamedLogger.Warn(logger_ctx, "Failed to close stream", ion.String("error", err.Error())) + } duration := time.Since(startTime).Seconds() sendSpan.SetAttributes(attribute.Float64("duration", duration), attribute.Int("bytes_written", bytesWritten), attribute.String("status", "success")) @@ -1709,7 +1749,11 @@ func (lh *ListenerHandler) TriggerForBFTFromSequencer(s network.Stream, message return } } - defer s.Close() + defer func() { + if err := s.Close(); err != nil { + fmt.Printf("Failed to close stream: %v\n", err) + } + }() fmt.Println("📩 Received BFT trigger from Sequencer:", message) @@ -1833,7 +1877,9 @@ func (lh *ListenerHandler) TriggerForBFTFromSequencer(s network.Stream, message return nil } defer func() { - stream.Close() + if err := stream.Close(); err != nil { + fmt.Printf("Failed to close stream to %s: %v\n", buddyID, err) + } fmt.Printf("🔌 Closed stream to %s\n", buddyID) }() diff --git a/AVC/BuddyNodes/MessagePassing/MessageListener.go b/AVC/BuddyNodes/MessagePassing/MessageListener.go index 9eb4760c..36395f93 100644 --- a/AVC/BuddyNodes/MessagePassing/MessageListener.go +++ b/AVC/BuddyNodes/MessagePassing/MessageListener.go @@ -40,7 +40,11 @@ func (StructListenerNode *StructListener) HandleSubmitMessageStream(logger_ctx c remotePeer := s.Conn().RemotePeer() span.SetAttributes(attribute.String("remote_peer_id", remotePeer.String())) // Ensure stream is closed to prevent resource leaks - defer s.Close() + defer func() { + if err := s.Close(); err != nil { + logger().NamedLogger.Error(logger_ctx, "Failed to close message stream", err) + } + }() logger().NamedLogger.Info(spanCtx, "StructListener.HandleSubmitMessageStream called", ion.String("remote_peer_id", remotePeer.String()), @@ -471,7 +475,11 @@ func (StructListenerNode *StructListener) SendMessageToPeer(logger_ctx context.C } // CRITICAL FIX: Ensure stream is always closed to prevent file descriptor leaks // particularly when read timeouts occur (lines 503-517) - defer stream.Close() + defer func() { + if err := stream.Close(); err != nil { + logger().NamedLogger.Error(logger_ctx, "Failed to close SubmitMessageStream", err) + } + }() sendSpan.SetAttributes(attribute.String("connection_method", "direct")) @@ -681,7 +689,11 @@ func (StructListenerNode *StructListener) sendViaSeedNode(logger_ctx context.Con seedSpan.SetAttributes(attribute.Float64("duration", duration)) return fmt.Errorf("failed to create stream to %s: %v", peerID, err) } - defer stream.Close() + defer func() { + if err := stream.Close(); err != nil { + logger().NamedLogger.Error(logger_ctx, "Failed to close SubmitMessageStream via seed node", err) + } + }() // Send the message _, err = stream.Write([]byte(message + string(rune(config.Delimiter)))) @@ -790,7 +802,11 @@ func (StructListenerNode *StructListener) getPeerInfoFromSeedNode(logger_ctx con peerInfoSpan.SetAttributes(attribute.Float64("duration", duration)) return nil, fmt.Errorf("failed to create seed node client: %v", err) } - defer client.Close() + defer func() { + if err := client.Close(); err != nil { + logger().NamedLogger.Error(logger_ctx, "Failed to close seed node client", err) + } + }() // Get peer record from seed node peerRecord, err := client.GetPeer(peerID.String()) diff --git a/AVC/BuddyNodes/MessagePassing/Streamcache_Builder.go b/AVC/BuddyNodes/MessagePassing/Streamcache_Builder.go index 68913349..3ac33f0d 100644 --- a/AVC/BuddyNodes/MessagePassing/Streamcache_Builder.go +++ b/AVC/BuddyNodes/MessagePassing/Streamcache_Builder.go @@ -154,7 +154,9 @@ func (sc *StructStreamCache) addEntry(peerID peer.ID, stream network.Stream) { // removeEntry removes a stream entry from the cache func (sc *StructStreamCache) removeEntry(peerID peer.ID) { if entry, exists := sc.StreamCache.Streams[peerID]; exists { - entry.Stream.Close() + if err := entry.Stream.Close(); err != nil { + fmt.Printf("Failed to close stream for peer %s: %v\n", peerID, err) + } delete(sc.StreamCache.Streams, peerID) sc.removeFromOrder(peerID) } diff --git a/AVC/NodeSelection/pkg/selection/service.go b/AVC/NodeSelection/pkg/selection/service.go index 6a06d118..0eeeae2a 100644 --- a/AVC/NodeSelection/pkg/selection/service.go +++ b/AVC/NodeSelection/pkg/selection/service.go @@ -25,7 +25,11 @@ func GetBuddyNodes( return nil, err } // fmt.Printf("Debugging 1\n") - defer peerClient.Close() + defer func() { + if closeErr := peerClient.Close(); closeErr != nil { + fmt.Println("⚠️ Failed to close peer client:", closeErr) + } + }() fmt.Println("📡 Connected to peer directory at", peerDirAddress) diff --git a/Block/gRPCclient.go b/Block/gRPCclient.go index 6202a99c..9d18f972 100644 --- a/Block/gRPCclient.go +++ b/Block/gRPCclient.go @@ -882,7 +882,9 @@ func InitMempoolClient(logger_ctx context.Context, address string) error { // CloseMempoolClient closes the global mempool client func CloseMempoolClient() { if globalMempoolClient != nil { - globalMempoolClient.Close() + if err := globalMempoolClient.Close(); err != nil { + fmt.Printf("Failed to close global mempool client: %v\n", err) + } globalMempoolClient = nil } } diff --git a/CA/ImmuDB_CA/ImmuDB_CA.go b/CA/ImmuDB_CA/ImmuDB_CA.go index 85ce202d..36c201d2 100644 --- a/CA/ImmuDB_CA/ImmuDB_CA.go +++ b/CA/ImmuDB_CA/ImmuDB_CA.go @@ -204,6 +204,10 @@ func writePEM(path, blockType string, derBytes []byte) error { if err != nil { return fmt.Errorf("open %q: %w", path, err) } - defer f.Close() + defer func() { + if closeErr := f.Close(); closeErr != nil { + fmt.Printf("Error closing PEM file %q: %v\n", path, closeErr) + } + }() return pem.Encode(f, &pem.Block{Type: blockType, Bytes: derBytes}) } diff --git a/CLI/CLI.go b/CLI/CLI.go index 0f82145c..8593f1f0 100644 --- a/CLI/CLI.go +++ b/CLI/CLI.go @@ -364,7 +364,11 @@ func (h *CommandHandler) handleSeedNodeStats(parts []string) { fmt.Println("💡 Check if the seed node is running and accessible.") return } - defer client.Close() + defer func() { + if closeErr := client.Close(); closeErr != nil { + fmt.Printf("⚠️ Failed to close seed node client: %v\n", closeErr) + } + }() fmt.Println("✅ Successfully connected to seed node!") @@ -898,7 +902,11 @@ func (h *CommandHandler) handleListAliases() { fmt.Printf("❌ Failed to connect to seed node: %v\n", err) return } - defer client.Close() + defer func() { + if closeErr := client.Close(); closeErr != nil { + fmt.Printf("⚠️ Failed to close client: %v\n", closeErr) + } + }() // Get current node's peer ID currentPeerID := h.Node.Host.ID().String() @@ -960,7 +968,11 @@ func (h *CommandHandler) handleDiscoverNeighbors() { fmt.Printf("❌ Failed to connect to seed node: %v\n", err) return } - defer client.Close() + defer func() { + if closeErr := client.Close(); closeErr != nil { + fmt.Printf("⚠️ Failed to close client: %v\n", closeErr) + } + }() // Perform neighbor discovery err = client.DiscoverAndAddNeighbors(h.Node.Host, h.NodeManager) diff --git a/DB_OPs/Immudb_AVROfile.go b/DB_OPs/Immudb_AVROfile.go index 39d9f564..c1672cea 100644 --- a/DB_OPs/Immudb_AVROfile.go +++ b/DB_OPs/Immudb_AVROfile.go @@ -44,7 +44,11 @@ func BackupFromHashMap(cfg Config, MAP *hashmap.HashMap) error { if err != nil { return fmt.Errorf("dial failed: %w", err) } - defer conn.Close() + defer func() { + if closeErr := conn.Close(); closeErr != nil { + log.Printf("Failed to close grpc connection: %v", closeErr) + } + }() client := schema.NewImmuServiceClient(conn) apiCtx := context.Background() @@ -101,7 +105,11 @@ func BackupFromHashMap(cfg Config, MAP *hashmap.HashMap) error { if err != nil { return fmt.Errorf("open avro file: %w", err) } - defer avroFile.Close() + defer func() { + if closeErr := avroFile.Close(); closeErr != nil { + log.Printf("Failed to close avro file: %v", closeErr) + } + }() ocfWriter, err := goavro.NewOCFWriter(goavro.OCFConfig{ W: avroFile, diff --git a/DB_OPs/sqlops/sqlops.go b/DB_OPs/sqlops/sqlops.go index 124b7ca3..2015a7e9 100644 --- a/DB_OPs/sqlops/sqlops.go +++ b/DB_OPs/sqlops/sqlops.go @@ -34,13 +34,17 @@ func NewUnifiedDB() (*UnifiedDB, error) { // Check connection if err := db.Ping(); err != nil { - db.Close() + if closeErr := db.Close(); closeErr != nil { + fmt.Printf("Error closing db during init ping: %v\n", closeErr) + } return nil, fmt.Errorf("database ping failed: %w", err) } // Initialize the database schema if err := initializeSchema(db); err != nil { - db.Close() + if closeErr := db.Close(); closeErr != nil { + fmt.Printf("Error closing db during schema init: %v\n", closeErr) + } return nil, fmt.Errorf("failed to initialize schema: %w", err) } @@ -199,7 +203,11 @@ func (u *UnifiedDB) GetPeers(maxConnections int, limit int) ([]string, error) { if err != nil { return nil, err } - defer rows.Close() + defer func() { + if err := rows.Close(); err != nil { + fmt.Printf("Error closing rows in GetPeers: %v\n", err) + } + }() var peers []string for rows.Next() { @@ -224,7 +232,11 @@ func (u *UnifiedDB) GetAllPeers() ([]string, error) { if err != nil { return nil, err } - defer rows.Close() + defer func() { + if err := rows.Close(); err != nil { + fmt.Printf("Error closing rows in GetAllPeers: %v\n", err) + } + }() var peers []string for rows.Next() { @@ -335,7 +347,11 @@ func (u *UnifiedDB) GetAllKeyValues() (map[string]string, error) { if err != nil { return nil, err } - defer rows.Close() + defer func() { + if err := rows.Close(); err != nil { + fmt.Printf("Error closing rows in GetAllKeyValues: %v\n", err) + } + }() data := make(map[string]string) for rows.Next() { @@ -374,7 +390,11 @@ func (u *UnifiedDB) GetAllMerkleHashes() (map[string]string, error) { if err != nil { return nil, err } - defer rows.Close() + defer func() { + if err := rows.Close(); err != nil { + fmt.Printf("Error closing rows in GetAllMerkleHashes: %v\n", err) + } + }() data := make(map[string]string) for rows.Next() { @@ -415,12 +435,16 @@ func (u *UnifiedDB) GetConnectedPeers() ([]PeerInfo, error) { var dfltValue interface{} if err := rows.Scan(&cid, &name, &typeName, ¬Null, &dfltValue, &pk); err != nil { - rows.Close() + if closeErr := rows.Close(); closeErr != nil { + fmt.Printf("Error closing rows after scan failure: %v\n", closeErr) + } return nil, fmt.Errorf("failed to scan column info: %w", err) } } - rows.Close() + if err := rows.Close(); err != nil { + fmt.Printf("Error closing PRAGMA rows: %v\n", err) + } // Now query the actual data query := fmt.Sprintf(`SELECT peer_id, multiaddr, last_seen, heartbeat_fail, is_alive FROM %s`, config.ConnectedPeers) @@ -429,7 +453,11 @@ func (u *UnifiedDB) GetConnectedPeers() ([]PeerInfo, error) { if err != nil { return nil, fmt.Errorf("failed to query connected peers: %w", err) } - defer rows.Close() + defer func() { + if err := rows.Close(); err != nil { + fmt.Printf("Error closing connected peers rows: %v\n", err) + } + }() var peers []PeerInfo for rows.Next() { @@ -490,7 +518,11 @@ func (u *UnifiedDB) GetConnectedPeersAsMap() ([]map[string]interface{}, error) { if err != nil { return nil, fmt.Errorf("failed to query connected peers: %w", err) } - defer rows.Close() + defer func() { + if err := rows.Close(); err != nil { + fmt.Printf("Error closing connected peers map rows: %v\n", err) + } + }() var result []map[string]interface{} diff --git a/Pubsub/Publish/Publish.go b/Pubsub/Publish/Publish.go index d3eee87e..04d6254a 100644 --- a/Pubsub/Publish/Publish.go +++ b/Pubsub/Publish/Publish.go @@ -354,7 +354,14 @@ func sendToPeer(gps *PubSubMessages.GossipPubSub, peerID peer.ID, messageBytes [ ion.String("function", "Publish.sendToPeer")) return err } - defer stream.Close() + defer func() { + if closeErr := stream.Close(); closeErr != nil { + logger().NamedLogger.Error(trace_ctx, "Failed to close stream to peer", + closeErr, + ion.String("peer", peerID.String()), + ion.String("function", "Publish.sendToPeer")) + } + }() err = writeMessage(stream, messageBytes) if err != nil { diff --git a/Pubsub/Pubsub.go b/Pubsub/Pubsub.go index 523afe0e..c83794f9 100644 --- a/Pubsub/Pubsub.go +++ b/Pubsub/Pubsub.go @@ -218,7 +218,11 @@ func RemovePeerFromChannel(gps *PubSubMessages.GossipPubSub, channelName string, // handleGossipStream handles incoming gossip messages func handleGossipStream(gps *PubSubMessages.GossipPubSub, s network.Stream) { - defer s.Close() + defer func() { + if err := s.Close(); err != nil { + log.Printf("Error closing gossip stream: %v", err) + } + }() // Read message using delimiter messageBytes, err := readMessage(s) diff --git a/Sequencer/Alerts/service.go b/Sequencer/Alerts/service.go index 5b28d793..276df2be 100644 --- a/Sequencer/Alerts/service.go +++ b/Sequencer/Alerts/service.go @@ -88,7 +88,11 @@ func (s *alertService) sendAlert( log.Printf("❌ [ALERT] Failed to send %s alert: %v", alertName, err) return } - defer resp.Body.Close() + defer func() { + if closeErr := resp.Body.Close(); closeErr != nil { + log.Printf("❌ [ALERT] Failed to close response body: %v", closeErr) + } + }() if resp.StatusCode >= 200 && resp.StatusCode < 300 { log.Printf("✅ [ALERT] Successfully sent %s alert (status: %d)", alertName, resp.StatusCode) diff --git a/Sequencer/Consensus.go b/Sequencer/Consensus.go index f8a4c36f..10b12eb3 100644 --- a/Sequencer/Consensus.go +++ b/Sequencer/Consensus.go @@ -1561,7 +1561,14 @@ func (consensus *Consensus) requestVoteResultFromBuddy(peerID peer.ID) *BLS_Sign ion.String("function", "Consensus.requestVoteResultFromBuddy")) return nil } - defer stream.Close() + defer func() { + if closeErr := stream.Close(); closeErr != nil { + logger().NamedLogger.Error(logger_ctx, "Failed to close stream", + closeErr, + ion.String("peer_id", peerID.String()), + ion.String("function", "Consensus.requestVoteResultFromBuddy")) + } + }() // Build request message reqAck := PubSubMessages.NewACKBuilder().True_ACK_Message(consensus.Host.ID(), config.Type_VoteResult) diff --git a/Sequencer/Triggers/Triggers.go b/Sequencer/Triggers/Triggers.go index 3afbb28f..401438d1 100644 --- a/Sequencer/Triggers/Triggers.go +++ b/Sequencer/Triggers/Triggers.go @@ -382,7 +382,11 @@ func RequestVoteResultsFromBuddies(blockhash string) error { log.Printf("RequestVoteResultsFromBuddies: Failed to open stream to %s: %v", peerID, err) return err } - defer stream.Close() + defer func() { + if closeErr := stream.Close(); closeErr != nil { + log.Printf("RequestVoteResultsFromBuddies: Failed to close stream to %s: %v", peerID, closeErr) + } + }() // Create vote result request message reqAck := AVCStruct.NewACKBuilder().True_ACK_Message(listenerNode.PeerID, config.Type_VoteResult) diff --git a/config/utils/utils.go b/config/utils/utils.go index ba7ca68f..9955bd5d 100644 --- a/config/utils/utils.go +++ b/config/utils/utils.go @@ -34,7 +34,11 @@ func loadPrivateKeyFromFile() (crypto.PrivKey, peer.ID, error) { if err != nil { return nil, "", fmt.Errorf("failed to open peer.json: %v", err) } - defer file.Close() + defer func() { + if closeErr := file.Close(); closeErr != nil { + fmt.Printf("Failed to close peer.json: %v\n", closeErr) + } + }() // Decode JSON var cfg config.PeerConfig diff --git a/fastsync/fastsync.go b/fastsync/fastsync.go index c3514460..fdde1215 100644 --- a/fastsync/fastsync.go +++ b/fastsync/fastsync.go @@ -943,7 +943,11 @@ func (fs *FastSync) Phase3_FileRequest(msg *SyncMessage, peerID peer.ID, stream log.Error().Err(err).Msg("Failed to create new stream for retry") continue } - defer newStream.Close() + defer func() { + if closeErr := newStream.Close(); closeErr != nil { + log.Error().Err(closeErr).Msg("Failed to close new transfer stream") + } + }() // Update the stream and its reader/writer stream = newStream @@ -1128,14 +1132,18 @@ func (fs *FastSync) batchCreateOrderedWithRetry(entries []struct { if dbType == MainDB { newClient, clientErr = DB_OPs.GetMainDBConnectionandPutBack(context.Background()) if clientErr == nil { - DB_OPs.Close(fs.mainDB.Client) + if closeErr := DB_OPs.Close(fs.mainDB.Client); closeErr != nil { + log.Error().Err(closeErr).Msg("Failed to close expired mainDB client") + } DB_OPs.PutMainDBConnection(fs.mainDB) fs.mainDB = newClient } } else { newClient, clientErr = DB_OPs.GetAccountConnectionandPutBack(context.Background()) if clientErr == nil { - DB_OPs.Close(fs.accountsDB.Client) + if closeErr := DB_OPs.Close(fs.accountsDB.Client); closeErr != nil { + log.Error().Err(closeErr).Msg("Failed to close expired accountsDB client") + } DB_OPs.PutAccountsConnection(fs.accountsDB) fs.accountsDB = newClient } @@ -1255,7 +1263,11 @@ func (fs *FastSync) PushDataToDB(msg *SyncMessage, dbType DatabaseType, dbPath s if err != nil { return fmt.Errorf("failed to open avro file: %w", err) } - defer file.Close() + defer func() { + if closeErr := file.Close(); closeErr != nil { + log.Error().Err(closeErr).Msg("Failed to close avro file") + } + }() startTime := time.Now() fmt.Printf(">>> [DB] Starting database restore from AVRO: %s (DB: %s)\n", filepath.Base(dbPath), dbTypeToString(dbType)) diff --git a/fastsync/fastsyncNew.go b/fastsync/fastsyncNew.go index d73eecba..3875ab68 100644 --- a/fastsync/fastsyncNew.go +++ b/fastsync/fastsyncNew.go @@ -1068,7 +1068,9 @@ func (fs *FastSync) handleStream(stream network.Stream) { Msg("PANIC in handleStream - recovering and closing stream") fmt.Printf(">>> [SERVER] PANIC in handleStream: %v\n", r) } - stream.Close() + if closeErr := stream.Close(); closeErr != nil { + log.Error().Err(closeErr).Msg("Failed to close sync stream") + } }() peerID := stream.Conn().RemotePeer() @@ -1749,7 +1751,11 @@ func (fs *FastSync) HandleSync(peerID peer.ID) (*SyncMessage, error) { shouldAbort := false // Use a closure to ensure stream is closed BEFORE Phase 1 (avoid idle timeout) func() { - defer preStream.Close() + defer func() { + if closeErr := preStream.Close(); closeErr != nil { + log.Error().Err(closeErr).Msg("Failed to close pre-sync stream") + } + }() preReader := bufio.NewReader(preStream) preWriter := bufio.NewWriter(preStream) @@ -1793,7 +1799,11 @@ func (fs *FastSync) HandleSync(peerID peer.ID) (*SyncMessage, error) { if err != nil { return nil, err } - defer (stream).Close() + defer func() { + if closeErr := stream.Close(); closeErr != nil { + log.Error().Err(closeErr).Msg("Failed to close main sync stream") + } + }() reader := bufio.NewReader(stream) writer := bufio.NewWriter(stream) @@ -1969,7 +1979,11 @@ func (fs *FastSync) HandleSync(peerID peer.ID) (*SyncMessage, error) { postStream, err := returnStream(fs, peerID) if err == nil { func() { - defer postStream.Close() + defer func() { + if closeErr := postStream.Close(); closeErr != nil { + log.Error().Err(closeErr).Msg("Failed to close post-sync stream") + } + }() postReader := bufio.NewReader(postStream) postWriter := bufio.NewWriter(postStream) @@ -2470,7 +2484,11 @@ func (fs *FastSync) FirstSyncClient(peerID peer.ID) error { if err == nil { shouldAbort := false func() { - defer preStream.Close() + defer func() { + if closeErr := preStream.Close(); closeErr != nil { + log.Error().Err(closeErr).Msg("Failed to close first-sync pre stream") + } + }() preReader := bufio.NewReader(preStream) preWriter := bufio.NewWriter(preStream) @@ -2625,7 +2643,11 @@ func (fs *FastSync) FirstSyncClient(peerID peer.ID) error { postStream, err := returnStream(fs, peerID) if err == nil { func() { - defer postStream.Close() + defer func() { + if closeErr := postStream.Close(); closeErr != nil { + log.Error().Err(closeErr).Msg("Failed to close first-sync post stream") + } + }() postReader := bufio.NewReader(postStream) postWriter := bufio.NewWriter(postStream) @@ -2664,7 +2686,11 @@ func (fs *FastSync) FirstSyncClient(peerID peer.ID) error { contentStream, err := returnStream(fs, peerID) if err == nil { func() { - defer contentStream.Close() + defer func() { + if closeErr := contentStream.Close(); closeErr != nil { + log.Error().Err(closeErr).Msg("Failed to close first-sync content stream") + } + }() cw := bufio.NewWriter(contentStream) cr := bufio.NewReader(contentStream) diff --git a/gETH/Facade/rpc/ws_server.go b/gETH/Facade/rpc/ws_server.go index 2a5aa488..9a910ea3 100644 --- a/gETH/Facade/rpc/ws_server.go +++ b/gETH/Facade/rpc/ws_server.go @@ -96,7 +96,11 @@ func (s *WSServer) handleWS(w http.ResponseWriter, r *http.Request) { if err != nil { return } - defer conn.Close() + defer func() { + if closeErr := conn.Close(); closeErr != nil { + log.Printf("Failed to close websocket connection: %v\n", closeErr) + } + }() ctx, cancel := context.WithCancel(context.Background()) defer cancel() diff --git a/main.go b/main.go index 1babe005..025dcf29 100644 --- a/main.go +++ b/main.go @@ -214,7 +214,11 @@ func runCommand(command string, args []string, grpcPort int) { client, err := cli.NewClient(fmt.Sprintf("localhost:%d", grpcPort)) if err == nil { - defer client.Close() + defer func() { + if err := client.Close(); err != nil { + fmt.Printf("Error closing client: %v\n", err) + } + }() v, err := client.GetNodeVersion() if err == nil { fmt.Println("Remote Node Version (Running):") @@ -237,7 +241,11 @@ func runCommand(command string, args []string, grpcPort int) { fmt.Println("Make sure the service is running.") os.Exit(1) } - defer client.Close() + defer func() { + if err := client.Close(); err != nil { + fmt.Printf("Error closing client: %v\n", err) + } + }() switch command { @@ -862,7 +870,11 @@ func main() { fmt.Println("Error starting node:", err) return } - defer n.Host.Close() + defer func() { + if err := n.Host.Close(); err != nil { + fmt.Printf("Failed to close node host: %v\n", err) + } + }() fmt.Println("Node created successfully") // Set the host instance for broadcast messaging @@ -1030,7 +1042,11 @@ func main() { fmt.Printf("Failed to create seed node client: %v\n", err) log.Error().Err(err).Msg("Failed to create seed node client") } else { - defer seedClient.Close() + defer func() { + if err := seedClient.Close(); err != nil { + log.Error().Err(err).Msg("Failed to close seed client") + } + }() // Register this peer with the seed node (with or without alias) if cfg.Node.Alias != "" { diff --git a/messaging/DIDPropagation.go b/messaging/DIDPropagation.go index 4ccad715..39485716 100644 --- a/messaging/DIDPropagation.go +++ b/messaging/DIDPropagation.go @@ -200,7 +200,11 @@ func HandleDIDStream(stream network.Stream) { return } } - defer stream.Close() + defer func() { + if err := stream.Close(); err != nil { + log.Error().Err(err).Msg("Failed to close DID incoming stream") + } + }() // Get the remote peer remotePeer := stream.Conn().RemotePeer().String() @@ -326,7 +330,11 @@ func forwardDID(h host.Host, msg DIDMessage) { log.Error().Err(err).Str("peer", peerIDForGoroutine.String()).Msg("Failed to open DID stream") return err } - defer stream.Close() + defer func() { + if closeErr := stream.Close(); closeErr != nil { + log.Error().Err(closeErr).Str("peer", peerIDForGoroutine.String()).Msg("Failed to close DID stream") + } + }() // Write the message _, err = stream.Write(msgBytes) @@ -449,7 +457,11 @@ func PropagateDID(h host.Host, doc *DB_OPs.Account) error { log.Error().Err(err).Str("peer", peerIDForGoroutine.String()).Msg("Failed to open stream for DID") return err } - defer stream.Close() + defer func() { + if closeErr := stream.Close(); closeErr != nil { + log.Error().Err(closeErr).Str("peer", peerIDForGoroutine.String()).Msg("Failed to close stream for DID") + } + }() // Send the message _, err = stream.Write(msgBytes) diff --git a/messaging/blockPropagation.go b/messaging/blockPropagation.go index 94a3824e..fe99f1cc 100644 --- a/messaging/blockPropagation.go +++ b/messaging/blockPropagation.go @@ -204,7 +204,11 @@ func HandleBlockStream(stream network.Stream) { return } } - defer stream.Close() + defer func() { + if err := stream.Close(); err != nil { + log.Error().Err(err).Msg("Failed to close block stream") + } + }() remotePeer := stream.Conn().RemotePeer().String() if isPeerTimedOut(remotePeer) { @@ -442,7 +446,11 @@ func forwardBlock(h host.Host, msg config.BlockMessage) { log.Debug().Err(err).Str("peer", peerIDForGoroutine.String()).Msg("Failed to open stream") return err } - defer stream.Close() + defer func() { + if closeErr := stream.Close(); closeErr != nil { + log.Debug().Err(closeErr).Str("peer", peerIDForGoroutine.String()).Msg("Failed to close stream") + } + }() if _, err := stream.Write(msgBytes); err != nil { log.Debug().Err(err).Str("peer", peerIDForGoroutine.String()).Msg("Failed to write message") diff --git a/messaging/broadcast.go b/messaging/broadcast.go index 2c082eb6..59fec878 100644 --- a/messaging/broadcast.go +++ b/messaging/broadcast.go @@ -111,7 +111,11 @@ func markMessageSeen(msgID string) { // HandleBroadcastStream processes incoming broadcast messages func HandleBroadcastStream(stream network.Stream) { - defer stream.Close() + defer func() { + if err := stream.Close(); err != nil { + log.Error().Err(err).Str("peer", stream.Conn().RemotePeer().String()).Msg("Failed to close broadcast incoming stream") + } + }() // Record metrics metrics.MessagesReceivedCounter.WithLabelValues("broadcast", stream.Conn().RemotePeer().String()).Inc() @@ -236,12 +240,16 @@ func forwardBroadcast(h host.Host, msg BroadcastMessageStruct) { _, err = stream.Write(msgBytes) if err != nil { log.Error().Err(err).Str("peer", peerID.String()).Msg("Failed to write broadcast message") - stream.Close() + if closeErr := stream.Close(); closeErr != nil { + log.Error().Err(closeErr).Str("peer", peerID.String()).Msg("Failed to close broadcast stream") + } continue } // Close the stream - stream.Close() + if closeErr := stream.Close(); closeErr != nil { + log.Error().Err(closeErr).Str("peer", peerID.String()).Msg("Failed to close broadcast stream") + } successCount++ // Record metrics @@ -319,7 +327,11 @@ func BroadcastMessage(h host.Host, content string) error { log.Error().Err(err).Str("peer", peer.String()).Msg("Failed to open broadcast stream") return err } - defer stream.Close() + defer func() { + if closeErr := stream.Close(); closeErr != nil { + log.Error().Err(closeErr).Str("peer", peer.String()).Msg("Failed to close broadcast stream") + } + }() // Send the message _, err = stream.Write(msgBytes) @@ -538,7 +550,11 @@ func BroadcastVoteTrigger(h host.Host, consensusMessage *PubSubMessages.Consensu log.Error().Err(err).Str("peer", peer.String()).Msg("Failed to open broadcast stream for vote trigger") return err } - defer stream.Close() + defer func() { + if closeErr := stream.Close(); closeErr != nil { + log.Error().Err(closeErr).Str("peer", peer.String()).Msg("Failed to close broadcast stream for vote trigger") + } + }() // Send the message _, err = stream.Write(msgBytes) @@ -671,7 +687,11 @@ func BroadcastBlockToEveryNodeWithExtraData(h host.Host, block *config.ZKBlock, log.Debug().Err(err).Str("peer", peer.String()).Msg("Failed to open stream") return err } - defer stream.Close() + defer func() { + if closeErr := stream.Close(); closeErr != nil { + log.Debug().Err(closeErr).Str("peer", peer.String()).Msg("Failed to close stream") + } + }() if _, err := stream.Write(msgBytes); err != nil { log.Debug().Err(err).Str("peer", peer.String()).Msg("Failed to write message") return err diff --git a/messaging/directMSG/directMSG.go b/messaging/directMSG/directMSG.go index f340d22b..0ae2ce38 100644 --- a/messaging/directMSG/directMSG.go +++ b/messaging/directMSG/directMSG.go @@ -96,7 +96,9 @@ func StartYggdrasilListener(ctx context.Context) { LocalGRO.Go(GRO.MessageListenerThread, func(ctx context.Context) error { defer func() { log.Info().Msg("Closing Yggdrasil listener") - listener.Close() + if err := listener.Close(); err != nil { + log.Error().Err(err).Msg("Failed to close Yggdrasil listener") + } }() for { @@ -138,7 +140,9 @@ func StartYggdrasilListener(ctx context.Context) { return nil }); err != nil { log.Error().Err(err).Msg("Failed to start goroutine for Yggdrasil connection") - connForGoroutine.Close() // Close connection if we can't handle it + if closeErr := connForGoroutine.Close(); closeErr != nil { + log.Error().Err(closeErr).Msg("Failed to close Yggdrasil connection") + } } } } @@ -147,7 +151,11 @@ func StartYggdrasilListener(ctx context.Context) { // handleYggdrasilConnection processes incoming Yggdrasil messages func handleYggdrasilConnection(conn net.Conn) { - defer conn.Close() + defer func() { + if err := conn.Close(); err != nil { + log.Error().Err(err).Msg("Failed to close Yggdrasil connection") + } + }() // Get remote address for display and logging remoteAddr := conn.RemoteAddr().String() @@ -373,7 +381,9 @@ func getConnection(targetAddr string) (net.Conn, error) { return pooled.conn, nil } // Connection expired, close it - pooled.conn.Close() + if closeErr := pooled.conn.Close(); closeErr != nil { + log.Error().Err(closeErr).Msg("Failed to close pooled connection") + } delete(connectionPool, targetAddr) } @@ -403,7 +413,9 @@ func removeConnection(targetAddr string) { defer connectionPoolLock.Unlock() if pooled, exists := connectionPool[targetAddr]; exists { - pooled.conn.Close() + if closeErr := pooled.conn.Close(); closeErr != nil { + log.Error().Err(closeErr).Msg("Failed to close pooled connection") + } delete(connectionPool, targetAddr) } } @@ -422,7 +434,9 @@ func cleanConnectionPool(ctx context.Context) { for addr, conn := range connectionPool { if now.After(conn.expiresAt) { - conn.conn.Close() + if closeErr := conn.conn.Close(); closeErr != nil { + log.Error().Err(closeErr).Msg("Failed to close expired pooled connection") + } delete(connectionPool, addr) } } diff --git a/messaging/message.go b/messaging/message.go index aad9a874..1070127f 100644 --- a/messaging/message.go +++ b/messaging/message.go @@ -17,7 +17,11 @@ import ( // HandleMessageStream processes incoming messages (TCP) func HandleMessageStream(s network.Stream) { - defer s.Close() + defer func() { + if err := s.Close(); err != nil { + fmt.Println("Error closing message stream:", err) + } + }() reader := bufio.NewReader(s) msg, err := reader.ReadString('\n') if err != nil { @@ -60,7 +64,11 @@ func SendMessage(n *config.Node, target string, message string) error { if err != nil { return fmt.Errorf("stream failed: %v", err) } - defer s.Close() + defer func() { + if closeErr := s.Close(); closeErr != nil { + fmt.Println("Error closing stream:", closeErr) + } + }() // Record message metrics metrics.MessagesSentCounter.WithLabelValues("message", peerInfo.ID.String()).Inc() diff --git a/node/node.go b/node/node.go index c9617e5a..ca44e366 100644 --- a/node/node.go +++ b/node/node.go @@ -44,7 +44,11 @@ func loadOrCreatePrivateKey() (crypto.PrivKey, peer.ID, error) { if err != nil { return nil, "", fmt.Errorf("failed to open peer.json: %v", err) } - defer file.Close() + defer func() { + if closeErr := file.Close(); closeErr != nil { + fmt.Printf("Failed to close peer.json: %v\n", closeErr) + } + }() if err := json.NewDecoder(file).Decode(&config); err != nil { return nil, "", fmt.Errorf("failed to decode peer.json: %v", err) @@ -104,7 +108,11 @@ func loadOrCreatePrivateKey() (crypto.PrivKey, peer.ID, error) { if err != nil { return nil, "", fmt.Errorf("failed to create peer.json: %v", err) } - defer file.Close() + defer func() { + if closeErr := file.Close(); closeErr != nil { + fmt.Printf("Failed to close peer.json: %v\n", closeErr) + } + }() if err := json.NewEncoder(file).Encode(config); err != nil { return nil, "", fmt.Errorf("failed to write peer.json: %v", err) @@ -248,7 +256,11 @@ func NewNode(logger_ctx context.Context) (*config.Node, error) { // Capture stream in closure to avoid race condition stream := s LocalGRO.Go(GRO.NodeStreamThread, func(ctx context.Context) error { - defer stream.Close() + defer func() { + if closeErr := stream.Close(); closeErr != nil { + fmt.Printf("Failed to close incoming stream: %v\n", closeErr) + } + }() listenerHandler.HandleSubmitMessageStream(logger_ctx, stream) return nil }) @@ -341,7 +353,11 @@ func GetPeerIDFromJSON() string { fmt.Println("Failed to open peer.json:", err) return "" } - defer file.Close() + defer func() { + if closeErr := file.Close(); closeErr != nil { + fmt.Printf("Failed to close peer.json: %v\n", closeErr) + } + }() // Decode JSON var config config.PeerConfig diff --git a/node/nodemanager.go b/node/nodemanager.go index 80b02fd3..99493dec 100644 --- a/node/nodemanager.go +++ b/node/nodemanager.go @@ -151,7 +151,9 @@ func createNodeManager(node *config.Node) (*NodeManager, error) { // Ping to ensure connection is valid if err := db.Ping(); err != nil { - db.Close() + if closeErr := db.Close(); closeErr != nil { + fmt.Printf("Failed to close node manager DB on ping error: %v\n", closeErr) + } return nil, errors.New("database ping failed: " + err.Error()) } @@ -692,7 +694,11 @@ func (nm *NodeManager) UpdatePeerStatus(peerID peer.ID, isAlive bool, failCount } func (nm *NodeManager) handleHeartbeat(stream network.Stream) { - defer stream.Close() + defer func() { + if closeErr := stream.Close(); closeErr != nil { + logger().NamedLogger.Error(context.Background(), "Failed to close heartbeat stream", closeErr) + } + }() // Record trace span and close it span_ctx, span := logger().NamedLogger.Tracer("NodeManager").Start(nm.ctx, "NodeManager.handleHeartbeat") @@ -947,7 +953,11 @@ func (nm *NodeManager) sendHeartbeat(peerID peer.ID) (bool, error) { ) return false, errors.New("failed to open heartbeat stream: " + err.Error()) } - defer stream.Close() + defer func() { + if closeErr := stream.Close(); closeErr != nil { + logger().NamedLogger.Error(span_ctx, "Failed to close send heartbeat stream", closeErr) + } + }() span.SetAttributes(attribute.String("stream_opened", "true")) diff --git a/seed/seed.go b/seed/seed.go index 2c8b44d5..aa65d8e7 100644 --- a/seed/seed.go +++ b/seed/seed.go @@ -83,7 +83,11 @@ func RegisterAsSeed(node *config.Node) error { // handleSeedRequest handles general seed node requests func handleSeedRequest(stream network.Stream, node *config.Node) { // Basic handler for seed protocol - defer stream.Close() + defer func() { + if closeErr := stream.Close(); closeErr != nil { + fmt.Printf("Error closing stream: %v\n", closeErr) + } + }() // Read the request reader := bufio.NewReader(stream) @@ -104,7 +108,11 @@ func handleSeedRequest(stream network.Stream, node *config.Node) { // handlePeerDiscoveryRequest processes requests for peer discovery func handlePeerDiscoveryRequest(stream network.Stream, node *config.Node) { - defer stream.Close() + defer func() { + if closeErr := stream.Close(); closeErr != nil { + fmt.Printf("Error closing stream: %v\n", closeErr) + } + }() if !node.IsSeed { stream.Write([]byte("{\"error\": \"Not a seed node\"}\n")) @@ -203,7 +211,11 @@ func handlePeerDiscoveryRequest(stream network.Stream, node *config.Node) { // handleRegisterRequest handles peer registration func handleRegisterRequest(stream network.Stream, node *config.Node) { - defer stream.Close() + defer func() { + if closeErr := stream.Close(); closeErr != nil { + fmt.Printf("Error closing stream: %v\n", closeErr) + } + }() if !node.IsSeed { stream.Write([]byte("{\"error\": \"Not a seed node\"}\n")) @@ -306,7 +318,11 @@ func RequestPeers(h host.Host, seedAddr string, maxPeers int, peerType string) ( if err != nil { return nil, fmt.Errorf("failed to open peer discovery stream: %v", err) } - defer stream.Close() + defer func() { + if closeErr := stream.Close(); closeErr != nil { + fmt.Printf("Error closing stream: %v\n", closeErr) + } + }() // Send request request := PeerRequest{ diff --git a/seed/seedhelper.go b/seed/seedhelper.go index 6e6b9df6..317122f4 100644 --- a/seed/seedhelper.go +++ b/seed/seedhelper.go @@ -44,7 +44,11 @@ func (sn *SeedNode) GetAllPeers() ([]string, error) { if err != nil { return nil, err } - defer rows.Close() + defer func() { + if closeErr := rows.Close(); closeErr != nil { + fmt.Printf("Error closing rows: %v\n", closeErr) + } + }() var peers []string for rows.Next() { @@ -64,7 +68,11 @@ func (sn *SeedNode) GetPeers() ([]string, error) { if err != nil { return nil, err } - defer rows.Close() + defer func() { + if closeErr := rows.Close(); closeErr != nil { + fmt.Printf("Error closing rows: %v\n", closeErr) + } + }() var peers []string for rows.Next() { @@ -233,7 +241,11 @@ func (sn *SeedNode) PingPeer(peerAddr string) (bool, error) { fmt.Printf("Failed to open heartbeat stream to %s: %v\n", peerInfo.ID, err) return false, nil // Not an error, just offline } - defer s.Close() + defer func() { + if closeErr := s.Close(); closeErr != nil { + fmt.Printf("Error closing heartbeat stream: %v\n", closeErr) + } + }() // Update last seen time in database _, err = sn.db.Exec("UPDATE peers SET connections = connections + 1 WHERE peerID = ?", peerInfo.ID.String()) @@ -278,7 +290,11 @@ func (sn *SeedNode) ListPeers() ([]*PeerStatus, error) { if err != nil { return nil, fmt.Errorf("database error listing peers: %w", err) } - defer rows.Close() + defer func() { + if closeErr := rows.Close(); closeErr != nil { + fmt.Printf("Error closing rows: %v\n", closeErr) + } + }() var peers []*PeerStatus for rows.Next() { @@ -401,7 +417,11 @@ func (sn *SeedNode) performPeerMaintenance() { // RegisterPeer is called by a peer to register itself with the seed node func (sn *SeedNode) RegisterPeer(stream network.Stream) { - defer stream.Close() + defer func() { + if closeErr := stream.Close(); closeErr != nil { + fmt.Printf("Error closing registration stream: %v\n", closeErr) + } + }() // Get the peer's info remotePeer := stream.Conn().RemotePeer() diff --git a/seednode/seednode.go b/seednode/seednode.go index 0d5af231..32ba6ec8 100644 --- a/seednode/seednode.go +++ b/seednode/seednode.go @@ -44,7 +44,11 @@ func getPublicIP() (string, error) { if err != nil { return "", fmt.Errorf("failed to get public IP: %w", err) } - defer resp.Body.Close() + defer func() { + if closeErr := resp.Body.Close(); closeErr != nil { + fmt.Printf("Failed to close public IP response body: %v\n", closeErr) + } + }() if resp.StatusCode != http.StatusOK { return "", fmt.Errorf("unexpected status code: %d", resp.StatusCode) diff --git a/transfer/file.go b/transfer/file.go index 79a5b8ce..e603d7f6 100644 --- a/transfer/file.go +++ b/transfer/file.go @@ -246,7 +246,11 @@ func calculateSpeedTrend(samples []float64) float64 { // HandleFileStream processes incoming files with adaptive buffer sizing // If outputPath is empty, defaults to "received__" func HandleFileStream(s network.Stream, outputPath string) { - defer s.Close() + defer func() { + if closeErr := s.Close(); closeErr != nil { + fmt.Println("Error closing stream:", closeErr) + } + }() startTime := time.Now().UTC() peerID := s.Conn().RemotePeer().String() @@ -301,7 +305,11 @@ func HandleFileStream(s network.Stream, outputPath string) { fmt.Println("Error creating file:", err) return } - defer file.Close() + defer func() { + if closeErr := file.Close(); closeErr != nil { + fmt.Println("Error closing file:", closeErr) + } + }() // Use buffered reader with initial buffer size bufferSize := initialBuffer @@ -432,7 +440,11 @@ func SendFile(h host.Host, peerID peer.ID, filePath, remotePath string) error { if err != nil { return fmt.Errorf("file open failed: %v", err) } - defer file.Close() + defer func() { + if closeErr := file.Close(); closeErr != nil { + fmt.Println("Error closing file:", closeErr) + } + }() // Get initial buffer size initialBuffer := getOptimalBufferSize(peerIDStr) @@ -444,7 +456,11 @@ func SendFile(h host.Host, peerID peer.ID, filePath, remotePath string) error { if err != nil { return fmt.Errorf("stream failed: %v", err) } - defer s.Close() + defer func() { + if closeErr := s.Close(); closeErr != nil { + fmt.Println("Error closing stream:", closeErr) + } + }() // Send file metadata (size and filename) header := make([]byte, 16+1024) // 16 bytes for size + 1024 for filename