-
Notifications
You must be signed in to change notification settings - Fork 2.2k
discovery: fix potential infinite loop bug re context cancel error handling in gossip syncer #10330
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -492,15 +492,19 @@ func (g *GossipSyncer) Stop() { | |
|
|
||
| // handleSyncingChans handles the state syncingChans for the GossipSyncer. When | ||
| // in this state, we will send a QueryChannelRange msg to our peer and advance | ||
| // the syncer's state to waitingQueryRangeReply. | ||
| func (g *GossipSyncer) handleSyncingChans(ctx context.Context) { | ||
| // the syncer's state to waitingQueryRangeReply. Returns an error if a fatal | ||
| // error occurs that should cause the goroutine to exit. | ||
| func (g *GossipSyncer) handleSyncingChans(ctx context.Context) error { | ||
| // Prepare the query msg. | ||
| queryRangeMsg, err := g.genChanRangeQuery( | ||
| ctx, g.genHistoricalChanRangeQuery, | ||
| ) | ||
| if err != nil { | ||
| log.Errorf("Unable to gen chan range query: %v", err) | ||
| return | ||
|
|
||
| // Any error here is likely fatal (context cancelled, db error, | ||
| // etc.), so return it to exit the goroutine cleanly. | ||
| return err | ||
| } | ||
|
|
||
| // Acquire a lock so the following state transition is atomic. | ||
|
|
@@ -517,12 +521,18 @@ func (g *GossipSyncer) handleSyncingChans(ctx context.Context) { | |
| err = g.sendToPeer(ctx, queryRangeMsg) | ||
| if err != nil { | ||
| log.Errorf("Unable to send chan range query: %v", err) | ||
| return | ||
|
|
||
| // Any send error (peer exiting, connection closed, rate | ||
| // limiter signaling exit, etc.) is fatal, so return it to | ||
| // exit the goroutine cleanly. | ||
| return err | ||
| } | ||
|
|
||
| // With the message sent successfully, we'll transition into the next | ||
| // state where we wait for their reply. | ||
| g.setSyncState(waitingQueryRangeReply) | ||
|
|
||
| return nil | ||
| } | ||
|
|
||
| // channelGraphSyncer is the main goroutine responsible for ensuring that we | ||
|
|
@@ -545,7 +555,14 @@ func (g *GossipSyncer) channelGraphSyncer(ctx context.Context) { | |
| // understand, as we'll as responding to any other queries by | ||
| // them. | ||
| case syncingChans: | ||
| g.handleSyncingChans(ctx) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. could instead (or in addition) just add a quick context check at the start of the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yeah that was the alternative here: #10329 but I would say introducing the errors is the better way at I like the design better. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah this was my alternative, as IMO it got at the root issue in that we weren't checking errors for these calls to decide to exit the state machine. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This was also a bit easier to write consistent unit tests for as well. |
||
| err := g.handleSyncingChans(ctx) | ||
| if err != nil { | ||
| log.Debugf("GossipSyncer(%x): exiting due to "+ | ||
| "error in syncingChans: %v", | ||
| g.cfg.peerPub[:], err) | ||
|
|
||
| return | ||
Roasbeef marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
|
|
||
| // In this state, we've sent out our initial channel range | ||
| // query and are waiting for the final response from the remote | ||
|
|
@@ -593,7 +610,14 @@ func (g *GossipSyncer) channelGraphSyncer(ctx context.Context) { | |
| // First, we'll attempt to continue our channel | ||
| // synchronization by continuing to send off another | ||
| // query chunk. | ||
| done := g.synchronizeChanIDs(ctx) | ||
| done, err := g.synchronizeChanIDs(ctx) | ||
| if err != nil { | ||
| log.Debugf("GossipSyncer(%x): exiting due to "+ | ||
| "error in queryNewChannels: %v", | ||
| g.cfg.peerPub[:], err) | ||
|
|
||
| return | ||
Roasbeef marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
|
|
||
| // If this wasn't our last query, then we'll need to | ||
| // transition to our waiting state. | ||
|
|
@@ -819,16 +843,18 @@ func (g *GossipSyncer) sendGossipTimestampRange(ctx context.Context, | |
| // range. This method will be called continually until the entire range has | ||
| // been queried for with a response received. We'll chunk our requests as | ||
| // required to ensure they fit into a single message. We may re-renter this | ||
| // state in the case that chunking is required. | ||
| func (g *GossipSyncer) synchronizeChanIDs(ctx context.Context) bool { | ||
| // state in the case that chunking is required. Returns true if synchronization | ||
| // is complete, and an error if a fatal error occurs that should cause the | ||
| // goroutine to exit. | ||
| func (g *GossipSyncer) synchronizeChanIDs(ctx context.Context) (bool, error) { | ||
| // If we're in this state yet there are no more new channels to query | ||
| // for, then we'll transition to our final synced state and return true | ||
| // to signal that we're fully synchronized. | ||
| if len(g.newChansToQuery) == 0 { | ||
| log.Infof("GossipSyncer(%x): no more chans to query", | ||
| g.cfg.peerPub[:]) | ||
|
|
||
| return true | ||
| return true, nil | ||
| } | ||
|
|
||
| // Otherwise, we'll issue our next chunked query to receive replies | ||
|
|
@@ -864,9 +890,14 @@ func (g *GossipSyncer) synchronizeChanIDs(ctx context.Context) bool { | |
| }) | ||
| if err != nil { | ||
| log.Errorf("Unable to sync chan IDs: %v", err) | ||
|
|
||
| // Any send error (peer exiting, connection closed, rate | ||
| // limiter signaling exit, etc.) is fatal, so return it to | ||
| // exit the goroutine cleanly. | ||
| return false, err | ||
| } | ||
|
|
||
| return false | ||
| return false, nil | ||
| } | ||
|
|
||
| // isLegacyReplyChannelRange determines where a ReplyChannelRange message is | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -16,6 +16,7 @@ import ( | |
| "github.com/btcsuite/btcd/chaincfg/chainhash" | ||
| "github.com/davecgh/go-spew/spew" | ||
| graphdb "github.com/lightningnetwork/lnd/graph/db" | ||
| "github.com/lightningnetwork/lnd/lnpeer" | ||
| "github.com/lightningnetwork/lnd/lnwire" | ||
| "github.com/stretchr/testify/require" | ||
| ) | ||
|
|
@@ -229,9 +230,111 @@ func newTestSyncer(hID lnwire.ShortChannelID, | |
|
|
||
| syncer := newGossipSyncer(cfg, syncerSema) | ||
|
|
||
| //nolint:forcetypeassert | ||
| return msgChan, syncer, cfg.channelSeries.(*mockChannelGraphTimeSeries) | ||
| } | ||
|
|
||
| // errorInjector provides thread-safe error injection for test syncers and | ||
| // tracks the number of send attempts to detect endless loops. | ||
| type errorInjector struct { | ||
| mu sync.Mutex | ||
| err error | ||
| attemptCount int | ||
| } | ||
|
|
||
| // setError sets the error that will be returned by sendMsg calls. | ||
| func (ei *errorInjector) setError(err error) { | ||
| ei.mu.Lock() | ||
| defer ei.mu.Unlock() | ||
| ei.err = err | ||
| } | ||
|
|
||
| // getError retrieves the current error in a thread-safe manner and increments | ||
| // the attempt counter. | ||
| func (ei *errorInjector) getError() error { | ||
| ei.mu.Lock() | ||
| defer ei.mu.Unlock() | ||
| ei.attemptCount++ | ||
|
|
||
| return ei.err | ||
| } | ||
|
|
||
| // getAttemptCount returns the number of times sendMsg was called. | ||
| func (ei *errorInjector) getAttemptCount() int { | ||
| ei.mu.Lock() | ||
| defer ei.mu.Unlock() | ||
| return ei.attemptCount | ||
| } | ||
|
|
||
| // newErrorInjectingSyncer creates a GossipSyncer with controllable error | ||
| // injection for testing error handling. The returned errorInjector can be used | ||
| // to inject errors into sendMsg calls. | ||
| func newErrorInjectingSyncer(hID lnwire.ShortChannelID, chunkSize int32) ( | ||
| *GossipSyncer, *errorInjector, chan []lnwire.Message) { | ||
|
|
||
| ei := &errorInjector{} | ||
| msgChan := make(chan []lnwire.Message, 20) | ||
|
|
||
| cfg := gossipSyncerCfg{ | ||
| channelSeries: newMockChannelGraphTimeSeries(hID), | ||
| encodingType: defaultEncoding, | ||
| chunkSize: chunkSize, | ||
| batchSize: chunkSize, | ||
| noSyncChannels: false, | ||
| noReplyQueries: true, | ||
| noTimestampQueryOption: false, | ||
| sendMsg: func(_ context.Context, _ bool, | ||
| msgs ...lnwire.Message) error { | ||
|
|
||
| // Check if we should inject an error. | ||
| if err := ei.getError(); err != nil { | ||
| return err | ||
| } | ||
|
|
||
| msgChan <- msgs | ||
| return nil | ||
| }, | ||
| bestHeight: func() uint32 { | ||
| return latestKnownHeight | ||
| }, | ||
| markGraphSynced: func() {}, | ||
| maxQueryChanRangeReplies: maxQueryChanRangeReplies, | ||
| timestampQueueSize: 10, | ||
| } | ||
|
|
||
| syncerSema := make(chan struct{}, 1) | ||
| syncerSema <- struct{}{} | ||
|
|
||
| syncer := newGossipSyncer(cfg, syncerSema) | ||
|
|
||
| return syncer, ei, msgChan | ||
| } | ||
|
|
||
| // assertSyncerExitsCleanly verifies that a syncer stops cleanly within the | ||
| // given timeout. This is used to ensure error handling doesn't cause endless | ||
| // loops. | ||
| func assertSyncerExitsCleanly(t *testing.T, syncer *GossipSyncer, | ||
| timeout time.Duration) { | ||
|
|
||
| t.Helper() | ||
|
|
||
| stopChan := make(chan struct{}) | ||
| go func() { | ||
| syncer.Stop() | ||
| close(stopChan) | ||
| }() | ||
|
|
||
| select { | ||
| case <-stopChan: | ||
| // Success - syncer stopped cleanly. | ||
| case <-time.After(timeout): | ||
| t.Fatal( | ||
| "syncer did not stop within timeout - possible " + | ||
| "endless loop", | ||
| ) | ||
| } | ||
| } | ||
|
|
||
| // TestGossipSyncerFilterGossipMsgsNoHorizon tests that if the remote peer | ||
| // doesn't have a horizon set, then we won't send any incoming messages to it. | ||
| func TestGossipSyncerFilterGossipMsgsNoHorizon(t *testing.T) { | ||
|
|
@@ -1518,7 +1621,8 @@ func TestGossipSyncerSynchronizeChanIDs(t *testing.T) { | |
|
|
||
| for i := 0; i < chunkSize*2; i += 2 { | ||
| // With our set up complete, we'll request a sync of chan ID's. | ||
| done := syncer.synchronizeChanIDs(t.Context()) | ||
| done, err := syncer.synchronizeChanIDs(t.Context()) | ||
| require.NoError(t, err) | ||
|
|
||
| // At this point, we shouldn't yet be done as only 2 items | ||
| // should have been queried for. | ||
|
|
@@ -1565,7 +1669,8 @@ func TestGossipSyncerSynchronizeChanIDs(t *testing.T) { | |
| } | ||
|
|
||
| // If we issue another query, the syncer should tell us that it's done. | ||
| done := syncer.synchronizeChanIDs(t.Context()) | ||
| done, err := syncer.synchronizeChanIDs(t.Context()) | ||
| require.NoError(t, err) | ||
| if done { | ||
| t.Fatalf("syncer should be finished!") | ||
| } | ||
|
|
@@ -2409,3 +2514,107 @@ func TestGossipSyncerMaxChannelRangeReplies(t *testing.T) { | |
| }, | ||
| }, nil)) | ||
| } | ||
|
|
||
| // TestGossipSyncerStateHandlerErrors tests that errors in state handlers cause | ||
| // the channelGraphSyncer goroutine to exit cleanly without endless retry loops. | ||
| // This is a table-driven test covering various error types and states. | ||
| func TestGossipSyncerStateHandlerErrors(t *testing.T) { | ||
| t.Parallel() | ||
|
|
||
| tests := []struct { | ||
| name string | ||
| state syncerState | ||
| setupState func(*GossipSyncer) | ||
| chunkSize int32 | ||
| injectedErr error | ||
| }{ | ||
| { | ||
| name: "context cancel during syncingChans", | ||
| state: syncingChans, | ||
| chunkSize: defaultChunkSize, | ||
| injectedErr: context.Canceled, | ||
| setupState: func(s *GossipSyncer) {}, | ||
| }, | ||
| { | ||
| name: "peer exit during syncingChans", | ||
| state: syncingChans, | ||
| chunkSize: defaultChunkSize, | ||
| injectedErr: lnpeer.ErrPeerExiting, | ||
| setupState: func(s *GossipSyncer) {}, | ||
| }, | ||
| { | ||
| name: "context cancel during queryNewChannels", | ||
| state: queryNewChannels, | ||
| chunkSize: 2, | ||
| injectedErr: context.Canceled, | ||
| setupState: func(s *GossipSyncer) { | ||
| s.newChansToQuery = []lnwire.ShortChannelID{ | ||
| lnwire.NewShortChanIDFromInt(1), | ||
| lnwire.NewShortChanIDFromInt(2), | ||
| lnwire.NewShortChanIDFromInt(3), | ||
| } | ||
| }, | ||
| }, | ||
| { | ||
| name: "network error during queryNewChannels", | ||
| state: queryNewChannels, | ||
| chunkSize: 2, | ||
| injectedErr: errors.New("connection closed"), | ||
| setupState: func(s *GossipSyncer) { | ||
| s.newChansToQuery = []lnwire.ShortChannelID{ | ||
| lnwire.NewShortChanIDFromInt(1), | ||
| lnwire.NewShortChanIDFromInt(2), | ||
| } | ||
| }, | ||
| }, | ||
| } | ||
|
|
||
| for _, tt := range tests { | ||
| tt := tt | ||
| t.Run(tt.name, func(t *testing.T) { | ||
| t.Parallel() | ||
|
|
||
| // Create syncer with error injection capability. | ||
| hID := lnwire.NewShortChanIDFromInt(10) | ||
| syncer, errInj, _ := newErrorInjectingSyncer( | ||
| hID, tt.chunkSize, | ||
| ) | ||
|
|
||
| // Set up the initial state and any required state data. | ||
| syncer.setSyncState(tt.state) | ||
| tt.setupState(syncer) | ||
|
|
||
| // Inject the error that should cause the goroutine to | ||
| // exit. | ||
| errInj.setError(tt.injectedErr) | ||
|
|
||
| // Start the syncer which spawns the channelGraphSyncer | ||
| // goroutine. | ||
| syncer.Start() | ||
|
|
||
| // Wait long enough that an endless loop would | ||
| // accumulate many attempts. With the fix, we should | ||
| // only see 1-3 attempts. Without the fix, we'd see | ||
| // 50-100+ attempts. | ||
| time.Sleep(500 * time.Millisecond) | ||
|
|
||
| // Check how many send attempts were made. This verifies | ||
| // that the state handler doesn't loop endlessly. | ||
| attemptCount := errInj.getAttemptCount() | ||
| require.GreaterOrEqual( | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wonder if this style of testing can introduce flakes when the timing and the counts are a bit off, probably not worth adding these kinda tests, the changes are very easy to understand which were made ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I added them so we can make sure that the tests actually do something. I had a test that passed, but then turns out it didn't actually do anything, as the test survived some trivial mutations in the area that we had fixed. I can drop the last commit with the additional tests if we want, those were some extra mutations I found with an automated tool I made. |
||
| t, attemptCount, 1, | ||
| "state handler was not called - test "+ | ||
| "setup issue", | ||
| ) | ||
| require.LessOrEqual( | ||
| t, attemptCount, 5, | ||
| "too many attempts (%d) - endless loop "+ | ||
| "not fixed", | ||
| attemptCount, | ||
| ) | ||
|
|
||
| // Verify the syncer exits cleanly without hanging. | ||
| assertSyncerExitsCleanly(t, syncer, 2*time.Second) | ||
| }) | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
while we are at it, could you also stop the iterator in this function here:
basically adding a
stop()at the beginningThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Stop what iterator? It isn't in scope here.