Skip to content

Commit 3dae8aa

Browse files
authored
logservice: avoid dispatcher registration failure during schema store initialization (#4366)
ref #4272
1 parent 119023a commit 3dae8aa

File tree

6 files changed

+47
-5
lines changed

6 files changed

+47
-5
lines changed

logservice/eventstore/event_store.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1287,6 +1287,7 @@ func (e *eventStore) writeEvents(
12871287
metrics.EventStoreWriteRequestsCount.Inc()
12881288
prepareStart := time.Now()
12891289
batch := db.NewBatch()
1290+
defer batch.Close()
12901291
kvCount := 0
12911292
var totalValueBytesBefore int64
12921293
var totalValueBytesAfter int64

pkg/eventservice/event_broker.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1032,6 +1032,9 @@ func (c *eventBroker) addDispatcher(info DispatcherInfo) error {
10321032
zap.Uint64("startTs", info.GetStartTs()), zap.String("span", common.FormatTableSpan(span)),
10331033
zap.Error(err),
10341034
)
1035+
// Mark removed to avoid processing notifications before unregister completes.
1036+
dispatcher.isRemoved.Store(true)
1037+
c.eventStore.UnregisterDispatcher(changefeedID, id)
10351038
status.removeDispatcher(id)
10361039
if status.isEmpty() {
10371040
c.changefeedMap.Delete(changefeedID)

pkg/eventservice/event_broker_test.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,21 @@ func TestOnNotify(t *testing.T) {
172172
log.Info("Pass case 6")
173173
}
174174

175+
func TestAddDispatcherUnregisterOnSchemaStoreError(t *testing.T) {
176+
broker, es, ss, _ := newEventBrokerForTest()
177+
defer broker.close()
178+
179+
ss.registerTableError = errors.New("register schema store failed")
180+
181+
info := newMockDispatcherInfoForTest(t)
182+
err := broker.addDispatcher(info)
183+
require.Error(t, err)
184+
185+
_, ok := es.spansMap.Load(info.GetTableSpan())
186+
require.False(t, ok)
187+
require.Equal(t, uint64(1), es.unregisterCount.Load())
188+
}
189+
175190
func TestScanRangeCappedByScanWindow(t *testing.T) {
176191
broker, _, _, _ := newEventBrokerForTest()
177192
// Close the broker, so we can catch all message in the test.

pkg/eventservice/event_service_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"context"
1818
"fmt"
1919
"sync"
20+
"sync/atomic"
2021
"testing"
2122
"time"
2223

@@ -163,6 +164,7 @@ type mockEventStore struct {
163164
resolvedTsUpdateInterval time.Duration
164165
dispatcherMap sync.Map // key is common.DispatcherID, value is span
165166
spansMap sync.Map // key is *heartbeatpb.TableSpan
167+
unregisterCount atomic.Uint64
166168
}
167169

168170
func newMockEventStore(resolvedTsUpdateInterval int) *mockEventStore {
@@ -245,7 +247,9 @@ func (m *mockEventStore) UnregisterDispatcher(changefeedID common.ChangeFeedID,
245247
span, ok := m.dispatcherMap.Load(dispatcherID)
246248
if ok {
247249
m.spansMap.Delete(span)
250+
m.dispatcherMap.Delete(dispatcherID)
248251
}
252+
m.unregisterCount.Add(1)
249253
}
250254

251255
func (m *mockEventStore) GetIterator(dispatcherID common.DispatcherID, dataRange common.DataRange) eventstore.EventIterator {

pkg/upstream/upstream.go

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -129,18 +129,24 @@ func CreateTiStore(ctx context.Context, urls string, credential *security.Creden
129129
retry.WithBackoffBaseDelay(200),
130130
retry.WithBackoffMaxDelay(4000),
131131
retry.WithIsRetryableErr(func(err error) bool {
132-
switch errors.Cause(err) {
133-
case context.Canceled:
134-
return false
135-
}
136-
return true
132+
return isCreateTiStoreRetryable(ctx, err)
137133
}))
138134
if err != nil {
139135
return nil, errors.WrapError(errors.ErrNewStore, err)
140136
}
141137
return tiStore, nil
142138
}
143139

140+
func isCreateTiStoreRetryable(ctx context.Context, err error) bool {
141+
switch errors.Cause(err) {
142+
case context.Canceled:
143+
// Only stop retrying if the caller's context is canceled.
144+
// Otherwise treat it as transient (e.g. internal client cancellation).
145+
return ctx.Err() == nil
146+
}
147+
return true
148+
}
149+
144150
// init initializes the upstream
145151
func initUpstream(ctx context.Context, up *Upstream, cfg *NodeTopologyCfg) error {
146152
ctx, up.cancel = context.WithCancel(ctx)

pkg/upstream/upstream_test.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,3 +136,16 @@ func TestRegisterTopo(t *testing.T) {
136136
return len(resp.Kvs) == 0
137137
}, time.Second*5, time.Millisecond*100)
138138
}
139+
140+
func TestIsCreateTiStoreRetryable(t *testing.T) {
141+
t.Parallel()
142+
143+
ctx := context.Background()
144+
require.True(t, isCreateTiStoreRetryable(ctx, context.Canceled))
145+
require.True(t, isCreateTiStoreRetryable(ctx, errors.Trace(context.Canceled)))
146+
147+
canceledCtx, cancel := context.WithCancel(context.Background())
148+
cancel()
149+
require.False(t, isCreateTiStoreRetryable(canceledCtx, context.Canceled))
150+
require.False(t, isCreateTiStoreRetryable(canceledCtx, errors.Trace(context.Canceled)))
151+
}

0 commit comments

Comments
 (0)