Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce caching of PowerTable CIDs #784

Merged
merged 3 commits into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
258 changes: 258 additions & 0 deletions consensus_inputs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,258 @@
package f3

import (
"bytes"
"context"
"fmt"
"slices"
"time"

"github.com/filecoin-project/go-f3/certs"
"github.com/filecoin-project/go-f3/certstore"
"github.com/filecoin-project/go-f3/ec"
"github.com/filecoin-project/go-f3/gpbft"
"github.com/filecoin-project/go-f3/internal/clock"
"github.com/filecoin-project/go-f3/manifest"
lru "github.com/hashicorp/golang-lru/v2"
"github.com/ipfs/go-cid"
"go.opentelemetry.io/otel/metric"
)

type gpbftInputs struct {
Kubuxu marked this conversation as resolved.
Show resolved Hide resolved
manifest *manifest.Manifest
certStore *certstore.Store
ec ec.Backend
verifier gpbft.Verifier
clock clock.Clock

ptCache *lru.Cache[string, cid.Cid]
}

func newInputs(manifest *manifest.Manifest, certStore *certstore.Store, ec ec.Backend,
verifier gpbft.Verifier, clk clock.Clock) gpbftInputs {
cache, err := lru.New[string, cid.Cid](256) // keep a bit more than 2x max ECChain size
if err != nil {
// panic as it only depends on the size
panic(fmt.Errorf("could not create cache: %w", err))

Check warning on line 36 in consensus_inputs.go

View check run for this annotation

Codecov / codecov/patch

consensus_inputs.go#L35-L36

Added lines #L35 - L36 were not covered by tests
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would return an error based on the principle that panics should be used rarely. I understand there are pre-existing practices in this repo on use of panic. So not a blocker for me.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a pet peeve of mine not to introduce errors, especially in constructors, when they cannot happen.

}

return gpbftInputs{
manifest: manifest,
certStore: certStore,
ec: ec,
verifier: verifier,
clock: clk,
ptCache: cache,
}
}

func (h *gpbftInputs) getPowerTableCIDForTipset(ctx context.Context, tsk gpbft.TipSetKey) (cid.Cid, error) {
sTSK := string(tsk)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How big do tipset keys get? do we want to hash these?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

on expectation ~5x the hash size

ptCid, ok := h.ptCache.Get(sTSK)
if ok {
return ptCid, nil
}

pt, err := h.ec.GetPowerTable(ctx, tsk)
if err != nil {
return cid.Undef, fmt.Errorf("getting power table to compute CID: %w", err)
}
ptCid, err = certs.MakePowerTableCID(pt)
if err != nil {
return cid.Undef, fmt.Errorf("computing power table CID: %w", err)
}

Check warning on line 63 in consensus_inputs.go

View check run for this annotation

Codecov / codecov/patch

consensus_inputs.go#L62-L63

Added lines #L62 - L63 were not covered by tests

h.ptCache.Add(sTSK, ptCid)
return ptCid, nil
}

func (h *gpbftInputs) collectChain(ctx context.Context, base ec.TipSet, head ec.TipSet) ([]ec.TipSet, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function would collect the entire chain that could be finalised irrespective of max chain length of 100. Right?

Specifically, during bootstrap this function would, at best, collect a chain of 900, then a chain of 800, 700, and so on even though each instance would only use at most 100 tipsets of the returned chain as proposal.

Is that the most efficient we can be here? How fast/expensive is GetParent?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It isn't that expensive in comparison to getting the power table, it could be more efficient but it is tricky given null rounds.

// TODO: optimize when head is way beyond base
res := make([]ec.TipSet, 0, 2*gpbft.ChainMaxLen)
res = append(res, head)

current := head
for !bytes.Equal(current.Key(), base.Key()) {
if current.Epoch() < base.Epoch() {
metrics.headDiverged.Add(ctx, 1)
log.Infow("reorg-ed away from base, proposing just base",
"head", head.String(), "base", base.String())
return nil, nil
}

Check warning on line 81 in consensus_inputs.go

View check run for this annotation

Codecov / codecov/patch

consensus_inputs.go#L77-L81

Added lines #L77 - L81 were not covered by tests
var err error
current, err = h.ec.GetParent(ctx, current)
if err != nil {
return nil, fmt.Errorf("walking back the chain: %w", err)
}

Check warning on line 86 in consensus_inputs.go

View check run for this annotation

Codecov / codecov/patch

consensus_inputs.go#L85-L86

Added lines #L85 - L86 were not covered by tests
res = append(res, current)
}
slices.Reverse(res)
return res[1:], nil
}

// Returns inputs to the next GPBFT instance.
// These are:
// - the supplemental data.
// - the EC chain to propose.
// These will be used as input to a subsequent instance of the protocol.
// The chain should be a suffix of the last chain notified to the host via
// ReceiveDecision (or known to be final via some other channel).
func (h *gpbftInputs) GetProposal(ctx context.Context, instance uint64) (_ *gpbft.SupplementalData, _ gpbft.ECChain, _err error) {
defer func(start time.Time) {
metrics.proposalFetchTime.Record(ctx, time.Since(start).Seconds(), metric.WithAttributes(attrStatusFromErr(_err)))
}(time.Now())

var baseTsk gpbft.TipSetKey
if instance == h.manifest.InitialInstance {
ts, err := h.ec.GetTipsetByEpoch(ctx,
h.manifest.BootstrapEpoch-h.manifest.EC.Finality)
if err != nil {
return nil, nil, fmt.Errorf("getting boostrap base: %w", err)
}

Check warning on line 111 in consensus_inputs.go

View check run for this annotation

Codecov / codecov/patch

consensus_inputs.go#L110-L111

Added lines #L110 - L111 were not covered by tests
baseTsk = ts.Key()
} else {
cert, err := h.certStore.Get(ctx, instance-1)
if err != nil {
return nil, nil, fmt.Errorf("getting cert for previous instance(%d): %w", instance-1, err)
}

Check warning on line 117 in consensus_inputs.go

View check run for this annotation

Codecov / codecov/patch

consensus_inputs.go#L116-L117

Added lines #L116 - L117 were not covered by tests
baseTsk = cert.ECChain.Head().Key
}

baseTs, err := h.ec.GetTipset(ctx, baseTsk)
if err != nil {
return nil, nil, fmt.Errorf("getting base TS: %w", err)
}

Check warning on line 124 in consensus_inputs.go

View check run for this annotation

Codecov / codecov/patch

consensus_inputs.go#L123-L124

Added lines #L123 - L124 were not covered by tests
headTs, err := h.ec.GetHead(ctx)
if err != nil {
return nil, nil, fmt.Errorf("getting head TS: %w", err)
}

Check warning on line 128 in consensus_inputs.go

View check run for this annotation

Codecov / codecov/patch

consensus_inputs.go#L127-L128

Added lines #L127 - L128 were not covered by tests

collectedChain, err := h.collectChain(ctx, baseTs, headTs)
if err != nil {
return nil, nil, fmt.Errorf("collecting chain: %w", err)
}

Check warning on line 133 in consensus_inputs.go

View check run for this annotation

Codecov / codecov/patch

consensus_inputs.go#L132-L133

Added lines #L132 - L133 were not covered by tests

// If we have an explicit head-lookback, trim the chain.
if h.manifest.EC.HeadLookback > 0 {
collectedChain = collectedChain[:max(0, len(collectedChain)-h.manifest.EC.HeadLookback)]
}

// less than ECPeriod since production of the head agreement is unlikely, trim the chain.
if len(collectedChain) > 0 && h.clock.Since(collectedChain[len(collectedChain)-1].Timestamp()) < h.manifest.EC.Period {
collectedChain = collectedChain[:len(collectedChain)-1]
}

base := gpbft.TipSet{
Epoch: baseTs.Epoch(),
Key: baseTs.Key(),
}
base.PowerTable, err = h.getPowerTableCIDForTipset(ctx, baseTs.Key())
if err != nil {
return nil, nil, fmt.Errorf("computing powertable CID for base: %w", err)
}

Check warning on line 152 in consensus_inputs.go

View check run for this annotation

Codecov / codecov/patch

consensus_inputs.go#L151-L152

Added lines #L151 - L152 were not covered by tests

suffix := make([]gpbft.TipSet, min(gpbft.ChainMaxLen-1, len(collectedChain))) // -1 because of base
for i := range suffix {
suffix[i].Key = collectedChain[i].Key()
suffix[i].Epoch = collectedChain[i].Epoch()

suffix[i].PowerTable, err = h.getPowerTableCIDForTipset(ctx, suffix[i].Key)
if err != nil {
return nil, nil, fmt.Errorf("computing powertable CID for suffix %d: %w", i, err)
}
}
chain, err := gpbft.NewChain(base, suffix...)
if err != nil {
return nil, nil, fmt.Errorf("making new chain: %w", err)
}

Check warning on line 167 in consensus_inputs.go

View check run for this annotation

Codecov / codecov/patch

consensus_inputs.go#L166-L167

Added lines #L166 - L167 were not covered by tests

var supplData gpbft.SupplementalData
committee, err := h.GetCommittee(ctx, instance+1)
if err != nil {
return nil, nil, fmt.Errorf("getting commite for %d: %w", instance+1, err)
}

Check warning on line 173 in consensus_inputs.go

View check run for this annotation

Codecov / codecov/patch

consensus_inputs.go#L172-L173

Added lines #L172 - L173 were not covered by tests

supplData.PowerTable, err = certs.MakePowerTableCID(committee.PowerTable.Entries)
if err != nil {
return nil, nil, fmt.Errorf("making power table cid for supplemental data: %w", err)
}

Check warning on line 178 in consensus_inputs.go

View check run for this annotation

Codecov / codecov/patch

consensus_inputs.go#L177-L178

Added lines #L177 - L178 were not covered by tests

return &supplData, chain, nil
}

func (h *gpbftInputs) GetCommittee(ctx context.Context, instance uint64) (_ *gpbft.Committee, _err error) {
defer func(start time.Time) {
metrics.committeeFetchTime.Record(context.TODO(), time.Since(start).Seconds(), metric.WithAttributes(attrStatusFromErr(_err)))
}(time.Now())

var powerTsk gpbft.TipSetKey
var powerEntries gpbft.PowerEntries
var err error

if instance < h.manifest.InitialInstance+h.manifest.CommitteeLookback {
//boostrap phase
powerEntries, err = h.certStore.GetPowerTable(ctx, h.manifest.InitialInstance)
if err != nil {
return nil, fmt.Errorf("getting power table: %w", err)
}

Check warning on line 197 in consensus_inputs.go

View check run for this annotation

Codecov / codecov/patch

consensus_inputs.go#L196-L197

Added lines #L196 - L197 were not covered by tests
if h.certStore.Latest() == nil {
ts, err := h.ec.GetTipsetByEpoch(ctx, h.manifest.BootstrapEpoch-h.manifest.EC.Finality)
if err != nil {
return nil, fmt.Errorf("getting tipset for boostrap epoch with lookback: %w", err)
}

Check warning on line 202 in consensus_inputs.go

View check run for this annotation

Codecov / codecov/patch

consensus_inputs.go#L201-L202

Added lines #L201 - L202 were not covered by tests
powerTsk = ts.Key()
} else {
cert, err := h.certStore.Get(ctx, h.manifest.InitialInstance)
if err != nil {
return nil, fmt.Errorf("getting finality certificate: %w", err)
}

Check warning on line 208 in consensus_inputs.go

View check run for this annotation

Codecov / codecov/patch

consensus_inputs.go#L207-L208

Added lines #L207 - L208 were not covered by tests
powerTsk = cert.ECChain.Base().Key
}
} else {
cert, err := h.certStore.Get(ctx, instance-h.manifest.CommitteeLookback)
if err != nil {
return nil, fmt.Errorf("getting finality certificate: %w", err)
}

Check warning on line 215 in consensus_inputs.go

View check run for this annotation

Codecov / codecov/patch

consensus_inputs.go#L214-L215

Added lines #L214 - L215 were not covered by tests
powerTsk = cert.ECChain.Head().Key

powerEntries, err = h.certStore.GetPowerTable(ctx, instance)
if err != nil {
log.Debugf("failed getting power table from certstore: %v, falling back to EC", err)

powerEntries, err = h.ec.GetPowerTable(ctx, powerTsk)
if err != nil {
return nil, fmt.Errorf("getting power table: %w", err)
}

Check warning on line 225 in consensus_inputs.go

View check run for this annotation

Codecov / codecov/patch

consensus_inputs.go#L224-L225

Added lines #L224 - L225 were not covered by tests
}
}

ts, err := h.ec.GetTipset(ctx, powerTsk)
if err != nil {
return nil, fmt.Errorf("getting tipset: %w", err)
}

Check warning on line 232 in consensus_inputs.go

View check run for this annotation

Codecov / codecov/patch

consensus_inputs.go#L231-L232

Added lines #L231 - L232 were not covered by tests

table := gpbft.NewPowerTable()
if err := table.Add(powerEntries...); err != nil {
return nil, fmt.Errorf("adding entries to power table: %w", err)
}

Check warning on line 237 in consensus_inputs.go

View check run for this annotation

Codecov / codecov/patch

consensus_inputs.go#L236-L237

Added lines #L236 - L237 were not covered by tests
if err := table.Validate(); err != nil {
return nil, fmt.Errorf("invalid power table for instance %d: %w", instance, err)
}

Check warning on line 240 in consensus_inputs.go

View check run for this annotation

Codecov / codecov/patch

consensus_inputs.go#L239-L240

Added lines #L239 - L240 were not covered by tests

// NOTE: we're intentionally keeping participants here even if they have no
// effective power (after rounding power) to simplify things. The runtime cost is
// minimal and it means that the keys can be aggregated before any rounding is done.
// TODO: this is slow and under a lock, but we only want to do it once per
// instance... ideally we'd have a per-instance lock/once, but that probably isn't
// worth it.
agg, err := h.verifier.Aggregate(table.Entries.PublicKeys())
if err != nil {
return nil, fmt.Errorf("failed to pre-compute aggregate mask for instance %d: %w", instance, err)
}

Check warning on line 251 in consensus_inputs.go

View check run for this annotation

Codecov / codecov/patch

consensus_inputs.go#L250-L251

Added lines #L250 - L251 were not covered by tests

return &gpbft.Committee{
PowerTable: table,
Beacon: ts.Beacon(),
AggregateVerifier: agg,
}, nil
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/filecoin-project/go-bitfield v0.2.4
github.com/filecoin-project/go-clock v0.1.0
github.com/filecoin-project/go-state-types v0.14.0
github.com/hashicorp/golang-lru/v2 v2.0.7
github.com/ipfs/go-cid v0.4.1
github.com/ipfs/go-datastore v0.6.0
github.com/ipfs/go-ds-leveldb v0.5.0
Expand Down Expand Up @@ -62,7 +63,6 @@ require (
github.com/google/pprof v0.0.0-20241017200806-017d972448fc // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/gorilla/websocket v1.5.3 // indirect
github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect
github.com/huin/goupnp v1.3.0 // indirect
github.com/jackpal/go-nat-pmp v1.0.2 // indirect
github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect
Expand Down
Loading
Loading