1
1
package mysql
2
2
3
3
import (
4
- "context"
5
4
"fmt"
6
5
"sync"
7
6
"time"
@@ -33,15 +32,14 @@ type taskHandle struct {
33
32
34
33
runner DriverHandle
35
34
36
- ctx context.Context
37
- cancelFunc context.CancelFunc
38
- waitCh chan * drivers.ExitResult
39
- stats * common.TaskStatistics
35
+ waitCh chan * drivers.ExitResult
36
+ doneCh chan struct {}
37
+ stats * common.TaskStatistics
40
38
41
39
driverConfig * common.MySQLDriverConfig
42
40
}
43
41
44
- func newDtleTaskHandle (ctx context. Context , logger g.LoggerType , cfg * drivers.TaskConfig , state drivers.TaskState , started time.Time ) * taskHandle {
42
+ func newDtleTaskHandle (logger g.LoggerType , cfg * drivers.TaskConfig , state drivers.TaskState , started time.Time ) * taskHandle {
45
43
h := & taskHandle {
46
44
logger : logger ,
47
45
stateLock : sync.RWMutex {},
@@ -50,9 +48,9 @@ func newDtleTaskHandle(ctx context.Context, logger g.LoggerType, cfg *drivers.Ta
50
48
startedAt : started ,
51
49
completedAt : time.Time {},
52
50
exitResult : nil ,
53
- waitCh : make (chan * drivers.ExitResult , 1 ),
51
+ waitCh : make (chan * drivers.ExitResult ),
52
+ doneCh : make (chan struct {}),
54
53
}
55
- h .ctx , h .cancelFunc = context .WithCancel (ctx )
56
54
return h
57
55
}
58
56
@@ -137,7 +135,8 @@ func (h *taskHandle) run(d *Driver) {
137
135
t := time .NewTimer (0 )
138
136
for {
139
137
select {
140
- case <- h .ctx .Done ():
138
+ case <- h .doneCh :
139
+ t .Stop ()
141
140
return
142
141
case <- t .C :
143
142
if h .runner != nil {
@@ -168,12 +167,12 @@ func (h *taskHandle) NewRunner(d *Driver) (runner DriverHandle, err error) {
168
167
case common .TaskTypeSrc :
169
168
if h .driverConfig .OracleConfig != nil {
170
169
h .logger .Debug ("found oracle src" , "OracleConfig" , h .driverConfig .OracleConfig )
171
- runner , err = extractor .NewExtractorOracle (ctx , h .driverConfig , h .logger , d .storeManager , h .waitCh , h .ctx )
170
+ runner , err = extractor .NewExtractorOracle (ctx , h .driverConfig , h .logger , d .storeManager , h .waitCh , d .ctx )
172
171
if err != nil {
173
172
return nil , errors .Wrap (err , "NewExtractor" )
174
173
}
175
174
} else {
176
- runner , err = mysql .NewExtractor (ctx , h .driverConfig , h .logger , d .storeManager , h .waitCh , h .ctx )
175
+ runner , err = mysql .NewExtractor (ctx , h .driverConfig , h .logger , d .storeManager , h .waitCh , d .ctx )
177
176
if err != nil {
178
177
return nil , errors .Wrap (err , "NewOracleExtractor" )
179
178
}
@@ -183,13 +182,13 @@ func (h *taskHandle) NewRunner(d *Driver) (runner DriverHandle, err error) {
183
182
if h .driverConfig .KafkaConfig != nil {
184
183
h .logger .Debug ("found kafka" , "KafkaConfig" , h .driverConfig .KafkaConfig )
185
184
runner , err = kafka .NewKafkaRunner (ctx , h .driverConfig .KafkaConfig , h .logger ,
186
- d .storeManager , d .config .NatsAdvertise , h .waitCh , h .ctx )
185
+ d .storeManager , d .config .NatsAdvertise , h .waitCh , d .ctx )
187
186
if err != nil {
188
187
return nil , errors .Wrap (err , "NewKafkaRunner" )
189
188
}
190
189
} else {
191
190
runner , err = mysql .NewApplier (ctx , h .driverConfig , h .logger , d .storeManager ,
192
- d .config .NatsAdvertise , h .waitCh , d .eventer , h .taskConfig , h .ctx )
191
+ d .config .NatsAdvertise , h .waitCh , d .eventer , h .taskConfig , d .ctx )
193
192
if err != nil {
194
193
return nil , errors .Wrap (err , "NewApplier" )
195
194
}
@@ -267,14 +266,17 @@ func (h *taskHandle) emitStats(ru *common.TaskStatistics) {
267
266
268
267
func (h * taskHandle ) Destroy () bool {
269
268
h .stateLock .RLock ()
270
- //driver.des
271
- h .cancelFunc ()
269
+ defer h .stateLock .RUnlock ()
270
+
271
+ close (h .doneCh )
272
+
272
273
if h .runner != nil {
273
274
err := h .runner .Shutdown ()
274
275
if err != nil {
275
276
h .logger .Error ("error in h.runner.Shutdown" , "err" , err )
276
277
}
277
278
}
279
+
278
280
return h .procState == drivers .TaskStateExited
279
281
}
280
282
0 commit comments