Skip to content

Commit 4131a8e

Browse files
authored
Merge pull request #10330 from Roasbeef/fix-error-handling-gossiper
discovery: fix potential infinite loop bug re context cancel error handling in gossip syncer
2 parents c7c8073 + adaa893 commit 4131a8e

File tree

3 files changed

+255
-12
lines changed

3 files changed

+255
-12
lines changed

discovery/syncer.go

Lines changed: 41 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -492,15 +492,19 @@ func (g *GossipSyncer) Stop() {
492492

493493
// handleSyncingChans handles the state syncingChans for the GossipSyncer. When
494494
// in this state, we will send a QueryChannelRange msg to our peer and advance
495-
// the syncer's state to waitingQueryRangeReply.
496-
func (g *GossipSyncer) handleSyncingChans(ctx context.Context) {
495+
// the syncer's state to waitingQueryRangeReply. Returns an error if a fatal
496+
// error occurs that should cause the goroutine to exit.
497+
func (g *GossipSyncer) handleSyncingChans(ctx context.Context) error {
497498
// Prepare the query msg.
498499
queryRangeMsg, err := g.genChanRangeQuery(
499500
ctx, g.genHistoricalChanRangeQuery,
500501
)
501502
if err != nil {
502503
log.Errorf("Unable to gen chan range query: %v", err)
503-
return
504+
505+
// Any error here is likely fatal (context cancelled, db error,
506+
// etc.), so return it to exit the goroutine cleanly.
507+
return err
504508
}
505509

506510
// Acquire a lock so the following state transition is atomic.
@@ -517,12 +521,18 @@ func (g *GossipSyncer) handleSyncingChans(ctx context.Context) {
517521
err = g.sendToPeer(ctx, queryRangeMsg)
518522
if err != nil {
519523
log.Errorf("Unable to send chan range query: %v", err)
520-
return
524+
525+
// Any send error (peer exiting, connection closed, rate
526+
// limiter signaling exit, etc.) is fatal, so return it to
527+
// exit the goroutine cleanly.
528+
return err
521529
}
522530

523531
// With the message sent successfully, we'll transition into the next
524532
// state where we wait for their reply.
525533
g.setSyncState(waitingQueryRangeReply)
534+
535+
return nil
526536
}
527537

528538
// channelGraphSyncer is the main goroutine responsible for ensuring that we
@@ -545,7 +555,14 @@ func (g *GossipSyncer) channelGraphSyncer(ctx context.Context) {
545555
// understand, as we'll as responding to any other queries by
546556
// them.
547557
case syncingChans:
548-
g.handleSyncingChans(ctx)
558+
err := g.handleSyncingChans(ctx)
559+
if err != nil {
560+
log.Debugf("GossipSyncer(%x): exiting due to "+
561+
"error in syncingChans: %v",
562+
g.cfg.peerPub[:], err)
563+
564+
return
565+
}
549566

550567
// In this state, we've sent out our initial channel range
551568
// query and are waiting for the final response from the remote
@@ -593,7 +610,14 @@ func (g *GossipSyncer) channelGraphSyncer(ctx context.Context) {
593610
// First, we'll attempt to continue our channel
594611
// synchronization by continuing to send off another
595612
// query chunk.
596-
done := g.synchronizeChanIDs(ctx)
613+
done, err := g.synchronizeChanIDs(ctx)
614+
if err != nil {
615+
log.Debugf("GossipSyncer(%x): exiting due to "+
616+
"error in queryNewChannels: %v",
617+
g.cfg.peerPub[:], err)
618+
619+
return
620+
}
597621

598622
// If this wasn't our last query, then we'll need to
599623
// transition to our waiting state.
@@ -819,16 +843,18 @@ func (g *GossipSyncer) sendGossipTimestampRange(ctx context.Context,
819843
// range. This method will be called continually until the entire range has
820844
// been queried for with a response received. We'll chunk our requests as
821845
// required to ensure they fit into a single message. We may re-renter this
822-
// state in the case that chunking is required.
823-
func (g *GossipSyncer) synchronizeChanIDs(ctx context.Context) bool {
846+
// state in the case that chunking is required. Returns true if synchronization
847+
// is complete, and an error if a fatal error occurs that should cause the
848+
// goroutine to exit.
849+
func (g *GossipSyncer) synchronizeChanIDs(ctx context.Context) (bool, error) {
824850
// If we're in this state yet there are no more new channels to query
825851
// for, then we'll transition to our final synced state and return true
826852
// to signal that we're fully synchronized.
827853
if len(g.newChansToQuery) == 0 {
828854
log.Infof("GossipSyncer(%x): no more chans to query",
829855
g.cfg.peerPub[:])
830856

831-
return true
857+
return true, nil
832858
}
833859

834860
// Otherwise, we'll issue our next chunked query to receive replies
@@ -864,9 +890,14 @@ func (g *GossipSyncer) synchronizeChanIDs(ctx context.Context) bool {
864890
})
865891
if err != nil {
866892
log.Errorf("Unable to sync chan IDs: %v", err)
893+
894+
// Any send error (peer exiting, connection closed, rate
895+
// limiter signaling exit, etc.) is fatal, so return it to
896+
// exit the goroutine cleanly.
897+
return false, err
867898
}
868899

869-
return false
900+
return false, nil
870901
}
871902

872903
// isLegacyReplyChannelRange determines where a ReplyChannelRange message is

discovery/syncer_test.go

Lines changed: 211 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"github.com/btcsuite/btcd/chaincfg/chainhash"
1717
"github.com/davecgh/go-spew/spew"
1818
graphdb "github.com/lightningnetwork/lnd/graph/db"
19+
"github.com/lightningnetwork/lnd/lnpeer"
1920
"github.com/lightningnetwork/lnd/lnwire"
2021
"github.com/stretchr/testify/require"
2122
)
@@ -229,9 +230,111 @@ func newTestSyncer(hID lnwire.ShortChannelID,
229230

230231
syncer := newGossipSyncer(cfg, syncerSema)
231232

233+
//nolint:forcetypeassert
232234
return msgChan, syncer, cfg.channelSeries.(*mockChannelGraphTimeSeries)
233235
}
234236

237+
// errorInjector provides thread-safe error injection for test syncers and
238+
// tracks the number of send attempts to detect endless loops.
239+
type errorInjector struct {
240+
mu sync.Mutex
241+
err error
242+
attemptCount int
243+
}
244+
245+
// setError sets the error that will be returned by sendMsg calls.
246+
func (ei *errorInjector) setError(err error) {
247+
ei.mu.Lock()
248+
defer ei.mu.Unlock()
249+
ei.err = err
250+
}
251+
252+
// getError retrieves the current error in a thread-safe manner and increments
253+
// the attempt counter.
254+
func (ei *errorInjector) getError() error {
255+
ei.mu.Lock()
256+
defer ei.mu.Unlock()
257+
ei.attemptCount++
258+
259+
return ei.err
260+
}
261+
262+
// getAttemptCount returns the number of times sendMsg was called.
263+
func (ei *errorInjector) getAttemptCount() int {
264+
ei.mu.Lock()
265+
defer ei.mu.Unlock()
266+
return ei.attemptCount
267+
}
268+
269+
// newErrorInjectingSyncer creates a GossipSyncer with controllable error
270+
// injection for testing error handling. The returned errorInjector can be used
271+
// to inject errors into sendMsg calls.
272+
func newErrorInjectingSyncer(hID lnwire.ShortChannelID, chunkSize int32) (
273+
*GossipSyncer, *errorInjector, chan []lnwire.Message) {
274+
275+
ei := &errorInjector{}
276+
msgChan := make(chan []lnwire.Message, 20)
277+
278+
cfg := gossipSyncerCfg{
279+
channelSeries: newMockChannelGraphTimeSeries(hID),
280+
encodingType: defaultEncoding,
281+
chunkSize: chunkSize,
282+
batchSize: chunkSize,
283+
noSyncChannels: false,
284+
noReplyQueries: true,
285+
noTimestampQueryOption: false,
286+
sendMsg: func(_ context.Context, _ bool,
287+
msgs ...lnwire.Message) error {
288+
289+
// Check if we should inject an error.
290+
if err := ei.getError(); err != nil {
291+
return err
292+
}
293+
294+
msgChan <- msgs
295+
return nil
296+
},
297+
bestHeight: func() uint32 {
298+
return latestKnownHeight
299+
},
300+
markGraphSynced: func() {},
301+
maxQueryChanRangeReplies: maxQueryChanRangeReplies,
302+
timestampQueueSize: 10,
303+
}
304+
305+
syncerSema := make(chan struct{}, 1)
306+
syncerSema <- struct{}{}
307+
308+
syncer := newGossipSyncer(cfg, syncerSema)
309+
310+
return syncer, ei, msgChan
311+
}
312+
313+
// assertSyncerExitsCleanly verifies that a syncer stops cleanly within the
314+
// given timeout. This is used to ensure error handling doesn't cause endless
315+
// loops.
316+
func assertSyncerExitsCleanly(t *testing.T, syncer *GossipSyncer,
317+
timeout time.Duration) {
318+
319+
t.Helper()
320+
321+
stopChan := make(chan struct{})
322+
go func() {
323+
syncer.Stop()
324+
close(stopChan)
325+
}()
326+
327+
select {
328+
case <-stopChan:
329+
// Success - syncer stopped cleanly.
330+
case <-time.After(timeout):
331+
t.Fatal(
332+
"syncer did not stop within timeout - possible " +
333+
"endless loop",
334+
)
335+
}
336+
}
337+
235338
// TestGossipSyncerFilterGossipMsgsNoHorizon tests that if the remote peer
236339
// doesn't have a horizon set, then we won't send any incoming messages to it.
237340
func TestGossipSyncerFilterGossipMsgsNoHorizon(t *testing.T) {
@@ -1518,7 +1621,8 @@ func TestGossipSyncerSynchronizeChanIDs(t *testing.T) {
15181621

15191622
for i := 0; i < chunkSize*2; i += 2 {
15201623
// With our set up complete, we'll request a sync of chan ID's.
1521-
done := syncer.synchronizeChanIDs(t.Context())
1624+
done, err := syncer.synchronizeChanIDs(t.Context())
1625+
require.NoError(t, err)
15221626

15231627
// At this point, we shouldn't yet be done as only 2 items
15241628
// should have been queried for.
@@ -1565,7 +1669,8 @@ func TestGossipSyncerSynchronizeChanIDs(t *testing.T) {
15651669
}
15661670

15671671
// If we issue another query, the syncer should tell us that it's done.
1568-
done := syncer.synchronizeChanIDs(t.Context())
1672+
done, err := syncer.synchronizeChanIDs(t.Context())
1673+
require.NoError(t, err)
15691674
if done {
15701675
t.Fatalf("syncer should be finished!")
15711676
}
@@ -2409,3 +2514,107 @@ func TestGossipSyncerMaxChannelRangeReplies(t *testing.T) {
24092514
},
24102515
}, nil))
24112516
}
2517+
2518+
// TestGossipSyncerStateHandlerErrors tests that errors in state handlers cause
2519+
// the channelGraphSyncer goroutine to exit cleanly without endless retry loops.
2520+
// This is a table-driven test covering various error types and states.
2521+
func TestGossipSyncerStateHandlerErrors(t *testing.T) {
2522+
t.Parallel()
2523+
2524+
tests := []struct {
2525+
name string
2526+
state syncerState
2527+
setupState func(*GossipSyncer)
2528+
chunkSize int32
2529+
injectedErr error
2530+
}{
2531+
{
2532+
name: "context cancel during syncingChans",
2533+
state: syncingChans,
2534+
chunkSize: defaultChunkSize,
2535+
injectedErr: context.Canceled,
2536+
setupState: func(s *GossipSyncer) {},
2537+
},
2538+
{
2539+
name: "peer exit during syncingChans",
2540+
state: syncingChans,
2541+
chunkSize: defaultChunkSize,
2542+
injectedErr: lnpeer.ErrPeerExiting,
2543+
setupState: func(s *GossipSyncer) {},
2544+
},
2545+
{
2546+
name: "context cancel during queryNewChannels",
2547+
state: queryNewChannels,
2548+
chunkSize: 2,
2549+
injectedErr: context.Canceled,
2550+
setupState: func(s *GossipSyncer) {
2551+
s.newChansToQuery = []lnwire.ShortChannelID{
2552+
lnwire.NewShortChanIDFromInt(1),
2553+
lnwire.NewShortChanIDFromInt(2),
2554+
lnwire.NewShortChanIDFromInt(3),
2555+
}
2556+
},
2557+
},
2558+
{
2559+
name: "network error during queryNewChannels",
2560+
state: queryNewChannels,
2561+
chunkSize: 2,
2562+
injectedErr: errors.New("connection closed"),
2563+
setupState: func(s *GossipSyncer) {
2564+
s.newChansToQuery = []lnwire.ShortChannelID{
2565+
lnwire.NewShortChanIDFromInt(1),
2566+
lnwire.NewShortChanIDFromInt(2),
2567+
}
2568+
},
2569+
},
2570+
}
2571+
2572+
for _, tt := range tests {
2573+
tt := tt
2574+
t.Run(tt.name, func(t *testing.T) {
2575+
t.Parallel()
2576+
2577+
// Create syncer with error injection capability.
2578+
hID := lnwire.NewShortChanIDFromInt(10)
2579+
syncer, errInj, _ := newErrorInjectingSyncer(
2580+
hID, tt.chunkSize,
2581+
)
2582+
2583+
// Set up the initial state and any required state data.
2584+
syncer.setSyncState(tt.state)
2585+
tt.setupState(syncer)
2586+
2587+
// Inject the error that should cause the goroutine to
2588+
// exit.
2589+
errInj.setError(tt.injectedErr)
2590+
2591+
// Start the syncer which spawns the channelGraphSyncer
2592+
// goroutine.
2593+
syncer.Start()
2594+
2595+
// Wait long enough that an endless loop would
2596+
// accumulate many attempts. With the fix, we should
2597+
// only see 1-3 attempts. Without the fix, we'd see
2598+
// 50-100+ attempts.
2599+
time.Sleep(500 * time.Millisecond)
2600+
2601+
// Check how many send attempts were made. This verifies
2602+
// that the state handler doesn't loop endlessly.
2603+
attemptCount := errInj.getAttemptCount()
2604+
require.GreaterOrEqual(
2605+
t, attemptCount, 1,
2606+
"state handler was not called - test "+
2607+
"setup issue",
2608+
)
2609+
require.LessOrEqual(
2610+
t, attemptCount, 5,
2611+
"too many attempts (%d) - endless loop "+
2612+
"not fixed",
2613+
attemptCount,
2614+
)
2615+
2616+
// Verify the syncer exits cleanly without hanging.
2617+
assertSyncerExitsCleanly(t, syncer, 2*time.Second)
2618+
})
2619+
}
2620+
}

docs/release-notes/release-notes-0.20.0.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,9 @@
5656
utxonursery (the legacy sweeper) where htlcs with a locktime of 0 would not
5757
be swept.
5858

59+
- [Fixed a bug](https://github.com/lightningnetwork/lnd/pull/10330) to ensure that goroutine resources are properly freed in the case
60+
of a disconnection or other failure event.
61+
5962
# New Features
6063

6164
* Use persisted [nodeannouncement](https://github.com/lightningnetwork/lnd/pull/8825)

0 commit comments

Comments
 (0)