diff --git a/chainexchange/cbor_gen.go b/chainexchange/cbor_gen.go new file mode 100644 index 00000000..9b326340 --- /dev/null +++ b/chainexchange/cbor_gen.go @@ -0,0 +1,135 @@ +// Code generated by github.com/whyrusleeping/cbor-gen. DO NOT EDIT. + +package chainexchange + +import ( + "fmt" + "io" + "math" + "sort" + + gpbft "github.com/filecoin-project/go-f3/gpbft" + cid "github.com/ipfs/go-cid" + cbg "github.com/whyrusleeping/cbor-gen" + xerrors "golang.org/x/xerrors" +) + +var _ = xerrors.Errorf +var _ = cid.Undef +var _ = math.E +var _ = sort.Sort + +var lengthBufMessage = []byte{130} + +func (t *Message) MarshalCBOR(w io.Writer) error { + if t == nil { + _, err := w.Write(cbg.CborNull) + return err + } + + cw := cbg.NewCborWriter(w) + + if _, err := cw.Write(lengthBufMessage); err != nil { + return err + } + + // t.Instance (uint64) (uint64) + + if err := cw.WriteMajorTypeHeader(cbg.MajUnsignedInt, uint64(t.Instance)); err != nil { + return err + } + + // t.Chain (gpbft.ECChain) (slice) + if len(t.Chain) > 8192 { + return xerrors.Errorf("Slice value in field t.Chain was too long") + } + + if err := cw.WriteMajorTypeHeader(cbg.MajArray, uint64(len(t.Chain))); err != nil { + return err + } + for _, v := range t.Chain { + if err := v.MarshalCBOR(cw); err != nil { + return err + } + + } + return nil +} + +func (t *Message) UnmarshalCBOR(r io.Reader) (err error) { + *t = Message{} + + cr := cbg.NewCborReader(r) + + maj, extra, err := cr.ReadHeader() + if err != nil { + return err + } + defer func() { + if err == io.EOF { + err = io.ErrUnexpectedEOF + } + }() + + if maj != cbg.MajArray { + return fmt.Errorf("cbor input should be of type array") + } + + if extra != 2 { + return fmt.Errorf("cbor input had wrong number of fields") + } + + // t.Instance (uint64) (uint64) + + { + + maj, extra, err = cr.ReadHeader() + if err != nil { + return err + } + if maj != cbg.MajUnsignedInt { + return fmt.Errorf("wrong type for uint64 field") + } + t.Instance = uint64(extra) + + } + // t.Chain (gpbft.ECChain) (slice) + + maj, extra, err = cr.ReadHeader() + if err != nil { + return err + } + + if extra > 8192 { + return fmt.Errorf("t.Chain: array too large (%d)", extra) + } + + if maj != cbg.MajArray { + return fmt.Errorf("expected cbor array") + } + + if extra > 0 { + t.Chain = make([]gpbft.TipSet, extra) + } + + for i := 0; i < int(extra); i++ { + { + var maj byte + var extra uint64 + var err error + _ = maj + _ = extra + _ = err + + { + + if err := t.Chain[i].UnmarshalCBOR(cr); err != nil { + return xerrors.Errorf("unmarshaling t.Chain[i]: %w", err) + } + + } + + } + } + return nil +} diff --git a/chainexchange/chainexchange.go b/chainexchange/chainexchange.go new file mode 100644 index 00000000..451c0ac6 --- /dev/null +++ b/chainexchange/chainexchange.go @@ -0,0 +1,27 @@ +package chainexchange + +import ( + "context" + + "github.com/filecoin-project/go-f3/gpbft" +) + +type Key []byte + +type Keyer interface { + Key(gpbft.ECChain) Key +} + +type Message struct { + Instance uint64 + Chain gpbft.ECChain +} + +type ChainExchange interface { + Keyer + Broadcast(context.Context, Message) error + GetChainByInstance(context.Context, uint64, Key) (gpbft.ECChain, bool) + RemoveChainsByInstance(context.Context, uint64) error +} + +func (k Key) IsZero() bool { return len(k) == 0 } diff --git a/chainexchange/options.go b/chainexchange/options.go new file mode 100644 index 00000000..574b9938 --- /dev/null +++ b/chainexchange/options.go @@ -0,0 +1,134 @@ +package chainexchange + +import ( + "errors" + + "github.com/filecoin-project/go-f3/gpbft" + "github.com/filecoin-project/go-f3/internal/psutil" + "github.com/filecoin-project/go-f3/manifest" + pubsub "github.com/libp2p/go-libp2p-pubsub" +) + +type Option func(*options) error + +type options struct { + topicName string + topicScoreParams *pubsub.TopicScoreParams + subscriptionBufferSize int + pubsub *pubsub.PubSub + progress gpbft.Progress + maxChainLength int + maxInstanceLookahead uint64 + maxDiscoveredChainsPerInstance int + maxWantedChainsPerInstance int +} + +func newOptions(o ...Option) (*options, error) { + opts := &options{ + topicScoreParams: psutil.PubsubTopicScoreParams, + subscriptionBufferSize: 32, + maxChainLength: gpbft.ChainMaxLen, + maxInstanceLookahead: manifest.DefaultCommitteeLookback, + maxDiscoveredChainsPerInstance: 1000, + maxWantedChainsPerInstance: 1000, + } + for _, apply := range o { + if err := apply(opts); err != nil { + return nil, err + } + } + if opts.progress == nil { + return nil, errors.New("gpbft progress must be set") + } + if opts.pubsub == nil { + return nil, errors.New("pubsub must be set") + } + if opts.topicName == "" { + return nil, errors.New("topic name must be set") + } + return opts, nil +} + +func WithTopicName(name string) Option { + return func(o *options) error { + if name == "" { + return errors.New("topic name cannot be empty") + } + o.topicName = name + return nil + } +} + +func WithTopicScoreParams(params *pubsub.TopicScoreParams) Option { + return func(o *options) error { + o.topicScoreParams = params + return nil + } +} + +func WithSubscriptionBufferSize(size int) Option { + return func(o *options) error { + if size < 1 { + return errors.New("subscription buffer size must be at least 1") + } + o.subscriptionBufferSize = size + return nil + } +} + +func WithPubSub(pubsub *pubsub.PubSub) Option { + return func(o *options) error { + if pubsub == nil { + return errors.New("pubsub cannot be nil") + } + o.pubsub = pubsub + return nil + } +} + +func WithProgress(progress gpbft.Progress) Option { + return func(o *options) error { + if progress == nil { + return errors.New("progress cannot be nil") + } + o.progress = progress + return nil + } +} + +func WithMaxChainLength(length int) Option { + return func(o *options) error { + if length < 1 { + return errors.New("max chain length must be at least 1") + } + o.maxChainLength = length + return nil + } +} + +func WithMaxInstanceLookahead(lookahead uint64) Option { + return func(o *options) error { + o.maxInstanceLookahead = lookahead + return nil + } +} + +func WithMaxDiscoveredChainsPerInstance(max int) Option { + return func(o *options) error { + if max < 1 { + return errors.New("max discovered chains per instance must be at least 1") + } + o.maxDiscoveredChainsPerInstance = max + return nil + } +} + +func WithMaxWantedChainsPerInstance(max int) Option { + return func(o *options) error { + if max < 1 { + return errors.New("max wanted chains per instance must be at least 1") + } + o.maxWantedChainsPerInstance = max + return nil + } +} diff --git a/chainexchange/pubsub.go b/chainexchange/pubsub.go new file mode 100644 index 00000000..9af7d82b --- /dev/null +++ b/chainexchange/pubsub.go @@ -0,0 +1,306 @@ +package chainexchange + +import ( + "bytes" + "context" + "fmt" + "sync" + + "github.com/filecoin-project/go-f3/gpbft" + "github.com/filecoin-project/go-f3/internal/psutil" + "github.com/filecoin-project/go-f3/merkle" + lru "github.com/hashicorp/golang-lru/v2" + logging "github.com/ipfs/go-log/v2" + pubsub "github.com/libp2p/go-libp2p-pubsub" + "github.com/libp2p/go-libp2p/core/peer" +) + +var ( + log = logging.Logger("f3/chainexchange") + + _ ChainExchange = (*PubSubChainExchange)(nil) + _ pubsub.ValidatorEx = (*PubSubChainExchange)(nil).validatePubSubMessage + + chainPortionPlaceHolder = &chainPortion{} +) + +type chainPortion struct { + // chain is a pointer to the full chain in order to avoid copying the chain across the cache + fullChain *gpbft.ECChain + offset int +} + +type PubSubChainExchange struct { + *options + + // mu guards access to chains and API calls. + mu sync.Mutex + chainsWanted map[uint64]*lru.Cache[string, *chainPortion] + chainsDiscovered map[uint64]*lru.Cache[string, *chainPortion] + topic *pubsub.Topic + stop func() error +} + +func NewPubSubChainExchange(o ...Option) (*PubSubChainExchange, error) { + opts, err := newOptions(o...) + if err != nil { + return nil, err + } + return &PubSubChainExchange{ + options: opts, + chainsWanted: map[uint64]*lru.Cache[string, *chainPortion]{}, + chainsDiscovered: map[uint64]*lru.Cache[string, *chainPortion]{}, + }, nil +} + +func (p *PubSubChainExchange) Start(ctx context.Context) error { + p.mu.Lock() + defer p.mu.Unlock() + if err := p.pubsub.RegisterTopicValidator(p.topicName, p.validatePubSubMessage); err != nil { + return fmt.Errorf("failed to register topic validator: %w", err) + } + var err error + p.topic, err = p.pubsub.Join(p.topicName, pubsub.WithTopicMessageIdFn(psutil.ChainExchangeMessageIdFn)) + if err != nil { + return fmt.Errorf("failed to join topic '%s': %w", p.topicName, err) + } + if p.topicScoreParams != nil { + if err := p.topic.SetScoreParams(p.topicScoreParams); err != nil { + return fmt.Errorf("failed to set score params: %w", err) + } + } + subscription, err := p.topic.Subscribe(pubsub.WithBufferSize(p.subscriptionBufferSize)) + if err != nil { + _ = p.topic.Close() + p.topic = nil + return fmt.Errorf("failed to subscribe to topic '%s': %w", p.topicName, err) + } + + ctx, cancel := context.WithCancel(context.Background()) + go func() { + for ctx.Err() == nil { + msg, err := subscription.Next(ctx) + if err != nil { + log.Debugw("failed to read nex message from subscription", "err", err) + continue + } + cmsg := msg.ValidatorData.(Message) + p.cacheAsDiscoveredChain(ctx, cmsg) + } + }() + p.stop = func() error { + cancel() + subscription.Cancel() + return p.topic.Close() + } + return nil +} + +func (p *PubSubChainExchange) Key(chain gpbft.ECChain) Key { + if chain.IsZero() { + return nil + } + length := len(chain) + values := make([][]byte, length) + for i := range length { + values[i] = chain[i].MarshalForSigning() + } + rootDigest := merkle.Tree(values) + return rootDigest[:] +} + +func (p *PubSubChainExchange) GetChainByInstance(_ context.Context, instance uint64, key Key) (gpbft.ECChain, bool) { + + // We do not have to take instance as input, and instead we can just search + // through all the instance as they are not expected to be more than 10. The + // reason we do take it, however, is because: + // * That information is readily available by the caller. + // * It helps in optimising the search by limiting the search space to the + // instance, since PubSubChainExchange groups chains by instance for ease of + // removal. + + if key.IsZero() { + return nil, false + } + + p.mu.Lock() + defer p.mu.Unlock() + + cacheKey := string(key) + + // Check wanted keys first. + wanted := p.getChainsWantedAt(instance) + if portion, found := wanted.Get(cacheKey); found && !portion.IsPlaceholder() { + // Found and is not a placeholder. + return portion.Get(), true + } + // Check if the chain for the key is discovered. + discovered := p.getChainsDiscoveredAt(instance) + if portion, found := discovered.Get(cacheKey); found { + // Add it to the wanted cache and remove it from the discovered cache. + wanted.Add(cacheKey, portion) + discovered.Remove(cacheKey) + // TODO: Do we want to pull all the suffixes of the chain into wanted cache? + return portion.Get(), true + } + // Otherwise, add a placeholder for the wanted key as a way to prioritise its + // retention via LRU recent-ness. + wanted.ContainsOrAdd(cacheKey, chainPortionPlaceHolder) + return nil, false +} + +func (p *PubSubChainExchange) getChainsWantedAt(instance uint64) *lru.Cache[string, *chainPortion] { + wanted, exists := p.chainsWanted[instance] + if !exists { + wanted = p.newChainPortionCache(p.maxWantedChainsPerInstance) + p.chainsWanted[instance] = wanted + } + return wanted +} + +func (p *PubSubChainExchange) getChainsDiscoveredAt(instance uint64) *lru.Cache[string, *chainPortion] { + discovered, exists := p.chainsDiscovered[instance] + if !exists { + discovered = p.newChainPortionCache(p.maxDiscoveredChainsPerInstance) + p.chainsDiscovered[instance] = discovered + } + return discovered +} + +func (p *PubSubChainExchange) newChainPortionCache(capacity int) *lru.Cache[string, *chainPortion] { + cache, err := lru.New[string, *chainPortion](capacity) + if err != nil { + // This can only happen if the cache size is negative, which is validated via + // options. Its occurrence for the purposes of chain exchange indicates a + // programmer error. + log.Fatalw("Failed to instantiate chain portion cache", "capacity", capacity, "err", err) + } + return cache +} + +func (p *PubSubChainExchange) validatePubSubMessage(_ context.Context, _ peer.ID, msg *pubsub.Message) pubsub.ValidationResult { + var cmsg Message + buf := bytes.NewBuffer(msg.Data) + if err := cmsg.UnmarshalCBOR(buf); err != nil { + log.Debugw("failed to decode message", "from", msg.GetFrom(), "err", err) + return pubsub.ValidationReject + } + switch current := p.progress(); { + case + cmsg.Instance < current.ID, + cmsg.Instance > current.ID+p.maxInstanceLookahead: + // Too far ahead or too far behind. + return pubsub.ValidationIgnore + case cmsg.Chain.IsZero(): + // No peer should broadcast a zero-length chain. + return pubsub.ValidationReject + } + // TODO: wire in the current base chain from an on-going instance to further + // tighten up validation. + msg.ValidatorData = cmsg + return pubsub.ValidationAccept +} + +func (p *PubSubChainExchange) cacheAsDiscoveredChain(ctx context.Context, cmsg Message) { + p.mu.Lock() + defer p.mu.Unlock() + + wanted := p.getChainsDiscoveredAt(cmsg.Instance) + discovered := p.getChainsDiscoveredAt(cmsg.Instance) + + for offset := len(cmsg.Chain); offset >= 0 && ctx.Err() == nil; offset-- { + // TODO: Expose internals of merkle.go so that keys can be generated + // cumulatively for a more efficient prefix chain key generation. + prefix := cmsg.Chain.Prefix(offset) + key := p.Key(prefix) + cacheKey := string(key) + if portion, found := wanted.Peek(cacheKey); !found { + // Not a wanted key; add it to discovered chains if they are not there already, + // i.e. without modifying the recent-ness of any of the discovered values. + discovered.ContainsOrAdd(cacheKey, &chainPortion{ + fullChain: &cmsg.Chain, + offset: offset, + }) + } else if portion.IsPlaceholder() { + // It is a wanted key with a placeholder; replace the placeholder with the actual + // discovery. + wanted.Add(cacheKey, &chainPortion{ + fullChain: &cmsg.Chain, + offset: offset, + }) + } + // Nothing to do; the discovered value is already in the wanted chains with + // discovered value. + + // Continue with the remaining prefix keys as we do not know if any of them have + // been evicted from the cache or not. This should be cheap enough considering the + // added complexity of tracking evictions relative to chain prefixes. + } +} + +func (p *PubSubChainExchange) Broadcast(ctx context.Context, msg Message) error { + + // Optimistically cache the broadcast chain and all of its prefixes as wanted. + p.cacheAsWantedChain(ctx, msg) + + // TODO: integrate zstd compression. + var buf bytes.Buffer + if err := msg.MarshalCBOR(&buf); err != nil { + return fmt.Errorf("failed to marshal message: %w", err) + } + if err := p.topic.Publish(ctx, buf.Bytes()); err != nil { + return fmt.Errorf("failed to publish message: %w", err) + } + return nil +} + +func (p *PubSubChainExchange) cacheAsWantedChain(ctx context.Context, cmsg Message) { + p.mu.Lock() + defer p.mu.Unlock() + + wanted := p.getChainsDiscoveredAt(cmsg.Instance) + for offset := len(cmsg.Chain); offset >= 0 && ctx.Err() == nil; offset-- { + // TODO: Expose internals of merkle.go so that keys can be generated + // cumulatively for a more efficient prefix chain key generation. + prefix := cmsg.Chain.Prefix(offset) + key := p.Key(prefix) + cacheKey := string(key) + if portion, found := wanted.Peek(cacheKey); !found || portion.IsPlaceholder() { + wanted.Add(cacheKey, &chainPortion{ + fullChain: &cmsg.Chain, + offset: offset, + }) + } + // Continue with the remaining prefix keys as we do not know if any of them have + // been evicted from the cache or not. This should be cheap enough considering the + // added complexity of tracking evictions relative to chain prefixes. + } +} + +func (p *PubSubChainExchange) RemoveChainsByInstance(_ context.Context, instance uint64) error { + p.mu.Lock() + defer p.mu.Unlock() + delete(p.chainsWanted, instance) + delete(p.chainsDiscovered, instance) + return nil +} + +func (p *PubSubChainExchange) Shutdown(context.Context) error { + p.mu.Lock() + defer p.mu.Unlock() + if p.stop != nil { + return p.stop() + } + return nil +} + +func (cp *chainPortion) Get() gpbft.ECChain { + if cp.offset < 0 { + return *cp.fullChain + } + return cp.fullChain.Prefix(cp.offset) +} + +func (cp *chainPortion) IsPlaceholder() bool { + return cp == chainPortionPlaceHolder +} diff --git a/chainexchange/pubsub_test.go b/chainexchange/pubsub_test.go new file mode 100644 index 00000000..2473a074 --- /dev/null +++ b/chainexchange/pubsub_test.go @@ -0,0 +1,78 @@ +package chainexchange_test + +import ( + "context" + "testing" + "time" + + "github.com/filecoin-project/go-f3/chainexchange" + "github.com/filecoin-project/go-f3/gpbft" + "github.com/libp2p/go-libp2p" + pubsub "github.com/libp2p/go-libp2p-pubsub" + "github.com/stretchr/testify/require" +) + +func TestPubSubChainExchange_Broadcast(t *testing.T) { + const topicName = "fish" + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + var testInstant gpbft.Instant + host, err := libp2p.New() + require.NoError(t, err) + t.Cleanup(func() { + cancel() + require.NoError(t, host.Close()) + }) + + ps, err := pubsub.NewGossipSub(ctx, host, pubsub.WithFloodPublish(true)) + require.NoError(t, err) + + subject, err := chainexchange.NewPubSubChainExchange( + chainexchange.WithProgress(func() (instant gpbft.Instant) { + return testInstant + }), + chainexchange.WithPubSub(ps), + chainexchange.WithTopicName(topicName), + chainexchange.WithTopicScoreParams(nil), + ) + require.NoError(t, err) + require.NotNil(t, subject) + + err = subject.Start(ctx) + require.NoError(t, err) + + instance := uint64(1) + ecChain := gpbft.ECChain{ + {Epoch: 0, Key: []byte("lobster"), PowerTable: gpbft.MakeCid([]byte("pt"))}, + {Epoch: 1, Key: []byte("barreleye"), PowerTable: gpbft.MakeCid([]byte("pt"))}, + } + + key := subject.Key(ecChain) + chain, found := subject.GetChainByInstance(ctx, instance, key) + require.False(t, found) + require.Nil(t, chain) + + require.NoError(t, subject.Broadcast(ctx, chainexchange.Message{ + Instance: instance, + Chain: ecChain, + })) + + chain, found = subject.GetChainByInstance(ctx, instance, key) + require.True(t, found) + require.Equal(t, ecChain, chain) + + baseChain := ecChain.BaseChain() + baseKey := subject.Key(baseChain) + chain, found = subject.GetChainByInstance(ctx, instance, baseKey) + require.True(t, found) + require.Equal(t, baseChain, chain) + + require.NoError(t, subject.Shutdown(ctx)) +} + +// TODO: Add more tests, specifically: +// - valodation +// - discovery through other chainexchange instance +// - cache eviction/fixed memory footprint. +// - fulfilment of chain from discovery to wanted in any order. +// - spam +// - fuzz diff --git a/gen/main.go b/gen/main.go index 6d90aab0..55a72f1a 100644 --- a/gen/main.go +++ b/gen/main.go @@ -6,6 +6,7 @@ import ( "github.com/filecoin-project/go-f3/certexchange" "github.com/filecoin-project/go-f3/certs" + "github.com/filecoin-project/go-f3/chainexchange" "github.com/filecoin-project/go-f3/gpbft" gen "github.com/whyrusleeping/cbor-gen" "golang.org/x/sync/errgroup" @@ -39,6 +40,11 @@ func main() { certexchange.ResponseHeader{}, ) }) + eg.Go(func() error { + return gen.WriteTupleEncodersToFile("../chainexchange/cbor_gen.go", "chainexchange", + chainexchange.Message{}, + ) + }) if err := eg.Wait(); err != nil { fmt.Printf("Failed to complete cborg_gen: %v\n", err) os.Exit(1) diff --git a/internal/psutil/psutil.go b/internal/psutil/psutil.go index ec896a52..de9439f7 100644 --- a/internal/psutil/psutil.go +++ b/internal/psutil/psutil.go @@ -13,6 +13,7 @@ import ( var ManifestMessageIdFn = pubsubMsgIdHashDataAndSender var GPBFTMessageIdFn = pubsubMsgIdHashData +var ChainExchangeMessageIdFn = pubsubMsgIdHashData // Generate a pubsub ID from the message topic + data. func pubsubMsgIdHashData(m *pubsub_pb.Message) string {