Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,12 @@ fmt.Println(string(value)) // "world"
// Persist the KVS to storage
reference, err := kvs.Save(ctx)

// Release resources
_ = kvs.Close()

// Later, load the KVS from its reference
loadedKvs, err := pot.NewSwarmKvsReference(persister, reference)
...
```

### Index
Expand Down Expand Up @@ -144,6 +148,9 @@ err := index.Iterate(prefix, targetKey, func(entry pot.Entry) (bool, error) {

// Persist the index
ref, err := index.Save(context.Background())

// Release resources
_ = index.Close()
```

## Proof System & Blockchain Integration
Expand Down
72 changes: 51 additions & 21 deletions index.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,31 @@ package pot

import (
"context"
"errors"
"fmt"

"github.com/ethersphere/proximity-order-trie/pkg/elements"
)

// Index represents a mutable pot
type Index struct {
mode elements.Mode // mode
read chan elements.Node // hands out current root for reads
write chan elements.Node // hands out current root for writes and locks
root chan elements.Node // channel for new roots
quit chan struct{} // closing this channel signals quit
mode elements.Mode // mode
read chan elements.Node // hands out current root for reads
write chan elements.Node // hands out current root for writes and locks
root chan elements.Node // channel for new roots
quit chan struct{} // closing this channel signals quit
closed bool
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok so this is not needed, in fact checking this field creates a race condition if Close is called concurrently with any other function that checks it.

}

// New constructs a new mutable pot
func New(mode elements.Mode) (*Index, error) {
idx := &Index{
mode: mode,
read: make(chan elements.Node),
write: make(chan elements.Node),
root: make(chan elements.Node),
quit: make(chan struct{}),
mode: mode,
read: make(chan elements.Node),
write: make(chan elements.Node),
root: make(chan elements.Node),
quit: make(chan struct{}),
closed: false,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this assignment is unnecessary since the zero value of boolean is false

}

root := idx.mode.New()
Expand All @@ -34,11 +37,12 @@ func New(mode elements.Mode) (*Index, error) {
// NewReference constructs a new mutable pot from a reference
func NewReference(ctx context.Context, mode elements.Mode, ref []byte) (*Index, error) {
idx := &Index{
mode: mode,
read: make(chan elements.Node),
write: make(chan elements.Node),
root: make(chan elements.Node),
quit: make(chan struct{}),
mode: mode,
read: make(chan elements.Node),
write: make(chan elements.Node),
root: make(chan elements.Node),
quit: make(chan struct{}),
closed: false,
}

root, loaded, err := idx.mode.Load(ctx, ref)
Expand All @@ -60,6 +64,7 @@ func (idx *Index) muxProcess(root elements.Node) {
for {
select {
case <-quit:
idx.closed = true
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a write write race with close

return
case idx.read <- root: //
case write <- root: // write locks the pot for writes
Expand All @@ -74,7 +79,7 @@ func (idx *Index) muxProcess(root elements.Node) {

// Add inserts an entry to the mutable pot
func (idx *Index) Add(ctx context.Context, e elements.Entry) error {
return idx.Update(ctx, e.Key(), &e )
return idx.Update(ctx, e.Key(), &e)
}

// Delete removes the entry at the given key from the mutable pot
Expand All @@ -86,6 +91,10 @@ func (idx *Index) Delete(ctx context.Context, k []byte) error {
func (idx *Index) Update(ctx context.Context, k []byte, e *elements.Entry) error {
var root elements.Node

if idx.closed {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is read/write race on the boolean closed

return errors.New("trie closed")
}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but it is even worse cos if Close is called when we are here concurrently then the following select will hang on line 114 until the context cancels :))

// get the pot root and capture the write lock
select {
case <-ctx.Done():
Expand All @@ -112,6 +121,11 @@ func (idx *Index) Update(ctx context.Context, k []byte, e *elements.Entry) error

// Find retrieves the entry at the given key from the mutable pot or gives elements.ErrNotFound
func (idx *Index) Find(ctx context.Context, k []byte) (elements.Entry, error) {

if idx.closed {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exact same as update func

return nil, errors.New("trie closed")
}

select {
case <-ctx.Done():
return nil, ctx.Err()
Expand All @@ -122,20 +136,29 @@ func (idx *Index) Find(ctx context.Context, k []byte) (elements.Entry, error) {

// Iterate wraps the underlying pot's iterator
func (idx *Index) Iterate(ctx context.Context, p, k []byte, f func(elements.Entry) (stop bool, err error)) error {
if idx.closed {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same

return errors.New("trie closed")
}
return elements.Iterate(ctx, elements.NewAt(0, <-idx.read), p, k, idx.mode, f)
}

// Size returns the size (number of entries) of the pot
func (idx *Index) Size() int {
func (idx *Index) Size() (int, error) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so here you dont need to return an error at all and this is where you should see that the pattern is wrong. So there is no reason one cannot call Size on a closed pot

if idx.closed {
return 0, errors.New("trie closed")
}
root := <-idx.read
if root == nil {
return 0
return 0, nil
}
return root.Size()
return root.Size(), nil
}

// Save calls the mode specific save method for the root node
func (idx *Index) Save(ctx context.Context) ([]byte, error) {
if idx.closed {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

like Size, no reason not to allow Save on a closed

return nil, errors.New("trie closed")
}
root := <-idx.read
if root.Empty() {
return nil, fmt.Errorf("root node is nil")
Expand All @@ -145,12 +168,19 @@ func (idx *Index) Save(ctx context.Context) ([]byte, error) {

// Close quits the process loop and closes the mode
func (idx *Index) Close() error {
if idx.closed {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you are protecting here from a concurrent call to Close then it is not gonna work and you can still get a close on closed channel error

return errors.New("trie closed")
}
close(idx.quit)
idx.closed = true
return nil
}

// String pretty prints the current state of the pot
func (idx *Index) String() string {
func (idx *Index) String() (string, error) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as Size, Save : no reason why not

if idx.closed {
return "", errors.New("trie closed")
}
root := <-idx.read
return elements.NewAt(0, root).String()
return elements.NewAt(0, root).String(), nil
}
6 changes: 3 additions & 3 deletions index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ func TestSize(t *testing.T) {
defer cancel()
t.Run("add", func(t *testing.T) {
for i := 0; i < count; i++ {
size := idx.Size()
size, _ := idx.Size()
if size != i {
t.Fatalf("incorrect number of items. want %d, got %d", i, size)
}
Expand All @@ -280,7 +280,7 @@ func TestSize(t *testing.T) {
t.Run("update", func(t *testing.T) {
for i := 0; i < count; i++ {
idx.Add(ctx, &mockEntry{newDetMockEntry(t, i).key, 10000})
size := idx.Size()
size, _ := idx.Size()
if size != count {
t.Fatalf("incorrect number of items. want %d, got %d", count, size)
}
Expand All @@ -289,7 +289,7 @@ func TestSize(t *testing.T) {
t.Run("delete", func(t *testing.T) {
for i := 0; i < count; i++ {
idx.Delete(ctx, newDetMockEntry(t, i).key)
size := idx.Size()
size, _ := idx.Size()
if size != count-i-1 {
t.Fatalf("incorrect number of items. want %d, got %d", count-i-1, size)
}
Expand Down
6 changes: 5 additions & 1 deletion kvs.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ func NewSwarmKvsReference(ctx context.Context, ls persister.LoadSaver, ref []byt
}, nil
}

// Close stops the mutex
func (ps *SwarmKvs) Close() error {
return ps.idx.Close()
}

// Get retrieves the value associated with the given key.
func (ps *SwarmKvs) Get(ctx context.Context, key []byte) ([]byte, error) {
entry, err := ps.idx.Find(ctx, key)
Expand Down Expand Up @@ -99,4 +104,3 @@ func (ps *SwarmKvs) Delete(ctx context.Context, key []byte) error {
}
return nil
}

29 changes: 29 additions & 0 deletions kvs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,4 +134,33 @@ func TestPotKvs_Save(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, val2, val)
})
t.Run("Create KVS, write to it, close it", func(t *testing.T) {
ls := createLs()
kvs1, _ := pot.NewSwarmKvs(ls)

err := kvs1.Put(ctx, key1, val1)
assert.NoError(t, err)

_, err = kvs1.Save(ctx)
assert.NoError(t, err)

err = kvs1.Close()
assert.NoError(t, err)
})
t.Run("Create KVS, write to it, close it, error when writing to closed KVS", func(t *testing.T) {
ls := createLs()
kvs1, _ := pot.NewSwarmKvs(ls)

err := kvs1.Put(ctx, key1, val1)
assert.NoError(t, err)

_, err = kvs1.Save(ctx)
assert.NoError(t, err)

err = kvs1.Close()
assert.NoError(t, err)

err = kvs1.Put(ctx, key2, val2)
assert.Error(t, err, "trie closed")
})
}
Loading