Skip to content

Conversation

@Json-Andriopoulos
Copy link
Contributor

  • Backend heartbeat support (DB, API)
  • Heartbeat monitoring worker

Describe changes

I implemented/fixed _ to achieve _.

Pre-requisites

Please ensure you have done the following:

  • I have read the CONTRIBUTING.md document.
  • I have added tests to cover my changes.
  • I have based my new branch on develop and the open PR is targeting develop. If your branch wasn't based on develop read Contribution guide on rebasing branch to develop.
  • IMPORTANT: I made sure that my changes are reflected properly in the following resources:
    • ZenML Docs
    • Dashboard: Needs to be communicated to the frontend team.
    • Templates: Might need adjustments (that are not reflected in the template tests) in case of non-breaking changes and deprecations.
    • Projects: Depending on the version dependencies, different projects might get affected.

Types of changes

  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Breaking change (fix or feature that would cause existing functionality to change)
  • Other (add details above)

@github-actions github-actions bot added the enhancement New feature or request label Oct 20, 2025
_thread.interrupt_main() # raises KeyboardInterrupt in main thread
# Ensure we stop our own loop as well.
self._running = False
except Exception:
Copy link
Contributor Author

@Json-Andriopoulos Json-Andriopoulos Oct 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: Improve this. For sure try to capture HTTP errors in more verbose logs to avoid excessive log generation if the error is for instance server raising 500 status code.

- Backend heartbeat support (DB, API)
- Heartbeat monitoring worker
@Json-Andriopoulos Json-Andriopoulos force-pushed the feature/3963-step-run-heartbeat branch from 123c1c4 to 18d8b76 Compare October 27, 2025 07:29
@bcdurak bcdurak linked an issue Oct 27, 2025 that may be closed by this pull request
1 task
@Json-Andriopoulos
Copy link
Contributor Author

Json-Andriopoulos commented Oct 27, 2025

Questions/Comments for reviewers @schustmi @bcdurak :

Log info records

I see that in general core components (StepLauncher, StepRunner, etc.) we display a very small number of log records. For better visibility during development I have some log records in the heartbeat worker, should these be removed? I am assuming we display few systemic logs to avoid polluting the user experience as they would be interested in their step function logs only? Some follow-up recommendations would be - a) use structured logs with context variables (https://www.structlog.org/en/stable/) to easily filter records by metadata values b) introduce a systemic logger that is configurable. Suppressed by default, when activated it would present all systemic logs.

Handling of constants

Currently heartbeat interval is hard set as a class variable for the StepHeartBeatWorker cls. For sure I don't want to expose this to user-provided settings as this should be a system setting (too frequent heartbeats from multiple steps may end-up overloading the rest server). I believe a good value would be somewhere in the range of 30-60 seconds. Where would you organize this value? Under config/constants.py? In a config object?

Interrupt implementation

I went over our signals/daemonize implementations. While that would be the proper implementation for any unix-based system it is not compatible with Windows. I opted to use _thead.interrupt_main() instead which raises a KeyboardInterrupt exception by default, capture it with a context manager that reraises it with a custom exception. Let me know your thoughts.

- Updates migration down revision refs
- context-reraise exception
- changes in the step-heartbeat logic
- fix null heartbeat in list/get endpoints
@Json-Andriopoulos Json-Andriopoulos force-pushed the feature/3963-step-run-heartbeat branch from 18d8b76 to 1352677 Compare October 27, 2025 08:55
Comment on lines +107 to +108
self._target_exception = target_exception
self._message = message
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess to simplify this, we could just pass an instance of the exception here instead of the class and message? That would additionally also allow some exceptions which can/need to be instantiated with multiple arguments.

"""Light-weight model for Step Heartbeat responses."""

id: UUID
status: str
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably be of type ExecutionStatus?

"interrupting main thread",
self.name,
)
_thread.interrupt_main() # raises KeyboardInterrupt in main thread
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My dynamic pipelines PR introduces running multiple steps in different threads, which doesn't work with this I think.

Can we somehow store the thread from which the heartbeat worker was started, and then interrupt that thread instead of the main one?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that is an important change, good point. interrupt_main will not work here, we will need to change the pattern a bit. Should I work my changes from your branch?

Comment on lines +222 to +224
step = zen_store().get_run_step(step_run_id, hydrate=True)
pipeline_run = zen_store().get_run(step.pipeline_run_id)
verify_permission_for_model(pipeline_run, action=Action.UPDATE)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering whether this RBAC check is even necessary, as running all of this will take quite some time (two calls to the DB, then a request to the RBAC service).

Is there any real harm in leaving this unprotected? I guess it would allow users potential access to the status of the step, which I'm not sure really is a concern.

Copy link
Contributor Author

@Json-Andriopoulos Json-Andriopoulos Oct 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, we can probably do both authenication & authorization with pipeline tokens. Will discuss with @stefannica for directions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me suggest an alternative: we could limit this endpoint to only be accessed by running pipelines.

Running pipelines (the containerized environment where the steps are running actually) use something called "a workload API token" which is only valid as long as the pipeline run itself is not yet finalized. These workload API tokens are tied to a particular pipeline run (or schedule, in case of scheduled pipelines). So we can also use their scope to limit the range of targets that they can update.

Some references:

A sketch of how you can use this in your endpoint:

def update_heartbeat(
    step_run_id: UUID,
    auth_context: AuthContext = Security(authorize),
) -> StepHeartbeatResponse:

    ...
    if not auth_context.access_token or not auth_context.access_token.schedule_id and not auth_context.access_token.pipeline_run_id:
        raise AuthorizationException("Not authorized")

    if auth_context.access_token.pipeline_run_id:
        # optionally, check that the step ID is part of this run ID
    else: # if auth_context.access_token.schedule_id
        # optionally, check that the step ID is part of a run ID that was scheduled with this schedule

This will no longer rely on RBAC calls, but it might still flood the database with a lot of requests, so maybe you could also implement a mini-caching system like the ones used in the previous code references, to reduce its impact.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@stefannica That was my initial idea as well, but do we use those tokens also when running pipelines with service accounts? I thought at some point we used the API key directly when running scheduled pipelines, but I might be misremembering.

I know for sure though that there is a way to generate a generic unscoped token instead of a workload token when running a pipeline (by setting some token expiration env variable), so we'll have to think about how we handle this case.

Co-authored-by: Michael Schuster <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Implement a step heartbeat function

4 participants