diff --git a/cmd/conformance/gcp/main.go b/cmd/conformance/gcp/main.go index 99e5015e..b170e6d8 100644 --- a/cmd/conformance/gcp/main.go +++ b/cmd/conformance/gcp/main.go @@ -24,7 +24,9 @@ import ( "net/http" "time" + "github.com/transparency-dev/merkle/rfc6962" tessera "github.com/transparency-dev/trillian-tessera" + "github.com/transparency-dev/trillian-tessera/api" "github.com/transparency-dev/trillian-tessera/storage/gcp" "golang.org/x/mod/sumdb/note" "golang.org/x/net/http2" @@ -71,11 +73,17 @@ func main() { dedups = append(dedups, tessera.InMemoryDedupe(256)) // PersistentDedup is currently experimental, so there's no terraform or documentation yet! if *persistentDedup { - fn, err := gcp.NewDedupe(ctx, fmt.Sprintf("%s_dedup", *spanner)) + dd, err := gcp.NewDedup(ctx, fmt.Sprintf("%s_dedup", *spanner)) if err != nil { klog.Exitf("Failed to create new GCP dedupe: %v", err) } - dedups = append(dedups, fn) + dedups = append(dedups, dd.AppendDecorator()) + + go func() { + if err := tessera.Follow(ctx, driver, dd.Follower(BundleHasher)); err != nil { + klog.Exitf("Follow: %v", err) + } + }() } addFn, _, err := tessera.NewAppender(driver, dedups...) @@ -151,3 +159,18 @@ func signerFromFlags() (note.Signer, []note.Signer) { return s, a } + +// BundleHasher parses a C2SP tlog-tile bundle and returns the leaf hashes of each entry it contains. +// TODO: figure out where this should live/how it should work +func BundleHasher(bundle []byte) ([][]byte, error) { + eb := &api.EntryBundle{} + if err := eb.UnmarshalText(bundle); err != nil { + return nil, fmt.Errorf("unmarshal: %v", err) + } + r := make([][]byte, 0, len(eb.Entries)) + for _, e := range eb.Entries { + h := rfc6962.DefaultHasher.HashLeaf(e) + r = append(r, h[:]) + } + return r, nil +} diff --git a/cmd/experimental/migrate/gcp/main.go b/cmd/experimental/migrate/gcp/main.go index 04cc6fd3..fa6c3c29 100644 --- a/cmd/experimental/migrate/gcp/main.go +++ b/cmd/experimental/migrate/gcp/main.go @@ -24,6 +24,7 @@ import ( "strconv" "strings" + tessera "github.com/transparency-dev/trillian-tessera" "github.com/transparency-dev/trillian-tessera/client" "github.com/transparency-dev/trillian-tessera/cmd/experimental/migrate/internal" "github.com/transparency-dev/trillian-tessera/storage/gcp" @@ -66,28 +67,32 @@ func main() { } // Create our Tessera storage backend: - gcpCfg := storageConfigFromFlags() - driver, err := gcp.NewMigrationTarget(ctx, gcpCfg, internal.BundleHasher) - if err != nil { - klog.Exitf("Failed to create new GCP storage: %v", err) - } + st := storageFromFlags(ctx) - if err := internal.Migrate(context.Background(), *numWorkers, sourceSize, sourceRoot, src.ReadEntryBundle, driver); err != nil { + if err := internal.Migrate(context.Background(), *numWorkers, sourceSize, sourceRoot, src.ReadEntryBundle, st); err != nil { klog.Exitf("Migrate failed: %v", err) } } -// storageConfigFromFlags returns a gcp.Config struct populated with values -// provided via flags. -func storageConfigFromFlags() gcp.Config { +// storageFromFlags returns a MigrationTarget copnfigured for GCP using the values provided via flags. +func storageFromFlags(ctx context.Context) tessera.MigrationTarget { if *bucket == "" { klog.Exit("--bucket must be set") } if *spanner == "" { klog.Exit("--spanner must be set") } - return gcp.Config{ + cfg := gcp.Config{ Bucket: *bucket, Spanner: *spanner, } + driver, err := gcp.NewMigrationTarget(ctx, cfg, internal.BundleHasher) + if err != nil { + klog.Exitf("Failed to create new GCP storage: %v", err) + } + st, err := tessera.NewMigrationTarget(driver) + if err != nil { + klog.Exitf("Failed to create new MigrationTarget lifecycle: %v", err) + } + return st } diff --git a/cmd/experimental/migrate/internal/migrate.go b/cmd/experimental/migrate/internal/migrate.go index 465bba12..141ccb67 100644 --- a/cmd/experimental/migrate/internal/migrate.go +++ b/cmd/experimental/migrate/internal/migrate.go @@ -23,6 +23,7 @@ import ( "github.com/avast/retry-go/v4" "github.com/transparency-dev/merkle/rfc6962" + tessera "github.com/transparency-dev/trillian-tessera" "github.com/transparency-dev/trillian-tessera/api" "github.com/transparency-dev/trillian-tessera/api/layout" "github.com/transparency-dev/trillian-tessera/client" @@ -33,7 +34,7 @@ import ( // copier controls the migration work. type copier struct { // storage is the target we're migrating to. - storage MigrationStorage + storage tessera.MigrationTarget getEntries client.EntryBundleFetcherFunc // sourceSize is the size of the source log. @@ -54,27 +55,6 @@ type bundle struct { Partial uint8 } -// MigrationStorage describes the required functionality from the target storage driver. -// -// It's expected that the implementation of this interface will attempt to integrate the entry bundles -// being set as soon as is reasonably possible. It's up to the implementation whether that's done as a -// background task, or is in-line with the call to AwaitIntegration. -type MigrationStorage interface { - // SetEntryBundle is called to store the provided entry bundle bytes at the given coordinates. - // - // Implementations SHOULD treat calls to this function as being idempotent - i.e. attempts to set a previously - // set entry bundle should succeed if the bundle data are identical. - // - // This will be called as many times as necessary to set all entry bundles being migrated, quite likely in parallel. - SetEntryBundle(ctx context.Context, index uint64, partial uint8, bundle []byte) error - // AwaitIntegration should block until the storage driver has received and integrated all outstanding entry bundles implied by sourceSize, - // and return the locally calculated root hash. - // An error should be returned if there is a problem integrating. - AwaitIntegration(ctx context.Context, sourceSize uint64) ([]byte, error) - // State returns the current integrated size and root hash of the local tree. - State(ctx context.Context) (uint64, []byte, error) -} - // Migrate starts the work of copying sourceSize entries from the source to the target log. // // Only the entry bundles are copied as the target storage is expected to integrate them and recalculate the root. @@ -82,7 +62,7 @@ type MigrationStorage interface { // // A call to this function will block until either the copying is done, or an error has occurred. // It is an error if the resource copying completes ok but the resulting root hash does not match the provided sourceRoot. -func Migrate(ctx context.Context, numWorkers int, sourceSize uint64, sourceRoot []byte, getEntries client.EntryBundleFetcherFunc, storage MigrationStorage) error { +func Migrate(ctx context.Context, numWorkers int, sourceSize uint64, sourceRoot []byte, getEntries client.EntryBundleFetcherFunc, storage tessera.MigrationTarget) error { klog.Infof("Starting migration; source size %d root %x", sourceSize, sourceRoot) m := &copier{ @@ -133,13 +113,11 @@ func Migrate(ctx context.Context, numWorkers int, sourceSize uint64, sourceRoot return m.migrateWorker(ctx) }) } - var root []byte eg.Go(func() error { - r, err := m.storage.AwaitIntegration(ctx, sourceSize) + err := m.storage.AwaitIntegration(ctx, sourceSize) if err != nil { - return fmt.Errorf("migration failed: %v", err) + return fmt.Errorf("await integration: %v", err) } - root = r return nil }) @@ -147,6 +125,11 @@ func Migrate(ctx context.Context, numWorkers int, sourceSize uint64, sourceRoot return fmt.Errorf("migration failed: %v", err) } + _, root, err := m.storage.State(ctx) + if err != nil { + return fmt.Errorf("failed to get local tree state: %v", err) + } + if !bytes.Equal(root, sourceRoot) { return fmt.Errorf("migration completed, but local root hash %x != source root hash %x", targetRoot, sourceRoot) } diff --git a/cmd/experimental/migrate/posix/main.go b/cmd/experimental/migrate/posix/main.go index a414cb49..ef39fbc8 100644 --- a/cmd/experimental/migrate/posix/main.go +++ b/cmd/experimental/migrate/posix/main.go @@ -24,6 +24,7 @@ import ( "strconv" "strings" + tessera "github.com/transparency-dev/trillian-tessera" "github.com/transparency-dev/trillian-tessera/client" "github.com/transparency-dev/trillian-tessera/cmd/experimental/migrate/internal" "github.com/transparency-dev/trillian-tessera/storage/posix" @@ -64,13 +65,21 @@ func main() { klog.Exitf("invalid checkpoint roothash %q: %v", bits[2], err) } + if err := internal.Migrate(context.Background(), *numWorkers, sourceSize, sourceRoot, src.ReadEntryBundle, storageFromFlags(ctx)); err != nil { + klog.Exitf("Migrate failed: %v", err) + } +} + +func storageFromFlags(ctx context.Context) tessera.MigrationTarget { // Create our Tessera storage backend: - st, err := posix.NewMigrationTarget(ctx, *storageDir, *initialise, internal.BundleHasher) + driver, err := posix.NewMigrationTarget(ctx, *storageDir, *initialise, internal.BundleHasher) if err != nil { klog.Exitf("Failed to create new POSIX storage: %v", err) } - if err := internal.Migrate(context.Background(), *numWorkers, sourceSize, sourceRoot, src.ReadEntryBundle, st); err != nil { - klog.Exitf("Migrate failed: %v", err) + st, err := tessera.NewMigrationTarget(driver) + if err != nil { + klog.Exitf("Failed to create new MigrationTarget lifecycle: %v", err) } + return st } diff --git a/lifecycle.go b/lifecycle.go index 8a0a403d..85d2126f 100644 --- a/lifecycle.go +++ b/lifecycle.go @@ -44,6 +44,17 @@ type LogReader interface { ReadEntryBundle(ctx context.Context, index uint64, p uint8) ([]byte, error) } +// LogStateReader is a LogReader that also provides access to the current integrated state, regardless +// of whether it's been published. +type LogStateReader interface { + LogReader + + // State returns the current size and root hash of the integrated tree. + // Note that this may not necessarily represent the same tree state as the checkpoint returned by + // LogReader.ReadCheckpoint since the current tree state may not have been published yet. + State(ctx context.Context) (uint64, []byte, error) +} + // NewAppender returns an Appender, which allows a personality to incrementally append new // leaves to the log and to read from it. // @@ -68,3 +79,59 @@ func NewAppender(d Driver, decorators ...func(AddFn) AddFn) (AddFn, LogReader, e } return add, reader, nil } + +// MigrationStorage describes the required functionality from the target storage driver to support the +// migration lifecycle mode. +// +// It's expected that the implementation of this interface will attempt to integrate the entry bundles +// being set as soon as is reasonably possible. It's up to the implementation whether that's done as a +// background task, or is in-line with the call to AwaitIntegration. +type MigrationTarget interface { + // SetEntryBundle is called to store the provided entry bundle bytes at the given coordinates. + // + // Implementations SHOULD treat calls to this function as being idempotent - i.e. attempts to set a previously + // set entry bundle should succeed if the bundle data are identical. + // + // This will be called as many times as necessary to set all entry bundles being migrated, quite likely in parallel. + SetEntryBundle(ctx context.Context, idx uint64, partial uint8, bundle []byte) error + // AwaitIntegration will block until SetEntryBundle has been called at least once for every + // entry bundle address implied by a tree of the provided size, and the storage implementation + // has successfully integrated all of the entries in those bundles into the local tree. + AwaitIntegration(ctx context.Context, size uint64) error + // State returns the current size and root hash of the target tree. + State(ctx context.Context) (uint64, []byte, error) +} + +// NewMigrationTarget returns a MigrationTarget for the provided driver, which applications can use +// to directly set entry bundles in the storage instance managed by the driver. +// +// This is intended to be used to migrate C2SP tlog-tiles compliant logs into/between Tessera storage +// implementations. +// +// Zero or more bundleProcessors can be provided to wrap the underlying functionality provided by +// the driver. +func NewMigrationTarget(d Driver, bundleProcessors ...func(MigrationTarget) MigrationTarget) (MigrationTarget, error) { + t, ok := d.(MigrationTarget) + if !ok { + return nil, fmt.Errorf("driver %T does not implement MigrationTarget", d) + } + for i := len(bundleProcessors) - 1; i > 0; i++ { + t = bundleProcessors[i](t) + } + return t, nil +} + +// Follower is the signature of a function accepted by the Follow function below. +type Follower func(ctx context.Context, lsr LogStateReader) error + +// Follow registers a func which will be called and provided with read-only access to the current state of the log. +// The provided func should only return an error under fatal conditions. +// +// This is intended for use by applications which want to perform some sort of processing based on the contents of the integrated entries. +func Follow(ctx context.Context, d Driver, fn Follower) error { + lsr, ok := d.(LogStateReader) + if !ok { + return fmt.Errorf("driver %T does not implement LogStateReader", d) + } + return fn(ctx, lsr) +} diff --git a/storage/gcp/gcp.go b/storage/gcp/gcp.go index e73cd209..243ba7e9 100644 --- a/storage/gcp/gcp.go +++ b/storage/gcp/gcp.go @@ -47,7 +47,6 @@ import ( "cloud.google.com/go/spanner/apiv1/spannerpb" gcs "cloud.google.com/go/storage" - "github.com/globocom/go-buffer" "github.com/google/go-cmp/cmp" "github.com/transparency-dev/merkle/rfc6962" tessera "github.com/transparency-dev/trillian-tessera" @@ -234,6 +233,11 @@ func (s *Storage) ReadEntryBundle(ctx context.Context, i uint64, p uint8) ([]byt return s.get(ctx, s.entriesPath(i, p)) } +// For LogReaderState +func (s *Storage) State(ctx context.Context) (uint64, []byte, error) { + return s.sequencer.currentTree(ctx) +} + // get returns the requested object. // // This is indended to be used to proxy read requests through the personality for debug/testing purposes. @@ -852,7 +856,7 @@ func (s *gcsStorage) lastModified(ctx context.Context, obj string) (time.Time, e return r.Attrs.LastModified, r.Close() } -// NewDedupe returns wrapped Add func which will use Spanner to maintain a mapping of +// NewDedupDecorator returns wrapped Add func which will use Spanner to maintain a mapping of // previously seen entries and their assigned indices. Future calls with the same entry // will return the previously assigned index, as yet unseen entries will be passed to the provided // delegate function to have an index assigned. @@ -865,68 +869,112 @@ func (s *gcsStorage) lastModified(ctx context.Context, obj string) (time.Time, e // maintaining the Merkle tree. // // This functionality is experimental! -func NewDedupe(ctx context.Context, spannerDB string) (func(tessera.AddFn) tessera.AddFn, error) { +func NewDedup(ctx context.Context, spannerDB string) (*Dedup, error) { /* - Schema for reference: - - CREATE TABLE IDSeq ( - id INT64 NOT NULL, - h BYTES(MAX) NOT NULL, - idx INT64 NOT NULL, - ) PRIMARY KEY (id, h); + Schema for reference: + CREATE TABLE FollowCoord ( + id INT64 NOT NULL, + nextIdx INT64 NOT NULL, + ) PRIMARY KEY (id); + + CREATE TABLE IDSeq ( + h BYTES(64) NOT NULL, + idx INT64 NOT NULL, + ) PRIMARY KEY (id, h); */ dedupDB, err := spanner.NewClient(ctx, spannerDB) if err != nil { return nil, fmt.Errorf("failed to connect to Spanner: %v", err) } - r := &dedupStorage{ + if _, err := dedupDB.Apply(ctx, []*spanner.Mutation{spanner.Insert("FollowCoord", []string{"id", "nextIdx"}, []interface{}{0, 0})}); err != nil && spanner.ErrCode(err) != codes.AlreadyExists { + return nil, fmt.Errorf("failed to initialise dedupDB: %v:", err) + } + + r := &Dedup{ ctx: ctx, dbPool: dedupDB, } - // TODO(al): Make these configurable - r.buf = buffer.New( - buffer.WithSize(64), - buffer.WithFlushInterval(200*time.Millisecond), - buffer.WithFlusher(buffer.FlusherFunc(r.flush)), - buffer.WithPushTimeout(15*time.Second), - ) - go func(ctx context.Context) { + go func() { t := time.NewTicker(time.Second) for { select { case <-ctx.Done(): return case <-t.C: - klog.V(1).Infof("DEDUP: # Writes %d, # Lookups %d, # DB hits %v, # buffer Push discards %d", r.numWrites.Load(), r.numLookups.Load(), r.numDBDedups.Load(), r.numPushErrs.Load()) } + hits := r.numDBDedups.Load() + lookups := r.numLookups.Load() + writes := r.numWrites.Load() + klog.Infof("DEDUP: hits %0.1f%% lookups: %d, populationWrites: %d", float64(hits)*100.0/float64(lookups), lookups, writes) } - }(ctx) - return func(af tessera.AddFn) tessera.AddFn { - r.delegate = af - return r.add - }, nil + }() + + return r, nil } -type dedupStorage struct { - ctx context.Context - dbPool *spanner.Client - delegate func(ctx context.Context, e *tessera.Entry) tessera.IndexFuture +func (d *Dedup) AppendDecorator() func(d tessera.AddFn) tessera.AddFn { + return func(delegate tessera.AddFn) tessera.AddFn { + return func(ctx context.Context, e *tessera.Entry) tessera.IndexFuture { + // Push back if anti-spam is trailing too far behind the current log. + if d.underwater.Load() { + return func() (uint64, error) { return 0, tessera.ErrPushback } + } + + idx, err := d.index(ctx, e.Identity()) + if err != nil { + return func() (uint64, error) { return 0, err } + } + if idx != nil { + return func() (uint64, error) { return *idx, nil } + } + + return delegate(ctx, e) + } + } +} + +type Dedup struct { + ctx context.Context + dbPool *spanner.Client + + underwater atomic.Bool numLookups atomic.Uint64 numWrites atomic.Uint64 numDBDedups atomic.Uint64 numPushErrs atomic.Uint64 +} - buf *buffer.Buffer +func (d *Dedup) Follower(bh BundleHasherFunc) tessera.Follower { + return func(ctx context.Context, lsr tessera.LogStateReader) error { + klog.Infof("Dedup: following") + t := time.NewTicker(500 * time.Millisecond) + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-t.C: + } + var err error + klog.V(2).Infof("Dedup: follower looking for work") + for more := true; more; { + more, err = d.populate(ctx, bh, lsr) + if err != nil { + klog.Errorf("Dedup failed to populate: %v", err) + } + } + } + } } // index returns the index (if any) previously associated with the provided hash -func (d *dedupStorage) index(ctx context.Context, h []byte) (*uint64, error) { +func (d *Dedup) index(ctx context.Context, h []byte) (*uint64, error) { d.numLookups.Add(1) var idx int64 - if row, err := d.dbPool.Single().ReadRow(ctx, "IDSeq", spanner.Key{0, h}, []string{"idx"}); err != nil { + // TODO(al): timestamp bound - good? + if row, err := d.dbPool.Single().WithTimestampBound(spanner.MaxStaleness(10*time.Second)).ReadRow(ctx, "IDSeq", spanner.Key{h}, []string{"idx"}); err != nil { if c := spanner.ErrCode(err); c == codes.NotFound { return nil, nil } @@ -941,15 +989,95 @@ func (d *dedupStorage) index(ctx context.Context, h []byte) (*uint64, error) { } } +func (d *Dedup) populate(ctx context.Context, bh BundleHasherFunc, lsr tessera.LogStateReader) (bool, error) { + workToDo := false + toSize, _, err := lsr.State(ctx) + if err != nil { + return false, fmt.Errorf("failed to read log state: %v", err) + } + _, err = d.dbPool.ReadWriteTransaction(ctx, func(ctx context.Context, txn *spanner.ReadWriteTransaction) error { + Tstart := time.Now() + // Grab the highest index we've populated so far. + row, err := txn.ReadRowWithOptions(ctx, "FollowCoord", spanner.Key{0}, []string{"nextIdx"}, &spanner.ReadOptions{LockHint: spannerpb.ReadRequest_LOCK_HINT_EXCLUSIVE}) + if err != nil { + return fmt.Errorf("read followcoord: %v", err) + } + var f int64 // Spanner doesn't support uint64 + if err := row.Columns(&f); err != nil { + return fmt.Errorf("failed to read dedup coordination info: %v", err) + } + fromIdx := uint64(f) + klog.V(1).Infof("Dedup: Populating from %d", fromIdx) + if fromIdx == toSize { + klog.V(1).Infof("Dedup: nothing new to add") + return nil + } + + // TODO(al): make this configurable + const maxBehind = 256 * 20 + behind := toSize - fromIdx + if behind > maxBehind { + klog.Infof("Dedup pushing back (%d > %d)", behind, maxBehind) + d.underwater.Store(true) + } else if d.underwater.Load() { + klog.Infof("Dedup caught up, stopping pushback") + d.underwater.Store(false) + } + + workToDo = true + + // TODO(al): make this configuable. + const maxBundles = 10 + + TfetchLeaves := time.Now() + lh, err := fetchLeafHashes(ctx, fromIdx, toSize, toSize, maxBundles, lsr.ReadEntryBundle, bh) + if err != nil { + return fmt.Errorf("fetchLeafHashes(%d, %d, %d): %v", fromIdx, toSize, toSize, err) + } + + m := make([]dedupeMapping, 0, len(lh)) + for i, h := range lh { + m = append(m, dedupeMapping{ + ID: h, + Idx: fromIdx + uint64(i), + }) + } + + TstoreMappings := time.Now() + if err := d.storeMappings(ctx, m); err != nil { + return fmt.Errorf("storeMappings(idx: %d, len: %d): %v", fromIdx, len(m), err) + } + nextIdx := fromIdx + uint64(len(m)) + + TupdateCoord := time.Now() + // Update our coordination row. + if err := txn.BufferWrite([]*spanner.Mutation{spanner.Update("FollowCoord", []string{"id", "nextIdx"}, []interface{}{0, int64(nextIdx)})}); err != nil { + return fmt.Errorf("update followcoord: %v", err) + } + + d.numWrites.Add(uint64(len(m))) + + klog.Infof("DEDUP: total %v (rC: %v, fetch: %v, storeM: %v, uC: %v)", time.Since(Tstart), TfetchLeaves.Sub(Tstart), TstoreMappings.Sub(TfetchLeaves), TupdateCoord.Sub(TstoreMappings), time.Since(TupdateCoord)) + + return nil + }) + if err != nil { + return false, err + } + + return workToDo, nil + +} + // storeMappings stores the associations between the keys and IDs in a non-atomic fashion // (i.e. it does not store all or none in a transactional sense). // // Returns an error if one or more mappings cannot be stored. -func (d *dedupStorage) storeMappings(ctx context.Context, entries []dedupeMapping) error { +func (d *Dedup) storeMappings(ctx context.Context, entries []dedupeMapping) error { m := make([]*spanner.MutationGroup, 0, len(entries)) for _, e := range entries { m = append(m, &spanner.MutationGroup{ - Mutations: []*spanner.Mutation{spanner.Insert("IDSeq", []string{"id", "h", "idx"}, []interface{}{0, e.ID, int64(e.Idx)})}, + Mutations: []*spanner.Mutation{spanner.Insert("IDSeq", []string{"h", "idx"}, []interface{}{e.ID, int64(e.Idx)})}, }) } @@ -969,59 +1097,7 @@ type dedupeMapping struct { Idx uint64 } -// add adds the entry to the underlying delegate only if e isn't already known. In either case, -// an IndexFuture will be returned that the client can use to get the sequence number of this entry. -func (d *dedupStorage) add(ctx context.Context, e *tessera.Entry) tessera.IndexFuture { - idx, err := d.index(ctx, e.Identity()) - if err != nil { - return func() (uint64, error) { return 0, err } - } - if idx != nil { - return func() (uint64, error) { return *idx, nil } - } - - i, err := d.delegate(ctx, e)() - if err != nil { - return func() (uint64, error) { return 0, err } - } - - err = d.enqueueMapping(ctx, e.Identity(), i) - return func() (uint64, error) { - return i, err - } -} - -// enqueueMapping buffers the provided ID -> index mapping ready to be flushed to storage. -func (d *dedupStorage) enqueueMapping(_ context.Context, h []byte, idx uint64) error { - err := d.buf.Push(dedupeMapping{ID: h, Idx: idx}) - if err != nil { - d.numPushErrs.Add(1) - // This means there's pressure flushing dedup writes out, so discard this write. - if err != buffer.ErrTimeout { - return err - } - } - return nil -} - -// flush writes enqueued mappings to storage. -func (d *dedupStorage) flush(items []interface{}) { - entries := make([]dedupeMapping, len(items)) - for i := range items { - entries[i] = items[i].(dedupeMapping) - } - - ctx, c := context.WithTimeout(d.ctx, 15*time.Second) - defer c() - - if err := d.storeMappings(ctx, entries); err != nil { - klog.Infof("Failed to flush dedup entries: %v", err) - return - } - d.numWrites.Add(uint64(len(entries))) -} - -// BundleHasherFunc is the signature of a function which knows how to parse an entry bundle and calculate leaf hashes for its entries. +// BundleHasherFunc is the signature of a function which knows how N parse an entry bundle and calculate leaf hashes for its entries. type BundleHasherFunc func(entryBundle []byte) (LeafHashes [][]byte, err error) // NewMigrationTarget creates a new GCP storage for the MigrationTarget lifecycle mode. @@ -1104,43 +1180,11 @@ func (m *MigrationStorage) fetchLeafHashes(ctx context.Context, from, to, source // TODO(al): Make this configurable. const maxBundles = 300 - toBeAdded := sync.Map{} - eg := errgroup.Group{} - n := 0 - for ri := range layout.Range(from, to, sourceSize) { - eg.Go(func() error { - b, err := m.s.ReadEntryBundle(ctx, ri.Index, ri.Partial) - if err != nil { - return fmt.Errorf("ReadEntryBundle(%d.%d): %v", ri.Index, ri.Partial, err) - } - - bh, err := m.bundleHasher(b) - if err != nil { - return fmt.Errorf("bundleHasherFunc for bundle index %d: %v", ri.Index, err) - } - toBeAdded.Store(ri.Index, bh[ri.First:ri.First+ri.N]) - return nil - }) - n++ - if n >= maxBundles { - break - } - } - if err := eg.Wait(); err != nil { - return nil, err - } - - lh := make([][]byte, 0, maxBundles) - for i := from / layout.EntryBundleWidth; ; i++ { - v, ok := toBeAdded.LoadAndDelete(i) - if !ok { - break - } - bh := v.([][]byte) - lh = append(lh, bh...) + h, err := fetchLeafHashes(ctx, from, to, to, maxBundles, m.s.ReadEntryBundle, m.bundleHasher) + if err != nil { + return nil, fmt.Errorf("fetchLeafHashes(%d, %d): %v", from, to, err) } - - return lh, nil + return h, nil } func (m *MigrationStorage) buildTree(ctx context.Context, sourceSize uint64) (uint64, []byte, error) { @@ -1192,3 +1236,51 @@ func (m *MigrationStorage) buildTree(ctx context.Context, sourceSize uint64) (ui } return newSize, newRoot, nil } + +// readEntryBundleFunc is the signature of a function which knows how to fetch the specified entry bundle resource. +type readEntryBundleFunc func(ctx context.Context, idx uint64, p uint8) ([]byte, error) + +// fetchLeafHashes fetches the necessary entry bundles to cover the specified [from, to) range of entries, +// and uses the provided function to hash the entries they contain. +// If the required number of bundles is greater than maxBundles, the entry range fetched is truncated. +func fetchLeafHashes(ctx context.Context, from, to, sourceSize uint64, maxBundles int, readBundle readEntryBundleFunc, bundleHasher BundleHasherFunc) ([][]byte, error) { + toBeAdded := sync.Map{} + eg := errgroup.Group{} + n := 0 + + for ri := range layout.Range(from, to, sourceSize) { + // We'll fetch each bundle in parallel because each individual request is surprisingly slow. + eg.Go(func() error { + b, err := readBundle(ctx, ri.Index, ri.Partial) + if err != nil { + return fmt.Errorf("ReadEntryBundle(%d.%d): %v", ri.Index, ri.Partial, err) + } + + bh, err := bundleHasher(b) + if err != nil { + return fmt.Errorf("bundleHasherFunc for bundle index %d: %v", ri.Index, err) + } + toBeAdded.Store(ri.Index, bh[ri.First:ri.First+ri.N]) + return nil + }) + n++ + if n >= maxBundles { + break + } + } + if err := eg.Wait(); err != nil { + return nil, err + } + + lh := make([][]byte, 0, maxBundles) + for i := from / layout.EntryBundleWidth; ; i++ { + v, ok := toBeAdded.LoadAndDelete(i) + if !ok { + break + } + bh := v.([][]byte) + lh = append(lh, bh...) + } + + return lh, nil +}