From b8149af9a30dda53e5a81bf1143f8ec29d4230a2 Mon Sep 17 00:00:00 2001 From: Al Cutter Date: Wed, 11 Dec 2024 11:50:42 +0000 Subject: [PATCH 1/6] [POSIX] Add MigrationTarget support --- storage/posix/files.go | 84 +++++++++++++++++++++++++++++++++++------- 1 file changed, 71 insertions(+), 13 deletions(-) diff --git a/storage/posix/files.go b/storage/posix/files.go index c744b7f3..21afc969 100644 --- a/storage/posix/files.go +++ b/storage/posix/files.go @@ -207,16 +207,7 @@ func (s *Storage) sequenceBatch(ctx context.Context, entries []*tessera.Entry) e } } writeBundle := func(bundleIndex uint64, partialSize uint8) error { - bf := filepath.Join(s.path, s.entriesPath(bundleIndex, partialSize)) - if err := os.MkdirAll(filepath.Dir(bf), dirPerm); err != nil { - return fmt.Errorf("failed to make entries directory structure: %w", err) - } - if err := createExclusive(bf, currTile.Bytes()); err != nil { - if !errors.Is(err, os.ErrExist) { - return err - } - } - return nil + return s.writeBundle(ctx, bundleIndex, partialSize, currTile.Bytes()) } seqEntries := make([]storage.SequencedEntry, 0, len(entries)) @@ -330,7 +321,7 @@ func (s *Storage) readTile(ctx context.Context, level, index uint64, p uint8) (* // Fully populated tiles are stored at the path corresponding to the level & // index parameters, partially populated (i.e. right-hand edge) tiles are // stored with a .xx suffix where xx is the number of "tile leaves" in hex. -func (s *Storage) storeTile(_ context.Context, level, index, logSize uint64, tile *api.HashTile) error { +func (s *Storage) storeTile(ctx context.Context, level, index, logSize uint64, tile *api.HashTile) error { tileSize := uint64(len(tile.Nodes)) klog.V(2).Infof("StoreTile: level %d index %x ts: %x", level, index, tileSize) if tileSize == 0 || tileSize > layout.TileWidth { @@ -341,7 +332,11 @@ func (s *Storage) storeTile(_ context.Context, level, index, logSize uint64, til return fmt.Errorf("failed to marshal tile: %w", err) } - tPath := filepath.Join(s.path, layout.TilePath(level, index, layout.PartialTileSize(level, index, logSize))) + return s.writeTile(ctx, level, index, layout.PartialTileSize(level, index, logSize), t) +} + +func (s *Storage) writeTile(_ context.Context, level, index uint64, partial uint8, t []byte) error { + tPath := filepath.Join(s.path, layout.TilePath(level, index, partial)) tDir := filepath.Dir(tPath) if err := os.MkdirAll(tDir, dirPerm); err != nil { return fmt.Errorf("failed to create directory %q: %w", tDir, err) @@ -351,7 +346,7 @@ func (s *Storage) storeTile(_ context.Context, level, index, logSize uint64, til return err } - if tileSize == layout.TileWidth { + if partial == 0 { partials, err := filepath.Glob(fmt.Sprintf("%s.p/*", tPath)) if err != nil { return fmt.Errorf("failed to list partial tiles for clean up; %w", err) @@ -376,6 +371,20 @@ func (s *Storage) storeTile(_ context.Context, level, index, logSize uint64, til return nil } +// writeBundle takes care of writing out the serialised entry bundle file. +func (s *Storage) writeBundle(_ context.Context, index uint64, partial uint8, bundle []byte) error { + bf := filepath.Join(s.path, s.entriesPath(index, partial)) + if err := os.MkdirAll(filepath.Dir(bf), dirPerm); err != nil { + return fmt.Errorf("failed to make entries directory structure: %w", err) + } + if err := createExclusive(bf, bundle); err != nil { + if !errors.Is(err, os.ErrExist) { + return err + } + } + return nil +} + // initialise ensures that the storage location is valid by loading the checkpoint from this location. // If `create` is set to true, then this will first ensure that the directory path is created, and // an empty checkpoint is created in this directory. @@ -497,3 +506,52 @@ func createExclusive(f string, d []byte) error { } return nil } + +// NewMigrationTarget creates a new POSIX storage for the MigrationTarget lifecycle mode. +// - path is a directory in which the log should be stored +// - create must only be set when first creating the log, and will create the directory structure and an empty checkpoint +func NewMigrationTarget(ctx context.Context, path string, opts ...func(*options.StorageOptions)) (*MigrationStorage, error) { + opt := storage.ResolveStorageOptions(opts...) + + r := &MigrationStorage{ + s: &Storage{ + path: path, + entriesPath: opt.EntriesPath, + }, + } + klog.Infof("Initializing directory for POSIX log at %q", r.s.path) + if err := os.MkdirAll(filepath.Join(r.s.path, stateDir), dirPerm); err != nil { + return nil, fmt.Errorf("failed to create log directory: %q", err) + } + + return r, nil +} + +type MigrationStorage struct { + s *Storage +} + +func (m *MigrationStorage) SetTile(ctx context.Context, level, index uint64, partial uint8, tile []byte) error { + return m.s.writeTile(ctx, index, level, partial, tile) +} +func (m *MigrationStorage) SetEntryBundle(ctx context.Context, index uint64, partial uint8, bundle []byte) error { + return m.s.writeBundle(ctx, index, partial, bundle) +} +func (m *MigrationStorage) SetState(ctx context.Context, treeSize uint64, rootHash []byte) error { + // Double locking: + // - The mutex `Lock()` ensures that multiple concurrent calls to this function within a task are serialised. + // - The POSIX `lockForTreeUpdate()` ensures that distinct tasks are serialised. + m.s.mu.Lock() + unlock, err := lockFile(filepath.Join(m.s.path, stateDir, "treeState.lock")) + if err != nil { + panic(err) + } + defer func() { + if err := unlock(); err != nil { + panic(err) + } + m.s.mu.Unlock() + }() + + return m.s.writeTreeState(treeSize, rootHash) +} From 0579e566d4235367e7a85ab02a9b9b52b8d5abb9 Mon Sep 17 00:00:00 2001 From: Al Cutter Date: Wed, 11 Dec 2024 16:31:27 +0000 Subject: [PATCH 2/6] Add experimental migrate command --- .../migrate/internal/migrate/migrate.go | 184 ++++++++++++++++++ cmd/experimental/migrate/posix/main.go | 59 ++++++ 2 files changed, 243 insertions(+) create mode 100644 cmd/experimental/migrate/internal/migrate/migrate.go create mode 100644 cmd/experimental/migrate/posix/main.go diff --git a/cmd/experimental/migrate/internal/migrate/migrate.go b/cmd/experimental/migrate/internal/migrate/migrate.go new file mode 100644 index 00000000..41734612 --- /dev/null +++ b/cmd/experimental/migrate/internal/migrate/migrate.go @@ -0,0 +1,184 @@ +// Copyright 2024 The Tessera authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package migrate + +import ( + "context" + "encoding/base64" + "fmt" + "strconv" + "strings" + "sync/atomic" + "time" + + "github.com/transparency-dev/trillian-tessera/api/layout" + "github.com/transparency-dev/trillian-tessera/client" + "golang.org/x/sync/errgroup" + "k8s.io/klog/v2" +) + +type migrate struct { + storage MigrationStorage + getCP client.CheckpointFetcherFunc + getTile client.TileFetcherFunc + getEntries client.EntryBundleFetcherFunc + + todo chan span + + tilesToMigrate uint64 + bundlesToMigrate uint64 + tilesMigrated atomic.Uint64 + bundlesMigrated atomic.Uint64 +} + +// span represents the number of tiles at a given tile-level. +type span struct { + level int + start uint64 + N uint64 +} + +type MigrationStorage interface { + SetTile(ctx context.Context, level, index uint64, partial uint8, tile []byte) error + SetEntryBundle(ctx context.Context, index uint64, partial uint8, bundle []byte) error + SetState(ctx context.Context, treeSize uint64, rootHash []byte) error +} + +func Migrate(ctx context.Context, stateDB string, getCP client.CheckpointFetcherFunc, getTile client.TileFetcherFunc, getEntries client.EntryBundleFetcherFunc, storage MigrationStorage) error { + // TODO store state & resume + m := &migrate{ + storage: storage, + getCP: getCP, + getTile: getTile, + getEntries: getEntries, + todo: make(chan span, 100), + } + + // init + cp, err := getCP(ctx) + if err != nil { + return fmt.Errorf("fetch initial source checkpoint: %v", err) + } + bits := strings.Split(string(cp), "\n") + size, err := strconv.ParseUint(bits[1], 10, 64) + if err != nil { + return fmt.Errorf("invalid CP size %q: %v", bits[1], err) + } + rootHash, err := base64.StdEncoding.DecodeString(bits[2]) + if err != nil { + return fmt.Errorf("invalid checkpoint roothash %q: %v", bits[2], err) + } + + // figure out what needs copying + go m.populateSpans(size) + + // Print stats + go func() { + for { + time.Sleep(time.Second) + tn := m.tilesMigrated.Load() + tnp := float64(tn*100) / float64(m.tilesToMigrate) + bn := m.bundlesMigrated.Load() + bnp := float64(tn*100) / float64(m.bundlesToMigrate) + klog.Infof("tiles: %d (%.2f%%) bundles: %d (%.2f%%)", tn, tnp, bn, bnp) + } + }() + + // Do the copying + eg := errgroup.Group{} + for i := 0; i < 1000; i++ { + eg.Go(func() error { + return m.migrateRange(ctx) + + }) + } + if err := eg.Wait(); err != nil { + return fmt.Errorf("migrate failed to copy resources: %v", err) + } + return storage.SetState(ctx, size, rootHash) +} + +func calcExpectedCounts(treeSize uint64) (uint64, uint64) { + tiles := uint64(0) + bundles := uint64(0) + levelSize := treeSize + for level := 0; levelSize > 0; level++ { + numFull, partial := levelSize/layout.TileWidth, levelSize%layout.TileWidth + n := numFull + if partial > 0 { + n++ + } + tiles += n + if level == 0 { + bundles = n + } + levelSize >>= layout.TileHeight + } + return tiles, bundles +} + +func (m *migrate) populateSpans(treeSize uint64) { + m.tilesToMigrate, m.bundlesToMigrate = calcExpectedCounts(treeSize) + klog.Infof("Spans for treeSize %d", treeSize) + klog.Infof("total resources to fetch %d tiles + %d bundles = %d", m.tilesToMigrate, m.bundlesToMigrate, m.tilesToMigrate+m.bundlesToMigrate) + + levelSize := treeSize + for level := 0; levelSize > 0; level++ { + numFull, partial := levelSize/layout.TileWidth, levelSize%layout.TileWidth + for j := uint64(0); j < numFull; j++ { + m.todo <- span{level: level, start: j, N: layout.TileWidth} + if level == 0 { + m.todo <- span{level: -1, start: j, N: layout.TileWidth} + } + } + if partial > 0 { + m.todo <- span{level: level, start: numFull, N: partial} + if level == 0 { + m.todo <- span{level: -1, start: numFull, N: partial} + } + + } + levelSize >>= layout.TileHeight + } + close(m.todo) +} + +func (m *migrate) migrateRange(ctx context.Context) error { + for s := range m.todo { + if s.N == layout.TileWidth { + s.N = 0 + } + if s.level == -1 { + d, err := m.getEntries(ctx, s.start, uint8(s.N)) + if err != nil { + return fmt.Errorf("failed to fetch entrybundle %d (p=%d): %v", s.start, s.N, err) + } + if err := m.storage.SetEntryBundle(ctx, s.start, uint8(s.N), d); err != nil { + return fmt.Errorf("failed to store entrybundle %d (p=%d): %v", s.start, s.N, err) + } + m.bundlesMigrated.Add(1) + } else { + d, err := m.getTile(ctx, uint64(s.level), s.start, uint8(s.N)) + if err != nil { + return fmt.Errorf("failed to fetch tile level %d index %d (p=%d): %v", s.level, s.start, s.N, err) + } + if err := m.storage.SetTile(ctx, uint64(s.level), s.start, uint8(s.N), d); err != nil { + return fmt.Errorf("failed to store tile level %d index %d (p=%d): %v", s.level, s.start, s.N, err) + } + m.tilesMigrated.Add(1) + } + } + return nil +} diff --git a/cmd/experimental/migrate/posix/main.go b/cmd/experimental/migrate/posix/main.go new file mode 100644 index 00000000..f1d53ef0 --- /dev/null +++ b/cmd/experimental/migrate/posix/main.go @@ -0,0 +1,59 @@ +// Copyright 2024 The Tessera authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// posix-migrate is a command-line tool for migrating data from a tlog-tiles +// compliant log, into a Tessera log instance. +package main + +import ( + "context" + "flag" + "net/url" + + "github.com/transparency-dev/trillian-tessera/client" + "github.com/transparency-dev/trillian-tessera/cmd/experimental/migrate/internal/migrate" + "github.com/transparency-dev/trillian-tessera/storage/posix" + "k8s.io/klog/v2" +) + +var ( + storageDir = flag.String("storage_dir", "", "Root directory to store log data.") + sourceURL = flag.String("source_url", "", "Base URL for the source log.") + stateDB = flag.String("state_database", "migrate.sttate", "File to use for the temporary file used to track migration state.") +) + +func main() { + klog.InitFlags(nil) + flag.Parse() + ctx := context.Background() + + srcURL, err := url.Parse(*sourceURL) + if err != nil { + klog.Exitf("Invalid --source_url %q: %v", *sourceURL, err) + } + src, err := client.NewHTTPFetcher(srcURL, nil) + if err != nil { + klog.Exitf("Failed to create HTTP fetcher: %v", err) + } + + // Construct a new Tessera POSIX MigrationTarget log storage. + st, err := posix.NewMigrationTarget(ctx, *storageDir) + if err != nil { + klog.Exitf("Failed to construct storage: %v", err) + } + + if err := migrate.Migrate(context.Background(), *stateDB, src.ReadCheckpoint, src.ReadTile, src.ReadEntryBundle, st); err != nil { + klog.Exitf("Migrate failed: %v", err) + } +} From d6eb97506c93b4b53ae91a260a3276deb301ed3d Mon Sep 17 00:00:00 2001 From: Al Cutter Date: Wed, 11 Dec 2024 16:34:57 +0000 Subject: [PATCH 3/6] Hack in CT compat so we can copy rome --- cmd/experimental/migrate/posix/main.go | 31 ++++++++++++++++++++++++-- 1 file changed, 29 insertions(+), 2 deletions(-) diff --git a/cmd/experimental/migrate/posix/main.go b/cmd/experimental/migrate/posix/main.go index f1d53ef0..0b13382a 100644 --- a/cmd/experimental/migrate/posix/main.go +++ b/cmd/experimental/migrate/posix/main.go @@ -19,8 +19,14 @@ package main import ( "context" "flag" + "fmt" + "io" + "net/http" "net/url" + "strings" + tessera "github.com/transparency-dev/trillian-tessera" + "github.com/transparency-dev/trillian-tessera/api/layout" "github.com/transparency-dev/trillian-tessera/client" "github.com/transparency-dev/trillian-tessera/cmd/experimental/migrate/internal/migrate" "github.com/transparency-dev/trillian-tessera/storage/posix" @@ -46,14 +52,35 @@ func main() { if err != nil { klog.Exitf("Failed to create HTTP fetcher: %v", err) } + // HACK CT: + readEntryBundle := func(ctx context.Context, i uint64, p uint8) ([]byte, error) { + up := strings.Replace(layout.EntriesPath(i, p), "entries", "data", 1) + reqURL, err := url.JoinPath(*sourceURL, up) + if err != nil { + return nil, err + } + req, err := http.NewRequestWithContext(ctx, "GET", reqURL, nil) + if err != nil { + return nil, err + } + rsp, err := http.DefaultClient.Do(req) + if err != nil { + return nil, err + } + defer rsp.Body.Close() + if rsp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("GET %q: %v", req.URL.Path, rsp.Status) + } + return io.ReadAll(rsp.Body) + } // Construct a new Tessera POSIX MigrationTarget log storage. - st, err := posix.NewMigrationTarget(ctx, *storageDir) + st, err := posix.NewMigrationTarget(ctx, *storageDir, tessera.WithCTLayout()) if err != nil { klog.Exitf("Failed to construct storage: %v", err) } - if err := migrate.Migrate(context.Background(), *stateDB, src.ReadCheckpoint, src.ReadTile, src.ReadEntryBundle, st); err != nil { + if err := migrate.Migrate(context.Background(), *stateDB, src.ReadCheckpoint, src.ReadTile, readEntryBundle, st); err != nil { klog.Exitf("Migrate failed: %v", err) } } From fe746625f060ac6c7c53d4988a48c80ebeba94d3 Mon Sep 17 00:00:00 2001 From: Al Cutter Date: Wed, 11 Dec 2024 16:58:47 +0000 Subject: [PATCH 4/6] [GCP] Add MigrationTarget support --- storage/gcp/gcp.go | 80 ++++++++++++++++++++++++++++++++++++----- storage/gcp/gcp_test.go | 6 +++- 2 files changed, 76 insertions(+), 10 deletions(-) diff --git a/storage/gcp/gcp.go b/storage/gcp/gcp.go index bf4e95a7..69424019 100644 --- a/storage/gcp/gcp.go +++ b/storage/gcp/gcp.go @@ -35,6 +35,7 @@ import ( "errors" "fmt" "io" + "math" "net/http" "os" "sync/atomic" @@ -282,14 +283,8 @@ func (s *Storage) publishCheckpoint(ctx context.Context, minStaleness time.Durat // setTile idempotently stores the provided tile at the location implied by the given level, index, and treeSize. // // The location to which the tile is written is defined by the tile layout spec. -func (s *Storage) setTile(ctx context.Context, level, index, logSize uint64, tile *api.HashTile) error { - data, err := tile.MarshalText() - if err != nil { - return err - } - tPath := layout.TilePath(level, index, layout.PartialTileSize(level, index, logSize)) - klog.V(2).Infof("StoreTile: %s (%d entries)", tPath, len(tile.Nodes)) - +func (s *Storage) setTile(ctx context.Context, level, index uint64, partial uint8, data []byte) error { + tPath := layout.TilePath(level, index, partial) return s.objStore.setObject(ctx, tPath, data, &gcs.Conditions{DoesNotExist: true}, logContType, logCacheControl) } @@ -390,7 +385,11 @@ func (s *Storage) integrate(ctx context.Context, fromSeq uint64, entries []stora for k, v := range tiles { func(ctx context.Context, k storage.TileID, v *api.HashTile) { errG.Go(func() error { - return s.setTile(ctx, uint64(k.Level), k.Index, newSize, v) + data, err := v.MarshalText() + if err != nil { + return err + } + return s.setTile(ctx, k.Level, k.Index, layout.PartialTileSize(k.Level, k.Index, newSize), data) }) }(ctx, k, v) } @@ -964,3 +963,66 @@ func (d *dedupStorage) flush(items []interface{}) { } d.numWrites.Add(uint64(len(entries))) } + +// NewMigrationTarget creates a new POSIX storage for the MigrationTarget lifecycle mode. +// - path is a directory in which the log should be stored +// - create must only be set when first creating the log, and will create the directory structure and an empty checkpoint +func NewMigrationTarget(ctx context.Context, cfg Config, opts ...func(*options.StorageOptions)) (*MigrationStorage, error) { + opt := storage.ResolveStorageOptions(opts...) + if opt.PushbackMaxOutstanding == 0 { + opt.PushbackMaxOutstanding = DefaultPushbackMaxOutstanding + } + + c, err := gcs.NewClient(ctx, gcs.WithJSONReads()) + if err != nil { + return nil, fmt.Errorf("failed to create GCS client: %v", err) + } + + seq, err := newSpannerSequencer(ctx, cfg.Spanner, uint64(opt.PushbackMaxOutstanding)) + if err != nil { + return nil, fmt.Errorf("failed to create Spanner sequencer: %v", err) + } + + r := &Storage{ + objStore: &gcsStorage{ + gcsClient: c, + bucket: cfg.Bucket, + }, + sequencer: seq, + newCP: opt.NewCP, + entriesPath: opt.EntriesPath, + } + + if err := r.init(ctx); err != nil { + return nil, fmt.Errorf("failed to initialise log storage: %v", err) + } + + m := &MigrationStorage{ + s: r, + dbPool: seq.dbPool, + } + return m, nil +} + +type MigrationStorage struct { + s *Storage + dbPool *spanner.Client +} + +func (m *MigrationStorage) SetTile(ctx context.Context, level, index uint64, partial uint8, tile []byte) error { + return m.s.setTile(ctx, index, level, partial, tile) +} +func (m *MigrationStorage) SetEntryBundle(ctx context.Context, index uint64, partial uint8, bundle []byte) error { + return m.s.setEntryBundle(ctx, index, partial, bundle) +} +func (m *MigrationStorage) SetState(ctx context.Context, treeSize uint64, rootHash []byte) error { + // Spanner doesn't support UINT64 types, so need to check for overflow here. + if treeSize > math.MaxInt64 { + return fmt.Errorf("treeSize %d is larger than int64 can contain", treeSize) + } + _, err := m.dbPool.Apply(ctx, []*spanner.Mutation{ + spanner.Update("IntCoord", []string{"id", "seq", "rootHash"}, []interface{}{0, int64(treeSize), rootHash}), + spanner.Update("SeqCoord", []string{"id", "next"}, []interface{}{0, int64(treeSize)}), + }) + return err +} diff --git a/storage/gcp/gcp_test.go b/storage/gcp/gcp_test.go index fcda9a3e..e1556c61 100644 --- a/storage/gcp/gcp_test.go +++ b/storage/gcp/gcp_test.go @@ -228,7 +228,11 @@ func TestTileRoundtrip(t *testing.T) { } { t.Run(test.name, func(t *testing.T) { wantTile := makeTile(t, test.tileSize) - if err := s.setTile(ctx, test.level, test.index, test.logSize, wantTile); err != nil { + tRaw, err := wantTile.MarshalText() + if err != nil { + t.Fatalf("Failed to marshal tile: %v", err) + } + if err := s.setTile(ctx, test.level, test.index, layout.PartialTileSize(test.level, test.index, test.logSize), tRaw); err != nil { t.Fatalf("setTile: %v", err) } From 2914886957afd3c5be7fa3e90b94b8ea14364cc8 Mon Sep 17 00:00:00 2001 From: Al Cutter Date: Wed, 11 Dec 2024 17:02:59 +0000 Subject: [PATCH 5/6] Add GCP migrate tool --- cmd/experimental/migrate/gcp/main.go | 76 ++++++++++++++++++++++++++++ storage/gcp/gcp.go | 4 -- 2 files changed, 76 insertions(+), 4 deletions(-) create mode 100644 cmd/experimental/migrate/gcp/main.go diff --git a/cmd/experimental/migrate/gcp/main.go b/cmd/experimental/migrate/gcp/main.go new file mode 100644 index 00000000..d9d02b25 --- /dev/null +++ b/cmd/experimental/migrate/gcp/main.go @@ -0,0 +1,76 @@ +// Copyright 2024 The Tessera authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// posix-migrate is a command-line tool for migrating data from a tlog-tiles +// compliant log, into a Tessera log instance. +package main + +import ( + "context" + "flag" + "net/url" + + "github.com/transparency-dev/trillian-tessera/client" + "github.com/transparency-dev/trillian-tessera/cmd/experimental/migrate/internal/migrate" + "github.com/transparency-dev/trillian-tessera/storage/gcp" + "k8s.io/klog/v2" +) + +var ( + bucket = flag.String("bucket", "", "Bucket to use for storing log") + spanner = flag.String("spanner", "", "Spanner resource URI ('projects/.../...')") + sourceURL = flag.String("source_url", "", "Base URL for the source log.") + stateDB = flag.String("state_database", "migrate.sttate", "File to use for the temporary file used to track migration state.") +) + +func main() { + klog.InitFlags(nil) + flag.Parse() + ctx := context.Background() + + srcURL, err := url.Parse(*sourceURL) + if err != nil { + klog.Exitf("Invalid --source_url %q: %v", *sourceURL, err) + } + src, err := client.NewHTTPFetcher(srcURL, nil) + if err != nil { + klog.Exitf("Failed to create HTTP fetcher: %v", err) + } + + // Create our Tessera storage backend: + gcpCfg := storageConfigFromFlags() + st, err := gcp.NewMigrationTarget(ctx, gcpCfg) + if err != nil { + klog.Exitf("Failed to create new GCP storage: %v", err) + } + + if err := migrate.Migrate(context.Background(), *stateDB, src.ReadCheckpoint, src.ReadTile, src.ReadEntryBundle, st); err != nil { + klog.Exitf("Migrate failed: %v", err) + } +} + +// storageConfigFromFlags returns a gcp.Config struct populated with values +// provided via flags. +func storageConfigFromFlags() gcp.Config { + if *bucket == "" { + klog.Exit("--bucket must be set") + } + if *spanner == "" { + klog.Exit("--spanner must be set") + } + return gcp.Config{ + Bucket: *bucket, + Spanner: *spanner, + } +} diff --git a/storage/gcp/gcp.go b/storage/gcp/gcp.go index 69424019..7e05a04a 100644 --- a/storage/gcp/gcp.go +++ b/storage/gcp/gcp.go @@ -993,10 +993,6 @@ func NewMigrationTarget(ctx context.Context, cfg Config, opts ...func(*options.S entriesPath: opt.EntriesPath, } - if err := r.init(ctx); err != nil { - return nil, fmt.Errorf("failed to initialise log storage: %v", err) - } - m := &MigrationStorage{ s: r, dbPool: seq.dbPool, From c82463102ecc065d29c6303e10dfd8f0d1329603 Mon Sep 17 00:00:00 2001 From: Al Cutter Date: Wed, 11 Dec 2024 17:03:29 +0000 Subject: [PATCH 6/6] Hack in CT support to GCP so we can copy rome --- cmd/experimental/migrate/gcp/main.go | 31 ++++++++++++++++++++++++++-- 1 file changed, 29 insertions(+), 2 deletions(-) diff --git a/cmd/experimental/migrate/gcp/main.go b/cmd/experimental/migrate/gcp/main.go index d9d02b25..375aabd7 100644 --- a/cmd/experimental/migrate/gcp/main.go +++ b/cmd/experimental/migrate/gcp/main.go @@ -19,8 +19,14 @@ package main import ( "context" "flag" + "fmt" + "io" + "net/http" "net/url" + "strings" + tessera "github.com/transparency-dev/trillian-tessera" + "github.com/transparency-dev/trillian-tessera/api/layout" "github.com/transparency-dev/trillian-tessera/client" "github.com/transparency-dev/trillian-tessera/cmd/experimental/migrate/internal/migrate" "github.com/transparency-dev/trillian-tessera/storage/gcp" @@ -47,15 +53,36 @@ func main() { if err != nil { klog.Exitf("Failed to create HTTP fetcher: %v", err) } + // HACK CT: + readEntryBundle := func(ctx context.Context, i uint64, p uint8) ([]byte, error) { + up := strings.Replace(layout.EntriesPath(i, p), "entries", "data", 1) + reqURL, err := url.JoinPath(*sourceURL, up) + if err != nil { + return nil, err + } + req, err := http.NewRequestWithContext(ctx, "GET", reqURL, nil) + if err != nil { + return nil, err + } + rsp, err := http.DefaultClient.Do(req) + if err != nil { + return nil, err + } + defer rsp.Body.Close() + if rsp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("GET %q: %v", req.URL.Path, rsp.Status) + } + return io.ReadAll(rsp.Body) + } // Create our Tessera storage backend: gcpCfg := storageConfigFromFlags() - st, err := gcp.NewMigrationTarget(ctx, gcpCfg) + st, err := gcp.NewMigrationTarget(ctx, gcpCfg, tessera.WithCTLayout()) if err != nil { klog.Exitf("Failed to create new GCP storage: %v", err) } - if err := migrate.Migrate(context.Background(), *stateDB, src.ReadCheckpoint, src.ReadTile, src.ReadEntryBundle, st); err != nil { + if err := migrate.Migrate(context.Background(), *stateDB, src.ReadCheckpoint, src.ReadTile, readEntryBundle, st); err != nil { klog.Exitf("Migrate failed: %v", err) } }