-
Notifications
You must be signed in to change notification settings - Fork 214
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add e2e ack, checkpointing and metrics to Postgres stream processing #5375
Add e2e ack, checkpointing and metrics to Postgres stream processing #5375
Conversation
@@ -34,4 +34,5 @@ dependencies { | |||
testImplementation libs.avro.core | |||
testImplementation libs.parquet.hadoop | |||
testImplementation libs.parquet.avro | |||
// testImplementation 'org.slf4j:slf4j-simple:2.0.9' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this needed ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed.
@@ -21,6 +23,14 @@ public enum AcknowledgmentStatus { | |||
|
|||
public ChangeEventStatus(final BinlogCoordinate binlogCoordinate, final long timestamp) { | |||
this.binlogCoordinate = binlogCoordinate; | |||
this.logSequenceNumber = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need this constructor ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In StreamCheckpointManager, we use a ConcurrentLinkedQueue<ChangeEventStatus>
to track change event status. To make this ChangeEventStatus
compatible for both MySQL and Postgres, I added logSequenceNumber in it, so MySQL will use binlogCoordinate and Postgres will use logSequenceNumber.
try { | ||
eventProcessingTimer.record(() -> function.accept(message)); | ||
} catch (Exception e) { | ||
LOG.error("Failed to process change event of type {}", messageType, e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add metrics for this failure
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added metric "replicationLogEntryProcessingErrors" for this failure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a data loss here ? It is possible to reprocess this message ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can add retries as well to handle transient issues. But if the message is somehow broken, there could be data loss here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added retries in latest commit.
} | ||
} while (!changeEventStatuses.isEmpty()); | ||
streamCheckpointer.checkpoint(currentChangeEventStatus.getBinlogCoordinate()); | ||
streamCheckpointer.checkpoint(engineType, currentChangeEventStatus); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add metrics to this class to track checkpointing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added the following metrics:
positiveAcknowledgementSets
negativeAcknowledgementSets
recordsCheckpointed
noDataExtendLeaseCount
giveupPartitionCount
Signed-off-by: Hai Yan <[email protected]>
Signed-off-by: Hai Yan <[email protected]>
Signed-off-by: Hai Yan <[email protected]>
Signed-off-by: Hai Yan <[email protected]>
Signed-off-by: Hai Yan <[email protected]>
77313d1
to
f6bd2e2
Compare
Had to rebase to resolve merge conflicts. |
changeEventStatus.getBinlogCoordinate() : changeEventStatus.getLogSequenceNumber(), | ||
changeEventStatus.getRecordCount()); | ||
streamCheckpointer.checkpoint(engineType, changeEventStatus); | ||
recordsCheckpointed.increment(changeEventStatus.getRecordCount()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
getRecordCount
will be total events processed so far. You may want to just increment it by 1 to make sure the checkpoint thread is working as expected.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated the metric to increment by 1 at a time.
Signed-off-by: Hai Yan <[email protected]>
Description
Add e2e ack, checkpointing and metrics to Postgres stream processing.
This is very similar to MySQL stream processing - binlogCoordinate in MySQL vs logSequenceNumber in Postgres. This PR refactors
StreamCheckpointer
andStreamCheckpointManager
to support both binlog coordinates and log sequence numbers.Issues Resolved
Contributes to #5309
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.