@@ -16,6 +16,7 @@ import (
1616 "github.com/btcsuite/btcd/chaincfg/chainhash"
1717 "github.com/davecgh/go-spew/spew"
1818 graphdb "github.com/lightningnetwork/lnd/graph/db"
19+ "github.com/lightningnetwork/lnd/lnpeer"
1920 "github.com/lightningnetwork/lnd/lnwire"
2021 "github.com/stretchr/testify/require"
2122)
@@ -232,6 +233,106 @@ func newTestSyncer(hID lnwire.ShortChannelID,
232233 return msgChan , syncer , cfg .channelSeries .(* mockChannelGraphTimeSeries )
233234}
234235
236+ // errorInjector provides thread-safe error injection for test syncers and
237+ // tracks the number of send attempts to detect endless loops.
238+ type errorInjector struct {
239+ mu sync.Mutex
240+ err error
241+ attemptCount int
242+ }
243+
244+ // setError sets the error that will be returned by sendMsg calls.
245+ func (ei * errorInjector ) setError (err error ) {
246+ ei .mu .Lock ()
247+ defer ei .mu .Unlock ()
248+ ei .err = err
249+ }
250+
251+ // getError retrieves the current error in a thread-safe manner and increments
252+ // the attempt counter.
253+ func (ei * errorInjector ) getError () error {
254+ ei .mu .Lock ()
255+ defer ei .mu .Unlock ()
256+ ei .attemptCount ++
257+ return ei .err
258+ }
259+
260+ // getAttemptCount returns the number of times sendMsg was called.
261+ func (ei * errorInjector ) getAttemptCount () int {
262+ ei .mu .Lock ()
263+ defer ei .mu .Unlock ()
264+ return ei .attemptCount
265+ }
266+
267+ // newErrorInjectingSyncer creates a GossipSyncer with controllable error
268+ // injection for testing error handling. The returned errorInjector can be used
269+ // to inject errors into sendMsg calls.
270+ func newErrorInjectingSyncer (hID lnwire.ShortChannelID ,
271+ chunkSize int32 ) (* GossipSyncer , * errorInjector , chan []lnwire.Message ) {
272+
273+ ei := & errorInjector {}
274+ msgChan := make (chan []lnwire.Message , 20 )
275+
276+ cfg := gossipSyncerCfg {
277+ channelSeries : newMockChannelGraphTimeSeries (hID ),
278+ encodingType : defaultEncoding ,
279+ chunkSize : chunkSize ,
280+ batchSize : chunkSize ,
281+ noSyncChannels : false ,
282+ noReplyQueries : true ,
283+ noTimestampQueryOption : false ,
284+ sendMsg : func (_ context.Context , _ bool ,
285+ msgs ... lnwire.Message ) error {
286+
287+ // Check if we should inject an error.
288+ if err := ei .getError (); err != nil {
289+ return err
290+ }
291+
292+ msgChan <- msgs
293+ return nil
294+ },
295+ bestHeight : func () uint32 {
296+ return latestKnownHeight
297+ },
298+ markGraphSynced : func () {},
299+ maxQueryChanRangeReplies : maxQueryChanRangeReplies ,
300+ timestampQueueSize : 10 ,
301+ }
302+
303+ syncerSema := make (chan struct {}, 1 )
304+ syncerSema <- struct {}{}
305+
306+ syncer := newGossipSyncer (cfg , syncerSema )
307+
308+ return syncer , ei , msgChan
309+ }
310+
311+ // assertSyncerExitsCleanly verifies that a syncer stops cleanly within the
312+ // given timeout. This is used to ensure error handling doesn't cause endless
313+ // loops.
314+ func assertSyncerExitsCleanly (t * testing.T , syncer * GossipSyncer ,
315+ timeout time.Duration ) {
316+
317+ t .Helper ()
318+
319+ stopChan := make (chan struct {})
320+ go func () {
321+ syncer .Stop ()
322+ close (stopChan )
323+ }()
324+
325+ select {
326+ case <- stopChan :
327+ // Success - syncer stopped cleanly.
328+ case <- time .After (timeout ):
329+ t .Fatal (
330+ "syncer did not stop within timeout - possible " +
331+ "endless loop" ,
332+ )
333+ }
334+ }
335+
235336// TestGossipSyncerFilterGossipMsgsNoHorizon tests that if the remote peer
236337// doesn't have a horizon set, then we won't send any incoming messages to it.
237338func TestGossipSyncerFilterGossipMsgsNoHorizon (t * testing.T ) {
@@ -2411,3 +2512,200 @@ func TestGossipSyncerMaxChannelRangeReplies(t *testing.T) {
24112512 },
24122513 }, nil ))
24132514}
2515+
2516+ // TestGossipSyncerStateHandlerErrors tests that errors in state handlers cause
2517+ // the channelGraphSyncer goroutine to exit cleanly without endless retry loops.
2518+ // This is a table-driven test covering various error types and states.
2519+ func TestGossipSyncerStateHandlerErrors (t * testing.T ) {
2520+ t .Parallel ()
2521+
2522+ tests := []struct {
2523+ name string
2524+ state syncerState
2525+ setupState func (* GossipSyncer )
2526+ chunkSize int32
2527+ injectedErr error
2528+ }{
2529+ {
2530+ name : "context cancel during syncingChans" ,
2531+ state : syncingChans ,
2532+ chunkSize : defaultChunkSize ,
2533+ injectedErr : context .Canceled ,
2534+ setupState : func (s * GossipSyncer ) {},
2535+ },
2536+ {
2537+ name : "peer exit during syncingChans" ,
2538+ state : syncingChans ,
2539+ chunkSize : defaultChunkSize ,
2540+ injectedErr : lnpeer .ErrPeerExiting ,
2541+ setupState : func (s * GossipSyncer ) {},
2542+ },
2543+ {
2544+ name : "context cancel during queryNewChannels" ,
2545+ state : queryNewChannels ,
2546+ chunkSize : 2 ,
2547+ injectedErr : context .Canceled ,
2548+ setupState : func (s * GossipSyncer ) {
2549+ s .newChansToQuery = []lnwire.ShortChannelID {
2550+ lnwire .NewShortChanIDFromInt (1 ),
2551+ lnwire .NewShortChanIDFromInt (2 ),
2552+ lnwire .NewShortChanIDFromInt (3 ),
2553+ }
2554+ },
2555+ },
2556+ {
2557+ name : "network error during queryNewChannels" ,
2558+ state : queryNewChannels ,
2559+ chunkSize : 2 ,
2560+ injectedErr : errors .New ("connection closed" ),
2561+ setupState : func (s * GossipSyncer ) {
2562+ s .newChansToQuery = []lnwire.ShortChannelID {
2563+ lnwire .NewShortChanIDFromInt (1 ),
2564+ lnwire .NewShortChanIDFromInt (2 ),
2565+ }
2566+ },
2567+ },
2568+ }
2569+
2570+ for _ , tt := range tests {
2571+ tt := tt
2572+ t .Run (tt .name , func (t * testing.T ) {
2573+ t .Parallel ()
2574+
2575+ // Create syncer with error injection capability.
2576+ hID := lnwire .NewShortChanIDFromInt (10 )
2577+ syncer , errInj , _ := newErrorInjectingSyncer (
2578+ hID , tt .chunkSize ,
2579+ )
2580+
2581+ // Set up the initial state and any required state data.
2582+ syncer .setSyncState (tt .state )
2583+ tt .setupState (syncer )
2584+
2585+ // Inject the error that should cause the goroutine to
2586+ // exit.
2587+ errInj .setError (tt .injectedErr )
2588+
2589+ // Start the syncer which spawns the channelGraphSyncer
2590+ // goroutine.
2591+ syncer .Start ()
2592+
2593+ // Wait long enough that an endless loop would
2594+ // accumulate many attempts. With the fix, we should
2595+ // only see 1-3 attempts. Without the fix, we'd see
2596+ // 50-100+ attempts.
2597+ time .Sleep (500 * time .Millisecond )
2598+
2599+ // Check how many send attempts were made. This verifies
2600+ // that the state handler doesn't loop endlessly.
2601+ attemptCount := errInj .getAttemptCount ()
2602+ require .GreaterOrEqual (
2603+ t , attemptCount , 1 ,
2604+ "state handler was not called - test " +
2605+ "setup issue" ,
2606+ )
2607+ require .LessOrEqual (
2608+ t , attemptCount , 5 ,
2609+ "too many attempts (%d) - endless loop " +
2610+ "not fixed" ,
2611+ attemptCount ,
2612+ )
2613+
2614+ // Verify the syncer exits cleanly without hanging.
2615+ assertSyncerExitsCleanly (t , syncer , 2 * time .Second )
2616+ })
2617+ }
2618+ }
2619+
2620+ // TestGossipSyncerProcessChanRangeReplyError tests that errors from
2621+ // processChanRangeReply cause the goroutine to exit cleanly. This test uses
2622+ // a different approach than the state handler tests because the
2623+ // waitingQueryRangeReply state waits in a select rather than looping.
2624+ func TestGossipSyncerProcessChanRangeReplyError (t * testing.T ) {
2625+ t .Parallel ()
2626+
2627+ hID := lnwire .NewShortChanIDFromInt (10 )
2628+ msgChan := make (chan []lnwire.Message , 20 )
2629+
2630+ // Create custom channel series.
2631+ chanSeries := newMockChannelGraphTimeSeries (hID )
2632+
2633+ cfg := gossipSyncerCfg {
2634+ channelSeries : chanSeries ,
2635+ encodingType : defaultEncoding ,
2636+ chunkSize : defaultChunkSize ,
2637+ batchSize : defaultChunkSize ,
2638+ noSyncChannels : false ,
2639+ noReplyQueries : true ,
2640+ noTimestampQueryOption : false ,
2641+ sendMsg : func (_ context.Context , _ bool ,
2642+ msgs ... lnwire.Message ) error {
2643+
2644+ msgChan <- msgs
2645+ return nil
2646+ },
2647+ bestHeight : func () uint32 {
2648+ return latestKnownHeight
2649+ },
2650+ markGraphSynced : func () {},
2651+ maxQueryChanRangeReplies : maxQueryChanRangeReplies ,
2652+ timestampQueueSize : 10 ,
2653+ }
2654+
2655+ syncerSema := make (chan struct {}, 1 )
2656+ syncerSema <- struct {}{}
2657+
2658+ syncer := newGossipSyncer (cfg , syncerSema )
2659+
2660+ // Start the syncer which will send a QueryChannelRange message.
2661+ syncer .Start ()
2662+ defer syncer .Stop ()
2663+
2664+ // Wait for the syncer to send its QueryChannelRange message.
2665+ var query * lnwire.QueryChannelRange
2666+ select {
2667+ case msgs := <- msgChan :
2668+ require .Len (t , msgs , 1 )
2669+ var ok bool
2670+ query , ok = msgs [0 ].(* lnwire.QueryChannelRange )
2671+ require .True (t , ok , "expected QueryChannelRange message" )
2672+ case <- time .After (time .Second ):
2673+ t .Fatal ("expected query message" )
2674+ }
2675+
2676+ // Send 10 malformed replies. With the fix, only the first one should
2677+ // be processed before the goroutine exits. Without the fix, all would
2678+ // be processed (goroutine stays in waitingQueryRangeReply state).
2679+ for i := 0 ; i < 10 ; i ++ {
2680+ malformedReply := & lnwire.ReplyChannelRange {
2681+ ChainHash : query .ChainHash ,
2682+ // FirstBlockHeight is before query range (invalid).
2683+ FirstBlockHeight : query .FirstBlockHeight - 100 ,
2684+ NumBlocks : query .NumBlocks ,
2685+ Complete : 1 ,
2686+ ShortChanIDs : []lnwire.ShortChannelID {},
2687+ }
2688+
2689+ // Send the malformed reply.
2690+ err := syncer .ProcessQueryMsg (malformedReply , nil )
2691+ require .NoError (t , err )
2692+
2693+ // Brief delay to let goroutine process.
2694+ time .Sleep (20 * time .Millisecond )
2695+ }
2696+
2697+ // With the fix: goroutine exits after first error, so remaining 9
2698+ // messages stay queued in gossipMsgs channel (buffer size 50).
2699+ // Without fix: goroutine processes all 10 messages.
2700+ //
2701+ // Check channel length. If goroutine exited, we should have ~9 messages
2702+ // queued (first was processed, rest are stuck).
2703+ channelLen := len (syncer .gossipMsgs )
2704+ require .GreaterOrEqual (t , channelLen , 7 ,
2705+ "expected messages queued (goroutine exited), but found %d - " +
2706+ "goroutine may still be processing" , channelLen )
2707+
2708+ // The channelGraphSyncer goroutine should have exited. Verify stop
2709+ // completes quickly.
2710+ assertSyncerExitsCleanly (t , syncer , 2 * time .Second )
2711+ }
0 commit comments