From 7914bd2c94c23abd1a60f10826965e3906319673 Mon Sep 17 00:00:00 2001 From: Al Cutter Date: Fri, 10 Jan 2025 12:11:41 +0000 Subject: [PATCH] Extract tighter-scoped integrate funcs in storage impls (#440) This PR pulls the act of updating the merkle tree resources out into separate funcs. This helps with readability by reducing the size/tightening the scope of some of the funcs in storage implementations (MySQL in particular was getting quite large), but also aids in supporting other lifecycle modes (e.g. for #414). No functional changes. --- storage/aws/README.md | 2 +- storage/aws/aws.go | 62 ++++++++++-------- storage/gcp/README.md | 2 +- storage/gcp/gcp.go | 81 ++++++++++++++---------- storage/gcp/gcp_test.go | 6 +- storage/mysql/mysql.go | 135 ++++++++++++++++++++++------------------ storage/posix/README.md | 2 +- 7 files changed, 166 insertions(+), 124 deletions(-) diff --git a/storage/aws/README.md b/storage/aws/README.md index a08bff66..bfd6a3db 100644 --- a/storage/aws/README.md +++ b/storage/aws/README.md @@ -41,7 +41,7 @@ This table is used to coordinate integration of sequenced batches in the `Seq` t 1. selects next from `SeqCoord` with for update ← this blocks other FE from writing their pools, but only for a short duration. 1. Inserts batch of entries into `Seq` with key `SeqCoord.next` 1. Update `SeqCoord` with `next+=len(batch)` -1. Integrators periodically integrate new sequenced entries into the tree: +1. Newly sequenced entries are periodically appended to the tree: In a transaction: 1. select `seq` from `IntCoord` with for update ← this blocks other integrators from proceeding. 1. Select one or more consecutive batches from `Seq` for update, starting at `IntCoord.seq` diff --git a/storage/aws/aws.go b/storage/aws/aws.go index 49b2d7a6..2f58e597 100644 --- a/storage/aws/aws.go +++ b/storage/aws/aws.go @@ -53,7 +53,7 @@ import ( "github.com/transparency-dev/trillian-tessera/api" "github.com/transparency-dev/trillian-tessera/api/layout" "github.com/transparency-dev/trillian-tessera/internal/options" - "github.com/transparency-dev/trillian-tessera/storage/internal" + storage "github.com/transparency-dev/trillian-tessera/storage/internal" "golang.org/x/sync/errgroup" "k8s.io/klog/v2" @@ -209,7 +209,7 @@ func (s *Storage) consumeEntriesTask(ctx context.Context) { cctx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() - if _, err := s.sequencer.consumeEntries(cctx, DefaultIntegrationSizeLimit, s.integrate, false); err != nil { + if _, err := s.sequencer.consumeEntries(cctx, DefaultIntegrationSizeLimit, s.appendEntries, false); err != nil { klog.Errorf("integrate: %v", err) return } @@ -278,7 +278,7 @@ func (s *Storage) init(ctx context.Context) error { // framework which prevents the tree from rolling backwards or otherwise forking). cctx, c := context.WithTimeout(ctx, 10*time.Second) defer c() - if _, err := s.sequencer.consumeEntries(cctx, DefaultIntegrationSizeLimit, s.integrate, true); err != nil { + if _, err := s.sequencer.consumeEntries(cctx, DefaultIntegrationSizeLimit, s.appendEntries, true); err != nil { return fmt.Errorf("forced integrate: %v", err) } select { @@ -404,20 +404,12 @@ func (s *Storage) setEntryBundle(ctx context.Context, bundleIndex uint64, p uint return nil } -// integrate incorporates the provided entries into the log starting at fromSeq. +// appendEntries incorporates the provided entries into the log starting at fromSeq. // // Returns the new root hash of the log with the entries added. -func (s *Storage) integrate(ctx context.Context, fromSeq uint64, entries []storage.SequencedEntry) ([]byte, error) { +func (s *Storage) appendEntries(ctx context.Context, fromSeq uint64, entries []storage.SequencedEntry) ([]byte, error) { var newRoot []byte - getTiles := func(ctx context.Context, tileIDs []storage.TileID, treeSize uint64) ([]*api.HashTile, error) { - n, err := s.getTiles(ctx, tileIDs, treeSize) - if err != nil { - return nil, fmt.Errorf("getTiles: %w", err) - } - return n, nil - } - errG := errgroup.Group{} errG.Go(func() error { @@ -432,20 +424,11 @@ func (s *Storage) integrate(ctx context.Context, fromSeq uint64, entries []stora for i, e := range entries { lh[i] = e.LeafHash } - newSize, root, tiles, err := storage.Integrate(ctx, getTiles, fromSeq, lh) + r, err := s.integrate(ctx, fromSeq, lh) if err != nil { - return fmt.Errorf("Integrate: %v", err) - } - newRoot = root - for k, v := range tiles { - func(ctx context.Context, k storage.TileID, v *api.HashTile) { - errG.Go(func() error { - return s.setTile(ctx, uint64(k.Level), k.Index, newSize, v) - }) - }(ctx, k, v) + return fmt.Errorf("integrate: %v", err) } - klog.Infof("New tree: %d, %x", newSize, newRoot) - + newRoot = r return nil }) @@ -453,6 +436,35 @@ func (s *Storage) integrate(ctx context.Context, fromSeq uint64, entries []stora return newRoot, err } +// integrate adds the provided leaf hashes to the merkle tree, starting at the provided location. +func (s *Storage) integrate(ctx context.Context, fromSeq uint64, lh [][]byte) ([]byte, error) { + getTiles := func(ctx context.Context, tileIDs []storage.TileID, treeSize uint64) ([]*api.HashTile, error) { + n, err := s.getTiles(ctx, tileIDs, treeSize) + if err != nil { + return nil, fmt.Errorf("getTiles: %w", err) + } + return n, nil + } + + newSize, newRoot, tiles, err := storage.Integrate(ctx, getTiles, fromSeq, lh) + if err != nil { + return nil, fmt.Errorf("Integrate: %v", err) + } + errG := errgroup.Group{} + for k, v := range tiles { + func(ctx context.Context, k storage.TileID, v *api.HashTile) { + errG.Go(func() error { + return s.setTile(ctx, uint64(k.Level), k.Index, newSize, v) + }) + }(ctx, k, v) + } + if err := errG.Wait(); err != nil { + return nil, err + } + klog.Infof("New tree: %d, %x", newSize, newRoot) + return newRoot, nil +} + // updateEntryBundles adds the entries being integrated into the entry bundles. // // The right-most bundle will be grown, if it's partial, and/or new bundles will be created as required. diff --git a/storage/gcp/README.md b/storage/gcp/README.md index 07fdb002..52430cdb 100644 --- a/storage/gcp/README.md +++ b/storage/gcp/README.md @@ -41,7 +41,7 @@ This table is used to coordinate integration of sequenced batches in the `Seq` t 1. selects next from `SeqCoord` with for update ← this blocks other FE from writing their pools, but only for a short duration. 1. Inserts batch of entries into `Seq` with key `SeqCoord.next` 1. Update `SeqCoord` with `next+=len(batch)` -1. Integrators periodically integrate new sequenced entries into the tree: +1. Newly sequenced entries are periodically appended to the tree: In a transaction: 1. select `seq` from `IntCoord` with for update ← this blocks other integrators from proceeding. 1. Select one or more consecutive batches from `Seq` for update, starting at `IntCoord.seq` diff --git a/storage/gcp/gcp.go b/storage/gcp/gcp.go index 7bf1287a..b2073e97 100644 --- a/storage/gcp/gcp.go +++ b/storage/gcp/gcp.go @@ -50,7 +50,7 @@ import ( "github.com/transparency-dev/trillian-tessera/api" "github.com/transparency-dev/trillian-tessera/api/layout" "github.com/transparency-dev/trillian-tessera/internal/options" - "github.com/transparency-dev/trillian-tessera/storage/internal" + storage "github.com/transparency-dev/trillian-tessera/storage/internal" "golang.org/x/sync/errgroup" "google.golang.org/api/googleapi" "google.golang.org/api/iterator" @@ -173,7 +173,7 @@ func New(ctx context.Context, cfg Config, opts ...func(*options.StorageOptions)) cctx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() - if _, err := r.sequencer.consumeEntries(cctx, DefaultIntegrationSizeLimit, r.integrate, false); err != nil { + if _, err := r.sequencer.consumeEntries(cctx, DefaultIntegrationSizeLimit, r.appendEntries, false); err != nil { klog.Errorf("integrate: %v", err) return } @@ -239,7 +239,7 @@ func (s *Storage) init(ctx context.Context) error { // framework which prevents the tree from rolling backwards or otherwise forking). cctx, c := context.WithTimeout(ctx, 10*time.Second) defer c() - if _, err := s.sequencer.consumeEntries(cctx, DefaultIntegrationSizeLimit, s.integrate, true); err != nil { + if _, err := s.sequencer.consumeEntries(cctx, DefaultIntegrationSizeLimit, s.appendEntries, true); err != nil { return fmt.Errorf("forced integrate: %v", err) } select { @@ -282,14 +282,8 @@ func (s *Storage) publishCheckpoint(ctx context.Context, minStaleness time.Durat // setTile idempotently stores the provided tile at the location implied by the given level, index, and treeSize. // // The location to which the tile is written is defined by the tile layout spec. -func (s *Storage) setTile(ctx context.Context, level, index, logSize uint64, tile *api.HashTile) error { - data, err := tile.MarshalText() - if err != nil { - return err - } - tPath := layout.TilePath(level, index, layout.PartialTileSize(level, index, logSize)) - klog.V(2).Infof("StoreTile: %s (%d entries)", tPath, len(tile.Nodes)) - +func (s *Storage) setTile(ctx context.Context, level, index uint64, partial uint8, data []byte) error { + tPath := layout.TilePath(level, index, partial) return s.objStore.setObject(ctx, tPath, data, &gcs.Conditions{DoesNotExist: true}, logContType, logCacheControl) } @@ -360,8 +354,8 @@ func (s *Storage) setEntryBundle(ctx context.Context, bundleIndex uint64, p uint return nil } -// integrate incorporates the provided entries into the log starting at fromSeq. -func (s *Storage) integrate(ctx context.Context, fromSeq uint64, entries []storage.SequencedEntry) ([]byte, error) { +// appendEntries incorporates the provided entries into the log starting at fromSeq. +func (s *Storage) appendEntries(ctx context.Context, fromSeq uint64, entries []storage.SequencedEntry) ([]byte, error) { var newRoot []byte errG := errgroup.Group{} @@ -374,36 +368,55 @@ func (s *Storage) integrate(ctx context.Context, fromSeq uint64, entries []stora }) errG.Go(func() error { - getTiles := func(ctx context.Context, tileIDs []storage.TileID, treeSize uint64) ([]*api.HashTile, error) { - n, err := s.getTiles(ctx, tileIDs, treeSize) - if err != nil { - return nil, fmt.Errorf("getTiles: %w", err) - } - return n, nil - } - lh := make([][]byte, len(entries)) for i, e := range entries { lh[i] = e.LeafHash } - newSize, root, tiles, err := storage.Integrate(ctx, getTiles, fromSeq, lh) + r, err := s.integrate(ctx, fromSeq, lh) if err != nil { - return fmt.Errorf("Integrate: %v", err) - } - newRoot = root - for k, v := range tiles { - func(ctx context.Context, k storage.TileID, v *api.HashTile) { - errG.Go(func() error { - return s.setTile(ctx, uint64(k.Level), k.Index, newSize, v) - }) - }(ctx, k, v) + return fmt.Errorf("integrate: %v", err) } - klog.Infof("New tree: %d, %x", newSize, newRoot) - + newRoot = r return nil }) + if err := errG.Wait(); err != nil { + return nil, err + } + return newRoot, nil +} + +// integrate adds the provided leaf hashes to the merkle tree, starting at the provided location. +func (s *Storage) integrate(ctx context.Context, fromSeq uint64, lh [][]byte) ([]byte, error) { + errG := errgroup.Group{} + getTiles := func(ctx context.Context, tileIDs []storage.TileID, treeSize uint64) ([]*api.HashTile, error) { + n, err := s.getTiles(ctx, tileIDs, treeSize) + if err != nil { + return nil, fmt.Errorf("getTiles: %w", err) + } + return n, nil + } + + newSize, newRoot, tiles, err := storage.Integrate(ctx, getTiles, fromSeq, lh) + if err != nil { + return nil, fmt.Errorf("Integrate: %v", err) + } + for k, v := range tiles { + func(ctx context.Context, k storage.TileID, v *api.HashTile) { + errG.Go(func() error { + data, err := v.MarshalText() + if err != nil { + return err + } + return s.setTile(ctx, k.Level, k.Index, layout.PartialTileSize(k.Level, k.Index, newSize), data) + }) + }(ctx, k, v) + } + if err := errG.Wait(); err != nil { + return nil, err + } + klog.Infof("New tree: %d, %x", newSize, newRoot) - return newRoot, errG.Wait() + return newRoot, nil } // updateEntryBundles adds the entries being integrated into the entry bundles. diff --git a/storage/gcp/gcp_test.go b/storage/gcp/gcp_test.go index fcda9a3e..e1556c61 100644 --- a/storage/gcp/gcp_test.go +++ b/storage/gcp/gcp_test.go @@ -228,7 +228,11 @@ func TestTileRoundtrip(t *testing.T) { } { t.Run(test.name, func(t *testing.T) { wantTile := makeTile(t, test.tileSize) - if err := s.setTile(ctx, test.level, test.index, test.logSize, wantTile); err != nil { + tRaw, err := wantTile.MarshalText() + if err != nil { + t.Fatalf("Failed to marshal tile: %v", err) + } + if err := s.setTile(ctx, test.level, test.index, layout.PartialTileSize(test.level, test.index, test.logSize), tRaw); err != nil { t.Fatalf("setTile: %v", err) } diff --git a/storage/mysql/mysql.go b/storage/mysql/mysql.go index 6096d4a8..0ebbe85f 100644 --- a/storage/mysql/mysql.go +++ b/storage/mysql/mysql.go @@ -33,7 +33,7 @@ import ( "github.com/transparency-dev/trillian-tessera/api" "github.com/transparency-dev/trillian-tessera/api/layout" options "github.com/transparency-dev/trillian-tessera/internal/options" - "github.com/transparency-dev/trillian-tessera/storage/internal" + storage "github.com/transparency-dev/trillian-tessera/storage/internal" "k8s.io/klog/v2" ) @@ -371,7 +371,7 @@ func (s *Storage) sequenceBatch(ctx context.Context, entries []*tessera.Entry) e } // Integrate the new entries into the entry bundle (TiledLeaves table) and tile (Subtree table). - if err := s.integrate(ctx, tx, state.size, entries); err != nil { + if err := s.appendEntries(ctx, tx, state.size, entries); err != nil { return fmt.Errorf("failed to integrate: %w", err) } @@ -386,57 +386,8 @@ func (s *Storage) sequenceBatch(ctx context.Context, entries []*tessera.Entry) e return err } -// integrate incorporates the provided entries into the log starting at fromSeq. -func (s *Storage) integrate(ctx context.Context, tx *sql.Tx, fromSeq uint64, entries []*tessera.Entry) error { - getTiles := func(ctx context.Context, tileIDs []storage.TileID, treeSize uint64) ([]*api.HashTile, error) { - hashTiles := make([]*api.HashTile, len(tileIDs)) - if len(tileIDs) == 0 { - return hashTiles, nil - } - - // Build the SQL and args to fetch the hash tiles. - var sql strings.Builder - args := make([]any, 0, len(tileIDs)*2) - for i, id := range tileIDs { - if i != 0 { - sql.WriteString(" UNION ALL ") - } - _, err := sql.WriteString(selectSubtreeByLevelAndIndexSQL) - if err != nil { - return nil, err - } - args = append(args, id.Level, id.Index) - } - - rows, err := tx.QueryContext(ctx, sql.String(), args...) - if err != nil { - return nil, fmt.Errorf("failed to query the hash tiles with SQL (%s): %w", sql.String(), err) - } - defer func() { - if err := rows.Close(); err != nil { - klog.Warningf("Failed to close the rows: %v", err) - } - }() - - i := 0 - for rows.Next() { - var tile []byte - if err := rows.Scan(&tile); err != nil { - return nil, fmt.Errorf("scan subtree tile: %w", err) - } - t := &api.HashTile{} - if err := t.UnmarshalText(tile); err != nil { - return nil, fmt.Errorf("unmarshal tile: %w", err) - } - hashTiles[i] = t - i++ - } - if err = rows.Err(); err != nil { - return nil, fmt.Errorf("rows error while fetching subtrees: %w", err) - } - - return hashTiles, nil - } +// appendEntries incorporates the provided entries into the log starting at fromSeq. +func (s *Storage) appendEntries(ctx context.Context, tx *sql.Tx, fromSeq uint64, entries []*tessera.Entry) error { sequencedEntries := make([]storage.SequencedEntry, len(entries)) // Assign provisional sequence numbers to entries. @@ -505,24 +456,86 @@ func (s *Storage) integrate(ctx context.Context, tx *sql.Tx, fromSeq uint64, ent for i, e := range sequencedEntries { lh[i] = e.LeafHash } + newSize, newRoot, err := s.integrate(ctx, tx, fromSeq, lh) + if err != nil { + return fmt.Errorf("integrate: %v", err) + } + + // Write new tree state. + if err := s.writeTreeState(ctx, tx, newSize, newRoot); err != nil { + return fmt.Errorf("writeCheckpoint: %w", err) + } + + klog.Infof("New tree: %d, %x", newSize, newRoot) + return nil +} + +// integrate adds the provided leaf hashes to the merkle tree, starting at the provided location. +func (s *Storage) integrate(ctx context.Context, tx *sql.Tx, fromSeq uint64, lh [][]byte) (uint64, []byte, error) { + getTiles := func(ctx context.Context, tileIDs []storage.TileID, treeSize uint64) ([]*api.HashTile, error) { + hashTiles := make([]*api.HashTile, len(tileIDs)) + if len(tileIDs) == 0 { + return hashTiles, nil + } + + // Build the SQL and args to fetch the hash tiles. + var sql strings.Builder + args := make([]any, 0, len(tileIDs)*2) + for i, id := range tileIDs { + if i != 0 { + sql.WriteString(" UNION ALL ") + } + _, err := sql.WriteString(selectSubtreeByLevelAndIndexSQL) + if err != nil { + return nil, err + } + args = append(args, id.Level, id.Index) + } + + rows, err := tx.QueryContext(ctx, sql.String(), args...) + if err != nil { + return nil, fmt.Errorf("failed to query the hash tiles with SQL (%s): %w", sql.String(), err) + } + defer func() { + if err := rows.Close(); err != nil { + klog.Warningf("Failed to close the rows: %v", err) + } + }() + + i := 0 + for rows.Next() { + var tile []byte + if err := rows.Scan(&tile); err != nil { + return nil, fmt.Errorf("scan subtree tile: %w", err) + } + t := &api.HashTile{} + if err := t.UnmarshalText(tile); err != nil { + return nil, fmt.Errorf("unmarshal tile: %w", err) + } + hashTiles[i] = t + i++ + } + if err = rows.Err(); err != nil { + return nil, fmt.Errorf("rows error while fetching subtrees: %w", err) + } + + return hashTiles, nil + } + newSize, newRoot, tiles, err := storage.Integrate(ctx, getTiles, fromSeq, lh) if err != nil { - return fmt.Errorf("tb.Integrate: %v", err) + return 0, nil, fmt.Errorf("storage.Integrate: %v", err) } for k, v := range tiles { nodes, err := v.MarshalText() if err != nil { - return err + return 0, nil, err } if err := s.writeTile(ctx, tx, uint64(k.Level), k.Index, nodes); err != nil { - return fmt.Errorf("failed to set tile(%v): %w", k, err) + return 0, nil, fmt.Errorf("failed to set tile(%v): %w", k, err) } } - // Write new tree state. - if err := s.writeTreeState(ctx, tx, newSize, newRoot); err != nil { - return fmt.Errorf("writeCheckpoint: %w", err) - } - return nil + return newSize, newRoot, nil } diff --git a/storage/posix/README.md b/storage/posix/README.md index a2f92957..7355344c 100644 --- a/storage/posix/README.md +++ b/storage/posix/README.md @@ -33,7 +33,7 @@ of actions we can avoid corrupt or partially written files being part of the tre 1. Leaves are submitted by the binary built using Tessera via a call the storage's `Add` func. 1. The storage library batches these entries up, and, after a configurable period of time has elapsed - or the batch reaches a configurable size threshold, the batch is sequenced and integrated into the tree: + or the batch reaches a configurable size threshold, the batch is sequenced and appended to the tree: 1. An advisory lock is taken on `.state/treeState.lock` file. This helps prevent multiple frontends from stepping on each other, but isn't necesary for safety. 1. Flushed entries are assigned contiguous sequence numbers, and written out into entry bundle files.