diff --git a/README.md b/README.md index 7110f81..8f11749 100644 --- a/README.md +++ b/README.md @@ -108,8 +108,12 @@ fmt.Println(string(value)) // "world" // Persist the KVS to storage reference, err := kvs.Save(ctx) +// Release resources +_ = kvs.Close() + // Later, load the KVS from its reference loadedKvs, err := pot.NewSwarmKvsReference(persister, reference) +... ``` ### Index @@ -144,6 +148,9 @@ err := index.Iterate(prefix, targetKey, func(entry pot.Entry) (bool, error) { // Persist the index ref, err := index.Save(context.Background()) + +// Release resources +_ = index.Close() ``` ## Proof System & Blockchain Integration diff --git a/index.go b/index.go index b56d625..e108ac0 100644 --- a/index.go +++ b/index.go @@ -2,6 +2,7 @@ package pot import ( "context" + "errors" "fmt" "github.com/ethersphere/proximity-order-trie/pkg/elements" @@ -9,21 +10,23 @@ import ( // Index represents a mutable pot type Index struct { - mode elements.Mode // mode - read chan elements.Node // hands out current root for reads - write chan elements.Node // hands out current root for writes and locks - root chan elements.Node // channel for new roots - quit chan struct{} // closing this channel signals quit + mode elements.Mode // mode + read chan elements.Node // hands out current root for reads + write chan elements.Node // hands out current root for writes and locks + root chan elements.Node // channel for new roots + quit chan struct{} // closing this channel signals quit + closed bool } // New constructs a new mutable pot func New(mode elements.Mode) (*Index, error) { idx := &Index{ - mode: mode, - read: make(chan elements.Node), - write: make(chan elements.Node), - root: make(chan elements.Node), - quit: make(chan struct{}), + mode: mode, + read: make(chan elements.Node), + write: make(chan elements.Node), + root: make(chan elements.Node), + quit: make(chan struct{}), + closed: false, } root := idx.mode.New() @@ -34,11 +37,12 @@ func New(mode elements.Mode) (*Index, error) { // NewReference constructs a new mutable pot from a reference func NewReference(ctx context.Context, mode elements.Mode, ref []byte) (*Index, error) { idx := &Index{ - mode: mode, - read: make(chan elements.Node), - write: make(chan elements.Node), - root: make(chan elements.Node), - quit: make(chan struct{}), + mode: mode, + read: make(chan elements.Node), + write: make(chan elements.Node), + root: make(chan elements.Node), + quit: make(chan struct{}), + closed: false, } root, loaded, err := idx.mode.Load(ctx, ref) @@ -60,6 +64,7 @@ func (idx *Index) muxProcess(root elements.Node) { for { select { case <-quit: + idx.closed = true return case idx.read <- root: // case write <- root: // write locks the pot for writes @@ -74,7 +79,7 @@ func (idx *Index) muxProcess(root elements.Node) { // Add inserts an entry to the mutable pot func (idx *Index) Add(ctx context.Context, e elements.Entry) error { - return idx.Update(ctx, e.Key(), &e ) + return idx.Update(ctx, e.Key(), &e) } // Delete removes the entry at the given key from the mutable pot @@ -86,6 +91,10 @@ func (idx *Index) Delete(ctx context.Context, k []byte) error { func (idx *Index) Update(ctx context.Context, k []byte, e *elements.Entry) error { var root elements.Node + if idx.closed { + return errors.New("trie closed") + } + // get the pot root and capture the write lock select { case <-ctx.Done(): @@ -112,6 +121,11 @@ func (idx *Index) Update(ctx context.Context, k []byte, e *elements.Entry) error // Find retrieves the entry at the given key from the mutable pot or gives elements.ErrNotFound func (idx *Index) Find(ctx context.Context, k []byte) (elements.Entry, error) { + + if idx.closed { + return nil, errors.New("trie closed") + } + select { case <-ctx.Done(): return nil, ctx.Err() @@ -122,20 +136,29 @@ func (idx *Index) Find(ctx context.Context, k []byte) (elements.Entry, error) { // Iterate wraps the underlying pot's iterator func (idx *Index) Iterate(ctx context.Context, p, k []byte, f func(elements.Entry) (stop bool, err error)) error { + if idx.closed { + return errors.New("trie closed") + } return elements.Iterate(ctx, elements.NewAt(0, <-idx.read), p, k, idx.mode, f) } // Size returns the size (number of entries) of the pot -func (idx *Index) Size() int { +func (idx *Index) Size() (int, error) { + if idx.closed { + return 0, errors.New("trie closed") + } root := <-idx.read if root == nil { - return 0 + return 0, nil } - return root.Size() + return root.Size(), nil } // Save calls the mode specific save method for the root node func (idx *Index) Save(ctx context.Context) ([]byte, error) { + if idx.closed { + return nil, errors.New("trie closed") + } root := <-idx.read if root.Empty() { return nil, fmt.Errorf("root node is nil") @@ -145,12 +168,19 @@ func (idx *Index) Save(ctx context.Context) ([]byte, error) { // Close quits the process loop and closes the mode func (idx *Index) Close() error { + if idx.closed { + return errors.New("trie closed") + } close(idx.quit) + idx.closed = true return nil } // String pretty prints the current state of the pot -func (idx *Index) String() string { +func (idx *Index) String() (string, error) { + if idx.closed { + return "", errors.New("trie closed") + } root := <-idx.read - return elements.NewAt(0, root).String() + return elements.NewAt(0, root).String(), nil } diff --git a/index_test.go b/index_test.go index 477b92f..447a058 100644 --- a/index_test.go +++ b/index_test.go @@ -270,7 +270,7 @@ func TestSize(t *testing.T) { defer cancel() t.Run("add", func(t *testing.T) { for i := 0; i < count; i++ { - size := idx.Size() + size, _ := idx.Size() if size != i { t.Fatalf("incorrect number of items. want %d, got %d", i, size) } @@ -280,7 +280,7 @@ func TestSize(t *testing.T) { t.Run("update", func(t *testing.T) { for i := 0; i < count; i++ { idx.Add(ctx, &mockEntry{newDetMockEntry(t, i).key, 10000}) - size := idx.Size() + size, _ := idx.Size() if size != count { t.Fatalf("incorrect number of items. want %d, got %d", count, size) } @@ -289,7 +289,7 @@ func TestSize(t *testing.T) { t.Run("delete", func(t *testing.T) { for i := 0; i < count; i++ { idx.Delete(ctx, newDetMockEntry(t, i).key) - size := idx.Size() + size, _ := idx.Size() if size != count-i-1 { t.Fatalf("incorrect number of items. want %d, got %d", count-i-1, size) } diff --git a/kvs.go b/kvs.go index ca52491..0328189 100644 --- a/kvs.go +++ b/kvs.go @@ -59,6 +59,11 @@ func NewSwarmKvsReference(ctx context.Context, ls persister.LoadSaver, ref []byt }, nil } +// Close stops the mutex +func (ps *SwarmKvs) Close() error { + return ps.idx.Close() +} + // Get retrieves the value associated with the given key. func (ps *SwarmKvs) Get(ctx context.Context, key []byte) ([]byte, error) { entry, err := ps.idx.Find(ctx, key) @@ -99,4 +104,3 @@ func (ps *SwarmKvs) Delete(ctx context.Context, key []byte) error { } return nil } - diff --git a/kvs_test.go b/kvs_test.go index a537f71..7f91682 100644 --- a/kvs_test.go +++ b/kvs_test.go @@ -134,4 +134,33 @@ func TestPotKvs_Save(t *testing.T) { assert.NoError(t, err) assert.Equal(t, val2, val) }) + t.Run("Create KVS, write to it, close it", func(t *testing.T) { + ls := createLs() + kvs1, _ := pot.NewSwarmKvs(ls) + + err := kvs1.Put(ctx, key1, val1) + assert.NoError(t, err) + + _, err = kvs1.Save(ctx) + assert.NoError(t, err) + + err = kvs1.Close() + assert.NoError(t, err) + }) + t.Run("Create KVS, write to it, close it, error when writing to closed KVS", func(t *testing.T) { + ls := createLs() + kvs1, _ := pot.NewSwarmKvs(ls) + + err := kvs1.Put(ctx, key1, val1) + assert.NoError(t, err) + + _, err = kvs1.Save(ctx) + assert.NoError(t, err) + + err = kvs1.Close() + assert.NoError(t, err) + + err = kvs1.Put(ctx, key2, val2) + assert.Error(t, err, "trie closed") + }) }