-
Notifications
You must be signed in to change notification settings - Fork 31
WIP: dbos integration with plugins #157
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
* add some refactoring notes * Clarify / document plugin interface better * debug mem leak * ugly things to workaround leaks * clean up from leak fix
* ugly leak fixes * remove test files
9e39d28 to
67c3c3e
Compare
f96df50 to
82c5546
Compare
82c5546 to
3000120
Compare
| and sent via DBOS.send, enabling durable orchestration. | ||
| """ | ||
|
|
||
| @DBOS.workflow() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To make recovery fully work, DBOS workflows need to be registered statically upon start up (currently it registers workflows lazily upon the first run). Since Python functions may not always be serializable and DBOS workflow input must be serializable, one workaround is to maintain a {workflow name: function} definition map.
Then, this DBOS plugin statically registers a generic DBOS workflow function, say _dbos_control_loop, that takes in a "workflow function name" parameter, and within the control loop function it'll look up the actual workflow function to run.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when is the deadline for registration with dbos? Is basically it that the workflows must be registered before calling DBOS.launch?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, workflows need to be registered before DBOS.launch for the recovery thread and queue dispatching thread to find them.
| with SetWorkflowID(run_id): | ||
| return await workflow_function(start_event, init_state, run_id) | ||
|
|
||
| wrapped_steps = {name: DBOS.step()(step) for name, step in steps.items()} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For steps to execute in distributed environment, they need to be DBOS workflows. Then the main run workflow can enqueue each step worker to distribute the work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The child workflows would need to be registered at startup time too, correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, all workflows should be registered at startup time.
070fba2 to
862274b
Compare
This is very lacking still, but it runs a basic example