Skip to content

Commit 3901b78

Browse files
author
ffffwh
committed
add log for big tx throttling
1 parent 1a21768 commit 3901b78

File tree

1 file changed

+16
-8
lines changed

1 file changed

+16
-8
lines changed

driver/mysql/binlog/binlog_reader.go

+16-8
Original file line numberDiff line numberDiff line change
@@ -927,25 +927,33 @@ func (b *BinlogReader) loadMapping(sql, currentSchema string, schemasRenameMap m
927927
}
928928

929929
func (b *BinlogReader) DataStreamEvents(entriesChannel chan<- *common.EntryContext) error {
930+
bigTxThrottlingCount := 0
930931
for {
931-
932-
// Check for shutdown
933932
if b.shutdown {
934933
break
935934
}
936935

937936
b.logger.Trace("b.HasBigTx.Wait. before")
938-
// Wait if this job has un-acked big tx or
937+
// Throttle if this job has un-acked big tx, or
939938
// there are too much global jobs with big tx.
940-
for i := 0; atomic.LoadInt32(&b.BigTxCount) > 0 || g.BigTxReachMax(); i++ {
941-
if b.shutdown {
939+
for !b.shutdown {
940+
localLimit, globalLimit := atomic.LoadInt32(&b.BigTxCount) > 0, g.BigTxReachMax()
941+
if !localLimit && !globalLimit {
942+
bigTxThrottlingCount = 0
942943
break
943944
}
944-
maxWaitMs := 1000
945-
if i >= maxWaitMs/10 {
945+
946+
bigTxThrottlingCount += 1
947+
sleepMs := 10
948+
if bigTxThrottlingCount%(1000/sleepMs) == 0 {
949+
// Force to read an event every 1000ms.
946950
break
947951
}
948-
time.Sleep(10 * time.Millisecond)
952+
time.Sleep(time.Duration(sleepMs) * time.Millisecond)
953+
if bigTxThrottlingCount * sleepMs >= 15 * 1000 {
954+
b.logger.Warn("reader big tx throttling for 15s", "local", localLimit, "global", globalLimit)
955+
bigTxThrottlingCount = 0
956+
}
949957
}
950958
b.logger.Trace("b.HasBigTx.Wait. after")
951959

0 commit comments

Comments
 (0)