Skip to content

Commit

Permalink
Extract tighter-scoped integrate funcs in storage impls (#440)
Browse files Browse the repository at this point in the history
This PR pulls the act of updating the merkle tree resources out into separate funcs.

This helps with readability by reducing the size/tightening the scope of some of the funcs in storage implementations (MySQL in particular was getting quite large), but also aids in supporting other lifecycle modes (e.g. for #414).

No functional changes.
  • Loading branch information
AlCutter authored Jan 10, 2025
1 parent ddac6ea commit 7914bd2
Show file tree
Hide file tree
Showing 7 changed files with 166 additions and 124 deletions.
2 changes: 1 addition & 1 deletion storage/aws/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ This table is used to coordinate integration of sequenced batches in the `Seq` t
1. selects next from `SeqCoord` with for update ← this blocks other FE from writing their pools, but only for a short duration.
1. Inserts batch of entries into `Seq` with key `SeqCoord.next`
1. Update `SeqCoord` with `next+=len(batch)`
1. Integrators periodically integrate new sequenced entries into the tree:
1. Newly sequenced entries are periodically appended to the tree:
In a transaction:
1. select `seq` from `IntCoord` with for update ← this blocks other integrators from proceeding.
1. Select one or more consecutive batches from `Seq` for update, starting at `IntCoord.seq`
Expand Down
62 changes: 37 additions & 25 deletions storage/aws/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ import (
"github.com/transparency-dev/trillian-tessera/api"
"github.com/transparency-dev/trillian-tessera/api/layout"
"github.com/transparency-dev/trillian-tessera/internal/options"
"github.com/transparency-dev/trillian-tessera/storage/internal"
storage "github.com/transparency-dev/trillian-tessera/storage/internal"
"golang.org/x/sync/errgroup"
"k8s.io/klog/v2"

Expand Down Expand Up @@ -209,7 +209,7 @@ func (s *Storage) consumeEntriesTask(ctx context.Context) {
cctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()

if _, err := s.sequencer.consumeEntries(cctx, DefaultIntegrationSizeLimit, s.integrate, false); err != nil {
if _, err := s.sequencer.consumeEntries(cctx, DefaultIntegrationSizeLimit, s.appendEntries, false); err != nil {
klog.Errorf("integrate: %v", err)
return
}
Expand Down Expand Up @@ -278,7 +278,7 @@ func (s *Storage) init(ctx context.Context) error {
// framework which prevents the tree from rolling backwards or otherwise forking).
cctx, c := context.WithTimeout(ctx, 10*time.Second)
defer c()
if _, err := s.sequencer.consumeEntries(cctx, DefaultIntegrationSizeLimit, s.integrate, true); err != nil {
if _, err := s.sequencer.consumeEntries(cctx, DefaultIntegrationSizeLimit, s.appendEntries, true); err != nil {
return fmt.Errorf("forced integrate: %v", err)
}
select {
Expand Down Expand Up @@ -404,20 +404,12 @@ func (s *Storage) setEntryBundle(ctx context.Context, bundleIndex uint64, p uint
return nil
}

// integrate incorporates the provided entries into the log starting at fromSeq.
// appendEntries incorporates the provided entries into the log starting at fromSeq.
//
// Returns the new root hash of the log with the entries added.
func (s *Storage) integrate(ctx context.Context, fromSeq uint64, entries []storage.SequencedEntry) ([]byte, error) {
func (s *Storage) appendEntries(ctx context.Context, fromSeq uint64, entries []storage.SequencedEntry) ([]byte, error) {
var newRoot []byte

getTiles := func(ctx context.Context, tileIDs []storage.TileID, treeSize uint64) ([]*api.HashTile, error) {
n, err := s.getTiles(ctx, tileIDs, treeSize)
if err != nil {
return nil, fmt.Errorf("getTiles: %w", err)
}
return n, nil
}

errG := errgroup.Group{}

errG.Go(func() error {
Expand All @@ -432,27 +424,47 @@ func (s *Storage) integrate(ctx context.Context, fromSeq uint64, entries []stora
for i, e := range entries {
lh[i] = e.LeafHash
}
newSize, root, tiles, err := storage.Integrate(ctx, getTiles, fromSeq, lh)
r, err := s.integrate(ctx, fromSeq, lh)
if err != nil {
return fmt.Errorf("Integrate: %v", err)
}
newRoot = root
for k, v := range tiles {
func(ctx context.Context, k storage.TileID, v *api.HashTile) {
errG.Go(func() error {
return s.setTile(ctx, uint64(k.Level), k.Index, newSize, v)
})
}(ctx, k, v)
return fmt.Errorf("integrate: %v", err)
}
klog.Infof("New tree: %d, %x", newSize, newRoot)

newRoot = r
return nil
})

err := errG.Wait()
return newRoot, err
}

// integrate adds the provided leaf hashes to the merkle tree, starting at the provided location.
func (s *Storage) integrate(ctx context.Context, fromSeq uint64, lh [][]byte) ([]byte, error) {
getTiles := func(ctx context.Context, tileIDs []storage.TileID, treeSize uint64) ([]*api.HashTile, error) {
n, err := s.getTiles(ctx, tileIDs, treeSize)
if err != nil {
return nil, fmt.Errorf("getTiles: %w", err)
}
return n, nil
}

newSize, newRoot, tiles, err := storage.Integrate(ctx, getTiles, fromSeq, lh)
if err != nil {
return nil, fmt.Errorf("Integrate: %v", err)
}
errG := errgroup.Group{}
for k, v := range tiles {
func(ctx context.Context, k storage.TileID, v *api.HashTile) {
errG.Go(func() error {
return s.setTile(ctx, uint64(k.Level), k.Index, newSize, v)
})
}(ctx, k, v)
}
if err := errG.Wait(); err != nil {
return nil, err
}
klog.Infof("New tree: %d, %x", newSize, newRoot)
return newRoot, nil
}

// updateEntryBundles adds the entries being integrated into the entry bundles.
//
// The right-most bundle will be grown, if it's partial, and/or new bundles will be created as required.
Expand Down
2 changes: 1 addition & 1 deletion storage/gcp/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ This table is used to coordinate integration of sequenced batches in the `Seq` t
1. selects next from `SeqCoord` with for update ← this blocks other FE from writing their pools, but only for a short duration.
1. Inserts batch of entries into `Seq` with key `SeqCoord.next`
1. Update `SeqCoord` with `next+=len(batch)`
1. Integrators periodically integrate new sequenced entries into the tree:
1. Newly sequenced entries are periodically appended to the tree:
In a transaction:
1. select `seq` from `IntCoord` with for update ← this blocks other integrators from proceeding.
1. Select one or more consecutive batches from `Seq` for update, starting at `IntCoord.seq`
Expand Down
81 changes: 47 additions & 34 deletions storage/gcp/gcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ import (
"github.com/transparency-dev/trillian-tessera/api"
"github.com/transparency-dev/trillian-tessera/api/layout"
"github.com/transparency-dev/trillian-tessera/internal/options"
"github.com/transparency-dev/trillian-tessera/storage/internal"
storage "github.com/transparency-dev/trillian-tessera/storage/internal"
"golang.org/x/sync/errgroup"
"google.golang.org/api/googleapi"
"google.golang.org/api/iterator"
Expand Down Expand Up @@ -173,7 +173,7 @@ func New(ctx context.Context, cfg Config, opts ...func(*options.StorageOptions))
cctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()

if _, err := r.sequencer.consumeEntries(cctx, DefaultIntegrationSizeLimit, r.integrate, false); err != nil {
if _, err := r.sequencer.consumeEntries(cctx, DefaultIntegrationSizeLimit, r.appendEntries, false); err != nil {
klog.Errorf("integrate: %v", err)
return
}
Expand Down Expand Up @@ -239,7 +239,7 @@ func (s *Storage) init(ctx context.Context) error {
// framework which prevents the tree from rolling backwards or otherwise forking).
cctx, c := context.WithTimeout(ctx, 10*time.Second)
defer c()
if _, err := s.sequencer.consumeEntries(cctx, DefaultIntegrationSizeLimit, s.integrate, true); err != nil {
if _, err := s.sequencer.consumeEntries(cctx, DefaultIntegrationSizeLimit, s.appendEntries, true); err != nil {
return fmt.Errorf("forced integrate: %v", err)
}
select {
Expand Down Expand Up @@ -282,14 +282,8 @@ func (s *Storage) publishCheckpoint(ctx context.Context, minStaleness time.Durat
// setTile idempotently stores the provided tile at the location implied by the given level, index, and treeSize.
//
// The location to which the tile is written is defined by the tile layout spec.
func (s *Storage) setTile(ctx context.Context, level, index, logSize uint64, tile *api.HashTile) error {
data, err := tile.MarshalText()
if err != nil {
return err
}
tPath := layout.TilePath(level, index, layout.PartialTileSize(level, index, logSize))
klog.V(2).Infof("StoreTile: %s (%d entries)", tPath, len(tile.Nodes))

func (s *Storage) setTile(ctx context.Context, level, index uint64, partial uint8, data []byte) error {
tPath := layout.TilePath(level, index, partial)
return s.objStore.setObject(ctx, tPath, data, &gcs.Conditions{DoesNotExist: true}, logContType, logCacheControl)
}

Expand Down Expand Up @@ -360,8 +354,8 @@ func (s *Storage) setEntryBundle(ctx context.Context, bundleIndex uint64, p uint
return nil
}

// integrate incorporates the provided entries into the log starting at fromSeq.
func (s *Storage) integrate(ctx context.Context, fromSeq uint64, entries []storage.SequencedEntry) ([]byte, error) {
// appendEntries incorporates the provided entries into the log starting at fromSeq.
func (s *Storage) appendEntries(ctx context.Context, fromSeq uint64, entries []storage.SequencedEntry) ([]byte, error) {
var newRoot []byte

errG := errgroup.Group{}
Expand All @@ -374,36 +368,55 @@ func (s *Storage) integrate(ctx context.Context, fromSeq uint64, entries []stora
})

errG.Go(func() error {
getTiles := func(ctx context.Context, tileIDs []storage.TileID, treeSize uint64) ([]*api.HashTile, error) {
n, err := s.getTiles(ctx, tileIDs, treeSize)
if err != nil {
return nil, fmt.Errorf("getTiles: %w", err)
}
return n, nil
}

lh := make([][]byte, len(entries))
for i, e := range entries {
lh[i] = e.LeafHash
}
newSize, root, tiles, err := storage.Integrate(ctx, getTiles, fromSeq, lh)
r, err := s.integrate(ctx, fromSeq, lh)
if err != nil {
return fmt.Errorf("Integrate: %v", err)
}
newRoot = root
for k, v := range tiles {
func(ctx context.Context, k storage.TileID, v *api.HashTile) {
errG.Go(func() error {
return s.setTile(ctx, uint64(k.Level), k.Index, newSize, v)
})
}(ctx, k, v)
return fmt.Errorf("integrate: %v", err)
}
klog.Infof("New tree: %d, %x", newSize, newRoot)

newRoot = r
return nil
})
if err := errG.Wait(); err != nil {
return nil, err
}
return newRoot, nil
}

// integrate adds the provided leaf hashes to the merkle tree, starting at the provided location.
func (s *Storage) integrate(ctx context.Context, fromSeq uint64, lh [][]byte) ([]byte, error) {
errG := errgroup.Group{}
getTiles := func(ctx context.Context, tileIDs []storage.TileID, treeSize uint64) ([]*api.HashTile, error) {
n, err := s.getTiles(ctx, tileIDs, treeSize)
if err != nil {
return nil, fmt.Errorf("getTiles: %w", err)
}
return n, nil
}

newSize, newRoot, tiles, err := storage.Integrate(ctx, getTiles, fromSeq, lh)
if err != nil {
return nil, fmt.Errorf("Integrate: %v", err)
}
for k, v := range tiles {
func(ctx context.Context, k storage.TileID, v *api.HashTile) {
errG.Go(func() error {
data, err := v.MarshalText()
if err != nil {
return err
}
return s.setTile(ctx, k.Level, k.Index, layout.PartialTileSize(k.Level, k.Index, newSize), data)
})
}(ctx, k, v)
}
if err := errG.Wait(); err != nil {
return nil, err
}
klog.Infof("New tree: %d, %x", newSize, newRoot)

return newRoot, errG.Wait()
return newRoot, nil
}

// updateEntryBundles adds the entries being integrated into the entry bundles.
Expand Down
6 changes: 5 additions & 1 deletion storage/gcp/gcp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,11 @@ func TestTileRoundtrip(t *testing.T) {
} {
t.Run(test.name, func(t *testing.T) {
wantTile := makeTile(t, test.tileSize)
if err := s.setTile(ctx, test.level, test.index, test.logSize, wantTile); err != nil {
tRaw, err := wantTile.MarshalText()
if err != nil {
t.Fatalf("Failed to marshal tile: %v", err)
}
if err := s.setTile(ctx, test.level, test.index, layout.PartialTileSize(test.level, test.index, test.logSize), tRaw); err != nil {
t.Fatalf("setTile: %v", err)
}

Expand Down
Loading

0 comments on commit 7914bd2

Please sign in to comment.