Skip to content

Commit

Permalink
generate new checkpoints in the absense of a submitted certificate
Browse files Browse the repository at this point in the history
  • Loading branch information
aditsachde committed Nov 1, 2024
1 parent c44eaf3 commit 8a6b13c
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 90 deletions.
2 changes: 2 additions & 0 deletions internal/ctsubmit/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ type stageTwoData struct {
edgeTiles map[int]tileWithBytes
maskSize int
checkpointOrigin string
treeSize uint64

signingKey *ecdsa.PrivateKey
}
Expand Down Expand Up @@ -370,6 +371,7 @@ func LoadLog(ctx context.Context, kvpath, consulAddress string) (*Log, error) {
edgeTiles: edgeTiles,
maskSize: gc.MaskSize,
checkpointOrigin: gc.Name,
treeSize: sth.TreeSize,

signingKey: key,
}
Expand Down
189 changes: 99 additions & 90 deletions internal/ctsubmit/logic.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,15 +258,14 @@ func (d *stageOneData) stageOne(

// If the flush interval has passed, flush the pool
case <-time.After(FLUSH_INTERVAL):
if len(pool) > 0 {
// Create a copy of the pool
closedPool := make([]LogEntryWithReturnPath, len(pool))
copy(closedPool, pool)

// Clear the original pool
pool = pool[:0]
d.stageTwoTx <- closedPool
}
// Create a copy of the pool
closedPool := make([]LogEntryWithReturnPath, len(pool))
copy(closedPool, pool)

// Clear the original pool
pool = pool[:0]
d.stageTwoTx <- closedPool

// Update the last flush time
lastFlushTime = time.Now()

Expand All @@ -289,118 +288,125 @@ func (d *stageTwoData) stageTwo(
return fmt.Errorf("stage two: stageTwoRx channel closed")
}

// Errgroup to safely parallelize the uploads
g, gctx := errgroup.WithContext(ctx)

// The current tree size is the same as the index of the first leaf in the pool
oldTreeSize := pool[0].entry.LeafIndex
// LeafIndex is zero-indexed, so the tree size is the last leaf index + 1
newTreeSize := pool[len(pool)-1].entry.LeafIndex + 1

// ** Upload the data tiles **
newHashes := make(map[int64]tlog.Hash)
// The newHashes map is a reference type, so adding elements to
// newHashes will let the hashReader function look them up.
hashReader := d.hashReader(newHashes)
// This value is written back to the struct the new sth is written.
updatedTreeSize := d.treeSize

// these are the hashes of the merkle tree leaves and are needed later
recordHashes := make([]RecordHashUpload, 0, len(pool))

// This is the right most data tile
dataTile := d.edgeTiles[-1]
if dataTile.Tile.W > sunlight.TileWidth {
return fmt.Errorf("tile width is greater than the maximum width!! %d", dataTile.Tile.W)
} else if dataTile.Tile.W == sunlight.TileWidth {
// If the tile is full, reset it so we have a partial
// Reset the width to zero
dataTile.Tile.W = 0
// Increment the tile index
dataTile.Tile.N++
// Clear the bytes
dataTile.Bytes = []byte{}
}
if len(pool) != 0 {

for _, e := range pool {
recordHash := tlog.RecordHash(e.entry.MerkleTreeLeaf())
recordHashShort := [16]byte(recordHash[:16])
recordHashes = append(recordHashes, RecordHashUpload{
hash: recordHashShort,
leafIndex: e.entry.LeafIndex,
})
hashes, err := tlog.StoredHashesForRecordHash(int64(e.entry.LeafIndex), recordHash, hashReader)
if err != nil {
return fmt.Errorf("failed to calculate new hashes for leaf %d: %w", e.entry.LeafIndex, err)
}
for i, hash := range hashes {
index := tlog.StoredHashIndex(0, int64(e.entry.LeafIndex)) + int64(i)
newHashes[index] = hash
}
// Errgroup to safely parallelize the uploads
g, gctx := errgroup.WithContext(ctx)

// The current tree size is the same as the index of the first leaf in the pool
oldTreeSize := pool[0].entry.LeafIndex
// LeafIndex is zero-indexed, so the tree size is the last leaf index + 1
newTreeSize := pool[len(pool)-1].entry.LeafIndex + 1
updatedTreeSize = newTreeSize

dataTile.Bytes = sunlight.AppendTileLeaf(dataTile.Bytes, &e.entry)
dataTile.Tile.W++
// these are the hashes of the merkle tree leaves and are needed later
recordHashes := make([]RecordHashUpload, 0, len(pool))

// This means we have a full width tile that we can go ahead and upload
// This is the right most data tile
dataTile := d.edgeTiles[-1]
if dataTile.Tile.W > sunlight.TileWidth {
return fmt.Errorf("tile width is greater than the maximum width!!! %d", dataTile.Tile.W)
return fmt.Errorf("tile width is greater than the maximum width!! %d", dataTile.Tile.W)
} else if dataTile.Tile.W == sunlight.TileWidth {
// Upload the tile
t := dataTile
g.Go(func() error { return d.bucket.SetTile(gctx, t.Tile, t.Bytes) })
// If the tile is full, reset it so we have a partial
// Reset the width to zero
dataTile.Tile.W = 0
// Increment the tile index
dataTile.Tile.N++
// Clear the bytes
dataTile.Bytes = []byte{}
}
}

// upload the partial data tile
if dataTile.Tile.W > 0 {
t := dataTile
g.Go(func() error { return d.bucket.SetTile(gctx, t.Tile, t.Bytes) })
}
d.edgeTiles[-1] = dataTile

// ** Upload the tree tiles **
// TODO: review if the treesize should be a int64 instead, to align with the tlog apis.
newEdgeTiles := maps.Clone(d.edgeTiles)
treeTiles := tlog.NewTiles(sunlight.TileHeight, int64(oldTreeSize), int64(newTreeSize))
for _, tile := range treeTiles {
data, err := tlog.ReadTileData(tile, hashReader)
if err != nil {
return fmt.Errorf("failed to read tile data for tile %v: %w", tile, err)
for _, e := range pool {
recordHash := tlog.RecordHash(e.entry.MerkleTreeLeaf())
recordHashShort := [16]byte(recordHash[:16])
recordHashes = append(recordHashes, RecordHashUpload{
hash: recordHashShort,
leafIndex: e.entry.LeafIndex,
})
hashes, err := tlog.StoredHashesForRecordHash(int64(e.entry.LeafIndex), recordHash, hashReader)
if err != nil {
return fmt.Errorf("failed to calculate new hashes for leaf %d: %w", e.entry.LeafIndex, err)
}
for i, hash := range hashes {
index := tlog.StoredHashIndex(0, int64(e.entry.LeafIndex)) + int64(i)
newHashes[index] = hash
}

dataTile.Bytes = sunlight.AppendTileLeaf(dataTile.Bytes, &e.entry)
dataTile.Tile.W++

// This means we have a full width tile that we can go ahead and upload
if dataTile.Tile.W > sunlight.TileWidth {
return fmt.Errorf("tile width is greater than the maximum width!!! %d", dataTile.Tile.W)
} else if dataTile.Tile.W == sunlight.TileWidth {
// Upload the tile
t := dataTile
g.Go(func() error { return d.bucket.SetTile(gctx, t.Tile, t.Bytes) })
// Reset the width to zero
dataTile.Tile.W = 0
// Increment the tile index
dataTile.Tile.N++
// Clear the bytes
dataTile.Bytes = []byte{}
}
}
g.Go(func() error { return d.bucket.SetTile(gctx, tile, data) })
if err != nil {
return fmt.Errorf("failed to upload tile %v: %w", tile, err)

// upload the partial data tile
if dataTile.Tile.W > 0 {
t := dataTile
g.Go(func() error { return d.bucket.SetTile(gctx, t.Tile, t.Bytes) })
}
newEdgeTiles[tile.L] = tileWithBytes{tile, data}
}
d.edgeTiles = newEdgeTiles
d.edgeTiles[-1] = dataTile

// ** Upload the tree tiles **
// TODO: review if the treesize should be a int64 instead, to align with the tlog apis.
newEdgeTiles := maps.Clone(d.edgeTiles)
treeTiles := tlog.NewTiles(sunlight.TileHeight, int64(oldTreeSize), int64(newTreeSize))
for _, tile := range treeTiles {
data, err := tlog.ReadTileData(tile, hashReader)
if err != nil {
return fmt.Errorf("failed to read tile data for tile %v: %w", tile, err)
}
g.Go(func() error { return d.bucket.SetTile(gctx, tile, data) })
if err != nil {
return fmt.Errorf("failed to upload tile %v: %w", tile, err)
}
newEdgeTiles[tile.L] = tileWithBytes{tile, data}
}
d.edgeTiles = newEdgeTiles

// ** Upload the v1 leaf record hash mappings **
g.Go(func() error { return d.bucket.PutRecordHashes(gctx, recordHashes, d.maskSize) })
// ** Upload the v1 leaf record hash mappings **
g.Go(func() error { return d.bucket.PutRecordHashes(gctx, recordHashes, d.maskSize) })

// ** Upload new intermediate certificates **
for _, e := range pool {
for _, cert := range e.entry.Chain {
g.Go(func() error { return d.bucket.SetIssuer(gctx, cert) })
// ** Upload new intermediate certificates **
for _, e := range pool {
for _, cert := range e.entry.Chain {
g.Go(func() error { return d.bucket.SetIssuer(gctx, cert) })
}
}

err := g.Wait()
if err != nil {
return fmt.Errorf("failed to upload data: %w", err)
}
}

err := g.Wait()
if err != nil {
return fmt.Errorf("failed to upload data: %w", err)
}

// ** Upload a new STH **
rootHash, err := tlog.TreeHash(int64(newTreeSize), hashReader)
rootHash, err := tlog.TreeHash(int64(updatedTreeSize), hashReader)
if err != nil {
return fmt.Errorf("failed to calculate new root hash: %w", err)
}

jsonBytes, err := sunlight.SignTreeHead(d.signingKey, newTreeSize, uint64(time.Now().UnixMilli()), rootHash)
jsonBytes, err := sunlight.SignTreeHead(d.signingKey, updatedTreeSize, uint64(time.Now().UnixMilli()), rootHash)
if err != nil {
return fmt.Errorf("failed to generate a new STH: %w", err)
}
Expand All @@ -411,7 +417,7 @@ func (d *stageTwoData) stageTwo(
}

// we also upload a checkpoint based on the STH
checkpointBytes, err := sunlight.SignTreeHeadCheckpoint(d.checkpointOrigin, d.signingKey, int64(newTreeSize), time.Now().UnixMilli(), rootHash)
checkpointBytes, err := sunlight.SignTreeHeadCheckpoint(d.checkpointOrigin, d.signingKey, int64(updatedTreeSize), time.Now().UnixMilli(), rootHash)
if err != nil {
return fmt.Errorf("failed to generate a new checkpoint: %w", err)
}
Expand All @@ -421,6 +427,9 @@ func (d *stageTwoData) stageTwo(
return fmt.Errorf("failed to upload new checkpoint: %w", err)
}

// Update the tree size once the checkpoints are uploaded
d.treeSize = updatedTreeSize

// ** Upload the dedupe mappings **
// TODO: This isn't the best cache key, because it fails to distinguish between
// a certificate that is submitted with a different chain. This is a problem because
Expand Down

0 comments on commit 8a6b13c

Please sign in to comment.