You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
WHERE Status
is Waiting For
Trigger
JOIN Triggers
WHERE SeqNo > last seqNo
TAKE UNTIL
new WF subject
version
There's some explanatory text describing how the Workflow Engine Operator downstream of that uses the output of this
Subscribes to a Workflow Subject Version, joining to triggers from WFSV's Sequence Id, until the WF subject version changes.
When I first saw this I was baffled because a Join (in SQL and Rx) entails two sources (tables or streams respectively). After peering at it for a while and trying to guess what might be meant, I have a partial hypothesis. First of all, here's what I think the streams involved are:
Triggers: a single observable sequence of every single trigger coming into the entire system
WorkflowSubjectVersions (WFSV): a single observable sequence producing an item for each state change of each workflow instance
To put these another way, Triggers is everything that happens in the outside world that is visible to the system, and WFSV is every state change in the system that occurred in response to a trigger.
So with that in mind, what is this join? The diagram shows the pseudo-SQL as defining the input to a set of Workflow Engine Operator instances, but the explanatory docs above seem to be describing the behaviour from the perspective of one of those instances, making it sound like the join happens inside that operator, with each performing its own join.
Something that's not clear to me is the lifetime of these Workflow Engine Operator instances. My best guess is that there is one of these for each active workflow instance, and that an individual Workflow Engine Operator lives for as long as its corresponding workflow instance remains in its current state.
Perhaps we might be looking at is joining over the Triggers and WorkflowSubjectVersions streams. These are both singleton streams, and the purpose of a join would be to produce outputs that pair up triggers with interested workflow instances (using a style of join in which a single trigger could be paired up with any number of triggers, each producing its own output from the join, because one trigger may be of interest to any number of workflow instances). But what would these look like? The explanatory text says this:
joining to triggers from WFSV's Sequence Id,
which implies that not only would each item in the WFSV need to include some sort of instance identifier and version e.g.:
That seems odd—I would have expected triggers not to need to specify a target workflow instance (because many instances might be interested in one trigger) and especially for them not to need to specify the specific sequence id of the workflow instance (because that seems to entail triggers knowing too much about the inner workings of the workflow instances to which they are applicable).
It's also not how the Trigger class is defined. It does have a sequence number, but that seems to be concerned only with triggers, and not workflow instances. In practice, the Trigger is going to be more like this:
So what would it mean to "join[...] to triggers from WFSV's Sequence Id?" I can't make sense of that.
It seems more likely that we would want to join, informally speaking, on "This trigger is of interest to this workflow subject version". So I think the join might look something like this:
IObservable<EngineOperator>engineOperators=WorkflowSubjectVersions.Where(wfsv =>wfsv.IsWaitingForTrigger()).GroupJoin(// Combine with all triggers that occur while this wfsv is currentTriggers,// This group should run until the next version of this wfs comes into existence
wfsv =>WorkflowSubjectVersions.FirstOrDefaultAsync(candidateNext =>candidateNext.WorkflowInstanceId==wfsv.WorkflowInstanceId),// Triggers have instantaneous duration.
_ =>Observable.Empty<Unit>(),(wfsv,triggers)=>(wfsv,triggers)).Select(wfsvAndContemporaneousTriggers =>{// Because of how joins work, we get every trigger that happens while// this WFSV is current, so we now need to filter these down to only the// triggers that are applicable to this WFSV(WorkflowSubjectVersionwfsv,IObservable<Trigger>contemporaneousTriggers)=wfsvAndContemporaneousTriggers;IObservable<Trigger>matchingTriggers=contemporaneousTriggers.Where(t =>TriggerAppliesToWorkflowSubjectVersion(wfsv,t));returnnewEngineOperator(wfsv,matchingTriggers);});
But I seem to have wandered some way from what the doc currently says, so I'm not sure if this is correct. Either the docs or my understanding require adjustment.
The text was updated successfully, but these errors were encountered:
The architecture diagram at https://github.com/mwadams/reaqtivewf/blob/main/docs/ReaqtiveWorkflow.png (as of the version at https://github.com/mwadams/reaqtivewf/blob/b8a6a135f4626a74009e93206ff9e3a177d34ebe/docs/ReaqtiveWorkflow.png ) has this pseudo-query:
There's some explanatory text describing how the Workflow Engine Operator downstream of that uses the output of this
When I first saw this I was baffled because a Join (in SQL and Rx) entails two sources (tables or streams respectively). After peering at it for a while and trying to guess what might be meant, I have a partial hypothesis. First of all, here's what I think the streams involved are:
To put these another way, Triggers is everything that happens in the outside world that is visible to the system, and WFSV is every state change in the system that occurred in response to a trigger.
So with that in mind, what is this join? The diagram shows the pseudo-SQL as defining the input to a set of Workflow Engine Operator instances, but the explanatory docs above seem to be describing the behaviour from the perspective of one of those instances, making it sound like the join happens inside that operator, with each performing its own join.
Something that's not clear to me is the lifetime of these Workflow Engine Operator instances. My best guess is that there is one of these for each active workflow instance, and that an individual Workflow Engine Operator lives for as long as its corresponding workflow instance remains in its current state.
Perhaps we might be looking at is joining over the Triggers and WorkflowSubjectVersions streams. These are both singleton streams, and the purpose of a join would be to produce outputs that pair up triggers with interested workflow instances (using a style of join in which a single trigger could be paired up with any number of triggers, each producing its own output from the join, because one trigger may be of interest to any number of workflow instances). But what would these look like? The explanatory text says this:
which implies that not only would each item in the WFSV need to include some sort of instance identifier and version e.g.:
but in order for the join to work as described, the Triggers items would also need to do that:
That seems odd—I would have expected triggers not to need to specify a target workflow instance (because many instances might be interested in one trigger) and especially for them not to need to specify the specific sequence id of the workflow instance (because that seems to entail triggers knowing too much about the inner workings of the workflow instances to which they are applicable).
It's also not how the
Trigger
class is defined. It does have a sequence number, but that seems to be concerned only with triggers, and not workflow instances. In practice, theTrigger
is going to be more like this:So what would it mean to "join[...] to triggers from WFSV's Sequence Id?" I can't make sense of that.
It seems more likely that we would want to join, informally speaking, on "This trigger is of interest to this workflow subject version". So I think the join might look something like this:
But I seem to have wandered some way from what the doc currently says, so I'm not sure if this is correct. Either the docs or my understanding require adjustment.
The text was updated successfully, but these errors were encountered: