-
Notifications
You must be signed in to change notification settings - Fork 2
logpoller: refactors and persistent store #219
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
|
|
||
| starting_seq_no INTEGER NOT NULL, | ||
|
|
||
| created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how is the time zone set for TIMESTAMPTZ? Will everything default to UTC?
| hasFilter, err := a.logPoller.HasFilter(ctx, name) | ||
| if err != nil { | ||
| return fmt.Errorf("failed to check for filter: %w", err) | ||
| } | ||
| // If filter exists, unregister it first to handle address changes | ||
| if hasFilter { | ||
| if err := a.logPoller.UnregisterFilter(ctx, name); err != nil { | ||
| return fmt.Errorf("failed to unregister logpoller filter: %w", err) | ||
| } | ||
| } | ||
|
|
||
| filter := types.Filter{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think I see this logic being handled inside of logPoller.ReigsterFilter or elsewhere, but maybe I just can't find it. Is it not needed anymore?
| // Filter at database level using byte-level filtering | ||
| // CCIPMessageSent struct layout: | ||
| // - Message: 40 bytes at offset 0 (Message struct) | ||
| // - DestChainSelector: uint64 (8 bytes) at offset 40 | ||
| // - SequenceNumber: uint64 (8 bytes) at offset 48 | ||
| logs, _, _, err := a.logPoller.NewQuery(). | ||
| WithSource(onrampAddr). | ||
| WithEventSig(hash.CRC32(consts.EventNameCCIPMessageSent)). | ||
| SkipBytes(40). // Skip to DestChainSelector | ||
| FilterBytes(8, query.EQ(binary.BigEndian.AppendUint64(nil, uint64(dest)))). | ||
| FilterBytes(8, | ||
| query.GTE(binary.BigEndian.AppendUint64(nil, uint64(seqNumRange.Start()))), | ||
| query.LTE(binary.BigEndian.AppendUint64(nil, uint64(seqNumRange.End()))), | ||
| WithBocBytes( | ||
| query.SkipBytes(40), | ||
| query.MatchBytes(8, query.WithCondition(binary.BigEndian.AppendUint64(nil, uint64(dest)), primitives.Eq)), | ||
| query.MatchBytes(8, | ||
| query.WithCondition(binary.BigEndian.AppendUint64(nil, uint64(seqNumRange.Start())), primitives.Gte), | ||
| query.WithCondition(binary.BigEndian.AppendUint64(nil, uint64(seqNumRange.End())), primitives.Lte), | ||
| ), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no need to address in this PR, but I'm wondering if this super exact byte matching query builder logic deserves its own layer in between the accessor and the log poller in the future. It's almost like a separate go-binding layer but for messages/events instead of contract calls. It is product-specific so it makes sense to keep in the accessor for now, but it might be more maintainable as a standalone layer in the long run 🤷
This PR adds persistent store in TON logpoller, improves query interface, and log ingestion pipeline
Next Steps