Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 28 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,38 @@ Pods are constructed via `meta.FromAgentSpec` / `meta.FromToolboxSpec` factory h

| Spec.Desire | What the reconciler does | Terminal phase |
|---|---|---|
| `Hibernate` | `meta.HibernateState(true).Apply` on the target pod, then poll `epoch.HasManifest(vmName, meta.HibernateSnapshotTag)` until the snapshot lands. A probe error (transport / 5xx / auth) surfaces as a returned error so controller-runtime logs + retries with backoff. | `Hibernated` |
| `Hibernate` | `meta.HibernateState(true).Apply` on the target pod, then poll `epoch.HasManifest(vmName, meta.HibernateSnapshotTag)` until the snapshot lands or `hibernateTimeout` (3 minutes) trips. A probe error (transport / 5xx / auth) surfaces as a returned error so controller-runtime logs + retries with backoff. | `Hibernated` |
| `Wake` | Check if the container is already `Running` (skip annotation patch if so), otherwise clear `meta.HibernateState` **once** (skip if already cleared to avoid triggering informer events on every requeue cycle), then wait for the container to be `Running` and drop the hibernation snapshot tag from epoch. A wake that does not complete within `wakeTimeout` (5 minutes) is escalated to `Phase=Failed` with a dated message in the `Ready` condition. | `Active` |

On CR deletion the reconciler runs a finalizer (`cocoonhibernation.cocoonset.cocoonstack.io/finalizer`) that clears the `:hibernate` tag from epoch (if `Status.VMName` is set) before removing itself, so deleting a CocoonHibernation never leaves an orphaned snapshot on the registry.

There is no `cocoon-vm-snapshots` ConfigMap bridge — epoch is the single source of truth for hibernation state. Failure paths set `Phase=Failed` with a one-shot message in the `Ready` condition instead of looping forever on a bad reference. A `Failed` wake is recoverable: on re-entry into `Waking` from a non-Waking phase the reconciler explicitly refreshes the Ready condition's `LastTransitionTime` so the wake budget resets cleanly (without the override, `apimeta.SetStatusCondition` would preserve the stale timestamp across the `False → False` transition and the recovered wake would trip the deadline on the next reconcile).
There is no `cocoon-vm-snapshots` ConfigMap bridge — epoch is the single source of truth for hibernation state. Failure paths set `Phase=Failed` with a one-shot message in the `Ready` condition instead of looping forever on a bad reference. Both `Hibernate` and `Wake` Failed phases are recoverable: on re-entry from a non-deadline phase the reconciler refreshes the Ready condition's `LastTransitionTime` so the budget resets cleanly (without the override, `apimeta.SetStatusCondition` would preserve the stale timestamp across the `False → False` transition and the recovered phase would trip the deadline on the next reconcile). Each retry emits a `RetryRequested` Normal Event so the recovery is visible in `kubectl describe`.

### Observability

Reconciler failures surface as K8s Events on the CR plus Prometheus counters on the controller-runtime `/metrics` endpoint:

| Event reason (CocoonHibernation) | Type |
|---|---|
| `HibernateTimedOut`, `WakeTimedOut` | Warning |
| `Hibernated`, `WokenActive`, `RetryRequested` | Normal |

| Event reason (CocoonSet) | Type |
|---|---|
| `PodLifecycleFailed`, `MainAgentFailed`, `SubAgentDeadLetter` | Warning |
| `SubAgentRebuilding`, `RecoveredFromFailure` | Normal |

Metrics:

```
cocoon_operator_subagent_rebuild_total{namespace, cocoonset}
cocoon_operator_subagent_dead_letter_total{namespace, cocoonset}
cocoon_operator_hibernate_phase_duration_seconds{result} # result=ok|timeout
cocoon_operator_wake_phase_duration_seconds{result}
cocoon_operator_lifecycle_state_failed_observed_total{phase}
```

`CocoonSet` consumes the `vm.cocoonstack.io/lifecycle-state=Failed` annotation that vk-cocoon writes on terminal failures (hibernate, wake, post-clone, SAC); the operator flips to `Failed` immediately instead of waiting for `Pod.Status.Phase` to follow. `triageSubAgent` rebuilds a terminal sub pod at most three times with `0/1/5/30 s` exponential backoff, then marks the pod `cocoonset.cocoonstack.io/dead-letter=true` and leaves it in place so a permanently broken slot stops consuming the apiserver budget. Rebuild count persists in the `cocoonset.cocoonstack.io/rebuild-history` annotation on the CocoonSet so the count survives the pod delete.
Comment thread
CMGS marked this conversation as resolved.
Outdated

## Configuration

Expand Down
68 changes: 62 additions & 6 deletions cocoonset/agents.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,18 @@ import (
"maps"
"slices"
"sync/atomic"
"time"

"github.com/projecteru2/core/log"
"golang.org/x/sync/errgroup"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"

cocoonv1 "github.com/cocoonstack/cocoon-common/apis/v1"
"github.com/cocoonstack/cocoon-common/meta"
"github.com/cocoonstack/cocoon-operator/metrics"
)

// subAgentCreateConcurrency caps parallel pod creates during fan-out so a
Expand Down Expand Up @@ -83,15 +87,15 @@ func (r *Reconciler) ensureSubAgents(ctx context.Context, cs *cocoonv1.CocoonSet
}

// triageSubAgent deletes pod when it is terminal or has drifted from spec.
// Returns (deleted, err). A non-deleted return means the pod still matches.
// Returns (deleted, err). A non-deleted return means the pod still matches,
// is in dead-letter, or is waiting on the rebuild backoff.
func (r *Reconciler) triageSubAgent(ctx context.Context, logger *log.Fields, pod *corev1.Pod, cs *cocoonv1.CocoonSet, slot int32) (bool, error) {
if pod.Annotations[annotationDeadLetter] == "true" {
return false, nil
}
switch {
case meta.IsPodTerminal(pod):
logger.Infof(ctx, "sub-agent %s/%s slot %d terminal (phase=%s), deleting for recreate", pod.Namespace, pod.Name, slot, pod.Status.Phase)
if err := r.Delete(ctx, pod); err != nil && !apierrors.IsNotFound(err) {
return false, fmt.Errorf("delete terminal sub-agent slot %d: %w", slot, err)
}
return true, nil
return r.rebuildSubAgent(ctx, logger, pod, cs, slot)
case !podSpecMatchesAgent(pod, cs, slot):
logger.Infof(ctx, "sub-agent %s/%s slot %d spec drifted, deleting for recreate", pod.Namespace, pod.Name, slot)
if err := r.Delete(ctx, pod); err != nil && !apierrors.IsNotFound(err) {
Expand All @@ -102,3 +106,55 @@ func (r *Reconciler) triageSubAgent(ctx context.Context, logger *log.Fields, pod
return false, nil
}
}

// rebuildSubAgent deletes pod with exponential backoff and a dead-letter
// gate. Past maxRebuildAttempts the pod is marked dead-letter and left in
// place so the failure stays visible and rebuild storms cannot consume
// the apiserver budget.
func (r *Reconciler) rebuildSubAgent(ctx context.Context, logger *log.Fields, pod *corev1.Pod, cs *cocoonv1.CocoonSet, slot int32) (bool, error) {
history := readRebuildHistory(cs)
entry := history[slot]
if entry.Count >= maxRebuildAttempts {
if err := r.patchPodAnnotation(ctx, pod, annotationDeadLetter, "true"); err != nil {
return false, err
}
metrics.SubAgentDeadLetterTotal.WithLabelValues(cs.Namespace, cs.Name).Inc()
if r.Recorder != nil {
r.Recorder.Eventf(cs, corev1.EventTypeWarning, "SubAgentDeadLetter",
"slot %d exhausted %d rebuilds; pod %s left in dead-letter", slot, maxRebuildAttempts, pod.Name)
}
return false, nil
}
if wait := backoffDelay(entry.Count); wait > 0 && time.Since(entry.LastDeleted) < wait {
return false, nil
}
Comment thread
CMGS marked this conversation as resolved.
Outdated
Comment thread
CMGS marked this conversation as resolved.
logger.Infof(ctx, "sub-agent %s/%s slot %d terminal (phase=%s), rebuild attempt %d/%d",
pod.Namespace, pod.Name, slot, pod.Status.Phase, entry.Count+1, maxRebuildAttempts)
if err := r.Delete(ctx, pod); err != nil && !apierrors.IsNotFound(err) {
return false, fmt.Errorf("delete terminal sub-agent slot %d: %w", slot, err)
}
entry.Count++
entry.LastDeleted = time.Now()
history[slot] = entry
if err := writeRebuildHistory(cs, history); err != nil {
return true, fmt.Errorf("encode rebuild history: %w", err)
}
if err := r.Update(ctx, cs); err != nil {
Comment thread
CMGS marked this conversation as resolved.
Outdated
return true, fmt.Errorf("persist rebuild history: %w", err)
}
Comment thread
CMGS marked this conversation as resolved.
Outdated
metrics.SubAgentRebuildTotal.WithLabelValues(cs.Namespace, cs.Name).Inc()
if r.Recorder != nil {
r.Recorder.Eventf(cs, corev1.EventTypeNormal, "SubAgentRebuilding",
"slot %d attempt %d/%d", slot, entry.Count, maxRebuildAttempts)
}
return true, nil
}

// patchPodAnnotation sets a single annotation via a strategic merge patch.
func (r *Reconciler) patchPodAnnotation(ctx context.Context, pod *corev1.Pod, key, value string) error {
patch := fmt.Appendf(nil, `{"metadata":{"annotations":{%q:%q}}}`, key, value)
if err := r.Patch(ctx, pod, client.RawPatch(types.StrategicMergePatchType, patch)); err != nil {
return fmt.Errorf("patch pod %s/%s annotation %s: %w", pod.Namespace, pod.Name, key, err)
}
return nil
}
67 changes: 67 additions & 0 deletions cocoonset/rebuild.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package cocoonset

import (
"encoding/json"
"time"

cocoonv1 "github.com/cocoonstack/cocoon-common/apis/v1"
)

const (
annotationRebuildHistory = "cocoonset.cocoonstack.io/rebuild-history"
annotationDeadLetter = "cocoonset.cocoonstack.io/dead-letter"

maxRebuildAttempts = 3
)

// rebuildEntry tracks how many times triageSubAgent has rebuilt a slot.
// Persisted as a JSON map keyed by slot in the CocoonSet annotation so
// the count survives the pod delete that erases the in-pod annotation.
type rebuildEntry struct {
Count int `json:"count"`
LastDeleted time.Time `json:"lastDeleted"`
}

func readRebuildHistory(cs *cocoonv1.CocoonSet) map[int32]rebuildEntry {
raw := cs.Annotations[annotationRebuildHistory]
if raw == "" {
return map[int32]rebuildEntry{}
}
m := map[int32]rebuildEntry{}
if err := json.Unmarshal([]byte(raw), &m); err != nil {
return map[int32]rebuildEntry{}
}
Comment thread
CMGS marked this conversation as resolved.
return m
}

func writeRebuildHistory(cs *cocoonv1.CocoonSet, m map[int32]rebuildEntry) error {
// Garbage-collect entries for slots no longer in the spec.
for slot := range m {
if slot > cs.Spec.Agent.Replicas {
delete(m, slot)
}
}
raw, err := json.Marshal(m)
if err != nil {
return err
}
if cs.Annotations == nil {
cs.Annotations = map[string]string{}
}
cs.Annotations[annotationRebuildHistory] = string(raw)
return nil
}

// backoffDelay returns the wait before the next rebuild attempt: 0, 1s, 5s, 30s.
func backoffDelay(priorCount int) time.Duration {
switch priorCount {
case 0:
return 0
case 1:
return 1 * time.Second
case 2:
return 5 * time.Second
default:
return 30 * time.Second
Comment thread
CMGS marked this conversation as resolved.
}
}
70 changes: 70 additions & 0 deletions cocoonset/rebuild_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package cocoonset

import (
"testing"
"time"

cocoonv1 "github.com/cocoonstack/cocoon-common/apis/v1"
)

func TestRebuildHistoryRoundTrip(t *testing.T) {
cs := &cocoonv1.CocoonSet{}
cs.Spec.Agent.Replicas = 3
in := map[int32]rebuildEntry{
1: {Count: 2, LastDeleted: time.Date(2026, 5, 14, 1, 0, 0, 0, time.UTC)},
2: {Count: 1, LastDeleted: time.Date(2026, 5, 14, 1, 0, 30, 0, time.UTC)},
}
if err := writeRebuildHistory(cs, in); err != nil {
t.Fatalf("writeRebuildHistory: %v", err)
}
got := readRebuildHistory(cs)
if got[1].Count != 2 || got[2].Count != 1 {
t.Fatalf("round-trip lost counts: %+v", got)
}
}

func TestRebuildHistoryGarbageCollectsStaleSlots(t *testing.T) {
cs := &cocoonv1.CocoonSet{}
cs.Spec.Agent.Replicas = 2
in := map[int32]rebuildEntry{
1: {Count: 1},
2: {Count: 2},
7: {Count: 3}, // slot beyond Replicas, must be pruned
}
if err := writeRebuildHistory(cs, in); err != nil {
t.Fatalf("writeRebuildHistory: %v", err)
}
got := readRebuildHistory(cs)
if _, ok := got[7]; ok {
t.Fatalf("expected slot 7 pruned, got %+v", got)
}
if len(got) != 2 {
t.Fatalf("expected 2 surviving slots, got %d: %+v", len(got), got)
}
}

func TestBackoffDelaySchedule(t *testing.T) {
cases := []struct {
count int
want time.Duration
}{
{0, 0},
{1, 1 * time.Second},
{2, 5 * time.Second},
{3, 30 * time.Second},
{10, 30 * time.Second},
}
for _, tc := range cases {
if got := backoffDelay(tc.count); got != tc.want {
t.Errorf("backoffDelay(%d) = %s, want %s", tc.count, got, tc.want)
}
}
}

func TestReadRebuildHistoryHandlesCorruptAnnotation(t *testing.T) {
cs := &cocoonv1.CocoonSet{}
cs.Annotations = map[string]string{annotationRebuildHistory: "not-json"}
if got := readRebuildHistory(cs); len(got) != 0 {
t.Fatalf("corrupt annotation must yield empty history, got %+v", got)
}
}
43 changes: 37 additions & 6 deletions cocoonset/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -17,6 +18,7 @@ import (

cocoonv1 "github.com/cocoonstack/cocoon-common/apis/v1"
"github.com/cocoonstack/cocoon-common/meta"
"github.com/cocoonstack/cocoon-operator/metrics"
"github.com/cocoonstack/cocoon-operator/snapshot"
)

Expand All @@ -30,8 +32,9 @@ const (
// and toolbox pods to match the declared spec.
type Reconciler struct {
client.Client
Scheme *runtime.Scheme
Epoch snapshot.Registry
Scheme *runtime.Scheme
Epoch snapshot.Registry
Recorder record.EventRecorder
}

// SetupWithManager registers the reconciler. `For` uses GenerationChangedPredicate
Expand Down Expand Up @@ -89,10 +92,24 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
return ctrl.Result{}, err
}

// Stop reconciling if main agent is in a terminal phase (e.g. Failed).
if classified.main != nil && meta.IsPodTerminal(classified.main) {
return ctrl.Result{}, r.patchStatus(ctx, &cs,
buildStatus(&cs, classified, cocoonv1.CocoonSetPhaseFailed))
// Stop reconciling if main agent is in a terminal state. lifecycle-state=Failed
// is the vk-cocoon-driven path (terminal before Pod Phase flips); IsPodTerminal
// is the kubelet-driven path. Both transition CocoonSet to Failed.
if classified.main != nil {
if state := meta.ReadLifecycleState(classified.main); state == meta.LifecycleStateFailed {
r.observeMainPodFailed(&cs, classified.main, "PodLifecycleFailed")
return ctrl.Result{}, r.patchStatus(ctx, &cs,
buildStatus(&cs, classified, cocoonv1.CocoonSetPhaseFailed))
}
if meta.IsPodTerminal(classified.main) {
r.observeMainPodFailed(&cs, classified.main, "MainAgentFailed")
return ctrl.Result{}, r.patchStatus(ctx, &cs,
buildStatus(&cs, classified, cocoonv1.CocoonSetPhaseFailed))
Comment thread
CMGS marked this conversation as resolved.
}
if cs.Status.Phase == cocoonv1.CocoonSetPhaseFailed && meta.IsPodReady(classified.main) && r.Recorder != nil {
r.Recorder.Eventf(&cs, corev1.EventTypeNormal, "RecoveredFromFailure",
"main pod %s/%s is Ready again", classified.main.Namespace, classified.main.Name)
}
}

if cs.Spec.Suspend {
Expand Down Expand Up @@ -151,3 +168,17 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu

return ctrl.Result{}, r.patchStatus(ctx, &cs, buildStatus(&cs, classified, ""))
}

// observeMainPodFailed records the failure on the metric + event channels.
// Message is taken from the lifecycle-state-message annotation when present.
func (r *Reconciler) observeMainPodFailed(cs *cocoonv1.CocoonSet, pod *corev1.Pod, reason string) {
metrics.LifecycleStateFailedObservedTotal.WithLabelValues(string(cs.Status.Phase)).Inc()
Comment thread
CMGS marked this conversation as resolved.
Outdated
if r.Recorder == nil {
return
}
msg := pod.Annotations[meta.AnnotationLifecycleStateMessage]
if msg == "" {
msg = string(pod.Status.Phase)
}
r.Recorder.Eventf(cs, corev1.EventTypeWarning, reason, "main pod %s/%s: %s", pod.Namespace, pod.Name, msg)
}
2 changes: 1 addition & 1 deletion cocoonset/reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ func TestEnsureSubAgentsReplacesTerminalPod(t *testing.T) {

cli := ctrlfake.NewClientBuilder().
WithScheme(scheme).
WithObjects(subPod).
WithObjects(cs, subPod).
Build()
r := &Reconciler{Client: cli, Scheme: scheme}
classified := classifiedPods{
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/cocoonstack/epoch v0.2.3-0.20260513102541-36ecd7a40af4
github.com/go-logr/logr v1.4.3
github.com/projecteru2/core v0.0.0-20241016125006-ff909eefe04c
github.com/prometheus/client_golang v1.23.2
golang.org/x/sync v0.19.0
k8s.io/api v0.35.3
k8s.io/apimachinery v0.35.3
Expand Down Expand Up @@ -48,7 +49,6 @@ require (
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_golang v1.23.2 // indirect
github.com/prometheus/client_model v0.6.2 // indirect
github.com/prometheus/common v0.66.1 // indirect
github.com/prometheus/procfs v0.16.1 // indirect
Expand Down
Loading