Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions AVC/BFT/bft/buddy_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,11 @@ func (s *BuddyService) runBFTConsensus(ctx context.Context, req *pb.BFTRequest)
s.reportFailure(req, fmt.Sprintf("failed to create BFT adapter: %v", err))
return
}
defer adapter.Close()
defer func() {
if err := adapter.Close(); err != nil {
log.Printf("⚠️ [%s] Failed to close BFT adapter: %v", s.buddyID, err)
}
}()

log.Printf("✅ [%s] BFT adapter initialized on channel: %s", s.buddyID, req.GossipsubTopic)

Expand Down Expand Up @@ -211,7 +215,11 @@ func (s *BuddyService) reportResult(req *pb.BFTRequest, result *Result) {
log.Printf("❌ [%s] Failed to connect to sequencer: %v", s.buddyID, err)
return
}
defer conn.Close()
defer func() {
if closeErr := conn.Close(); closeErr != nil {
log.Printf("⚠️ [%s] Failed to close sequencer connection: %v", s.buddyID, closeErr)
}
}()

client := pb.NewBFTServiceClient(conn)

Expand Down Expand Up @@ -255,7 +263,11 @@ func (s *BuddyService) reportFailure(req *pb.BFTRequest, reason string) {
log.Printf("❌ [%s] Failed to connect to sequencer: %v", s.buddyID, err)
return
}
defer conn.Close()
defer func() {
if closeErr := conn.Close(); closeErr != nil {
log.Printf("⚠️ [%s] Failed to close sequencer connection: %v", s.buddyID, closeErr)
}
}()

client := pb.NewBFTServiceClient(conn)
pbResult := &pb.BFTResult{
Expand Down
6 changes: 5 additions & 1 deletion AVC/BFT/bft/sequencer_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,11 @@ func (s *SequencerBFTClient) sendBFTRequest(
Message: fmt.Sprintf("connection failed: %v", err),
}
}
defer conn.Close()
defer func() {
if err := conn.Close(); err != nil {
log.Printf("❌ [Sequencer] Failed to close connection to %s: %v", buddy.ID, err)
}
}()

client := pb.NewBFTServiceClient(conn)

Expand Down
4 changes: 3 additions & 1 deletion AVC/BFT/network/libp2p_setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ func SetupLibp2pHost(ctx context.Context, port int) (host.Host, *pubsub.PubSub,
// Create GossipSub
ps, err := pubsub.NewGossipSub(ctx, h)
if err != nil {
h.Close()
if closeErr := h.Close(); closeErr != nil {
fmt.Printf("❌ Failed to close libp2p host: %v\n", closeErr)
}
return nil, nil, fmt.Errorf("failed to create gossipsub: %w", err)
}

Expand Down
6 changes: 5 additions & 1 deletion AVC/BuddyNodes/MessagePassing/BuddyNodeStream.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,11 @@ func NewStructBuddyNode(buddy *AVCStruct.BuddyNode) *StructBuddyNode {

// HandleBuddyNodesMessageStream handles incoming messages on the buddy nodes protocol
func (StructBuddyNode *StructBuddyNode) HandleBuddyNodesMessageStream(host host.Host, s network.Stream) {
defer s.Close()
defer func() {
if err := s.Close(); err != nil {
logger().NamedLogger.Error(context.Background(), "Failed to close buddy nodes stream", err)
}
}()

// Create context for this stream handler
spanCtx := StructBuddyNode.logger_ctx
Expand Down
74 changes: 60 additions & 14 deletions AVC/BuddyNodes/MessagePassing/ListenerHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,11 @@ func (lh *ListenerHandler) HandleSubmitMessageStream(logger_ctx context.Context,
attribute.String("remote_peer_id", remotePeer.String()),
)
// Ensure stream is closed to prevent resource leaks
defer s.Close()
defer func() {
if err := s.Close(); err != nil {
logger().NamedLogger.Error(logger_ctx, "Failed to close SubmitMessageStream", err)
}
}()

logger().NamedLogger.Info(spanCtx, "ListenerHandler.HandleSubmitMessageStream CALLED",
ion.String("remote_peer_id", remotePeer.String()),
Expand Down Expand Up @@ -173,15 +177,23 @@ func (lh *ListenerHandler) HandleSubmitMessageStream(logger_ctx context.Context,
ion.String("topic", TOPIC),
ion.String("function", "MessagePassing.HandleSubmitMessageStream"))
lh.handleBFTRequest(spanCtx, s, message)
defer s.Close()
defer func() {
if err := s.Close(); err != nil {
logger().NamedLogger.Error(logger_ctx, "Failed to close SubmitMessageStream", err)
}
}()
case config.Type_SubmitVote:
logger().NamedLogger.Info(spanCtx, "Handling Type_SubmitVote",
ion.String("created_at", time.Now().UTC().Format(time.RFC3339)),
ion.String("log_file", LOG_FILE),
ion.String("topic", TOPIC),
ion.String("function", "MessagePassing.HandleSubmitMessageStream"))
lh.handleSubmitVote(spanCtx, s, message)
defer s.Close()
defer func() {
if err := s.Close(); err != nil {
logger().NamedLogger.Error(logger_ctx, "Failed to close SubmitMessageStream", err)
}
}()
case config.Type_AskForSubscription:
logger().NamedLogger.Info(spanCtx, "Handling Type_AskForSubscription",
ion.String("created_at", time.Now().UTC().Format(time.RFC3339)),
Expand All @@ -196,15 +208,23 @@ func (lh *ListenerHandler) HandleSubmitMessageStream(logger_ctx context.Context,
ion.String("topic", TOPIC),
ion.String("function", "MessagePassing.HandleSubmitMessageStream"))
lh.handleSubscriptionResponse(spanCtx, s, message)
defer s.Close()
defer func() {
if err := s.Close(); err != nil {
logger().NamedLogger.Error(logger_ctx, "Failed to close SubmitMessageStream", err)
}
}()
case config.Type_VoteResult:
logger().NamedLogger.Info(spanCtx, "Handling Type_VoteResult",
ion.String("created_at", time.Now().UTC().Format(time.RFC3339)),
ion.String("log_file", LOG_FILE),
ion.String("topic", TOPIC),
ion.String("function", "MessagePassing.HandleSubmitMessageStream"))
lh.handleVoteResultRequest(spanCtx, s, message)
defer s.Close()
defer func() {
if err := s.Close(); err != nil {
logger().NamedLogger.Error(logger_ctx, "Failed to close SubmitMessageStream", err)
}
}()
default:
span.SetAttributes(attribute.String("status", "unknown_message_type"))
logger().NamedLogger.Error(spanCtx, "Unknown message type",
Expand All @@ -215,7 +235,11 @@ func (lh *ListenerHandler) HandleSubmitMessageStream(logger_ctx context.Context,
ion.String("log_file", LOG_FILE),
ion.String("topic", TOPIC),
ion.String("function", "MessagePassing.HandleSubmitMessageStream"))
defer s.Close()
defer func() {
if err := s.Close(); err != nil {
logger().NamedLogger.Error(logger_ctx, "Failed to close SubmitMessageStream", err)
}
}()
}

duration := time.Since(startTime).Seconds()
Expand Down Expand Up @@ -608,7 +632,11 @@ func (lh *ListenerHandler) runBFTConsensusFlow(logger_ctx context.Context, conte
lh.sendBFTResultToSequencer(consensusSpanCtx, bftCtx.Round, bftCtx.BlockHash, myBuddyID, false, "REJECT", fmt.Sprintf("Failed to create adapter: %v", err))
return
}
defer adapter.Close()
defer func() {
if err := adapter.Close(); err != nil {
logger().NamedLogger.Error(logger_ctx, "Failed to close BFT adapter", err)
}
}()

logger().NamedLogger.Info(consensusSpanCtx, "BFT adapter created successfully",
ion.String("context_key", contextKey),
Expand Down Expand Up @@ -865,7 +893,11 @@ func (lh *ListenerHandler) sendBFTResultToSequencer(
ion.String("function", "MessagePassing.sendBFTResultToSequencer"))
return
}
defer stream.Close()
defer func() {
if err := stream.Close(); err != nil {
logger().NamedLogger.Error(logger_ctx, "Failed to close stream to Sequencer", err)
}
}()

// Send message
messageBytes, err := json.Marshal(message)
Expand Down Expand Up @@ -1400,7 +1432,9 @@ func (lh *ListenerHandler) sendSubscriptionResponse(logger_ctx context.Context,
ion.String("log_file", LOG_FILE),
ion.String("topic", TOPIC),
ion.String("function", "MessagePassing.sendSubscriptionResponse"))
s.Close()
if err := s.Close(); err != nil {
logger().NamedLogger.Warn(logger_ctx, "Failed to close stream", ion.String("error", err.Error()))
}
return
}

Expand All @@ -1419,7 +1453,9 @@ func (lh *ListenerHandler) sendSubscriptionResponse(logger_ctx context.Context,
ion.String("log_file", LOG_FILE),
ion.String("topic", TOPIC),
ion.String("function", "MessagePassing.sendSubscriptionResponse"))
s.Close()
if err := s.Close(); err != nil {
logger().NamedLogger.Warn(logger_ctx, "Failed to close stream", ion.String("error", err.Error()))
}
return
}

Expand All @@ -1432,7 +1468,9 @@ func (lh *ListenerHandler) sendSubscriptionResponse(logger_ctx context.Context,
ion.String("error", err.Error()),
ion.String("remote_peer_id", remotePeer.String()),
ion.String("function", "MessagePassing.sendSubscriptionResponse"))
s.Close()
if err := s.Close(); err != nil {
logger().NamedLogger.Warn(logger_ctx, "Failed to close stream", ion.String("error", err.Error()))
}
return
}
}
Expand All @@ -1442,7 +1480,9 @@ func (lh *ListenerHandler) sendSubscriptionResponse(logger_ctx context.Context,
// This is critical for preventing stream reset errors after prolonged runtime
time.Sleep(200 * time.Millisecond)

s.Close()
if err := s.Close(); err != nil {
logger().NamedLogger.Warn(logger_ctx, "Failed to close stream", ion.String("error", err.Error()))
}

duration := time.Since(startTime).Seconds()
sendSpan.SetAttributes(attribute.Float64("duration", duration), attribute.Int("bytes_written", bytesWritten), attribute.String("status", "success"))
Expand Down Expand Up @@ -1709,7 +1749,11 @@ func (lh *ListenerHandler) TriggerForBFTFromSequencer(s network.Stream, message
return
}
}
defer s.Close()
defer func() {
if err := s.Close(); err != nil {
fmt.Printf("Failed to close stream: %v\n", err)
}
}()

fmt.Println("📩 Received BFT trigger from Sequencer:", message)

Expand Down Expand Up @@ -1833,7 +1877,9 @@ func (lh *ListenerHandler) TriggerForBFTFromSequencer(s network.Stream, message
return nil
}
defer func() {
stream.Close()
if err := stream.Close(); err != nil {
fmt.Printf("Failed to close stream to %s: %v\n", buddyID, err)
}
fmt.Printf("🔌 Closed stream to %s\n", buddyID)
}()

Expand Down
24 changes: 20 additions & 4 deletions AVC/BuddyNodes/MessagePassing/MessageListener.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,11 @@ func (StructListenerNode *StructListener) HandleSubmitMessageStream(logger_ctx c
remotePeer := s.Conn().RemotePeer()
span.SetAttributes(attribute.String("remote_peer_id", remotePeer.String()))
// Ensure stream is closed to prevent resource leaks
defer s.Close()
defer func() {
if err := s.Close(); err != nil {
logger().NamedLogger.Error(logger_ctx, "Failed to close message stream", err)
}
}()

logger().NamedLogger.Info(spanCtx, "StructListener.HandleSubmitMessageStream called",
ion.String("remote_peer_id", remotePeer.String()),
Expand Down Expand Up @@ -471,7 +475,11 @@ func (StructListenerNode *StructListener) SendMessageToPeer(logger_ctx context.C
}
// CRITICAL FIX: Ensure stream is always closed to prevent file descriptor leaks
// particularly when read timeouts occur (lines 503-517)
defer stream.Close()
defer func() {
if err := stream.Close(); err != nil {
logger().NamedLogger.Error(logger_ctx, "Failed to close SubmitMessageStream", err)
}
}()

sendSpan.SetAttributes(attribute.String("connection_method", "direct"))

Expand Down Expand Up @@ -681,7 +689,11 @@ func (StructListenerNode *StructListener) sendViaSeedNode(logger_ctx context.Con
seedSpan.SetAttributes(attribute.Float64("duration", duration))
return fmt.Errorf("failed to create stream to %s: %v", peerID, err)
}
defer stream.Close()
defer func() {
if err := stream.Close(); err != nil {
logger().NamedLogger.Error(logger_ctx, "Failed to close SubmitMessageStream via seed node", err)
}
}()

// Send the message
_, err = stream.Write([]byte(message + string(rune(config.Delimiter))))
Expand Down Expand Up @@ -790,7 +802,11 @@ func (StructListenerNode *StructListener) getPeerInfoFromSeedNode(logger_ctx con
peerInfoSpan.SetAttributes(attribute.Float64("duration", duration))
return nil, fmt.Errorf("failed to create seed node client: %v", err)
}
defer client.Close()
defer func() {
if err := client.Close(); err != nil {
logger().NamedLogger.Error(logger_ctx, "Failed to close seed node client", err)
}
}()

// Get peer record from seed node
peerRecord, err := client.GetPeer(peerID.String())
Expand Down
4 changes: 3 additions & 1 deletion AVC/BuddyNodes/MessagePassing/Streamcache_Builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,9 @@ func (sc *StructStreamCache) addEntry(peerID peer.ID, stream network.Stream) {
// removeEntry removes a stream entry from the cache
func (sc *StructStreamCache) removeEntry(peerID peer.ID) {
if entry, exists := sc.StreamCache.Streams[peerID]; exists {
entry.Stream.Close()
if err := entry.Stream.Close(); err != nil {
fmt.Printf("Failed to close stream for peer %s: %v\n", peerID, err)
}
delete(sc.StreamCache.Streams, peerID)
sc.removeFromOrder(peerID)
}
Expand Down
6 changes: 5 additions & 1 deletion AVC/NodeSelection/pkg/selection/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,11 @@ func GetBuddyNodes(
return nil, err
}
// fmt.Printf("Debugging 1\n")
defer peerClient.Close()
defer func() {
if closeErr := peerClient.Close(); closeErr != nil {
fmt.Println("⚠️ Failed to close peer client:", closeErr)
}
}()

fmt.Println("📡 Connected to peer directory at", peerDirAddress)

Expand Down
4 changes: 3 additions & 1 deletion Block/gRPCclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -882,7 +882,9 @@ func InitMempoolClient(logger_ctx context.Context, address string) error {
// CloseMempoolClient closes the global mempool client
func CloseMempoolClient() {
if globalMempoolClient != nil {
globalMempoolClient.Close()
if err := globalMempoolClient.Close(); err != nil {
fmt.Printf("Failed to close global mempool client: %v\n", err)
}
globalMempoolClient = nil
}
}
Expand Down
6 changes: 5 additions & 1 deletion CA/ImmuDB_CA/ImmuDB_CA.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,10 @@ func writePEM(path, blockType string, derBytes []byte) error {
if err != nil {
return fmt.Errorf("open %q: %w", path, err)
}
defer f.Close()
defer func() {
if closeErr := f.Close(); closeErr != nil {
fmt.Printf("Error closing PEM file %q: %v\n", path, closeErr)
}
}()
return pem.Encode(f, &pem.Block{Type: blockType, Bytes: derBytes})
}
18 changes: 15 additions & 3 deletions CLI/CLI.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,11 @@ func (h *CommandHandler) handleSeedNodeStats(parts []string) {
fmt.Println("💡 Check if the seed node is running and accessible.")
return
}
defer client.Close()
defer func() {
if closeErr := client.Close(); closeErr != nil {
fmt.Printf("⚠️ Failed to close seed node client: %v\n", closeErr)
}
}()

fmt.Println("✅ Successfully connected to seed node!")

Expand Down Expand Up @@ -898,7 +902,11 @@ func (h *CommandHandler) handleListAliases() {
fmt.Printf("❌ Failed to connect to seed node: %v\n", err)
return
}
defer client.Close()
defer func() {
if closeErr := client.Close(); closeErr != nil {
fmt.Printf("⚠️ Failed to close client: %v\n", closeErr)
}
}()

// Get current node's peer ID
currentPeerID := h.Node.Host.ID().String()
Expand Down Expand Up @@ -960,7 +968,11 @@ func (h *CommandHandler) handleDiscoverNeighbors() {
fmt.Printf("❌ Failed to connect to seed node: %v\n", err)
return
}
defer client.Close()
defer func() {
if closeErr := client.Close(); closeErr != nil {
fmt.Printf("⚠️ Failed to close client: %v\n", closeErr)
}
}()

// Perform neighbor discovery
err = client.DiscoverAndAddNeighbors(h.Node.Host, h.NodeManager)
Expand Down
Loading