Skip to content

Commit

Permalink
use errgroups
Browse files Browse the repository at this point in the history
  • Loading branch information
aditsachde committed Jul 23, 2024
1 parent 3fdad27 commit f03e535
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 31 deletions.
48 changes: 40 additions & 8 deletions internal/ctsubmit/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package ctsubmit
import (
"bytes"
"context"
"crypto/sha256"
"encoding/binary"
"errors"
"fmt"
Expand All @@ -14,6 +15,9 @@ import (
"github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/service/s3"
s3types "github.com/aws/aws-sdk-go-v2/service/s3/types"
"github.com/google/certificate-transparency-go/x509"
"golang.org/x/mod/sumdb/tlog"
"golang.org/x/sync/errgroup"
"itko.dev/internal/sunlight"
)

Expand Down Expand Up @@ -81,6 +85,28 @@ func (b *Bucket) Exists(ctx context.Context, key string) (bool, error) {

// --------------------------------------------------------------------------------------------

func (b *Bucket) SetTile(ctx context.Context, tile tlog.Tile, data []byte) error {
return b.Set(ctx, tile.Path(), data)
}

func (b *Bucket) SetSth(ctx context.Context, data []byte) error {
return b.Set(ctx, "ct/v1/get-sth", data)
}

func (b *Bucket) SetIssuer(ctx context.Context, cert *x509.Certificate) error {
fingerprint := sha256.Sum256(cert.Raw)
exists, err := b.Exists(ctx, fmt.Sprintf("issuer/%x", fingerprint))
if err != nil {
return err
}
if !exists {
return b.Set(ctx, fmt.Sprintf("issuer/%x", fingerprint), cert.Raw)
}
return nil
}

// --------------------------------------------------------------------------------------------

type RecordHashUpload struct {
hash [16]byte // if 16 bytes is good enough for sunlight, its good enough for us
leafIndex uint64
Expand Down Expand Up @@ -182,12 +208,15 @@ func (b *Bucket) PutRecordHashes(ctx context.Context, hashes []RecordHashUpload,
f[e.hashPath] = newRecords
}


// Now, write the updated files back to the bucket.
g, gctx := errgroup.WithContext(ctx)
for k, v := range f {
err := b.Set(ctx, "int/hashes/"+k, v)
if err != nil {
return err
}
g.Go(func() error { return b.Set(gctx, "int/hashes/"+k, v) })
}

if err := g.Wait(); err != nil {
return err
}

return nil
Expand Down Expand Up @@ -320,12 +349,15 @@ func (b *Bucket) PutDedupeEntries(ctx context.Context, hashes []DedupeUpload, ma
f[e.hashPath] = newRecords
}


// Now, write the updated files back to the bucket.
g, gctx := errgroup.WithContext(ctx)
for k, v := range f {
err := b.Set(ctx, "int/dedupe/"+k, v)
if err != nil {
return err
}
g.Go(func() error { return b.Set(gctx, "int/dedupe/"+k, v) })
}

if err := g.Wait(); err != nil {
return err
}

return nil
Expand Down
47 changes: 24 additions & 23 deletions internal/ctsubmit/logic.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/google/certificate-transparency-go/x509"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"golang.org/x/mod/sumdb/tlog"
"golang.org/x/sync/errgroup"
"itko.dev/internal/sunlight"
)

Expand Down Expand Up @@ -171,10 +172,14 @@ func (d *stageZeroData) stageZero(ctx context.Context, reqBody io.ReadCloser, pr
returnPath := make(chan sunlight.LogEntry)
d.stageOneTx <- UnsequencedEntryWithReturnPath{entry, returnPath}

// TODO: Add a timeout with select
// If we recieve something here, that means that the entry has been both sequenced
// and uploaded with a newly signed STH, so we can issue a SCT.
completeEntry = <-returnPath
select {
case completeEntry = <-returnPath:
// Nominally, this should complete in under 2 seconds.
case <-time.After(5 * time.Second):
return nil, http.StatusServiceUnavailable, fmt.Errorf("timed out waiting for sequencer")
}
}

extension, err := sunlight.MarshalExtensions(sunlight.Extensions{LeafIndex: completeEntry.LeafIndex})
Expand Down Expand Up @@ -281,6 +286,9 @@ func (d *stageTwoData) stageTwo(
return fmt.Errorf("stage two: stageTwoRx channel closed")
}

// Errgroup to safely parallelize the uploads
g, gctx := errgroup.WithContext(ctx)

// The current tree size is the same as the index of the first leaf in the pool
oldTreeSize := pool[0].entry.LeafIndex
// LeafIndex is zero-indexed, so the tree size is the last leaf index + 1
Expand Down Expand Up @@ -319,11 +327,9 @@ func (d *stageTwoData) stageTwo(

// This means we have a full width tile that we can go ahead and upload
if dataTile.Tile.W == sunlight.TileWidth {
err := d.bucket.Set(ctx, dataTile.Path(), dataTile.Bytes)
if err != nil {
return fmt.Errorf("failed to upload data tile %v: %w", dataTile.Tile, err)
}

// Upload the tile
t := dataTile
g.Go(func() error { return d.bucket.SetTile(gctx, t.Tile, t.Bytes) })
// Reset the width to zero
dataTile.Tile.W = 0
// Increment the tile index
Expand All @@ -335,10 +341,8 @@ func (d *stageTwoData) stageTwo(

// upload the partial data tile
if dataTile.Tile.W > 0 {
err := d.bucket.Set(ctx, dataTile.Path(), dataTile.Bytes)
if err != nil {
return fmt.Errorf("failed to upload partial data tile %v: %w", dataTile.Tile, err)
}
t := dataTile
g.Go(func() error { return d.bucket.SetTile(gctx, t.Tile, t.Bytes) })
}
d.edgeTiles[-1] = dataTile

Expand All @@ -351,7 +355,7 @@ func (d *stageTwoData) stageTwo(
if err != nil {
return fmt.Errorf("failed to read tile data for tile %v: %w", tile, err)
}
err = d.bucket.Set(ctx, tile.Path(), data)
g.Go(func() error { return d.bucket.SetTile(gctx, tile, data) })
if err != nil {
return fmt.Errorf("failed to upload tile %v: %w", tile, err)
}
Expand All @@ -360,23 +364,20 @@ func (d *stageTwoData) stageTwo(
d.edgeTiles = newEdgeTiles

// ** Upload the v1 leaf record hash mappings **
err := d.bucket.PutRecordHashes(ctx, recordHashes, d.maskSize)
if err != nil {
return fmt.Errorf("failed to upload record hashes: %w", err)
}
g.Go(func() error { return d.bucket.PutRecordHashes(gctx, recordHashes, d.maskSize) })

// ** Upload new intermediate certificates **
// TODO: this won't blow up S3, but we're certainly making far more requests than needed. add a cache.
for _, e := range pool {
for _, cert := range e.entry.Chain {
fingerprint := sha256.Sum256(cert.Raw)
err := d.bucket.Set(ctx, fmt.Sprintf("issuer/%x", fingerprint), cert.Raw)
if err != nil {
return fmt.Errorf("failed to upload issuer certificate %x: %w", fingerprint, err)
}
g.Go(func() error { return d.bucket.SetIssuer(gctx, cert) })
}
}

err := g.Wait()
if err != nil {
return fmt.Errorf("failed to upload data: %w", err)
}

// ** Upload a new STH **
rootHash, err := tlog.TreeHash(int64(newTreeSize), hashReader)
if err != nil {
Expand All @@ -388,7 +389,7 @@ func (d *stageTwoData) stageTwo(
return fmt.Errorf("failed to generate a new STH: %w", err)
}

err = d.bucket.Set(ctx, "/ct/v1/get-sth", jsonBytes)
err = d.bucket.SetSth(ctx, jsonBytes)
if err != nil {
return fmt.Errorf("failed to upload new STH: %w", err)
}
Expand Down

0 comments on commit f03e535

Please sign in to comment.