Skip to content

Commit 6b91d3f

Browse files
LordofAvernusffffwh
authored and
ffffwh
committed
Fix bug deleting binlogrelay file
1 parent cd29576 commit 6b91d3f

File tree

2 files changed

+16
-27
lines changed

2 files changed

+16
-27
lines changed

driver/common/store.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,17 @@ func (sm *StoreManager) PutSourceType(jobName, sourceType string) error {
142142
return nil
143143
}
144144

145+
func (sm *StoreManager) GetNatsIfExist(jobName string) (string, bool, error) {
146+
natsKey := fmt.Sprintf("dtle/%v/NatsAddr", jobName)
147+
kv, err := sm.consulStore.Get(natsKey)
148+
if err == store.ErrKeyNotFound {
149+
return "", false, nil
150+
} else if err != nil {
151+
return "", false, err
152+
}
153+
return string(kv.Value), true, nil
154+
}
155+
145156
func (sm *StoreManager) DstPutNats(jobName string, natsAddr string, stopCh chan struct{}, onWatchError func(error)) error {
146157
sm.logger.Debug("DstPutNats")
147158

driver/driver.go

Lines changed: 5 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -390,30 +390,16 @@ func (d *Driver) SetConfig(c *base.Config) (err error) {
390390
}
391391

392392
func (d *Driver) loopCleanRelayDir() {
393-
stopCh := make(chan struct{})
394-
defer close(stopCh)
395-
396393
cleanDataDir := func() {
397394
files, err := ioutil.ReadDir(path.Join(d.config.DataDir, "binlog"))
398395
if err != nil {
399-
d.logger.Info("read dir failed", "dataDir", d.config.DataDir, "err", err)
400-
return
401-
}
402-
403-
jobs, err := d.storeManager.FindJobList()
404-
if err != nil {
405-
d.logger.Error("list jobs failed", "err", err)
396+
d.logger.Error("read dir failed", "dataDir", d.config.DataDir, "err", err)
406397
return
407398
}
408399

409400
for _, file := range files {
410-
existUnuseDir := true
411-
for i := range jobs {
412-
if jobs[i].JobId == file.Name() {
413-
existUnuseDir = false
414-
}
415-
}
416-
if !existUnuseDir {
401+
_, exist, err := d.storeManager.GetNatsIfExist(file.Name())
402+
if exist || err != nil {
417403
continue
418404
}
419405
if err := os.RemoveAll(path.Join(d.config.DataDir, "binlog", file.Name())); err != nil {
@@ -422,21 +408,13 @@ func (d *Driver) loopCleanRelayDir() {
422408
}
423409
}
424410

425-
jobKeysCh, err := d.storeManager.WatchTree("/dtleJobList/", stopCh)
426-
if err != nil {
427-
d.logger.Error("watch job tree error", "err", err)
428-
}
429411
cleanDuration := 12 * time.Hour
430412
cleanDelay := time.NewTimer(cleanDuration)
431413
defer cleanDelay.Stop()
432414
for {
433-
select {
434-
case <-jobKeysCh:
435-
cleanDataDir()
436-
case <-cleanDelay.C:
437-
cleanDataDir()
438-
}
439415
cleanDelay.Reset(cleanDuration)
416+
<-cleanDelay.C
417+
cleanDataDir()
440418
}
441419
}
442420

0 commit comments

Comments
 (0)