Skip to content

Commit b10c6cb

Browse files
author
ffffwh
committed
workaround of #923
split big DumpEntries.
1 parent 6d647bb commit b10c6cb

File tree

4 files changed

+105
-82
lines changed

4 files changed

+105
-82
lines changed

driver/common/taskconfig.go

+1
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ type DtleTaskConfig struct {
6464
DependencyHistorySize int `codec:"DependencyHistorySize"`
6565
UseMySQLDependency bool `codec:"UseMySQLDependency"`
6666
ForeignKeyChecks bool `codec:"ForeignKeyChecks"`
67+
DumpEntryLimit int `codec:"DumpEntryLimit"`
6768

6869
SkipCreateDbTable bool `codec:"SkipCreateDbTable"`
6970
SkipPrivilegeCheck bool `codec:"SkipPrivilegeCheck"`

driver/driver.go

+2
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,8 @@ var (
168168
hclspec.NewLiteral(`true`)),
169169
"ForeignKeyChecks": hclspec.NewDefault(hclspec.NewAttr("ForeignKeyChecks", "bool", false),
170170
hclspec.NewLiteral(`true`)),
171+
"DumpEntryLimit": hclspec.NewDefault(hclspec.NewAttr("DumpEntryLimit", "number", false),
172+
hclspec.NewLiteral(`134217728`)),
171173
"OracleConfig": hclspec.NewBlock("OracleConfig", false, hclspec.NewObject(map[string]*hclspec.Spec{
172174
"ServiceName": hclspec.NewAttr("ServiceName", "string", true),
173175
"Host": hclspec.NewAttr("Host", "string", true),

driver/mysql/dumper.go

+100-81
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,12 @@ type dumper struct {
3434
doChecksum int
3535
oldWayDump bool
3636

37-
sentTableDef bool
37+
sentTableDef bool
38+
dumpEntryLimit int
3839
}
3940

4041
func NewDumper(ctx context.Context, db usql.QueryAble, table *common.Table, chunkSize int64,
41-
logger g.LoggerType, memory *int64) *dumper {
42+
logger g.LoggerType, memory *int64, dumpEntryLimit int) *dumper {
4243
dumper := &dumper{
4344
common.NewDumper(ctx, table, chunkSize, logger, memory),
4445
umconf.EscapeName(table.TableSchema),
@@ -47,6 +48,7 @@ func NewDumper(ctx context.Context, db usql.QueryAble, table *common.Table, chun
4748
0,
4849
false,
4950
false,
51+
dumpEntryLimit,
5052
}
5153
dumper.PrepareForDumping = dumper.prepareForDumping
5254
dumper.GetChunkData = dumper.getChunkData
@@ -154,42 +156,6 @@ func (d *dumper) buildQueryOnUniqueKey() string {
154156

155157
// dumps a specific chunk, reading chunk info from the channel
156158
func (d *dumper) getChunkData() (nRows int64, err error) {
157-
entry := &common.DumpEntry{
158-
TableSchema: d.TableSchema,
159-
TableName: d.TableName,
160-
ColumnMapTo: d.Table.ColumnMapTo,
161-
}
162-
defer func() {
163-
if err != nil {
164-
return
165-
}
166-
if err == nil && len(entry.ValuesX) == 0 {
167-
return
168-
}
169-
170-
keepGoing := true
171-
timer := time.NewTicker(pingInterval)
172-
defer timer.Stop()
173-
for keepGoing {
174-
select {
175-
case <-d.ShutdownCh:
176-
keepGoing = false
177-
case d.ResultsChannel <- entry:
178-
atomic.AddInt64(d.Memory, int64(entry.Size()))
179-
//d.logger.Debug("*** memory", "memory", atomic.LoadInt64(d.memory))
180-
keepGoing = false
181-
case <-timer.C:
182-
d.Logger.Debug("resultsChannel full. waiting and ping conn")
183-
var dummy int
184-
errPing := d.db.QueryRow("select 1").Scan(&dummy)
185-
if errPing != nil {
186-
d.Logger.Debug("ping query row got error.", "err", errPing)
187-
}
188-
}
189-
}
190-
d.Logger.Debug("resultsChannel", "n", len(d.ResultsChannel))
191-
}()
192-
193159
query := ""
194160
if d.oldWayDump || d.Table.UseUniqueKey == nil {
195161
query = d.buildQueryOldWay()
@@ -227,9 +193,80 @@ func (d *dumper) getChunkData() (nRows int64, err error) {
227193
return 0, err
228194
}
229195

230-
scanArgs := make([]interface{}, len(columns)) // tmp use, for casting `values` to `[]interface{}`
196+
handleEntry := func(valuesX [][]*[]byte, last bool) error {
197+
if len(valuesX) == 0 {
198+
return nil
199+
}
200+
201+
entry := &common.DumpEntry{
202+
TableSchema: g.StringElse(d.Table.TableSchemaRename, d.TableSchema),
203+
TableName: g.StringElse(d.Table.TableRename, d.TableName),
204+
ColumnMapTo: d.Table.ColumnMapTo,
205+
ValuesX: valuesX,
206+
}
207+
208+
if last {
209+
var lastVals []string
210+
for _, col := range entry.ValuesX[len(entry.ValuesX)-1] {
211+
lastVals = append(lastVals, usql.EscapeColRawToString(col))
212+
}
213+
214+
if d.Table.UseUniqueKey != nil {
215+
// lastVals must not be nil if len(data) > 0
216+
for i, col := range d.Table.UseUniqueKey.Columns.Columns {
217+
// TODO save the idx
218+
idx := d.Table.OriginalTableColumns.Ordinals[col.RawName]
219+
if idx > len(lastVals) {
220+
return fmt.Errorf("getChunkData. GetLastMaxVal: column index %v > n_column %v", idx, len(lastVals))
221+
} else {
222+
d.Table.UseUniqueKey.LastMaxVals[i] = lastVals[idx]
223+
}
224+
}
225+
d.Logger.Debug("GetLastMaxVal", "val", d.Table.UseUniqueKey.LastMaxVals)
226+
}
227+
}
228+
if len(d.Table.ColumnMap) > 0 {
229+
for i, oldRow := range entry.ValuesX {
230+
row := make([]*[]byte, len(d.Table.ColumnMap))
231+
for i := range d.Table.ColumnMap {
232+
fromIdx := d.Table.ColumnMap[i]
233+
row[i] = oldRow[fromIdx]
234+
}
235+
entry.ValuesX[i] = row
236+
}
237+
}
238+
239+
keepGoing := true
240+
timer := time.NewTicker(pingInterval)
241+
defer timer.Stop()
242+
for keepGoing {
243+
select {
244+
case <-d.ShutdownCh:
245+
keepGoing = false
246+
case d.ResultsChannel <- entry:
247+
atomic.AddInt64(d.Memory, int64(entry.Size()))
248+
//d.logger.Debug("*** memory", "memory", atomic.LoadInt64(d.memory))
249+
keepGoing = false
250+
case <-timer.C:
251+
d.Logger.Debug("resultsChannel full. waiting and ping conn")
252+
var dummy int
253+
errPing := d.db.QueryRow("select 1").Scan(&dummy)
254+
if errPing != nil {
255+
d.Logger.Debug("ping query row got error.", "err", errPing)
256+
}
257+
}
258+
}
259+
d.Logger.Debug("resultsChannel", "n", len(d.ResultsChannel))
231260

232-
for rows.Next() {
261+
return nil
262+
}
263+
264+
entrySize := 0
265+
var valuesX [][]*[]byte
266+
scanArgs := make([]interface{}, len(columns)) // tmp use, for casting `values` to `[]interface{}`
267+
hasRow := rows.Next()
268+
for hasRow {
269+
nRows += 1
233270
rowValuesRaw := make([]*[]byte, len(columns))
234271
for i := range rowValuesRaw {
235272
scanArgs[i] = &rowValuesRaw[i]
@@ -240,53 +277,35 @@ func (d *dumper) getChunkData() (nRows int64, err error) {
240277
return 0, err
241278
}
242279

243-
entry.ValuesX = append(entry.ValuesX, rowValuesRaw)
244-
}
245-
246-
nRows = int64(len(entry.ValuesX))
247-
d.Logger.Debug("getChunkData.", "n_row", nRows)
280+
hasRow = rows.Next()
248281

249-
if nRows > 0 {
250-
var lastVals []string
282+
valuesX = append(valuesX, rowValuesRaw)
283+
entrySize += getRowSize(rowValuesRaw)
251284

252-
for _, col := range entry.ValuesX[len(entry.ValuesX)-1] {
253-
lastVals = append(lastVals, usql.EscapeColRawToString(col))
254-
}
285+
if !hasRow || entrySize >= d.dumpEntryLimit {
286+
d.Logger.Debug("reach DUMP_ENTRY_LIMIT.", "size", entrySize, "n_row", len(valuesX))
255287

256-
if d.Table.UseUniqueKey != nil {
257-
// lastVals must not be nil if len(data) > 0
258-
for i, col := range d.Table.UseUniqueKey.Columns.Columns {
259-
// TODO save the idx
260-
idx := d.Table.OriginalTableColumns.Ordinals[col.RawName]
261-
if idx > len(lastVals) {
262-
return nRows, fmt.Errorf("getChunkData. GetLastMaxVal: column index %v > n_column %v", idx, len(lastVals))
263-
} else {
264-
d.Table.UseUniqueKey.LastMaxVals[i] = lastVals[idx]
265-
}
266-
}
267-
d.Logger.Debug("GetLastMaxVal", "val", d.Table.UseUniqueKey.LastMaxVals)
268-
}
269-
}
270-
if d.Table.TableRename != "" {
271-
entry.TableName = d.Table.TableRename
272-
}
273-
if d.Table.TableSchemaRename != "" {
274-
entry.TableSchema = d.Table.TableSchemaRename
275-
}
276-
if len(d.Table.ColumnMap) > 0 {
277-
for i, oldRow := range entry.ValuesX {
278-
row := make([]*[]byte, len(d.Table.ColumnMap))
279-
for i := range d.Table.ColumnMap {
280-
fromIdx := d.Table.ColumnMap[i]
281-
row[i] = oldRow[fromIdx]
288+
err = handleEntry(valuesX, !hasRow)
289+
if err != nil {
290+
return 0, err
282291
}
283-
entry.ValuesX[i] = row
292+
entrySize = 0
293+
valuesX = nil
284294
}
285295
}
286-
// ValuesX[i]: n-th row
287-
// ValuesX[i][j]: j-th col of n-th row
288-
// Values[i]: i-th chunk of rows
289-
// Values[i][j]: j-th row (in paren-wrapped string)
296+
297+
d.Logger.Debug("getChunkData.", "n_row", nRows)
290298

291299
return nRows, nil
292300
}
301+
302+
func getRowSize(row []*[]byte) (size int) {
303+
for i := range row {
304+
if row[i] == nil {
305+
size += 0
306+
} else {
307+
size += len(*row[i])
308+
}
309+
}
310+
return size
311+
}

driver/mysql/extractor.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -1392,7 +1392,8 @@ func (e *Extractor) mysqlDump() error {
13921392
e.logger.Info("Step n: - scanning table (i of N tables)",
13931393
"n", step, "schema", t.TableSchema, "table", t.TableName, "i", counter, "N", e.tableCount)
13941394

1395-
d := NewDumper(e.ctx, tx, t, e.mysqlContext.ChunkSize, e.logger.ResetNamed("dumper"), e.memory1)
1395+
d := NewDumper(e.ctx, tx, t, e.mysqlContext.ChunkSize, e.logger.ResetNamed("dumper"), e.memory1,
1396+
e.mysqlContext.DumpEntryLimit)
13961397
if err := d.Dump(); err != nil {
13971398
e.onError(common.TaskStateDead, err)
13981399
}

0 commit comments

Comments
 (0)