Skip to content

Commit

Permalink
add page iterator (LPI) for local pagination
Browse files Browse the repository at this point in the history
* lpi package
* part three, prev. commit: 5089413

Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Jan 21, 2025
1 parent 5089413 commit 693ff5c
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 37 deletions.
51 changes: 29 additions & 22 deletions fs/lpi.go → fs/lpi/lpi.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
// Package fs provides mountpath and FQN abstractions and methods to resolve/map stored content
// Package lpi: local page iterator
/*
* Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved.
*/
package fs
package lpi

import (
"errors"
Expand All @@ -18,25 +18,30 @@ import (
)

const (
allPages = ""
// to fill `Page` with the entire remaining content,
// with no limits that may otherwise be imposed by
// page size (`Msg.Size`) and/or end-of-page (`Msg.EOP`)
AllPages = ""
)

type (
lpiPage map[string]struct{}
Page map[string]struct{}

lpiMsg struct {
eop string
size int
Msg struct {
EOP string // until end-of-page marker
Size int // so-many entries
}

LocalPageIt struct {
page lpiPage
// local page iterator (LPI)
// NOTE: it is caller's responsibility to serialize access _or_ take locks.
Iter struct {
page Page
root string

// runtime
current string
next string
msg lpiMsg
msg Msg
lr int
}
)
Expand All @@ -45,7 +50,7 @@ var (
errStop = errors.New("stop")
)

func newLocalPageIt(root string) (*LocalPageIt, error) {
func New(root string) (*Iter, error) {
finfo, err := os.Stat(root)
if err != nil {
return nil, fmt.Errorf("root fstat: %v", err)
Expand All @@ -54,7 +59,7 @@ func newLocalPageIt(root string) (*LocalPageIt, error) {
return nil, fmt.Errorf("root is not a directory: %s", root)
}

lpi := &LocalPageIt{root: root}
lpi := &Iter{root: root}
{
lpi.next = lpi.root
lpi.lr = len(lpi.root)
Expand All @@ -64,7 +69,9 @@ func newLocalPageIt(root string) (*LocalPageIt, error) {
return lpi, nil
}

func (lpi *LocalPageIt) do(msg lpiMsg, out lpiPage) error {
func (lpi *Iter) Pos() string { return lpi.next }

func (lpi *Iter) Next(msg Msg, out Page) error {
{
clear(out)
lpi.page = out
Expand All @@ -73,11 +80,11 @@ func (lpi *LocalPageIt) do(msg lpiMsg, out lpiPage) error {
}
debug.Assert(strings.HasPrefix(lpi.current, lpi.root), lpi.current, " vs ", lpi.root)

if lpi.msg.eop != allPages {
if lpi.current > lpi.msg.eop {
return fmt.Errorf("expected (end-of-page) %q > %q (current)", lpi.msg.eop, lpi.current)
if lpi.msg.EOP != AllPages {
if lpi.current > lpi.msg.EOP {
return fmt.Errorf("expected (end-of-page) %q > %q (current)", lpi.msg.EOP, lpi.current)
}
if _, err := os.Stat(lpi.msg.eop); err != nil {
if _, err := os.Stat(lpi.msg.EOP); err != nil {
return fmt.Errorf("end-of-page fstat: %v", err)
}
}
Expand All @@ -95,7 +102,7 @@ func (lpi *LocalPageIt) do(msg lpiMsg, out lpiPage) error {
return nil
}

func (lpi *LocalPageIt) Callback(pathname string, de *godirwalk.Dirent) (err error) {
func (lpi *Iter) Callback(pathname string, de *godirwalk.Dirent) (err error) {
switch {
case de.IsDir():
// skip or SkipDir
Expand All @@ -116,8 +123,8 @@ func (lpi *LocalPageIt) Callback(pathname string, de *godirwalk.Dirent) (err err
}
case pathname < lpi.current:
// skip
case pathname >= lpi.current && (pathname <= lpi.msg.eop || lpi.msg.eop == allPages):
if lpi.msg.size != 0 && len(lpi.page) >= lpi.msg.size {
case pathname >= lpi.current && (pathname <= lpi.msg.EOP || lpi.msg.EOP == AllPages):
if lpi.msg.Size != 0 && len(lpi.page) >= lpi.msg.Size {
lpi.next = pathname
err = errStop
break
Expand All @@ -132,15 +139,15 @@ func (lpi *LocalPageIt) Callback(pathname string, de *godirwalk.Dirent) (err err
lpi.page[rel] = struct{}{}
default:
// next
debug.Assert(pathname > lpi.msg.eop && lpi.msg.eop != allPages)
debug.Assert(pathname > lpi.msg.EOP && lpi.msg.EOP != AllPages)
lpi.next = pathname
err = errStop
}

return err
}

func (*LocalPageIt) ErrorCallback(pathname string, err error) godirwalk.ErrorAction {
func (*Iter) ErrorCallback(pathname string, err error) godirwalk.ErrorAction {
if err != errStop {
nlog.Warningf("Error accessing %s: %v", pathname, err)
}
Expand Down
43 changes: 43 additions & 0 deletions fs/lpi/map_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Package lpi: local page iterator
/*
* Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved.
*/
package lpi_test

import (
"testing"
)

// [TODO] consider and bench lpi.Page alternatives:
// - sorted (reusable) slice with binary search
// - what else?

func clearMap(m map[int]struct{}) {
for k := range m {
delete(m, k)
}
}

func BenchmarkClearFunction(b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
m := make(map[int]struct{})
for pb.Next() {
for i := range 1000 {
m[i] = struct{}{}
}
clear(m)
}
})
}

func BenchmarkManualClear(b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
m := make(map[int]struct{})
for pb.Next() {
for i := range 1000 {
m[i] = struct{}{}
}
clearMap(m)
}
})
}
31 changes: 16 additions & 15 deletions fs/lpi_internal_test.go → fs/lpi_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
// Package fs provides mountpath and FQN abstractions and methods eop resolve/map stored content
// Package fs provides mountpath and FQN abstractions and methods to resolve/map stored content
/*
* Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved.
*/

package fs
package fs_test

import (
"fmt"
Expand All @@ -14,6 +14,7 @@ import (
"strings"
"testing"

"github.com/NVIDIA/aistore/fs/lpi"
"github.com/NVIDIA/aistore/tools/tassert"
)

Expand Down Expand Up @@ -66,15 +67,15 @@ func TestLocalPageIt(t *testing.T) {

func lpiPageSize(t *testing.T, root string, eops []string, lpiTestPageSize, total int) []string {
var (
page = make(lpiPage, 100)
msg = lpiMsg{size: lpiTestPageSize}
page = make(lpi.Page, 100)
msg = lpi.Msg{Size: lpiTestPageSize}
num int
)
lpi, err := newLocalPageIt(root)
it, err := lpi.New(root)
tassert.CheckFatal(t, err)

for {
if err := lpi.do(msg, page); err != nil {
if err := it.Next(msg, page); err != nil {
t.Fatal(err)
}
num += len(page)
Expand All @@ -86,28 +87,28 @@ func lpiPageSize(t *testing.T, root string, eops []string, lpiTestPageSize, tota
fmt.Println(strings.Repeat("---", 10))
fmt.Println()
}
if lpi.next == "" {
eops = append(eops, allPages)
if it.Pos() == "" {
eops = append(eops, lpi.AllPages)
break
}
eops = append(eops, lpi.next)
eops = append(eops, it.Pos())
}
tassert.Errorf(t, num == total, "(num) %d != %d (total)", num, total)
return eops
}

func lpiEndOfPage(t *testing.T, root string, eops []string, total int) {
page := make(lpiPage, 100)
lpi, err := newLocalPageIt(root)
tassert.CheckFatal(t, err)

var (
previous string
num int
page = make(lpi.Page, 100)
it, err = lpi.New(root)
)
tassert.CheckFatal(t, err)

for _, eop := range eops {
msg := lpiMsg{eop: eop}
if err := lpi.do(msg, page); err != nil {
msg := lpi.Msg{EOP: eop}
if err := it.Next(msg, page); err != nil {
t.Fatal(err) // TODO: check vs divisibility by page size
}
num += len(page)
Expand Down

0 comments on commit 693ff5c

Please sign in to comment.