-
Notifications
You must be signed in to change notification settings - Fork 131
Add: Checkpoint/restore for pods #1399
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
| }) | ||
|
|
||
| return exitCode, request.ContainerId, err | ||
| return -1, "", fmt.Errorf("checkpoint not found") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The parent function already handles this exact intent of running the code without checkpoints as a fallback. This is just duplicated.
| log.Error().Str("container_id", request.ContainerId).Msgf("failed to update checkpoint state: %v", updateStateErr) | ||
| } | ||
|
|
||
| err = exposeNetwork() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When we are checkpointing, we do not want requests to taint the checkpoint. It should only checkpoint the ready state of the pod and not any running requests. In the case of an API server, the running request would be a connection.
pkg/worker/lifecycle.go
Outdated
| return | ||
| } | ||
| } | ||
| // for idx, bindPort := range opts.BindPorts { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@luke-lombardi Would it be problem if we delay the exposing of ports right before the container starts instead?
|
|
||
| if self.checkpoint_enabled and self.checkpoint_condition is not None: | ||
| self.entrypoint = [ | ||
| "(python -m beta9.runner.checkpoint &) &&", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With regards to the current snippet of entrypoint code we have, this is the best place to put the script.
we can also move this to the backend and pass USER_CODE_DIR
["sh", "-c", f"cd {USER_CODE_DIR} && {' '.join(self.entrypoint)}"]
|
|
||
| if not module and hasattr(user_obj, "cleanup_deployment_artifacts"): | ||
| user_obj.cleanup_deployment_artifacts() | ||
| finally: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
GRPC errors left pod artifacts in the repo and they were not properly cleaned up.
| updateStateErr := s.updateCheckpointState(request, types.CheckpointStatusRestoreFailed) | ||
| if updateStateErr != nil { | ||
| log.Error().Str("container_id", request.ContainerId).Msgf("failed to update checkpoint state: %v", updateStateErr) | ||
| var e *runc.ExitError |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[TODO]: This logic is not complete. @luke-lombardi I want to chat with you about stop and error conditions related to a running RestoreCheckpoint process.
A SIGKILL from container stop event from user would put the checkpoint into a restore_failed state which is not what we want but this SIGKILL can also occur for other factors that are related to a failed restore. We need to hash about better indicators of failure in a restore checkpoint process.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
15 issues found across 16 files
React with 👍 or 👎 to teach cubic. You can also tag @cubic-dev-ai to give feedback, ask questions, or re-run the review.
| user_obj.cleanup_deployment_artifacts() | ||
| finally: | ||
| if not module and hasattr(user_obj, "cleanup_deployment_artifacts"): | ||
| user_obj.cleanup_deployment_artifacts() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Exceptions from cleanup_deployment_artifacts inside finally can mask the original failure or exit; wrap cleanup in its own try/except to avoid overshadowing errors.
Prompt for AI agents
Address the following comment on sdk/src/beta9/cli/deployment.py at line 197:
<comment>Exceptions from cleanup_deployment_artifacts inside finally can mask the original failure or exit; wrap cleanup in its own try/except to avoid overshadowing errors.</comment>
<file context>
@@ -180,17 +180,21 @@ def create_deployment(
- user_obj.cleanup_deployment_artifacts()
+ finally:
+ if not module and hasattr(user_obj, "cleanup_deployment_artifacts"):
+ user_obj.cleanup_deployment_artifacts()
if capture_logs.capture_logs:
</file context>
|
|
||
| if self.checkpoint_enabled and self.checkpoint_condition is not None: | ||
| self.entrypoint = [ | ||
| "(python -m beta9.runner.checkpoint &) &&", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shell snippet is injected into entrypoint without a shell wrapper for custom images, causing exec failure and overriding the image's default ENTRYPOINT.
Prompt for AI agents
Address the following comment on sdk/src/beta9/abstractions/pod.py at line 310:
<comment>Shell snippet is injected into entrypoint without a shell wrapper for custom images, causing exec failure and overriding the image's default ENTRYPOINT.</comment>
<file context>
@@ -281,6 +305,12 @@ def deploy(
+ if self.checkpoint_enabled and self.checkpoint_condition is not None:
+ self.entrypoint = [
+ "(python -m beta9.runner.checkpoint &) &&",
+ *self.entrypoint,
+ ]
</file context>
| fmt.Sprintf("STUB_ID=%s", stub.ExternalId), | ||
| fmt.Sprintf("STUB_TYPE=%s", stub.Type), | ||
| fmt.Sprintf("KEEP_WARM_SECONDS=%d", stubConfig.KeepWarmSeconds), | ||
| fmt.Sprintf("CHECKPOINT_ENABLED=%t", stubConfig.CheckpointEnabled), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CHECKPOINT_ENABLED env var reflects stubConfig instead of the computed effective value, causing mismatches when checkpointing is disabled (e.g., multi-GPU).
Prompt for AI agents
Address the following comment on pkg/abstractions/pod/pod.go at line 285:
<comment>CHECKPOINT_ENABLED env var reflects stubConfig instead of the computed effective value, causing mismatches when checkpointing is disabled (e.g., multi-GPU).</comment>
<file context>
@@ -282,6 +282,8 @@ func (s *GenericPodService) run(ctx context.Context, authInfo *auth.AuthInfo, st
fmt.Sprintf("STUB_ID=%s", stub.ExternalId),
fmt.Sprintf("STUB_TYPE=%s", stub.Type),
fmt.Sprintf("KEEP_WARM_SECONDS=%d", stubConfig.KeepWarmSeconds),
+ fmt.Sprintf("CHECKPOINT_ENABLED=%t", stubConfig.CheckpointEnabled),
+ fmt.Sprintf("CHECKPOINT_CONDITION=%s", stubConfig.CheckpointCondition),
}...)
</file context>
| workers_ready.value += 1 | ||
|
|
||
| if workers_ready.value == config.workers: | ||
| Path(CHECKPOINT_SIGNAL_FILE).touch(exist_ok=True) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Creating the checkpoint signal without waiting for all workers can race with worker coordination; delegate to wait_for_checkpoint() to ensure readiness across workers.
Prompt for AI agents
Address the following comment on sdk/src/beta9/runner/checkpoint.py at line 30:
<comment>Creating the checkpoint signal without waiting for all workers can race with worker coordination; delegate to wait_for_checkpoint() to ensure readiness across workers.</comment>
<file context>
@@ -0,0 +1,71 @@
+ workers_ready.value += 1
+
+ if workers_ready.value == config.workers:
+ Path(CHECKPOINT_SIGNAL_FILE).touch(exist_ok=True)
+ return _reload_config()
+
</file context>
| }) | ||
|
|
||
| return exitCode, request.ContainerId, err | ||
| return -1, "", fmt.Errorf("checkpoint not found") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing fallback to normal run when checkpoint is not found; this now returns an error instead of starting the container normally.
(Based on the PR description that the worker falls back to a normal run if no checkpoint is found.)
Prompt for AI agents
Address the following comment on pkg/worker/criu.go at line 150:
<comment>Missing fallback to normal run when checkpoint is not found; this now returns an error instead of starting the container normally.
(Based on the PR description that the worker falls back to a normal run if no checkpoint is found.)</comment>
<file context>
@@ -123,31 +129,30 @@ func (s *Worker) attemptCheckpointOrRestore(ctx context.Context, request *types.
- })
-
- return exitCode, request.ContainerId, err
+ return -1, "", fmt.Errorf("checkpoint not found")
}
</file context>
| if updateStateErr != nil { | ||
| log.Error().Str("container_id", request.ContainerId).Msgf("failed to update checkpoint state: %v", updateStateErr) | ||
| var e *runc.ExitError | ||
| if errors.As(err, &e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Restore failures that are not runc.ExitError won't update checkpoint state or be logged, leaving stale/incorrect state.
Prompt for AI agents
Address the following comment on pkg/worker/criu.go at line 133:
<comment>Restore failures that are not runc.ExitError won't update checkpoint state or be logged, leaving stale/incorrect state.</comment>
<file context>
@@ -123,31 +129,30 @@ func (s *Worker) attemptCheckpointOrRestore(ctx context.Context, request *types.
- if updateStateErr != nil {
- log.Error().Str("container_id", request.ContainerId).Msgf("failed to update checkpoint state: %v", updateStateErr)
+ var e *runc.ExitError
+ if errors.As(err, &e) {
+ code := e.Status
+
</file context>
| } | ||
| defer f.Close() | ||
|
|
||
| err = exposeNetwork() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Network is exposed before a successful restore, potentially allowing premature connections; expose after a successful restore instead.
(Based on the PR description stating the worker only exposes ports after checkpoint or restore.)
Prompt for AI agents
Address the following comment on pkg/worker/criu.go at line 117:
<comment>Network is exposed before a successful restore, potentially allowing premature connections; expose after a successful restore instead.
(Based on the PR description stating the worker only exposes ports after checkpoint or restore.)</comment>
<file context>
@@ -113,6 +114,11 @@ func (s *Worker) attemptCheckpointOrRestore(ctx context.Context, request *types.
}
defer f.Close()
+ err = exposeNetwork()
+ if err != nil {
+ return -1, "", fmt.Errorf("failed to expose network: %v", err)
</file context>
| fmt.Sprintf("STUB_ID=%s", i.Stub.ExternalId), | ||
| fmt.Sprintf("STUB_TYPE=%s", i.Stub.Type), | ||
| fmt.Sprintf("KEEP_WARM_SECONDS=%d", i.StubConfig.KeepWarmSeconds), | ||
| fmt.Sprintf("CHECKPOINT_ENABLED=%t", i.StubConfig.CheckpointEnabled), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adding CHECKPOINT_ENABLED to request.Env duplicates the worker-controlled flag and can override it due to merge order; it also ignores the computed effective value (GPU>1), risking inconsistent checkpoint behavior.
Prompt for AI agents
Address the following comment on pkg/abstractions/pod/instance.go at line 36:
<comment>Adding CHECKPOINT_ENABLED to request.Env duplicates the worker-controlled flag and can override it due to merge order; it also ignores the computed effective value (GPU>1), risking inconsistent checkpoint behavior.</comment>
<file context>
@@ -33,6 +33,8 @@ func (i *podInstance) startContainers(containersToRun int) error {
fmt.Sprintf("STUB_ID=%s", i.Stub.ExternalId),
fmt.Sprintf("STUB_TYPE=%s", i.Stub.Type),
fmt.Sprintf("KEEP_WARM_SECONDS=%d", i.StubConfig.KeepWarmSeconds),
+ fmt.Sprintf("CHECKPOINT_ENABLED=%t", i.StubConfig.CheckpointEnabled),
+ fmt.Sprintf("CHECKPOINT_CONDITION=%s", i.StubConfig.CheckpointCondition),
}...)
</file context>
| return | ||
|
|
||
| module, func = config.checkpoint_condition.split(":") | ||
| target_module = importlib.import_module(module) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rule violated: Prevent Redundant Code Duplication
Repeated dynamic import/lookup logic; extract a shared utility to load a callable from "module:function" to prevent code duplication across modules.
Prompt for AI agents
Address the following comment on sdk/src/beta9/runner/checkpoint.py at line 48:
<comment>Repeated dynamic import/lookup logic; extract a shared utility to load a callable from "module:function" to prevent code duplication across modules.</comment>
<file context>
@@ -0,0 +1,71 @@
+ return
+
+ module, func = config.checkpoint_condition.split(":")
+ target_module = importlib.import_module(module)
+ method = getattr(target_module, func)
+
</file context>
Summary by cubic
Adds CRIU-based checkpoint/restore for pods with an optional user-defined checkpoint condition. This lets pods start from a pre-warmed state to reduce cold starts.
New Features
Migration