@@ -753,20 +753,26 @@ func (a *Applier) initiateStreaming() error {
753753 a .mysqlContext .MarkRowCopyStartTime ()
754754 a .logger .Debugf ("mysql.applier: nats subscribe" )
755755 _ , err := a .natsConn .Subscribe (fmt .Sprintf ("%s_full" , a .subject ), func (m * gonats.Msg ) {
756- a .logger .Debugf ("mysql.applier: recv a msg" )
757- dumpData := & DumpEntry {}
758- if err := Decode (m .Data , dumpData ); err != nil {
759- a .onError (TaskStateDead , err )
760- }
761- atomic .AddInt64 (& a .nDumpEntry , 1 )
762- a .copyRowsQueue <- dumpData
763- a .logger .Debugf ("mysql.applier: copyRowsQueue: %v" , len (a .copyRowsQueue ))
764- a .mysqlContext .Stage = models .StageSlaveWaitingForWorkersToProcessQueue
765- if err := a .natsConn .Publish (m .Reply , nil ); err != nil {
766- a .onError (TaskStateDead , err )
756+ a .logger .Debugf ("mysql.applier: recv a msg. copyRowsQueue: %v" , len (a .copyRowsQueue ))
757+ // TODO possible optimization: if the queue has a vacant before extractor timeout, ack
758+ // the msg to avoid extractor resending.
759+ if cap (a .copyRowsQueue ) - len (a .copyRowsQueue ) < 1 {
760+ a .logger .Debugf ("applier. full. discarding entries" )
761+ a .mysqlContext .Stage = models .StageSlaveWaitingForWorkersToProcessQueue
762+ } else {
763+ dumpData := & DumpEntry {}
764+ if err := Decode (m .Data , dumpData ); err != nil {
765+ a .onError (TaskStateDead , err )
766+ }
767+ atomic .AddInt64 (& a .nDumpEntry , 1 )
768+ a .copyRowsQueue <- dumpData
769+ a .mysqlContext .Stage = models .StageSlaveWaitingForWorkersToProcessQueue
770+ if err := a .natsConn .Publish (m .Reply , nil ); err != nil {
771+ a .onError (TaskStateDead , err )
772+ }
773+ a .logger .Debugf ("mysql.applier: after publish nats reply" )
774+ atomic .AddInt64 (& a .mysqlContext .RowsEstimate , dumpData .TotalCount )
767775 }
768- a .logger .Debugf ("mysql.applier: after publish nats reply" )
769- atomic .AddInt64 (& a .mysqlContext .RowsEstimate , dumpData .TotalCount )
770776 })
771777 /*if err := sub.SetPendingLimits(a.mysqlContext.MsgsLimit, a.mysqlContext.BytesLimit); err != nil {
772778 return err
0 commit comments