diff --git a/api/v2/coordinator.go b/api/v2/coordinator.go index 119d99f07..82c17f9e9 100644 --- a/api/v2/coordinator.go +++ b/api/v2/coordinator.go @@ -29,7 +29,7 @@ import ( func (h *OpenAPIV2) ResignOwner(c *gin.Context) { o, _ := h.server.GetCoordinator() if o != nil { - o.AsyncStop() + o.Stop() } c.JSON(getStatus(c), &EmptyResponse{}) diff --git a/cmd/cdc/server/server.go b/cmd/cdc/server/server.go index d539fd539..34f9a6380 100644 --- a/cmd/cdc/server/server.go +++ b/cmd/cdc/server/server.go @@ -112,6 +112,16 @@ func (o *options) run(cmd *cobra.Command) error { log.Info("TiCDC(new arch) server created", zap.Strings("pd", o.pdEndpoints), zap.Stringer("config", o.serverConfig)) + // shutdown is used to notify the server to shutdown (by closing the context) when receiving the signal, and exit the process. + shutdown := func() <-chan struct{} { + done := make(chan struct{}) + cancel() + close(done) + return done + } + // Gracefully shutdown the server when receiving the signal, and exit the process. + util.InitSignalHandling(shutdown, cancel) + err = svr.Run(ctx) if err != nil && !errors.Is(errors.Cause(err), context.Canceled) { log.Warn("cdc server exits with error", zap.Error(err)) diff --git a/coordinator/coordinator.go b/coordinator/coordinator.go index 59239e6be..e2a33f567 100644 --- a/coordinator/coordinator.go +++ b/coordinator/coordinator.go @@ -123,7 +123,6 @@ func New(node *node.Info, mc.RegisterHandler(messaging.CoordinatorTopic, c.recvMessages) c.taskScheduler = threadpool.NewThreadPoolDefault() - c.closed.Store(false) controller := NewController( c.version, @@ -147,18 +146,41 @@ func New(node *node.Info, log.Info("Coordinator changed, and I am not the coordinator, stop myself", zap.String("selfID", string(c.nodeInfo.ID)), zap.String("newCoordinatorID", newCoordinatorID)) - c.AsyncStop() + c.Stop() } }) return c } -func (c *coordinator) recvMessages(_ context.Context, msg *messaging.TargetMessage) error { +func (c *coordinator) recvMessages(ctx context.Context, msg *messaging.TargetMessage) error { if c.closed.Load() { return nil } - c.eventCh.In() <- &Event{message: msg} + + defer func() { + if r := recover(); r != nil { + // There is chance that: + // 1. Just before the c.eventCh is closed, the recvMessages is called + // 2. Then the goroutine(call it g1) that calls recvMessages is scheduled out by runtime, and the msg is in flight + // 3. The c.eventCh is closed by another goroutine(call it g2) + // 4. g1 is scheduled back by runtime, and the msg is sent to the closed channel + // 5. g1 panics + // To avoid the panic, we have two choices: + // 1. Use a mutex to protect this function, but it will reduce the throughput + // 2. Recover the panic, and log the error + // We choose the second option here. + log.Error("panic in recvMessages", zap.Any("msg", msg), zap.Any("panic", r)) + } + }() + + select { + case <-ctx.Done(): + return ctx.Err() + default: + c.eventCh.In() <- &Event{message: msg} + } + return nil } @@ -385,13 +407,14 @@ func (c *coordinator) GetChangefeed(ctx context.Context, changefeedDisplayName c return c.controller.GetChangefeed(ctx, changefeedDisplayName) } -func (c *coordinator) AsyncStop() { +func (c *coordinator) Stop() { if c.closed.CompareAndSwap(false, true) { c.mc.DeRegisterHandler(messaging.CoordinatorTopic) c.controller.Stop() c.taskScheduler.Stop() - c.eventCh.CloseAndDrain() c.cancel() + // close eventCh after cancel, to avoid send or get event from the channel + c.eventCh.CloseAndDrain() } } diff --git a/coordinator/coordinator_test.go b/coordinator/coordinator_test.go index d1bf4ebe9..a0695e086 100644 --- a/coordinator/coordinator_test.go +++ b/coordinator/coordinator_test.go @@ -22,6 +22,7 @@ import ( "net/http" "net/http/pprof" "strconv" + "sync" "testing" "time" @@ -477,6 +478,132 @@ func TestBootstrapWithUnStoppedChangefeed(t *testing.T) { }, waitTime, time.Millisecond*5) } +func TestConcurrentStopAndSendEvents(t *testing.T) { + // Setup context + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Initialize node info + info := node.NewInfo("127.0.0.1:28600", "") + etcdClient := newMockEtcdClient(string(info.ID)) + nodeManager := watcher.NewNodeManager(nil, etcdClient) + appcontext.SetService(watcher.NodeManagerName, nodeManager) + nodeManager.GetAliveNodes()[info.ID] = info + + // Initialize message center + mc := messaging.NewMessageCenter(ctx, info.ID, 0, config.NewDefaultMessageCenterConfig(), nil) + mc.Run(ctx) + defer mc.Close() + appcontext.SetService(appcontext.MessageCenter, mc) + + // Initialize backend + ctrl := gomock.NewController(t) + defer ctrl.Finish() + backend := mock_changefeed.NewMockBackend(ctrl) + backend.EXPECT().GetAllChangefeeds(gomock.Any()).Return(map[common.ChangeFeedID]*changefeed.ChangefeedMetaWrapper{}, nil).AnyTimes() + + // Create coordinator + cr := New(info, &mockPdClient{}, pdutil.NewClock4Test(), backend, "test-gc-service", 100, 10000, time.Millisecond*10) + co := cr.(*coordinator) + + // Number of goroutines for each operation + const ( + sendEventGoroutines = 10 + stopGoroutines = 5 + eventsPerGoroutine = 100 + ) + + var wg sync.WaitGroup + wg.Add(sendEventGoroutines + stopGoroutines) + + // Start the coordinator + ctxRun, cancelRun := context.WithCancel(ctx) + go func() { + err := cr.Run(ctxRun) + if err != nil && err != context.Canceled { + t.Errorf("Coordinator Run returned unexpected error: %v", err) + } + }() + + // Give coordinator some time to initialize + time.Sleep(100 * time.Millisecond) + + // Start goroutines to send events + for i := 0; i < sendEventGoroutines; i++ { + go func(id int) { + defer wg.Done() + defer func() { + // Recover from potential panics + if r := recover(); r != nil { + t.Errorf("Panic in send event goroutine %d: %v", id, r) + } + }() + + for j := 0; j < eventsPerGoroutine; j++ { + // Try to send an event + if co.closed.Load() { + // Coordinator is already closed, stop sending + return + } + + msg := &messaging.TargetMessage{ + Topic: messaging.CoordinatorTopic, + Type: messaging.TypeMaintainerHeartbeatRequest, + } + + // Use recvMessages to send event to channel + err := co.recvMessages(ctx, msg) + if err != nil && err != context.Canceled { + t.Logf("Failed to send event in goroutine %d: %v", id, err) + } + + // Small sleep to increase chance of race conditions + time.Sleep(time.Millisecond) + } + }(i) + } + + // Start goroutines to stop the coordinator + for i := 0; i < stopGoroutines; i++ { + go func(id int) { + defer wg.Done() + // Small delay to ensure some events are sent first + time.Sleep(time.Duration(10+id*5) * time.Millisecond) + co.Stop() + }(i) + } + + // Wait for all goroutines to complete + wg.Wait() + + // Cancel the context to ensure the coordinator stops + cancelRun() + + // Give some time for the coordinator to fully stop + time.Sleep(100 * time.Millisecond) + + // Verify that the coordinator is closed + require.True(t, co.closed.Load()) + + // Verify that event channel is closed + select { + case _, ok := <-co.eventCh.Out(): + require.False(t, ok, "Event channel should be closed") + default: + // Channel might be already drained, which is fine + } + + // Try sending another event - should not panic but may return error + msg := &messaging.TargetMessage{ + Topic: messaging.CoordinatorTopic, + Type: messaging.TypeMaintainerHeartbeatRequest, + } + + err := co.recvMessages(ctx, msg) + require.NoError(t, err) + require.True(t, co.closed.Load()) +} + type maintainNode struct { cancel context.CancelFunc mc messaging.MessageCenter diff --git a/downstreamadapter/dispatchermanager/heartbeat_collector.go b/downstreamadapter/dispatchermanager/heartbeat_collector.go index 3a8355958..3af8b859b 100644 --- a/downstreamadapter/dispatchermanager/heartbeat_collector.go +++ b/downstreamadapter/dispatchermanager/heartbeat_collector.go @@ -212,6 +212,7 @@ func (c *HeartBeatCollector) RecvMessages(_ context.Context, msg *messaging.Targ } func (c *HeartBeatCollector) Close() { + log.Info("heartbeat collector is closing") c.mc.DeRegisterHandler(messaging.HeartbeatCollectorTopic) c.cancel() c.wg.Wait() diff --git a/downstreamadapter/dispatcherorchestrator/dispatcher_orchestrator.go b/downstreamadapter/dispatcherorchestrator/dispatcher_orchestrator.go index d18d17912..6cec95681 100644 --- a/downstreamadapter/dispatcherorchestrator/dispatcher_orchestrator.go +++ b/downstreamadapter/dispatcherorchestrator/dispatcher_orchestrator.go @@ -273,7 +273,9 @@ func (m *DispatcherOrchestrator) sendResponse(to node.ID, topic string, msg mess } func (m *DispatcherOrchestrator) Close() { + log.Info("dispatcher orchestrator is closing") m.mc.DeRegisterHandler(messaging.DispatcherManagerManagerTopic) + log.Info("dispatcher orchestrator closed") } // handleDispatcherError creates and sends an error response for create dispatcher-related failures diff --git a/downstreamadapter/eventcollector/event_collector.go b/downstreamadapter/eventcollector/event_collector.go index 780e75b9c..c9259bf04 100644 --- a/downstreamadapter/eventcollector/event_collector.go +++ b/downstreamadapter/eventcollector/event_collector.go @@ -161,9 +161,9 @@ func (c *EventCollector) Run(ctx context.Context) { } func (c *EventCollector) Close() { + log.Info("event collector is closing") c.cancel() c.ds.Close() - c.changefeedIDMap.Range(func(key, value any) bool { cfID := value.(common.ChangeFeedID) // Remove metrics for the changefeed. diff --git a/pkg/messaging/message_center.go b/pkg/messaging/message_center.go index db35d3dca..5793e3fd7 100644 --- a/pkg/messaging/message_center.go +++ b/pkg/messaging/message_center.go @@ -278,6 +278,7 @@ func (mc *messageCenter) ReceiveCmd() (*TargetMessage, error) { // Close stops the grpc server and stops all the connections to the remote targets. func (mc *messageCenter) Close() { + log.Info("message center is closing", zap.Stringer("id", mc.id)) mc.remoteTargets.RLock() defer mc.remoteTargets.RUnlock() diff --git a/pkg/messaging/proto/message.proto b/pkg/messaging/proto/message.proto index 3b5edcaba..43fdc85cc 100644 --- a/pkg/messaging/proto/message.proto +++ b/pkg/messaging/proto/message.proto @@ -14,7 +14,6 @@ message Message { int32 type = 5; // topic is the destination of the message, it is used to route the message to the correct handler. string topic = 6; - // TODO, change to real types repeated bytes payload = 7; } diff --git a/pkg/messaging/target.go b/pkg/messaging/target.go index f5b86daea..50c136a5d 100644 --- a/pkg/messaging/target.go +++ b/pkg/messaging/target.go @@ -116,36 +116,36 @@ func (s *remoteMessageTarget) Epoch() uint64 { func (s *remoteMessageTarget) sendEvent(msg ...*TargetMessage) error { if !s.eventSender.ready.Load() { s.connectionNotfoundErrorCounter.Inc() - return AppError{Type: ErrorTypeConnectionNotFound, Reason: "Stream has not been initialized"} + return AppError{Type: ErrorTypeConnectionNotFound, Reason: fmt.Sprintf("Stream has not been initialized, target: %s", s.targetId)} } select { case <-s.ctx.Done(): s.connectionNotfoundErrorCounter.Inc() - return AppError{Type: ErrorTypeConnectionNotFound, Reason: "Stream has been closed"} + return AppError{Type: ErrorTypeConnectionNotFound, Reason: fmt.Sprintf("Stream has been closed, target: %s", s.targetId)} case s.sendEventCh <- s.newMessage(msg...): s.sendEventCounter.Add(float64(len(msg))) return nil default: s.congestedEventErrorCounter.Inc() - return AppError{Type: ErrorTypeMessageCongested, Reason: "Send event message is congested"} + return AppError{Type: ErrorTypeMessageCongested, Reason: fmt.Sprintf("Send event message is congested, target: %s", s.targetId)} } } func (s *remoteMessageTarget) sendCommand(msg ...*TargetMessage) error { if !s.commandSender.ready.Load() { s.connectionNotfoundErrorCounter.Inc() - return AppError{Type: ErrorTypeConnectionNotFound, Reason: "Stream has not been initialized"} + return AppError{Type: ErrorTypeConnectionNotFound, Reason: fmt.Sprintf("Stream has not been initialized, target: %s", s.targetId)} } select { case <-s.ctx.Done(): s.connectionNotfoundErrorCounter.Inc() - return AppError{Type: ErrorTypeConnectionNotFound, Reason: "Stream has been closed"} + return AppError{Type: ErrorTypeConnectionNotFound, Reason: fmt.Sprintf("Stream has been closed, target: %s", s.targetId)} case s.sendCmdCh <- s.newMessage(msg...): s.sendCmdCounter.Add(float64(len(msg))) return nil default: s.congestedCmdErrorCounter.Inc() - return AppError{Type: ErrorTypeMessageCongested, Reason: "Send command message is congested"} + return AppError{Type: ErrorTypeMessageCongested, Reason: fmt.Sprintf("Send command message is congested, target: %s", s.targetId)} } } diff --git a/pkg/server/coordinator.go b/pkg/server/coordinator.go index fdb46b8a7..bd3b68a18 100644 --- a/pkg/server/coordinator.go +++ b/pkg/server/coordinator.go @@ -27,7 +27,7 @@ import ( // 4. manager gc safe point // 5. response for open API call type Coordinator interface { - AsyncStop() + Stop() // Run handles messages Run(ctx context.Context) error // ListChangefeeds returns all changefeeds diff --git a/server/module_election.go b/server/module_election.go index 0e759fbea..8f6a9d2f1 100644 --- a/server/module_election.go +++ b/server/module_election.go @@ -144,7 +144,7 @@ func (e *elector) campaignCoordinator(ctx context.Context) error { e.svr.setCoordinator(co) err = co.Run(ctx) // When coordinator exits, we need to stop it. - e.svr.coordinator.AsyncStop() + e.svr.coordinator.Stop() e.svr.setCoordinator(nil) log.Info("coordinator stop", zap.String("captureID", string(e.svr.info.ID)), zap.Int64("coordinatorVersion", coordinatorVersion), zap.Error(err)) diff --git a/server/server.go b/server/server.go index 644788797..e4e71d08b 100644 --- a/server/server.go +++ b/server/server.go @@ -17,6 +17,7 @@ import ( "context" "strings" "sync" + "sync/atomic" "time" "github.com/pingcap/log" @@ -51,7 +52,8 @@ import ( ) const ( - cleanMetaDuration = 10 * time.Second + closeServiceTimeout = 15 * time.Second + cleanMetaDuration = 10 * time.Second ) type server struct { @@ -87,6 +89,8 @@ type server struct { // subModules is the modules will be start after PreServices are started // And will be closed when the server is closing subModules []common.SubModule + + closed atomic.Bool } // New returns a new Server instance @@ -275,23 +279,24 @@ func (c *server) GetCoordinator() (tiserver.Coordinator, error) { // it also closes the coordinator and processorManager // Note: this function should be reentrant func (c *server) Close(ctx context.Context) { + if !c.closed.CompareAndSwap(false, true) { + return + } log.Info("server closing", zap.Any("ServerInfo", c.info)) // Safety: Here we mainly want to stop the coordinator // and ignore it if the coordinator does not exist or is not set. o, _ := c.GetCoordinator() if o != nil { - o.AsyncStop() + o.Stop() log.Info("coordinator closed", zap.String("captureID", string(c.info.ID))) } - for _, service := range c.preServices { - service.Close() - } + closeGroup := c.closePreServices() for _, subModule := range c.subModules { if err := subModule.Close(ctx); err != nil { - log.Warn("failed to close sub watcher", - zap.String("watcher", subModule.Name()), + log.Warn("failed to close sub module", + zap.String("module", subModule.Name()), zap.Error(err)) } log.Info("sub module closed", zap.String("module", subModule.Name())) @@ -304,11 +309,39 @@ func (c *server) Close(ctx context.Context) { log.Warn("failed to delete server info when server exited", zap.String("captureID", string(c.info.ID)), zap.Error(err)) + } else { + log.Info("server info deleted from etcd", zap.String("captureID", string(c.info.ID))) } + closeGroup.Wait() log.Info("server closed", zap.Any("ServerInfo", c.info)) } +func (c *server) closePreServices() *errgroup.Group { + closeCtx, cancel := context.WithTimeout(context.Background(), closeServiceTimeout) + defer cancel() + closeGroup, closeCtx := errgroup.WithContext(closeCtx) + for _, service := range c.preServices { + s := service + closeGroup.Go(func() error { + done := make(chan struct{}) + go func() { + s.Close() + close(done) + }() + + select { + case <-done: + return nil + case <-closeCtx.Done(): + log.Warn("service close operation timed out", zap.Error(closeCtx.Err())) + return closeCtx.Err() + } + }) + } + return closeGroup +} + // Liveness returns liveness of the server. func (c *server) Liveness() model.Liveness { return c.liveness.Load() diff --git a/tests/integration_tests/_utils/cleanup_process b/tests/integration_tests/_utils/cleanup_process index f9dac3145..4d3d6c73e 100755 --- a/tests/integration_tests/_utils/cleanup_process +++ b/tests/integration_tests/_utils/cleanup_process @@ -2,7 +2,7 @@ # parameter 1: process name process=$1 -retry_count=20 +retry_count=60 killall $process || true diff --git a/tests/integration_tests/_utils/kill_cdc_pid b/tests/integration_tests/_utils/kill_cdc_pid index e5fd0f843..993c10108 100755 --- a/tests/integration_tests/_utils/kill_cdc_pid +++ b/tests/integration_tests/_utils/kill_cdc_pid @@ -2,7 +2,7 @@ # parameter 1: process pid=$1 -retry_count=20 +retry_count=60 kill $pid || true diff --git a/tests/integration_tests/_utils/random_kill_process b/tests/integration_tests/_utils/random_kill_process index c40784cd6..0e9fc2cae 100755 --- a/tests/integration_tests/_utils/random_kill_process +++ b/tests/integration_tests/_utils/random_kill_process @@ -2,7 +2,7 @@ # parameter 1: process name process=$1 -retry_count=20 +retry_count=60 pids=($(pidof $process)) echo "list pids " ${pids[@]} diff --git a/tests/integration_tests/changefeed_error/run.sh b/tests/integration_tests/changefeed_error/run.sh index a6943fd94..4bbe5008e 100755 --- a/tests/integration_tests/changefeed_error/run.sh +++ b/tests/integration_tests/changefeed_error/run.sh @@ -73,8 +73,8 @@ function run() { export GO_FAILPOINTS='' cleanup_process $CDC_BINARY + # make sure old capture key and old owner key expire in etcd - ETCDCTL_API=3 etcdctl get /tidb/cdc/default/__cdc_meta__/capture --prefix | grep -v "capture" ensure $MAX_RETRIES "check_etcd_meta_not_exist '/tidb/cdc/default/__cdc_meta__/capture' 'capture'" ensure $MAX_RETRIES "check_etcd_meta_not_exist '/tidb/cdc/default/__cdc_meta__/owner' 'owner'" echo "Pass case 1" diff --git a/tests/integration_tests/ddl_sequence/run.sh b/tests/integration_tests/ddl_sequence/run.sh index d9730e46e..aaf1cc47c 100644 --- a/tests/integration_tests/ddl_sequence/run.sh +++ b/tests/integration_tests/ddl_sequence/run.sh @@ -30,7 +30,7 @@ function run() { ;; *) SINK_URI="mysql://normal:123456@127.0.0.1:3306/" ;; esac - run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" + do_retry 5 3 run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" case $SINK_TYPE in kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;; diff --git a/tests/integration_tests/fail_over_ddl_A/run.sh b/tests/integration_tests/fail_over_ddl_A/run.sh index e51e8262f..e070349d8 100644 --- a/tests/integration_tests/fail_over_ddl_A/run.sh +++ b/tests/integration_tests/fail_over_ddl_A/run.sh @@ -179,6 +179,7 @@ function failOverCaseA-3() { # restart cdc server to enable failpoint cdc_pid_1=$(ps -C $CDC_BINARY -o pid= | awk '{print $1}') kill_cdc_pid $cdc_pid_1 + export GO_FAILPOINTS='github.com/pingcap/ticdc/downstreamadapter/dispatcher/BlockOrWaitReportAfterWrite=pause' run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --logsuffix "0-1" --addr "127.0.0.1:8300" cdc_pid_1=$(ps -C $CDC_BINARY -o pid= | awk '{print $1}') @@ -238,6 +239,7 @@ function failOverCaseA-4() { # restart cdc server to enable failpoint cdc_pid_1=$(ps -C $CDC_BINARY -o pid= | awk '{print $1}') kill_cdc_pid $cdc_pid_1 + export GO_FAILPOINTS='github.com/pingcap/ticdc/downstreamadapter/dispatcher/BlockOrWaitReportAfterWrite=pause' run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --logsuffix "0-1" --addr "127.0.0.1:8300" cdc_pid_1=$(ps -C $CDC_BINARY -o pid= | awk '{print $1}') diff --git a/tests/integration_tests/fail_over_ddl_mix/conf/diff_config.toml b/tests/integration_tests/fail_over_ddl_mix/conf/diff_config.toml index 6b87f38de..816db3397 100644 --- a/tests/integration_tests/fail_over_ddl_mix/conf/diff_config.toml +++ b/tests/integration_tests/fail_over_ddl_mix/conf/diff_config.toml @@ -15,13 +15,13 @@ target-instance = "tidb0" target-check-tables = ["test.*"] [data-sources] -[data-sources.mysql1] +[data-sources.tidb0] host = "127.0.0.1" port = 4000 user = "root" password = "" -[data-sources.tidb0] +[data-sources.mysql1] host = "127.0.0.1" port = 3306 user = "root" diff --git a/tests/integration_tests/fail_over_ddl_mix/run.sh b/tests/integration_tests/fail_over_ddl_mix/run.sh index 57650f4db..d3c12b15a 100644 --- a/tests/integration_tests/fail_over_ddl_mix/run.sh +++ b/tests/integration_tests/fail_over_ddl_mix/run.sh @@ -148,9 +148,9 @@ function kill_server() { if [ -z "$cdc_pid_1" ]; then continue fi - kill -9 $cdc_pid_1 + kill_cdc_pid $cdc_pid_1 - sleep 5 + sleep 15 run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --logsuffix "0-$count" --addr "127.0.0.1:8300" ;; 1) @@ -158,9 +158,9 @@ function kill_server() { if [ -z "$cdc_pid_2" ]; then continue fi - kill -9 $cdc_pid_2 + kill_cdc_pid $cdc_pid_2 - sleep 5 + sleep 15 run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --logsuffix "1-$count" --addr "127.0.0.1:8301" ;; esac diff --git a/tests/integration_tests/fail_over_ddl_mix_with_syncpoint/run.sh b/tests/integration_tests/fail_over_ddl_mix_with_syncpoint/run.sh index 92daa571d..de675d7a0 100644 --- a/tests/integration_tests/fail_over_ddl_mix_with_syncpoint/run.sh +++ b/tests/integration_tests/fail_over_ddl_mix_with_syncpoint/run.sh @@ -89,9 +89,9 @@ function kill_server() { if [ -z "$cdc_pid_1" ]; then continue fi - kill -9 $cdc_pid_1 + kill_cdc_pid $cdc_pid_1 - sleep 5 + sleep 10 run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --logsuffix "0-$count" --addr "127.0.0.1:8300" ;; 1) @@ -99,9 +99,9 @@ function kill_server() { if [ -z "$cdc_pid_2" ]; then continue fi - kill -9 $cdc_pid_2 + kill_cdc_pid $cdc_pid_2 - sleep 5 + sleep 10 run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --logsuffix "1-$count" --addr "127.0.0.1:8301" ;; esac @@ -130,11 +130,11 @@ main() { kill_server - sleep 10 + sleep 15 kill -9 $DDL_PID $DML_PID_1 $DML_PID_2 $DML_PID_3 $DML_PID_4 $DML_PID_5 - sleep 10 + sleep 15 check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml 500 diff --git a/tests/integration_tests/move_table/run.sh b/tests/integration_tests/move_table/run.sh index 381b41531..862dbef3b 100644 --- a/tests/integration_tests/move_table/run.sh +++ b/tests/integration_tests/move_table/run.sh @@ -42,6 +42,7 @@ function run() { run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --loglevel "debug" --logsuffix "2" --addr 127.0.0.1:8301 run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --loglevel "debug" --logsuffix "3" --addr 127.0.0.1:8302 + sleep 15 # Add a check table to reduce check time, or if we check data with sync diff # directly, there maybe a lot of diff data at first because of the incremental scan run_sql "CREATE table move_table.check1(id int primary key);" ${UP_TIDB_HOST} ${UP_TIDB_PORT} diff --git a/tests/integration_tests/tiflash/run.sh b/tests/integration_tests/tiflash/run.sh index b344f731d..6a6bab1a0 100644 --- a/tests/integration_tests/tiflash/run.sh +++ b/tests/integration_tests/tiflash/run.sh @@ -38,9 +38,9 @@ function run() { auth-tls-private-key-path="${WORK_DIR}/broker_client.key-pk8.pem" auth-tls-certificate-path="${WORK_DIR}/broker_client.cert.pem" EOF - cdc cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" --config=$WORK_DIR/pulsar_test.toml + do_retry 5 3 cdc cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" --config=$WORK_DIR/pulsar_test.toml else - cdc cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" + do_retry 5 3 cdc cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" fi case $SINK_TYPE in