-
Notifications
You must be signed in to change notification settings - Fork 36
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Allow indirect replication #1070
Conversation
import org.scalatest.wordspec.AnyWordSpecLike | ||
import org.slf4j.LoggerFactory | ||
|
||
object IndirectReplicationIntegrationSpec { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is similar to existing ReplicationIntegrationSpec
# only events from the origin replica will be transferred from the origin | ||
# replica. When each replica is connected to each other replica it's most | ||
# efficient to disable indirect replication. | ||
indirect-replication = on |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
default value is off
, this is included in docs and maybe we should show off
however, then the test would be testing this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make this "off" and enable it after snippet end or through config factory apis?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes
if (settings.indirectReplication || meta.originReplica == settings.selfReplicaId) | ||
Future.successful(envelope.eventOption) | ||
else | ||
filteredEvent // Optimization: was replicated to this replica, don't pass the payload across the wire |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the reason why I haven't enabled it by default. Would have a substantial performance overhead. We'll see where it is going when we support star topologies and such.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would actually be nice to configure this per replica, but here on the producer side we don't know which replica is the consumer (at least I couldn't see that)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If it was a predicate instead of a setting (or a pattern) the user could design for a star-topology-replica naming strategy and only replicate indirectly only to the edge nodes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, need to think this through again. we want to support a full mesh in cloud where indirect-replication=off, and edge connecting in to one or a few cloud replicas. edge consumers need indirect-replication=on
Maybe we could look at the allReplicas (or otherReplicas) and use that for deciding indirect. If it's included in the defined allReplicas then off, otherwise on. However, again, here on the producer side we don't know the replicaId of the consumer. Maybe we have to add that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We want to indirectly replicate from cloud to edge, and from edge to other cloud replicas, so maybe it is more about knowing which replication goes/comes from where when deciding (I haven't completely thought this all the way through)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
indirectly ... from edge to other cloud replicas
true, didn't think about that one
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is the consumer side that knows what other replicas it is consuming from, otherReplicas
. Therefore it would be wrong to configure or decide the indirect aspect on the producer side, but we want the actual filtering of indirect events to happen on the producer side.
We could include the otherReplicas in the InitReq from the consumer. On the producer it would then use that to decide if indirect events should be included or not. If event origin is in initReq.otherReplicas then it can be filter out, otherwise included (as indirect event).
aa62b55
to
a9a4f1c
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
New approach, would like feedback before continuing...
// Used for Replicated Event Sourcing, the replica id of the consumer | ||
string replica_id = 6; | ||
// Used for Replicated Event Sourcing, other replicas that the consumer is connected to. | ||
repeated string other_replica_ids = 7; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@johanandren I have tried this instead. These replica ids are included from the consumer in the InitReq. Then that is used on the producer side to decide if an event based on its origin should be emitted or handled as FilteredEvent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For producer push it would be similar. These fields would be included in the ConsumerEventStart response from the consumer side and then used on the producer side.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, seems reasonable.
case Some(meta: ReplicatedEventMetadata) => | ||
!exclude(meta.originReplica) | ||
case _ => | ||
// FIXME eventMetadata isn't loaded by backtracking |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's working aside from this little detail. For ordinary transformation, which we used for the origin filtering, we solve it by not applying transformation for backtracking and instead do that in loadEvent. It would be difficult to do the same for this, because then we would need the replica ids also in the load event request.
Do you think it's fine to change the r2dbc query to always load metadata, also for backtracking?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sleeping on this. Feels fragile to not use the same origin filter from loadEvent also. Even though we only use loadEvent from backtracking events I think it should have the same transformation and filtering mechanism. I think it's best to include those additional replica id fields also in the loadEvent request.
It would be difficult to do the same for this
no, that shouldn't be difficult
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, let's go with always passing metadata and applying the same transform/filter.
Incorporated the filter changes from #1072 because there were too many changes in same place. Also added test for such filters. |
55b02cc
to
d4cf2a8
Compare
Rebased and bumped Akka to get this ready for merge |
d4cf2a8
to
41147c8
Compare
* and define filters via ReplicationSettings * deprecate grpcReplication with producer filter params, via settings instead
41147c8
to
c6f523c
Compare
Don't know what happened, but this didn't contain my latest work. Restored from history so now this should be ready. |
Maybe I missed pulling the latest before rebasing, sorry! |
@@ -59,6 +59,7 @@ message ConsumerEventInit { | |||
|
|||
message ConsumerEventStart { | |||
repeated FilterCriteria filter = 1; | |||
ReplicaInfo replica_info = 6; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added also to ConsumerEventStart but setting it and implementing corresponding filter on the producer side for RES must be done in @johanandren 's #1073 (or after that)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll do that in that PR once rebased on top of this after merged
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Test fail was in IntegrationSpec, which I think is unrelated. #905 |
ah, that was because I changed log level. Some tests needed trace level. Should work now. 🤞 |
Draft because requires a change in Akka first akka/akka#32233