Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 28 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,38 @@ Pods are constructed via `meta.FromAgentSpec` / `meta.FromToolboxSpec` factory h

| Spec.Desire | What the reconciler does | Terminal phase |
|---|---|---|
| `Hibernate` | `meta.HibernateState(true).Apply` on the target pod, then poll `epoch.HasManifest(vmName, meta.HibernateSnapshotTag)` until the snapshot lands. A probe error (transport / 5xx / auth) surfaces as a returned error so controller-runtime logs + retries with backoff. | `Hibernated` |
| `Hibernate` | `meta.HibernateState(true).Apply` on the target pod, then poll `epoch.HasManifest(vmName, meta.HibernateSnapshotTag)` until the snapshot lands or `hibernateTimeout` (3 minutes) trips. A probe error (transport / 5xx / auth) surfaces as a returned error so controller-runtime logs + retries with backoff. | `Hibernated` |
| `Wake` | Check if the container is already `Running` (skip annotation patch if so), otherwise clear `meta.HibernateState` **once** (skip if already cleared to avoid triggering informer events on every requeue cycle), then wait for the container to be `Running` and drop the hibernation snapshot tag from epoch. A wake that does not complete within `wakeTimeout` (5 minutes) is escalated to `Phase=Failed` with a dated message in the `Ready` condition. | `Active` |

On CR deletion the reconciler runs a finalizer (`cocoonhibernation.cocoonset.cocoonstack.io/finalizer`) that clears the `:hibernate` tag from epoch (if `Status.VMName` is set) before removing itself, so deleting a CocoonHibernation never leaves an orphaned snapshot on the registry.

There is no `cocoon-vm-snapshots` ConfigMap bridge — epoch is the single source of truth for hibernation state. Failure paths set `Phase=Failed` with a one-shot message in the `Ready` condition instead of looping forever on a bad reference. A `Failed` wake is recoverable: on re-entry into `Waking` from a non-Waking phase the reconciler explicitly refreshes the Ready condition's `LastTransitionTime` so the wake budget resets cleanly (without the override, `apimeta.SetStatusCondition` would preserve the stale timestamp across the `False → False` transition and the recovered wake would trip the deadline on the next reconcile).
There is no `cocoon-vm-snapshots` ConfigMap bridge — epoch is the single source of truth for hibernation state. Failure paths set `Phase=Failed` with a one-shot message in the `Ready` condition instead of looping forever on a bad reference. Both `Hibernate` and `Wake` Failed phases are recoverable: on re-entry from a non-deadline phase the reconciler refreshes the Ready condition's `LastTransitionTime` so the budget resets cleanly (without the override, `apimeta.SetStatusCondition` would preserve the stale timestamp across the `False → False` transition and the recovered phase would trip the deadline on the next reconcile). Each retry emits a `RetryRequested` Normal Event so the recovery is visible in `kubectl describe`.

### Observability

Reconciler failures surface as K8s Events on the CR plus Prometheus counters on the controller-runtime `/metrics` endpoint:

| Event reason (CocoonHibernation) | Type |
|---|---|
| `HibernateTimedOut`, `WakeTimedOut` | Warning |
| `Hibernated`, `WokenActive`, `RetryRequested` | Normal |

| Event reason (CocoonSet) | Type |
|---|---|
| `PodLifecycleFailed`, `MainAgentFailed`, `SubAgentDeadLetter` | Warning |
| `SubAgentRebuilding`, `RecoveredFromFailure` | Normal |

Metrics:

```
cocoon_operator_subagent_rebuild_total{namespace, cocoonset}
cocoon_operator_subagent_dead_letter_total{namespace, cocoonset}
cocoon_operator_hibernate_phase_duration_seconds{result} # result=ok|timeout
cocoon_operator_wake_phase_duration_seconds{result}
cocoon_operator_lifecycle_state_failed_observed_total{phase}
```

`CocoonSet` consumes the `vm.cocoonstack.io/lifecycle-state=Failed` annotation that vk-cocoon writes on terminal failures (hibernate, wake, post-clone, SAC); the operator flips to `Failed` immediately instead of waiting for `Pod.Status.Phase` to follow. `triageSubAgent` rebuilds a terminal sub pod at most three times with `0/1/5/30 s` exponential backoff, then marks the pod `cocoonset.cocoonstack.io/dead-letter=true` and leaves it in place so a permanently broken slot stops consuming the apiserver budget. Rebuild count persists in the `cocoonset.cocoonstack.io/rebuild-history` annotation on the CocoonSet so the count survives the pod delete.
Comment thread
CMGS marked this conversation as resolved.
Outdated

## Configuration

Expand Down
117 changes: 99 additions & 18 deletions cocoonset/agents.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,18 @@ import (
"maps"
"slices"
"sync/atomic"
"time"

"github.com/projecteru2/core/log"
"golang.org/x/sync/errgroup"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"

cocoonv1 "github.com/cocoonstack/cocoon-common/apis/v1"
"github.com/cocoonstack/cocoon-common/meta"
"github.com/cocoonstack/cocoon-operator/metrics"
)

// subAgentCreateConcurrency caps parallel pod creates during fan-out so a
Expand All @@ -23,24 +27,30 @@ import (
const subAgentCreateConcurrency = 8

// ensureSubAgents creates/deletes sub-agent pods to match [1..Replicas].
// Returns true when cluster state was mutated. Missing slots are created
// concurrently so batch scale-ups do not serialize N apiserver round trips.
func (r *Reconciler) ensureSubAgents(ctx context.Context, cs *cocoonv1.CocoonSet, classified classifiedPods, mainVMName, mainNodeName string) (bool, error) {
// Returns changed (true when cluster state was mutated) and requeueAfter
// (non-zero when a sub-agent is in rebuild backoff and the caller should
// re-reconcile when backoff elapses). Missing slots are created concurrently
// so batch scale-ups do not serialize N apiserver round trips.
func (r *Reconciler) ensureSubAgents(ctx context.Context, cs *cocoonv1.CocoonSet, classified classifiedPods, mainVMName, mainNodeName string) (bool, time.Duration, error) {
logger := log.WithFunc("cocoonset.Reconciler.ensureSubAgents")
changed := false
var requeueAfter time.Duration

g, gctx := errgroup.WithContext(ctx)
g.SetLimit(subAgentCreateConcurrency)
var created atomic.Bool
for slot := int32(1); slot <= cs.Spec.Agent.Replicas; slot++ {
if pod, exists := classified.sub[slot]; exists {
deleted, err := r.triageSubAgent(ctx, logger, pod, cs, slot)
deleted, wait, err := r.triageSubAgent(ctx, logger, pod, cs, slot)
if err != nil {
return changed, err
return changed, requeueAfter, err
}
if deleted {
changed = true
}
if wait > 0 && (requeueAfter == 0 || wait < requeueAfter) {
requeueAfter = wait
}
continue
}
g.Go(func() error {
Expand All @@ -60,7 +70,7 @@ func (r *Reconciler) ensureSubAgents(ctx context.Context, cs *cocoonv1.CocoonSet
})
}
if err := g.Wait(); err != nil {
return changed || created.Load(), err
return changed || created.Load(), requeueAfter, err
}
if created.Load() {
changed = true
Expand All @@ -74,31 +84,102 @@ func (r *Reconciler) ensureSubAgents(ctx context.Context, cs *cocoonv1.CocoonSet
if apierrors.IsNotFound(err) {
continue
}
return changed, fmt.Errorf("delete extra sub-agent slot %d: %w", slot, err)
return changed, requeueAfter, fmt.Errorf("delete extra sub-agent slot %d: %w", slot, err)
}
logger.Infof(ctx, "deleted extra sub-agent %s/%s", pod.Namespace, pod.Name)
changed = true
}
return changed, nil
return changed, requeueAfter, nil
}

// triageSubAgent deletes pod when it is terminal or has drifted from spec.
// Returns (deleted, err). A non-deleted return means the pod still matches.
func (r *Reconciler) triageSubAgent(ctx context.Context, logger *log.Fields, pod *corev1.Pod, cs *cocoonv1.CocoonSet, slot int32) (bool, error) {
// requeueAfter > 0 means the slot is in rebuild backoff and the caller should
// re-reconcile when the window elapses; deleted=false with requeueAfter=0
// means the pod still matches or is in dead-letter.
func (r *Reconciler) triageSubAgent(ctx context.Context, logger *log.Fields, pod *corev1.Pod, cs *cocoonv1.CocoonSet, slot int32) (bool, time.Duration, error) {
if pod.Annotations[annotationDeadLetter] == "true" {
return false, 0, nil
}
switch {
case meta.IsPodTerminal(pod):
logger.Infof(ctx, "sub-agent %s/%s slot %d terminal (phase=%s), deleting for recreate", pod.Namespace, pod.Name, slot, pod.Status.Phase)
if err := r.Delete(ctx, pod); err != nil && !apierrors.IsNotFound(err) {
return false, fmt.Errorf("delete terminal sub-agent slot %d: %w", slot, err)
}
return true, nil
return r.rebuildSubAgent(ctx, logger, pod, cs, slot)
case !podSpecMatchesAgent(pod, cs, slot):
logger.Infof(ctx, "sub-agent %s/%s slot %d spec drifted, deleting for recreate", pod.Namespace, pod.Name, slot)
if err := r.Delete(ctx, pod); err != nil && !apierrors.IsNotFound(err) {
return false, fmt.Errorf("delete drifted sub-agent slot %d: %w", slot, err)
return false, 0, fmt.Errorf("delete drifted sub-agent slot %d: %w", slot, err)
}
return true, nil
return true, 0, nil
default:
return false, nil
return false, 0, nil
}
}

// rebuildSubAgent deletes pod with exponential backoff and a dead-letter
// gate. Past maxRebuildAttempts the pod is marked dead-letter and left in
// place so the failure stays visible and rebuild storms cannot consume
// the apiserver budget. The history annotation is persisted **before** the
// delete so a failed delete cannot bypass maxRebuildAttempts on retry, and
// the patch goes through a DeepCopy so concurrent ensureSubAgents goroutines
// observe a stable cs pointer.
func (r *Reconciler) rebuildSubAgent(ctx context.Context, logger *log.Fields, pod *corev1.Pod, cs *cocoonv1.CocoonSet, slot int32) (bool, time.Duration, error) {
history := readRebuildHistory(cs)
entry := history[slot]
if entry.Count >= maxRebuildAttempts {
if err := r.patchPodAnnotation(ctx, pod, annotationDeadLetter, "true"); err != nil {
return false, 0, err
}
metrics.SubAgentDeadLetterTotal.WithLabelValues(cs.Namespace, cs.Name).Inc()
if r.Recorder != nil {
r.Recorder.Eventf(cs, corev1.EventTypeWarning, "SubAgentDeadLetter",
"slot %d exhausted %d rebuilds; pod %s left in dead-letter", slot, maxRebuildAttempts, pod.Name)
}
return false, 0, nil
}
if wait := backoffDelay(entry.Count); wait > 0 {
remaining := wait - time.Since(entry.LastDeleted)
if remaining > 0 {
return false, remaining, nil
}
}
next := maps.Clone(history)
next[slot] = rebuildEntry{Count: entry.Count + 1, LastDeleted: time.Now()}
if err := r.patchRebuildHistory(ctx, cs, next); err != nil {
return false, 0, fmt.Errorf("persist rebuild history: %w", err)
}
Comment thread
CMGS marked this conversation as resolved.
logger.Infof(ctx, "sub-agent %s/%s slot %d terminal (phase=%s), rebuild attempt %d/%d",
pod.Namespace, pod.Name, slot, pod.Status.Phase, next[slot].Count, maxRebuildAttempts)
if err := r.Delete(ctx, pod); err != nil && !apierrors.IsNotFound(err) {
Comment thread
CMGS marked this conversation as resolved.
Outdated
return false, 0, fmt.Errorf("delete terminal sub-agent slot %d: %w", slot, err)
}
metrics.SubAgentRebuildTotal.WithLabelValues(cs.Namespace, cs.Name).Inc()
if r.Recorder != nil {
r.Recorder.Eventf(cs, corev1.EventTypeNormal, "SubAgentRebuilding",
"slot %d attempt %d/%d", slot, next[slot].Count, maxRebuildAttempts)
}
return true, 0, nil
}

// patchPodAnnotation sets a single annotation via a strategic merge patch.
func (r *Reconciler) patchPodAnnotation(ctx context.Context, pod *corev1.Pod, key, value string) error {
patch := fmt.Appendf(nil, `{"metadata":{"annotations":{%q:%q}}}`, key, value)
if err := r.Patch(ctx, pod, client.RawPatch(types.StrategicMergePatchType, patch)); err != nil {
return fmt.Errorf("patch pod %s/%s annotation %s: %w", pod.Namespace, pod.Name, key, err)
}
return nil
}

// patchRebuildHistory writes the new history into the CocoonSet annotation
// without mutating cs — the concurrent ensureSubAgents goroutines read cs
// fields, so an in-place Update would race with their reads.
func (r *Reconciler) patchRebuildHistory(ctx context.Context, cs *cocoonv1.CocoonSet, history map[int32]rebuildEntry) error {
enc, err := encodeRebuildHistory(cs.Spec.Agent.Replicas, history)
if err != nil {
return fmt.Errorf("encode rebuild history: %w", err)
}
csCopy := cs.DeepCopy()
if csCopy.Annotations == nil {
csCopy.Annotations = map[string]string{}
}
csCopy.Annotations[annotationRebuildHistory] = enc
return r.Patch(ctx, csCopy, client.MergeFrom(cs))
}
64 changes: 64 additions & 0 deletions cocoonset/rebuild.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package cocoonset

import (
"encoding/json"
"time"

cocoonv1 "github.com/cocoonstack/cocoon-common/apis/v1"
)

const (
annotationRebuildHistory = "cocoonset.cocoonstack.io/rebuild-history"
annotationDeadLetter = "cocoonset.cocoonstack.io/dead-letter"

maxRebuildAttempts = 3
)

// rebuildEntry tracks how many times triageSubAgent has rebuilt a slot.
// Persisted as a JSON map keyed by slot in the CocoonSet annotation so
// the count survives the pod delete that erases the in-pod annotation.
type rebuildEntry struct {
Count int `json:"count"`
LastDeleted time.Time `json:"lastDeleted"`
}

func readRebuildHistory(cs *cocoonv1.CocoonSet) map[int32]rebuildEntry {
raw := cs.Annotations[annotationRebuildHistory]
if raw == "" {
return map[int32]rebuildEntry{}
}
m := map[int32]rebuildEntry{}
if err := json.Unmarshal([]byte(raw), &m); err != nil {
return map[int32]rebuildEntry{}
}
Comment thread
CMGS marked this conversation as resolved.
return m
}

// encodeRebuildHistory garbage-collects entries for slots no longer in the
// spec and returns the JSON payload for the annotation.
func encodeRebuildHistory(replicas int32, m map[int32]rebuildEntry) (string, error) {
for slot := range m {
if slot > replicas {
delete(m, slot)
}
}
raw, err := json.Marshal(m)
if err != nil {
return "", err
}
return string(raw), nil
}

// backoffDelay returns the wait before the next rebuild attempt: 0, 1s, 5s, 30s.
func backoffDelay(priorCount int) time.Duration {
switch priorCount {
case 0:
return 0
case 1:
return 1 * time.Second
case 2:
return 5 * time.Second
default:
return 30 * time.Second
Comment thread
CMGS marked this conversation as resolved.
}
}
73 changes: 73 additions & 0 deletions cocoonset/rebuild_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package cocoonset

import (
"testing"
"time"

cocoonv1 "github.com/cocoonstack/cocoon-common/apis/v1"
)

func TestRebuildHistoryRoundTrip(t *testing.T) {
cs := &cocoonv1.CocoonSet{}
cs.Spec.Agent.Replicas = 3
in := map[int32]rebuildEntry{
1: {Count: 2, LastDeleted: time.Date(2026, 5, 14, 1, 0, 0, 0, time.UTC)},
2: {Count: 1, LastDeleted: time.Date(2026, 5, 14, 1, 0, 30, 0, time.UTC)},
}
enc, err := encodeRebuildHistory(cs.Spec.Agent.Replicas, in)
if err != nil {
t.Fatalf("encodeRebuildHistory: %v", err)
}
cs.Annotations = map[string]string{annotationRebuildHistory: enc}
got := readRebuildHistory(cs)
if got[1].Count != 2 || got[2].Count != 1 {
t.Fatalf("round-trip lost counts: %+v", got)
}
}

func TestRebuildHistoryGarbageCollectsStaleSlots(t *testing.T) {
in := map[int32]rebuildEntry{
1: {Count: 1},
2: {Count: 2},
7: {Count: 3}, // slot beyond Replicas, must be pruned
}
enc, err := encodeRebuildHistory(2, in)
if err != nil {
t.Fatalf("encodeRebuildHistory: %v", err)
}
cs := &cocoonv1.CocoonSet{}
cs.Annotations = map[string]string{annotationRebuildHistory: enc}
got := readRebuildHistory(cs)
if _, ok := got[7]; ok {
t.Fatalf("expected slot 7 pruned, got %+v", got)
}
if len(got) != 2 {
t.Fatalf("expected 2 surviving slots, got %d: %+v", len(got), got)
}
}

func TestBackoffDelaySchedule(t *testing.T) {
cases := []struct {
count int
want time.Duration
}{
{0, 0},
{1, 1 * time.Second},
{2, 5 * time.Second},
{3, 30 * time.Second},
{10, 30 * time.Second},
}
for _, tc := range cases {
if got := backoffDelay(tc.count); got != tc.want {
t.Errorf("backoffDelay(%d) = %s, want %s", tc.count, got, tc.want)
}
}
}

func TestReadRebuildHistoryHandlesCorruptAnnotation(t *testing.T) {
cs := &cocoonv1.CocoonSet{}
cs.Annotations = map[string]string{annotationRebuildHistory: "not-json"}
if got := readRebuildHistory(cs); len(got) != 0 {
t.Fatalf("corrupt annotation must yield empty history, got %+v", got)
}
}
Loading