Skip to content

Commit 9060a56

Browse files
author
ffffwh
committed
bulk insert: merge multiple RowsEvents #829
1 parent af77db2 commit 9060a56

File tree

2 files changed

+37
-9
lines changed

2 files changed

+37
-9
lines changed

driver/common/binlog.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -118,8 +118,8 @@ type SchemaTable struct {
118118
Table string
119119
}
120120

121-
func NewDataEvent(databaseName, tableName string, dml int8, columnCount uint64, timestamp uint32) DataEvent {
122-
event := DataEvent{
121+
func NewDataEvent(databaseName, tableName string, dml int8, columnCount uint64, timestamp uint32) *DataEvent {
122+
event := &DataEvent{
123123
DatabaseName: databaseName,
124124
TableName: tableName,
125125
DML: dml,

driver/mysql/binlog/binlog_reader.go

Lines changed: 35 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -937,8 +937,9 @@ func (b *BinlogReader) DataStreamEvents(entriesChannel chan<- *common.EntryConte
937937
// Throttle if this job has un-acked big tx, or
938938
// there are too much global jobs with big tx.
939939
for !b.shutdown {
940-
localLimit, globalLimit := atomic.LoadInt32(&b.BigTxCount) > b.mysqlContext.BigTxSrcQueue, g.BigTxReachMax()
941-
if !localLimit && !globalLimit {
940+
localCount := atomic.LoadInt32(&b.BigTxCount)
941+
globalLimit := g.BigTxReachMax()
942+
if localCount <= b.mysqlContext.BigTxSrcQueue && !globalLimit {
942943
bigTxThrottlingCount = 0
943944
break
944945
}
@@ -951,7 +952,7 @@ func (b *BinlogReader) DataStreamEvents(entriesChannel chan<- *common.EntryConte
951952
}
952953
time.Sleep(time.Duration(sleepMs) * time.Millisecond)
953954
if bigTxThrottlingCount * sleepMs >= 15 * 1000 {
954-
b.logger.Warn("reader big tx throttling for 15s", "local", localLimit, "global", globalLimit)
955+
b.logger.Warn("reader big tx throttling for 15s", "local", localCount, "global", globalLimit)
955956
bigTxThrottlingCount = 0
956957
}
957958
}
@@ -1798,9 +1799,9 @@ func (b *BinlogReader) handleRowsEvent(ev *replication.BinlogEvent, rowsEvent *r
17981799
schemaName := string(rowsEvent.Table.Schema)
17991800
tableName := string(rowsEvent.Table.Table)
18001801
coordinate := b.entryContext.Entry.Coordinates.(*common.MySQLCoordinateTx)
1801-
b.logger.Debug("got rowsEvent", "schema", schemaName, "table", tableName,
1802-
"gno", coordinate.GNO,
1803-
"flags", rowsEvent.Flags, "tableFlags", rowsEvent.Table.Flags)
1802+
b.logger.Trace("got rowsEvent", "schema", schemaName, "table", tableName,
1803+
"gno", coordinate.GNO, "flags", rowsEvent.Flags, "tableFlags", rowsEvent.Table.Flags,
1804+
"nRows", len(rowsEvent.Rows))
18041805

18051806
dml := common.ToEventDML(ev.Header.EventType)
18061807
skip, table := b.skipRowEvent(rowsEvent, dml)
@@ -1943,7 +1944,34 @@ func (b *BinlogReader) handleRowsEvent(ev *replication.BinlogEvent, rowsEvent *r
19431944
}
19441945

19451946
if len(dmlEvent.Rows) > 0 {
1946-
b.entryContext.Entry.Events = append(b.entryContext.Entry.Events, dmlEvent)
1947+
// return 0 if the last event could be reused.
1948+
reuseLast := func() int {
1949+
if len(b.entryContext.Entry.Events) == 0 {
1950+
return 1
1951+
}
1952+
lastEvent := &b.entryContext.Entry.Events[len(b.entryContext.Entry.Events)-1]
1953+
if dml != common.InsertDML || lastEvent.DML != common.InsertDML {
1954+
return 2
1955+
}
1956+
if lastEvent.DatabaseName != dmlEvent.DatabaseName || lastEvent.TableName != dmlEvent.TableName {
1957+
return 3
1958+
}
1959+
if bytes.Compare(lastEvent.Flags, dmlEvent.Flags) != 0 {
1960+
return 4
1961+
}
1962+
if dmlEvent.FKParent != lastEvent.FKParent {
1963+
return 5
1964+
}
1965+
1966+
lastEvent.Rows = append(lastEvent.Rows, dmlEvent.Rows...)
1967+
b.logger.Debug("reuseLast. reusing", "nRows", len(lastEvent.Rows))
1968+
return 0
1969+
}()
1970+
1971+
if reuseLast != 0 {
1972+
b.logger.Debug("reuseLast. not reusing", "step", reuseLast)
1973+
b.entryContext.Entry.Events = append(b.entryContext.Entry.Events, *dmlEvent)
1974+
}
19471975

19481976
if b.entryContext.OriginalSize >= bigTxSplittingSize {
19491977
b.logger.Debug("splitting big tx", "index", b.entryContext.Entry.Index)

0 commit comments

Comments
 (0)