Skip to content

Commit 80123fb

Browse files
author
ffffwh
committed
dump: getChunkData: fetch all rows and split
misc: logs
1 parent b10c6cb commit 80123fb

File tree

2 files changed

+21
-17
lines changed

2 files changed

+21
-17
lines changed

driver/mysql/dumper.go

+17-12
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ package mysql
99
import (
1010
"context"
1111
"fmt"
12+
"github.com/pkg/errors"
1213
"os"
1314
"strings"
1415
"sync/atomic"
@@ -182,10 +183,8 @@ func (d *dumper) getChunkData() (nRows int64, err error) {
182183
d.Iteration += 1
183184
rows, err := d.db.Query(query)
184185
if err != nil {
185-
d.Logger.Debug("error at select chunk. query: ", query)
186-
newErr := fmt.Errorf("error at select chunk. err: %v", err)
187-
d.Logger.Error(newErr.Error())
188-
return 0, err
186+
d.Logger.Error("error at select chunk", "query", query)
187+
return 0, errors.Wrap(err, "select chunk")
189188
}
190189

191190
columns, err := rows.Columns()
@@ -264,9 +263,9 @@ func (d *dumper) getChunkData() (nRows int64, err error) {
264263
entrySize := 0
265264
var valuesX [][]*[]byte
266265
scanArgs := make([]interface{}, len(columns)) // tmp use, for casting `values` to `[]interface{}`
266+
splitPoints := []int{0}
267267
hasRow := rows.Next()
268268
for hasRow {
269-
nRows += 1
270269
rowValuesRaw := make([]*[]byte, len(columns))
271270
for i := range rowValuesRaw {
272271
scanArgs[i] = &rowValuesRaw[i]
@@ -283,14 +282,20 @@ func (d *dumper) getChunkData() (nRows int64, err error) {
283282
entrySize += getRowSize(rowValuesRaw)
284283

285284
if !hasRow || entrySize >= d.dumpEntryLimit {
286-
d.Logger.Debug("reach DUMP_ENTRY_LIMIT.", "size", entrySize, "n_row", len(valuesX))
287-
288-
err = handleEntry(valuesX, !hasRow)
289-
if err != nil {
290-
return 0, err
291-
}
285+
d.Logger.Debug("reach dumpEntryLimit.", "size", entrySize, "point", len(valuesX))
286+
splitPoints = append(splitPoints, len(valuesX))
292287
entrySize = 0
293-
valuesX = nil
288+
}
289+
}
290+
if err = rows.Err(); err != nil {
291+
return 0, err
292+
}
293+
nRows = int64(len(valuesX))
294+
295+
for i := 1; i < len(splitPoints); i++ {
296+
err = handleEntry(valuesX[splitPoints[i-1]:splitPoints[i]], i == len(splitPoints)-1)
297+
if err != nil {
298+
return 0, err
294299
}
295300
}
296301

driver/mysql/extractor.go

+4-5
Original file line numberDiff line numberDiff line change
@@ -1060,17 +1060,14 @@ func (e *Extractor) StreamEvents() error {
10601060
// retryOperation attempts up to `count` attempts at running given function,
10611061
// exiting as soon as it returns with non-error.
10621062
// gno: only for logging
1063-
func (e *Extractor) publish(subject string, txMsg []byte, gno int64) (err error) {
1064-
msgLen := len(txMsg)
1065-
1066-
data := txMsg
1063+
func (e *Extractor) publish(subject string, data []byte, gno int64) (err error) {
10671064
lenData := len(data)
10681065

10691066
// lenData < NatsMaxMsg: 1 msg
10701067
// lenData = k * NatsMaxMsg + b, where k >= 1 && b >= 0: (k+1) msg
10711068
// b could be 0. we send a zero-len msg as a sign of termination.
10721069
nSeg := lenData/g.NatsMaxMsg + 1
1073-
e.logger.Debug("publish. msg", "subject", subject, "gno", gno, "nSeg", nSeg, "spanLen", lenData, "msgLen", msgLen)
1070+
e.logger.Debug("publish. msg", "subject", subject, "gno", gno, "nSeg", nSeg, "spanLen", lenData)
10741071
bak := make([]byte, 4)
10751072
if nSeg > 1 {
10761073
// ensure there are 4 bytes to save iSeg
@@ -1439,10 +1436,12 @@ func (e *Extractor) encodeAndSendDumpEntry(entry *common.DumpEntry) error {
14391436
if err != nil {
14401437
return err
14411438
}
1439+
e.logger.Debug("encodeAndSendDumpEntry. after Marshal", "size", len(bs))
14421440
txMsg, err := common.Compress(bs)
14431441
if err != nil {
14441442
return errors.Wrap(err, "common.Compress")
14451443
}
1444+
e.logger.Debug("encodeAndSendDumpEntry. after Compress", "size", len(txMsg))
14461445
if err := e.publish(fmt.Sprintf("%s_full", e.subject), txMsg, 0); err != nil {
14471446
return err
14481447
}

0 commit comments

Comments
 (0)