Skip to content

Commit 6d647bb

Browse files
author
ffffwh
committed
remove DumpEntry.Err
1 parent 26f1e42 commit 6d647bb

File tree

7 files changed

+28
-89
lines changed

7 files changed

+28
-89
lines changed

driver/common/dumper.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,10 @@ type Dumper struct {
2727
Table *Table
2828
Iteration int64
2929
Columns string
30+
// ResultsChannel should be closed after writing all entries.
3031
ResultsChannel chan *DumpEntry
32+
// Set Err (if there is) before closing ResultsChannel.
33+
Err error
3134
shutdown bool
3235
ShutdownCh chan struct{}
3336
shutdownLock sync.Mutex
@@ -71,6 +74,7 @@ func (d *Dumper) Dump() error {
7174

7275
nRows, err := d.GetChunkData()
7376
if err != nil {
77+
d.Err = err
7478
d.Logger.Error("error at dump", "err", err)
7579
break
7680
}

driver/common/type.schema

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ struct DumpEntry {
77
TbSQL []string
88
ValuesX [][]*[]byte
99
TotalCount int64
10-
Err string
1110
Table []byte
1211
ColumnMapTo []string
1312
}

driver/common/type.schema.gen.go

Lines changed: 0 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ type DumpEntry struct {
2121
TbSQL []string
2222
ValuesX [][]*[]byte
2323
TotalCount int64
24-
Err string
2524
Table []byte
2625
ColumnMapTo []string
2726
}
@@ -228,21 +227,6 @@ func (d *DumpEntry) Size() (s uint64) {
228227
}
229228

230229
}
231-
{
232-
l := uint64(len(d.Err))
233-
234-
{
235-
236-
t := l
237-
for t >= 0x80 {
238-
t >>= 7
239-
s++
240-
}
241-
s++
242-
243-
}
244-
s += l
245-
}
246230
{
247231
l := uint64(len(d.Table))
248232

@@ -560,25 +544,6 @@ func (d *DumpEntry) Marshal(buf []byte) ([]byte, error) {
560544
buf[i+7+0] = byte(d.TotalCount >> 56)
561545

562546
}
563-
{
564-
l := uint64(len(d.Err))
565-
566-
{
567-
568-
t := uint64(l)
569-
570-
for t >= 0x80 {
571-
buf[i+8] = byte(t) | 0x80
572-
t >>= 7
573-
i++
574-
}
575-
buf[i+8] = byte(t)
576-
i++
577-
578-
}
579-
copy(buf[i+8:], d.Err)
580-
i += l
581-
}
582547
{
583548
l := uint64(len(d.Table))
584549

@@ -924,26 +889,6 @@ func (d *DumpEntry) Unmarshal(buf []byte) (uint64, error) {
924889
{
925890
l := uint64(0)
926891

927-
{
928-
929-
bs := uint8(7)
930-
t := uint64(buf[i+8] & 0x7F)
931-
for buf[i+8]&0x80 == 0x80 {
932-
i++
933-
t |= uint64(buf[i+8]&0x7F) << bs
934-
bs += 7
935-
}
936-
i++
937-
938-
l = t
939-
940-
}
941-
d.Err = string(buf[i+8 : i+8+l])
942-
i += l
943-
}
944-
{
945-
l := uint64(0)
946-
947892
{
948893

949894
bs := uint8(7)

driver/mysql/dumper.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ func (d *dumper) getChunkData() (nRows int64, err error) {
161161
}
162162
defer func() {
163163
if err != nil {
164-
entry.Err = err.Error()
164+
return
165165
}
166166
if err == nil && len(entry.ValuesX) == 0 {
167167
return

driver/mysql/extractor.go

Lines changed: 17 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1383,12 +1383,9 @@ func (e *Extractor) mysqlDump() error {
13831383
e.logger.Info("Step: scanning contents of x tables", "n", step, "x", e.tableCount)
13841384
startScan := g.CurrentTimeMillis()
13851385
counter := 0
1386-
//pool := models.NewPool(10)
13871386
for _, db := range e.replicateDoDb {
13881387
for _, tbCtx := range db.TableMap {
13891388
t := tbCtx.Table
1390-
//pool.Add(1)
1391-
//go func(t *config.Table) {
13921389
counter++
13931390
// Obtain a record maker for this table, which knows about the schema ...
13941391
// Choose how we create statements based on the # of rows ...
@@ -1402,34 +1399,29 @@ func (e *Extractor) mysqlDump() error {
14021399
e.dumpers = append(e.dumpers, d)
14031400
// Scan the rows in the table ...
14041401
for entry := range d.ResultsChannel {
1405-
if entry.Err != "" {
1406-
e.onError(common.TaskStateDead, fmt.Errorf(entry.Err))
1407-
} else {
1408-
memSize := int64(entry.Size())
1409-
if !d.sentTableDef {
1410-
tableBs, err := common.EncodeTable(d.Table)
1411-
if err != nil {
1412-
err = errors.Wrap(err, "full copy: EncodeTable")
1413-
e.onError(common.TaskStateDead, err)
1414-
return err
1415-
} else {
1416-
entry.Table = tableBs
1417-
d.sentTableDef = true
1418-
}
1419-
}
1420-
if err = e.encodeAndSendDumpEntry(entry); err != nil {
1402+
memSize := int64(entry.Size())
1403+
if !d.sentTableDef {
1404+
tableBs, err := common.EncodeTable(d.Table)
1405+
if err != nil {
1406+
err = errors.Wrap(err, "full copy: EncodeTable")
14211407
e.onError(common.TaskStateDead, err)
1408+
return err
1409+
} else {
1410+
entry.Table = tableBs
1411+
d.sentTableDef = true
14221412
}
1423-
atomic.AddInt64(&e.TotalRowsCopied, int64(len(entry.ValuesX)))
1424-
atomic.AddInt64(d.Memory, -memSize)
14251413
}
1414+
if err = e.encodeAndSendDumpEntry(entry); err != nil {
1415+
e.onError(common.TaskStateDead, err)
1416+
}
1417+
atomic.AddInt64(&e.TotalRowsCopied, int64(len(entry.ValuesX)))
1418+
atomic.AddInt64(d.Memory, -memSize)
1419+
}
1420+
if d.Err != nil {
1421+
e.onError(common.TaskStateDead, d.Err)
14261422
}
1427-
1428-
//pool.Done()
1429-
//}(tb)
14301423
}
14311424
}
1432-
//pool.Wait()
14331425
step++
14341426

14351427
// We've copied all of the tables, but our buffer holds onto the very last record.

driver/oracle/extractor/dumper.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ func (d *dumper) getChunkData() (nRows int64, err error) {
5555
// SELECT * FROM %s.%s AS OF SCN %d where ROWNUM < %d
5656
defer func() {
5757
if err != nil {
58-
entry.Err = err.Error()
58+
return
5959
}
6060
if err == nil && len(entry.ValuesX) == 0 {
6161
return

driver/oracle/extractor/extractor_oracle.go

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -964,14 +964,13 @@ func (e *ExtractorOracle) oracleDump() error {
964964

965965
// Scan the rows in the table ...
966966
for entry := range d.ResultsChannel {
967-
if entry.Err != "" {
968-
e.onError(common.TaskStateDead, fmt.Errorf(entry.Err))
969-
} else {
970-
if err := e.encodeAndSendDumpEntry(entry); err != nil {
971-
e.onError(common.TaskStateDead, err)
972-
}
967+
if err := e.encodeAndSendDumpEntry(entry); err != nil {
968+
e.onError(common.TaskStateDead, err)
973969
}
974970
}
971+
if d.Err != nil {
972+
e.onError(common.TaskStateDead, d.Err)
973+
}
975974
}
976975
}
977976
e.fullCopyCoordinates = &common.OracleCoordinates{

0 commit comments

Comments
 (0)