Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

XXX [GCP] Dedup support for migrate #463

Draft
wants to merge 24 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 25 additions & 2 deletions cmd/conformance/gcp/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ import (
"net/http"
"time"

"github.com/transparency-dev/merkle/rfc6962"
tessera "github.com/transparency-dev/trillian-tessera"
"github.com/transparency-dev/trillian-tessera/api"
"github.com/transparency-dev/trillian-tessera/storage/gcp"
"golang.org/x/mod/sumdb/note"
"golang.org/x/net/http2"
Expand Down Expand Up @@ -71,11 +73,17 @@ func main() {
dedups = append(dedups, tessera.InMemoryDedupe(256))
// PersistentDedup is currently experimental, so there's no terraform or documentation yet!
if *persistentDedup {
fn, err := gcp.NewDedupe(ctx, fmt.Sprintf("%s_dedup", *spanner))
dd, err := gcp.NewDedup(ctx, fmt.Sprintf("%s_dedup", *spanner))
if err != nil {
klog.Exitf("Failed to create new GCP dedupe: %v", err)
}
dedups = append(dedups, fn)
dedups = append(dedups, dd.AppendDecorator())

go func() {
if err := tessera.Follow(ctx, driver, dd.Follower(BundleHasher)); err != nil {
klog.Exitf("Follow: %v", err)
}
}()
}

addFn, _, err := tessera.NewAppender(driver, dedups...)
Expand Down Expand Up @@ -151,3 +159,18 @@ func signerFromFlags() (note.Signer, []note.Signer) {

return s, a
}

// BundleHasher parses a C2SP tlog-tile bundle and returns the leaf hashes of each entry it contains.
// TODO: figure out where this should live/how it should work
func BundleHasher(bundle []byte) ([][]byte, error) {
eb := &api.EntryBundle{}
if err := eb.UnmarshalText(bundle); err != nil {
return nil, fmt.Errorf("unmarshal: %v", err)
}
r := make([][]byte, 0, len(eb.Entries))
for _, e := range eb.Entries {
h := rfc6962.DefaultHasher.HashLeaf(e)
r = append(r, h[:])
}
return r, nil
}
25 changes: 15 additions & 10 deletions cmd/experimental/migrate/gcp/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"strconv"
"strings"

tessera "github.com/transparency-dev/trillian-tessera"
"github.com/transparency-dev/trillian-tessera/client"
"github.com/transparency-dev/trillian-tessera/cmd/experimental/migrate/internal"
"github.com/transparency-dev/trillian-tessera/storage/gcp"
Expand Down Expand Up @@ -66,28 +67,32 @@ func main() {
}

// Create our Tessera storage backend:
gcpCfg := storageConfigFromFlags()
driver, err := gcp.NewMigrationTarget(ctx, gcpCfg, internal.BundleHasher)
if err != nil {
klog.Exitf("Failed to create new GCP storage: %v", err)
}
st := storageFromFlags(ctx)

if err := internal.Migrate(context.Background(), *numWorkers, sourceSize, sourceRoot, src.ReadEntryBundle, driver); err != nil {
if err := internal.Migrate(context.Background(), *numWorkers, sourceSize, sourceRoot, src.ReadEntryBundle, st); err != nil {
klog.Exitf("Migrate failed: %v", err)
}
}

// storageConfigFromFlags returns a gcp.Config struct populated with values
// provided via flags.
func storageConfigFromFlags() gcp.Config {
// storageFromFlags returns a MigrationTarget copnfigured for GCP using the values provided via flags.
func storageFromFlags(ctx context.Context) tessera.MigrationTarget {
if *bucket == "" {
klog.Exit("--bucket must be set")
}
if *spanner == "" {
klog.Exit("--spanner must be set")
}
return gcp.Config{
cfg := gcp.Config{
Bucket: *bucket,
Spanner: *spanner,
}
driver, err := gcp.NewMigrationTarget(ctx, cfg, internal.BundleHasher)
if err != nil {
klog.Exitf("Failed to create new GCP storage: %v", err)
}
st, err := tessera.NewMigrationTarget(driver)
if err != nil {
klog.Exitf("Failed to create new MigrationTarget lifecycle: %v", err)
}
return st
}
37 changes: 10 additions & 27 deletions cmd/experimental/migrate/internal/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/avast/retry-go/v4"
"github.com/transparency-dev/merkle/rfc6962"
tessera "github.com/transparency-dev/trillian-tessera"
"github.com/transparency-dev/trillian-tessera/api"
"github.com/transparency-dev/trillian-tessera/api/layout"
"github.com/transparency-dev/trillian-tessera/client"
Expand All @@ -33,7 +34,7 @@ import (
// copier controls the migration work.
type copier struct {
// storage is the target we're migrating to.
storage MigrationStorage
storage tessera.MigrationTarget
getEntries client.EntryBundleFetcherFunc

// sourceSize is the size of the source log.
Expand All @@ -54,35 +55,14 @@ type bundle struct {
Partial uint8
}

// MigrationStorage describes the required functionality from the target storage driver.
//
// It's expected that the implementation of this interface will attempt to integrate the entry bundles
// being set as soon as is reasonably possible. It's up to the implementation whether that's done as a
// background task, or is in-line with the call to AwaitIntegration.
type MigrationStorage interface {
// SetEntryBundle is called to store the provided entry bundle bytes at the given coordinates.
//
// Implementations SHOULD treat calls to this function as being idempotent - i.e. attempts to set a previously
// set entry bundle should succeed if the bundle data are identical.
//
// This will be called as many times as necessary to set all entry bundles being migrated, quite likely in parallel.
SetEntryBundle(ctx context.Context, index uint64, partial uint8, bundle []byte) error
// AwaitIntegration should block until the storage driver has received and integrated all outstanding entry bundles implied by sourceSize,
// and return the locally calculated root hash.
// An error should be returned if there is a problem integrating.
AwaitIntegration(ctx context.Context, sourceSize uint64) ([]byte, error)
// State returns the current integrated size and root hash of the local tree.
State(ctx context.Context) (uint64, []byte, error)
}

// Migrate starts the work of copying sourceSize entries from the source to the target log.
//
// Only the entry bundles are copied as the target storage is expected to integrate them and recalculate the root.
// This is done to ensure the correctness of both the source log as well as the copy process itself.
//
// A call to this function will block until either the copying is done, or an error has occurred.
// It is an error if the resource copying completes ok but the resulting root hash does not match the provided sourceRoot.
func Migrate(ctx context.Context, numWorkers int, sourceSize uint64, sourceRoot []byte, getEntries client.EntryBundleFetcherFunc, storage MigrationStorage) error {
func Migrate(ctx context.Context, numWorkers int, sourceSize uint64, sourceRoot []byte, getEntries client.EntryBundleFetcherFunc, storage tessera.MigrationTarget) error {
klog.Infof("Starting migration; source size %d root %x", sourceSize, sourceRoot)

m := &copier{
Expand Down Expand Up @@ -133,20 +113,23 @@ func Migrate(ctx context.Context, numWorkers int, sourceSize uint64, sourceRoot
return m.migrateWorker(ctx)
})
}
var root []byte
eg.Go(func() error {
r, err := m.storage.AwaitIntegration(ctx, sourceSize)
err := m.storage.AwaitIntegration(ctx, sourceSize)
if err != nil {
return fmt.Errorf("migration failed: %v", err)
return fmt.Errorf("await integration: %v", err)
}
root = r
return nil
})

if err := eg.Wait(); err != nil {
return fmt.Errorf("migration failed: %v", err)
}

_, root, err := m.storage.State(ctx)
if err != nil {
return fmt.Errorf("failed to get local tree state: %v", err)
}

if !bytes.Equal(root, sourceRoot) {
return fmt.Errorf("migration completed, but local root hash %x != source root hash %x", targetRoot, sourceRoot)
}
Expand Down
15 changes: 12 additions & 3 deletions cmd/experimental/migrate/posix/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"strconv"
"strings"

tessera "github.com/transparency-dev/trillian-tessera"
"github.com/transparency-dev/trillian-tessera/client"
"github.com/transparency-dev/trillian-tessera/cmd/experimental/migrate/internal"
"github.com/transparency-dev/trillian-tessera/storage/posix"
Expand Down Expand Up @@ -64,13 +65,21 @@ func main() {
klog.Exitf("invalid checkpoint roothash %q: %v", bits[2], err)
}

if err := internal.Migrate(context.Background(), *numWorkers, sourceSize, sourceRoot, src.ReadEntryBundle, storageFromFlags(ctx)); err != nil {
klog.Exitf("Migrate failed: %v", err)
}
}

func storageFromFlags(ctx context.Context) tessera.MigrationTarget {
// Create our Tessera storage backend:
st, err := posix.NewMigrationTarget(ctx, *storageDir, *initialise, internal.BundleHasher)
driver, err := posix.NewMigrationTarget(ctx, *storageDir, *initialise, internal.BundleHasher)
if err != nil {
klog.Exitf("Failed to create new POSIX storage: %v", err)
}

if err := internal.Migrate(context.Background(), *numWorkers, sourceSize, sourceRoot, src.ReadEntryBundle, st); err != nil {
klog.Exitf("Migrate failed: %v", err)
st, err := tessera.NewMigrationTarget(driver)
if err != nil {
klog.Exitf("Failed to create new MigrationTarget lifecycle: %v", err)
}
return st
}
67 changes: 67 additions & 0 deletions lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,17 @@ type LogReader interface {
ReadEntryBundle(ctx context.Context, index uint64, p uint8) ([]byte, error)
}

// LogStateReader is a LogReader that also provides access to the current integrated state, regardless
// of whether it's been published.
type LogStateReader interface {
LogReader

// State returns the current size and root hash of the integrated tree.
// Note that this may not necessarily represent the same tree state as the checkpoint returned by
// LogReader.ReadCheckpoint since the current tree state may not have been published yet.
State(ctx context.Context) (uint64, []byte, error)
}

// NewAppender returns an Appender, which allows a personality to incrementally append new
// leaves to the log and to read from it.
//
Expand All @@ -68,3 +79,59 @@ func NewAppender(d Driver, decorators ...func(AddFn) AddFn) (AddFn, LogReader, e
}
return add, reader, nil
}

// MigrationStorage describes the required functionality from the target storage driver to support the
// migration lifecycle mode.
//
// It's expected that the implementation of this interface will attempt to integrate the entry bundles
// being set as soon as is reasonably possible. It's up to the implementation whether that's done as a
// background task, or is in-line with the call to AwaitIntegration.
type MigrationTarget interface {
// SetEntryBundle is called to store the provided entry bundle bytes at the given coordinates.
//
// Implementations SHOULD treat calls to this function as being idempotent - i.e. attempts to set a previously
// set entry bundle should succeed if the bundle data are identical.
//
// This will be called as many times as necessary to set all entry bundles being migrated, quite likely in parallel.
SetEntryBundle(ctx context.Context, idx uint64, partial uint8, bundle []byte) error
// AwaitIntegration will block until SetEntryBundle has been called at least once for every
// entry bundle address implied by a tree of the provided size, and the storage implementation
// has successfully integrated all of the entries in those bundles into the local tree.
AwaitIntegration(ctx context.Context, size uint64) error
// State returns the current size and root hash of the target tree.
State(ctx context.Context) (uint64, []byte, error)
}

// NewMigrationTarget returns a MigrationTarget for the provided driver, which applications can use
// to directly set entry bundles in the storage instance managed by the driver.
//
// This is intended to be used to migrate C2SP tlog-tiles compliant logs into/between Tessera storage
// implementations.
//
// Zero or more bundleProcessors can be provided to wrap the underlying functionality provided by
// the driver.
func NewMigrationTarget(d Driver, bundleProcessors ...func(MigrationTarget) MigrationTarget) (MigrationTarget, error) {
t, ok := d.(MigrationTarget)
if !ok {
return nil, fmt.Errorf("driver %T does not implement MigrationTarget", d)
}
for i := len(bundleProcessors) - 1; i > 0; i++ {
t = bundleProcessors[i](t)
}
return t, nil
}

// Follower is the signature of a function accepted by the Follow function below.
type Follower func(ctx context.Context, lsr LogStateReader) error

// Follow registers a func which will be called and provided with read-only access to the current state of the log.
// The provided func should only return an error under fatal conditions.
//
// This is intended for use by applications which want to perform some sort of processing based on the contents of the integrated entries.
func Follow(ctx context.Context, d Driver, fn Follower) error {
lsr, ok := d.(LogStateReader)
if !ok {
return fmt.Errorf("driver %T does not implement LogStateReader", d)
}
return fn(ctx, lsr)
}
Loading
Loading