Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pkg/eventservice/event_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -1032,6 +1032,9 @@ func (c *eventBroker) addDispatcher(info DispatcherInfo) error {
zap.Uint64("startTs", info.GetStartTs()), zap.String("span", common.FormatTableSpan(span)),
zap.Error(err),
)
// Mark removed to avoid processing notifications before unregister completes.
dispatcher.isRemoved.Store(true)
c.eventStore.UnregisterDispatcher(changefeedID, id)
status.removeDispatcher(id)
if status.isEmpty() {
c.changefeedMap.Delete(changefeedID)
Expand Down
15 changes: 15 additions & 0 deletions pkg/eventservice/event_broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,21 @@ func TestOnNotify(t *testing.T) {
log.Info("Pass case 6")
}

func TestAddDispatcherUnregisterOnSchemaStoreError(t *testing.T) {
broker, es, ss, _ := newEventBrokerForTest()
defer broker.close()

ss.registerTableError = errors.New("register schema store failed")

info := newMockDispatcherInfoForTest(t)
err := broker.addDispatcher(info)
require.Error(t, err)

_, ok := es.spansMap.Load(info.GetTableSpan())
require.False(t, ok)
require.Equal(t, uint64(1), es.unregisterCount.Load())
}

func TestScanRangeCappedByScanWindow(t *testing.T) {
broker, _, _, _ := newEventBrokerForTest()
// Close the broker, so we can catch all message in the test.
Expand Down
4 changes: 4 additions & 0 deletions pkg/eventservice/event_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"fmt"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -163,6 +164,7 @@ type mockEventStore struct {
resolvedTsUpdateInterval time.Duration
dispatcherMap sync.Map // key is common.DispatcherID, value is span
spansMap sync.Map // key is *heartbeatpb.TableSpan
unregisterCount atomic.Uint64
}

func newMockEventStore(resolvedTsUpdateInterval int) *mockEventStore {
Expand Down Expand Up @@ -245,7 +247,9 @@ func (m *mockEventStore) UnregisterDispatcher(changefeedID common.ChangeFeedID,
span, ok := m.dispatcherMap.Load(dispatcherID)
if ok {
m.spansMap.Delete(span)
m.dispatcherMap.Delete(dispatcherID)
}
m.unregisterCount.Add(1)
}

func (m *mockEventStore) GetIterator(dispatcherID common.DispatcherID, dataRange common.DataRange) eventstore.EventIterator {
Expand Down
16 changes: 11 additions & 5 deletions pkg/upstream/upstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,18 +129,24 @@ func CreateTiStore(ctx context.Context, urls string, credential *security.Creden
retry.WithBackoffBaseDelay(200),
retry.WithBackoffMaxDelay(4000),
retry.WithIsRetryableErr(func(err error) bool {
switch errors.Cause(err) {
case context.Canceled:
return false
}
return true
return isCreateTiStoreRetryable(ctx, err)
}))
if err != nil {
return nil, errors.WrapError(errors.ErrNewStore, err)
}
return tiStore, nil
}

func isCreateTiStoreRetryable(ctx context.Context, err error) bool {
switch errors.Cause(err) {
case context.Canceled:
// Only stop retrying if the caller's context is canceled.
// Otherwise treat it as transient (e.g. internal client cancellation).
return ctx.Err() == nil
}
return true
}

// init initializes the upstream
func initUpstream(ctx context.Context, up *Upstream, cfg *NodeTopologyCfg) error {
ctx, up.cancel = context.WithCancel(ctx)
Expand Down
13 changes: 13 additions & 0 deletions pkg/upstream/upstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,3 +136,16 @@ func TestRegisterTopo(t *testing.T) {
return len(resp.Kvs) == 0
}, time.Second*5, time.Millisecond*100)
}

func TestIsCreateTiStoreRetryable(t *testing.T) {
t.Parallel()

ctx := context.Background()
require.True(t, isCreateTiStoreRetryable(ctx, context.Canceled))
require.True(t, isCreateTiStoreRetryable(ctx, errors.Trace(context.Canceled)))

canceledCtx, cancel := context.WithCancel(context.Background())
cancel()
require.False(t, isCreateTiStoreRetryable(canceledCtx, context.Canceled))
require.False(t, isCreateTiStoreRetryable(canceledCtx, errors.Trace(context.Canceled)))
}