Skip to content

Commit

Permalink
[bugfix] delete & close safely
Browse files Browse the repository at this point in the history
  • Loading branch information
waldoweng committed Aug 13, 2019
1 parent 4505e5d commit 66e8207
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 15 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
# exclude data file
*.dat
*.dat
.DS_Store
12 changes: 7 additions & 5 deletions errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,16 @@ package errors
import "errors"

var (
// ErrorSystemInternal for
// ErrorSystemInternal system internal error, something should not happend happens
ErrorSystemInternal = errors.New("system internal error")
// ErrorDataNotFound for
// ErrorDataNotFound data not found
ErrorDataNotFound = errors.New("data not found")
// ErrorReadFileData for
// ErrorReadFileData read wal file for record error
ErrorReadFileData = errors.New("read file data error")
// ErrorParseFileData for
// ErrorParseFileData parse wal file to record error
ErrorParseFileData = errors.New("parse file data error")
// ErrorIterateWalFile for
// ErrorIterateWalFile iterate throught wal file error
ErrorIterateWalFile = errors.New("iterate over wal file error")
// ErrorSystemShuttingDown system is shutting down, not write allow now
ErrorSystemShuttingDown = errors.New("system is shutting down")
)
54 changes: 45 additions & 9 deletions storage/bitcask.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,10 @@ func (b *Bitcask) Get(key string) (string, error) {
b.mutex.RLock()
defer func() { b.mutex.RUnlock() }()

if b.wchan == nil {
return "", beancaskError.ErrorSystemShuttingDown
}

if item, err := b.activeInstance.hashTable.Get(key); err == nil {
var r Record
err = b.activeInstance.walFile.ReadRecord(item.Offset, item.Len, &r)
Expand Down Expand Up @@ -356,14 +360,30 @@ func (b *Bitcask) Set(key string, value string) error {

resultChan := make(chan error, 1)
defer func() { close(resultChan) }()
b.wchan <- struct {
key string
record Record
result chan error
}{
key: key,
record: r,
result: resultChan,

err := func() error {
b.mutex.Lock()
defer func() { b.mutex.Unlock() }()

if b.wchan == nil {
return beancaskError.ErrorSystemShuttingDown
}

b.wchan <- struct {
key string
record Record
result chan error
}{
key: key,
record: r,
result: resultChan,
}

return nil
}()

if err != nil {
return err
}

return <-resultChan
Expand All @@ -379,6 +399,10 @@ func (b *Bitcask) Delete(key string) error {
b.mutex.Lock()
defer func() { b.mutex.Unlock() }()

if b.wchan == nil {
return beancaskError.ErrorSystemShuttingDown
}

if !b.exists(key) {
return beancaskError.ErrorDataNotFound
}
Expand Down Expand Up @@ -427,7 +451,11 @@ func (b *Bitcask) realSet() {
var errorQ []error
for {
select {
case witem := <-b.wchan:
case witem, ok := <-b.wchan:
if !ok {
timer.Stop()
break
}
itemQ = append(itemQ, witem)
case <-timer.C:
if len(itemQ) != 0 {
Expand Down Expand Up @@ -551,6 +579,10 @@ func (b *Bitcask) compact(ndataFile int) error {
b.mutex.Lock()
defer func() { b.mutex.Unlock() }()

if b.wchan == nil {
return
}

for i := 0; i < ndataFile; i++ {
b.instances[i].walFile.RemoveFile()
}
Expand Down Expand Up @@ -592,6 +624,8 @@ func (b *Bitcask) Destory() {
for _, ins := range b.instances {
ins.walFile.RemoveFile()
}

close(b.wchan)
})
}

Expand All @@ -602,5 +636,7 @@ func (b *Bitcask) Close() {
for _, ins := range b.instances {
ins.walFile.CloseFile(false)
}

close(b.wchan)
})
}

0 comments on commit 66e8207

Please sign in to comment.