Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix a PANIC in coordinator. #1107

Merged
merged 25 commits into from
Mar 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
5301436
update
hongyunyan Mar 14, 2025
85ac3b7
update
hongyunyan Mar 14, 2025
e283d1e
Merge branch 'master' of https://github.com/pingcap/ticdc into 0314
hongyunyan Mar 18, 2025
cbc916e
Merge branch 'master' of https://github.com/pingcap/ticdc into 0314
hongyunyan Mar 19, 2025
f7450fe
coordinator: prevent from being stop mutiple times
asddongmen Mar 20, 2025
430cfe1
coordinator: add unit test for Stop method
asddongmen Mar 20, 2025
1c0fa61
coordinator: add a recover to prenvent coordinator from panic
asddongmen Mar 20, 2025
beb86f3
tests: sleep longer for waiting the ectd session expires
asddongmen Mar 21, 2025
5542309
hack test group
asddongmen Mar 21, 2025
19b85e7
improve log
asddongmen Mar 21, 2025
57acfc0
Revert "hack test group"
asddongmen Mar 21, 2025
087b626
Merge remote-tracking branch 'upstream/master' into 0314
asddongmen Mar 21, 2025
8ba9ede
server: handle signal
asddongmen Mar 21, 2025
f0e202d
make check
asddongmen Mar 21, 2025
19b0605
Revert "Revert "hack test group""
asddongmen Mar 21, 2025
09b49e0
hack test
asddongmen Mar 21, 2025
ec2203e
add clean process retry time
asddongmen Mar 21, 2025
7c2d179
merge upstream master
asddongmen Mar 21, 2025
8c326c9
wait longer when kill cdc
asddongmen Mar 21, 2025
69d2533
server: add timeout for closing service
asddongmen Mar 21, 2025
a3b270e
server: add timeout for closing server 2
asddongmen Mar 21, 2025
44a4dfd
tests: fix 3
asddongmen Mar 21, 2025
ea6b998
tests: fix 4
asddongmen Mar 21, 2025
34eb376
tests: fix 5
asddongmen Mar 21, 2025
dde26a1
tests: make fmt
asddongmen Mar 21, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion api/v2/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
func (h *OpenAPIV2) ResignOwner(c *gin.Context) {
o, _ := h.server.GetCoordinator()
if o != nil {
o.AsyncStop()
o.Stop()
}

c.JSON(getStatus(c), &EmptyResponse{})
Expand Down
10 changes: 10 additions & 0 deletions cmd/cdc/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,16 @@ func (o *options) run(cmd *cobra.Command) error {
log.Info("TiCDC(new arch) server created",
zap.Strings("pd", o.pdEndpoints), zap.Stringer("config", o.serverConfig))

// shutdown is used to notify the server to shutdown (by closing the context) when receiving the signal, and exit the process.
shutdown := func() <-chan struct{} {
done := make(chan struct{})
cancel()
close(done)
return done
}
// Gracefully shutdown the server when receiving the signal, and exit the process.
util.InitSignalHandling(shutdown, cancel)

err = svr.Run(ctx)
if err != nil && !errors.Is(errors.Cause(err), context.Canceled) {
log.Warn("cdc server exits with error", zap.Error(err))
Expand Down
35 changes: 29 additions & 6 deletions coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,6 @@ func New(node *node.Info,
mc.RegisterHandler(messaging.CoordinatorTopic, c.recvMessages)

c.taskScheduler = threadpool.NewThreadPoolDefault()
c.closed.Store(false)

controller := NewController(
c.version,
Expand All @@ -147,18 +146,41 @@ func New(node *node.Info,
log.Info("Coordinator changed, and I am not the coordinator, stop myself",
zap.String("selfID", string(c.nodeInfo.ID)),
zap.String("newCoordinatorID", newCoordinatorID))
c.AsyncStop()
c.Stop()
}
})

return c
}

func (c *coordinator) recvMessages(_ context.Context, msg *messaging.TargetMessage) error {
func (c *coordinator) recvMessages(ctx context.Context, msg *messaging.TargetMessage) error {
if c.closed.Load() {
return nil
}
c.eventCh.In() <- &Event{message: msg}

defer func() {
if r := recover(); r != nil {
// There is chance that:
// 1. Just before the c.eventCh is closed, the recvMessages is called
// 2. Then the goroutine(call it g1) that calls recvMessages is scheduled out by runtime, and the msg is in flight
// 3. The c.eventCh is closed by another goroutine(call it g2)
// 4. g1 is scheduled back by runtime, and the msg is sent to the closed channel
// 5. g1 panics
// To avoid the panic, we have two choices:
// 1. Use a mutex to protect this function, but it will reduce the throughput
// 2. Recover the panic, and log the error
// We choose the second option here.
log.Error("panic in recvMessages", zap.Any("msg", msg), zap.Any("panic", r))
}
}()

select {
case <-ctx.Done():
return ctx.Err()
default:
c.eventCh.In() <- &Event{message: msg}
}

return nil
}

Expand Down Expand Up @@ -385,13 +407,14 @@ func (c *coordinator) GetChangefeed(ctx context.Context, changefeedDisplayName c
return c.controller.GetChangefeed(ctx, changefeedDisplayName)
}

func (c *coordinator) AsyncStop() {
func (c *coordinator) Stop() {
if c.closed.CompareAndSwap(false, true) {
c.mc.DeRegisterHandler(messaging.CoordinatorTopic)
c.controller.Stop()
c.taskScheduler.Stop()
c.eventCh.CloseAndDrain()
c.cancel()
// close eventCh after cancel, to avoid send or get event from the channel
c.eventCh.CloseAndDrain()
}
}

Expand Down
127 changes: 127 additions & 0 deletions coordinator/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"net/http"
"net/http/pprof"
"strconv"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -477,6 +478,132 @@ func TestBootstrapWithUnStoppedChangefeed(t *testing.T) {
}, waitTime, time.Millisecond*5)
}

func TestConcurrentStopAndSendEvents(t *testing.T) {
// Setup context
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Initialize node info
info := node.NewInfo("127.0.0.1:28600", "")
etcdClient := newMockEtcdClient(string(info.ID))
nodeManager := watcher.NewNodeManager(nil, etcdClient)
appcontext.SetService(watcher.NodeManagerName, nodeManager)
nodeManager.GetAliveNodes()[info.ID] = info

// Initialize message center
mc := messaging.NewMessageCenter(ctx, info.ID, 0, config.NewDefaultMessageCenterConfig(), nil)
mc.Run(ctx)
defer mc.Close()
appcontext.SetService(appcontext.MessageCenter, mc)

// Initialize backend
ctrl := gomock.NewController(t)
defer ctrl.Finish()
backend := mock_changefeed.NewMockBackend(ctrl)
backend.EXPECT().GetAllChangefeeds(gomock.Any()).Return(map[common.ChangeFeedID]*changefeed.ChangefeedMetaWrapper{}, nil).AnyTimes()

// Create coordinator
cr := New(info, &mockPdClient{}, pdutil.NewClock4Test(), backend, "test-gc-service", 100, 10000, time.Millisecond*10)
co := cr.(*coordinator)

// Number of goroutines for each operation
const (
sendEventGoroutines = 10
stopGoroutines = 5
eventsPerGoroutine = 100
)

var wg sync.WaitGroup
wg.Add(sendEventGoroutines + stopGoroutines)

// Start the coordinator
ctxRun, cancelRun := context.WithCancel(ctx)
go func() {
err := cr.Run(ctxRun)
if err != nil && err != context.Canceled {
t.Errorf("Coordinator Run returned unexpected error: %v", err)
}
}()

// Give coordinator some time to initialize
time.Sleep(100 * time.Millisecond)

// Start goroutines to send events
for i := 0; i < sendEventGoroutines; i++ {
go func(id int) {
defer wg.Done()
defer func() {
// Recover from potential panics
if r := recover(); r != nil {
t.Errorf("Panic in send event goroutine %d: %v", id, r)
}
}()

for j := 0; j < eventsPerGoroutine; j++ {
// Try to send an event
if co.closed.Load() {
// Coordinator is already closed, stop sending
return
}

msg := &messaging.TargetMessage{
Topic: messaging.CoordinatorTopic,
Type: messaging.TypeMaintainerHeartbeatRequest,
}

// Use recvMessages to send event to channel
err := co.recvMessages(ctx, msg)
if err != nil && err != context.Canceled {
t.Logf("Failed to send event in goroutine %d: %v", id, err)
}

// Small sleep to increase chance of race conditions
time.Sleep(time.Millisecond)
}
}(i)
}

// Start goroutines to stop the coordinator
for i := 0; i < stopGoroutines; i++ {
go func(id int) {
defer wg.Done()
// Small delay to ensure some events are sent first
time.Sleep(time.Duration(10+id*5) * time.Millisecond)
co.Stop()
}(i)
}

// Wait for all goroutines to complete
wg.Wait()

// Cancel the context to ensure the coordinator stops
cancelRun()

// Give some time for the coordinator to fully stop
time.Sleep(100 * time.Millisecond)

// Verify that the coordinator is closed
require.True(t, co.closed.Load())

// Verify that event channel is closed
select {
case _, ok := <-co.eventCh.Out():
require.False(t, ok, "Event channel should be closed")
default:
// Channel might be already drained, which is fine
}

// Try sending another event - should not panic but may return error
msg := &messaging.TargetMessage{
Topic: messaging.CoordinatorTopic,
Type: messaging.TypeMaintainerHeartbeatRequest,
}

err := co.recvMessages(ctx, msg)
require.NoError(t, err)
require.True(t, co.closed.Load())
}

type maintainNode struct {
cancel context.CancelFunc
mc messaging.MessageCenter
Expand Down
1 change: 1 addition & 0 deletions downstreamadapter/dispatchermanager/heartbeat_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ func (c *HeartBeatCollector) RecvMessages(_ context.Context, msg *messaging.Targ
}

func (c *HeartBeatCollector) Close() {
log.Info("heartbeat collector is closing")
c.mc.DeRegisterHandler(messaging.HeartbeatCollectorTopic)
c.cancel()
c.wg.Wait()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,9 @@ func (m *DispatcherOrchestrator) sendResponse(to node.ID, topic string, msg mess
}

func (m *DispatcherOrchestrator) Close() {
log.Info("dispatcher orchestrator is closing")
m.mc.DeRegisterHandler(messaging.DispatcherManagerManagerTopic)
log.Info("dispatcher orchestrator closed")
}

// handleDispatcherError creates and sends an error response for create dispatcher-related failures
Expand Down
2 changes: 1 addition & 1 deletion downstreamadapter/eventcollector/event_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,9 +161,9 @@ func (c *EventCollector) Run(ctx context.Context) {
}

func (c *EventCollector) Close() {
log.Info("event collector is closing")
c.cancel()
c.ds.Close()

c.changefeedIDMap.Range(func(key, value any) bool {
cfID := value.(common.ChangeFeedID)
// Remove metrics for the changefeed.
Expand Down
1 change: 1 addition & 0 deletions pkg/messaging/message_center.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ func (mc *messageCenter) ReceiveCmd() (*TargetMessage, error) {

// Close stops the grpc server and stops all the connections to the remote targets.
func (mc *messageCenter) Close() {
log.Info("message center is closing", zap.Stringer("id", mc.id))
mc.remoteTargets.RLock()
defer mc.remoteTargets.RUnlock()

Expand Down
1 change: 0 additions & 1 deletion pkg/messaging/proto/message.proto
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ message Message {
int32 type = 5;
// topic is the destination of the message, it is used to route the message to the correct handler.
string topic = 6;
// TODO, change to real types
repeated bytes payload = 7;
}

Expand Down
12 changes: 6 additions & 6 deletions pkg/messaging/target.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,36 +116,36 @@ func (s *remoteMessageTarget) Epoch() uint64 {
func (s *remoteMessageTarget) sendEvent(msg ...*TargetMessage) error {
if !s.eventSender.ready.Load() {
s.connectionNotfoundErrorCounter.Inc()
return AppError{Type: ErrorTypeConnectionNotFound, Reason: "Stream has not been initialized"}
return AppError{Type: ErrorTypeConnectionNotFound, Reason: fmt.Sprintf("Stream has not been initialized, target: %s", s.targetId)}
}
select {
case <-s.ctx.Done():
s.connectionNotfoundErrorCounter.Inc()
return AppError{Type: ErrorTypeConnectionNotFound, Reason: "Stream has been closed"}
return AppError{Type: ErrorTypeConnectionNotFound, Reason: fmt.Sprintf("Stream has been closed, target: %s", s.targetId)}
case s.sendEventCh <- s.newMessage(msg...):
s.sendEventCounter.Add(float64(len(msg)))
return nil
default:
s.congestedEventErrorCounter.Inc()
return AppError{Type: ErrorTypeMessageCongested, Reason: "Send event message is congested"}
return AppError{Type: ErrorTypeMessageCongested, Reason: fmt.Sprintf("Send event message is congested, target: %s", s.targetId)}
}
}

func (s *remoteMessageTarget) sendCommand(msg ...*TargetMessage) error {
if !s.commandSender.ready.Load() {
s.connectionNotfoundErrorCounter.Inc()
return AppError{Type: ErrorTypeConnectionNotFound, Reason: "Stream has not been initialized"}
return AppError{Type: ErrorTypeConnectionNotFound, Reason: fmt.Sprintf("Stream has not been initialized, target: %s", s.targetId)}
}
select {
case <-s.ctx.Done():
s.connectionNotfoundErrorCounter.Inc()
return AppError{Type: ErrorTypeConnectionNotFound, Reason: "Stream has been closed"}
return AppError{Type: ErrorTypeConnectionNotFound, Reason: fmt.Sprintf("Stream has been closed, target: %s", s.targetId)}
case s.sendCmdCh <- s.newMessage(msg...):
s.sendCmdCounter.Add(float64(len(msg)))
return nil
default:
s.congestedCmdErrorCounter.Inc()
return AppError{Type: ErrorTypeMessageCongested, Reason: "Send command message is congested"}
return AppError{Type: ErrorTypeMessageCongested, Reason: fmt.Sprintf("Send command message is congested, target: %s", s.targetId)}
}
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/server/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
// 4. manager gc safe point
// 5. response for open API call
type Coordinator interface {
AsyncStop()
Stop()
// Run handles messages
Run(ctx context.Context) error
// ListChangefeeds returns all changefeeds
Expand Down
2 changes: 1 addition & 1 deletion server/module_election.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func (e *elector) campaignCoordinator(ctx context.Context) error {
e.svr.setCoordinator(co)
err = co.Run(ctx)
// When coordinator exits, we need to stop it.
e.svr.coordinator.AsyncStop()
e.svr.coordinator.Stop()
e.svr.setCoordinator(nil)
log.Info("coordinator stop", zap.String("captureID", string(e.svr.info.ID)),
zap.Int64("coordinatorVersion", coordinatorVersion), zap.Error(err))
Expand Down
Loading