From 66e8207003ba858972ed4eca34e6c115f7d49d40 Mon Sep 17 00:00:00 2001 From: waldoweng Date: Wed, 14 Aug 2019 00:09:58 +0800 Subject: [PATCH] [bugfix] delete & close safely --- .gitignore | 3 ++- errors/errors.go | 12 ++++++----- storage/bitcask.go | 54 ++++++++++++++++++++++++++++++++++++++-------- 3 files changed, 54 insertions(+), 15 deletions(-) diff --git a/.gitignore b/.gitignore index 71a7493..906c6a2 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ # exclude data file -*.dat \ No newline at end of file +*.dat +.DS_Store \ No newline at end of file diff --git a/errors/errors.go b/errors/errors.go index 4940809..7cd5314 100644 --- a/errors/errors.go +++ b/errors/errors.go @@ -3,14 +3,16 @@ package errors import "errors" var ( - // ErrorSystemInternal for + // ErrorSystemInternal system internal error, something should not happend happens ErrorSystemInternal = errors.New("system internal error") - // ErrorDataNotFound for + // ErrorDataNotFound data not found ErrorDataNotFound = errors.New("data not found") - // ErrorReadFileData for + // ErrorReadFileData read wal file for record error ErrorReadFileData = errors.New("read file data error") - // ErrorParseFileData for + // ErrorParseFileData parse wal file to record error ErrorParseFileData = errors.New("parse file data error") - // ErrorIterateWalFile for + // ErrorIterateWalFile iterate throught wal file error ErrorIterateWalFile = errors.New("iterate over wal file error") + // ErrorSystemShuttingDown system is shutting down, not write allow now + ErrorSystemShuttingDown = errors.New("system is shutting down") ) diff --git a/storage/bitcask.go b/storage/bitcask.go index 5da2bbb..9ee2a89 100644 --- a/storage/bitcask.go +++ b/storage/bitcask.go @@ -317,6 +317,10 @@ func (b *Bitcask) Get(key string) (string, error) { b.mutex.RLock() defer func() { b.mutex.RUnlock() }() + if b.wchan == nil { + return "", beancaskError.ErrorSystemShuttingDown + } + if item, err := b.activeInstance.hashTable.Get(key); err == nil { var r Record err = b.activeInstance.walFile.ReadRecord(item.Offset, item.Len, &r) @@ -356,14 +360,30 @@ func (b *Bitcask) Set(key string, value string) error { resultChan := make(chan error, 1) defer func() { close(resultChan) }() - b.wchan <- struct { - key string - record Record - result chan error - }{ - key: key, - record: r, - result: resultChan, + + err := func() error { + b.mutex.Lock() + defer func() { b.mutex.Unlock() }() + + if b.wchan == nil { + return beancaskError.ErrorSystemShuttingDown + } + + b.wchan <- struct { + key string + record Record + result chan error + }{ + key: key, + record: r, + result: resultChan, + } + + return nil + }() + + if err != nil { + return err } return <-resultChan @@ -379,6 +399,10 @@ func (b *Bitcask) Delete(key string) error { b.mutex.Lock() defer func() { b.mutex.Unlock() }() + if b.wchan == nil { + return beancaskError.ErrorSystemShuttingDown + } + if !b.exists(key) { return beancaskError.ErrorDataNotFound } @@ -427,7 +451,11 @@ func (b *Bitcask) realSet() { var errorQ []error for { select { - case witem := <-b.wchan: + case witem, ok := <-b.wchan: + if !ok { + timer.Stop() + break + } itemQ = append(itemQ, witem) case <-timer.C: if len(itemQ) != 0 { @@ -551,6 +579,10 @@ func (b *Bitcask) compact(ndataFile int) error { b.mutex.Lock() defer func() { b.mutex.Unlock() }() + if b.wchan == nil { + return + } + for i := 0; i < ndataFile; i++ { b.instances[i].walFile.RemoveFile() } @@ -592,6 +624,8 @@ func (b *Bitcask) Destory() { for _, ins := range b.instances { ins.walFile.RemoveFile() } + + close(b.wchan) }) } @@ -602,5 +636,7 @@ func (b *Bitcask) Close() { for _, ins := range b.instances { ins.walFile.CloseFile(false) } + + close(b.wchan) }) }