Skip to content

KEP 4680: Update README.md for DRA #5302

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
186 changes: 140 additions & 46 deletions keps/sig-node/4680-add-resource-health-to-pod-status/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -285,53 +285,113 @@ We should consider introducing another field to the Status that will be a free f

### DRA implementation details

Today DRA does not return the health of the device back to kubelet. The proposal is to extend the
type `NamedResourcesInstance` (from [pkg/apis/resource/namedresources.go](https://github.com/kubernetes/kubernetes/blob/790dfdbe386e4a115f41d38058c127d2dd0e6f44/pkg/apis/resource/namedresources.go#L29-L37)) to include the Health field the same way it is done in
the Device Plugin as well as a device ID.
Today DRA does not return the health of the device back to kubelet. In `1.30` we had a `ListAndWatch()` API similar to DevicePlugin, from which we could have inferred device health. However, this API is being removed in `1.31`, necessitating a new approach for health monitoring.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In 1.30 we had a ListAndWatch() API similar to DevicePlugin, from which we could have inferred device health.

Did we? Any pointers?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nvm, found it. NodeListAndWatchResources


The following design outlines how Kubelet will obtain health information from DRA plugins and use it to update the PodStatus. This design focuses on an optional, proactive health reporting mechanism from DRA plugins.

**High-Level Architectural Approach for DRA Health**

1. **Optional gRPC Stream:** A new, optional gRPC service (`NodeHealth`) is defined within a dedicated API group (`dra-health/v1alpha1`). DRA plugins can implement this service. It exposes a server-streaming RPC (`WatchResources`) allowing the plugin to proactively send health updates for its managed devices to Kubelet.
2. **Health Information Cache:** Kubelet's DRA Manager will maintain a persistent cache (`healthInfoCache`) storing the latest known health status (Healthy, Unhealthy, Unknown) and a `LastUpdated` timestamp for each device, keyed by driver name and device identifiers. This cache handles state reconciliation, timeouts for stale data, and persists across Kubelet restarts.
3. **Kubelet Integration:** Kubelet's DRA Manager (`pkg/kubelet/cm/dra/manager.go`) acts as the gRPC client. Upon plugin registration, it attempts to initiate the `WatchResources` stream. If successful, it consumes the health stream, updates the `healthInfoCache`, identifies which Pods are affected by reported health changes (using its existing `claimInfoCache`), and signals the Kubelet status manager via a dedicated update channel.
4. **Update Channel Aggregation:** The Container Manager (`pkg/kubelet/container_manager_linux.go`) merges update signals from the DRA Manager's update channel (if the DRA health feature is enabled) and the Device Manager's existing update channel into a single channel (`Updates()`) monitored by the Kubelet status manager.
5. **PodStatus Update:** Kubelet's pod synchronization logic, upon receiving an update signal or during other Pod updates, calls the Container Manager's `UpdateAllocatedResourcesStatus` method. This, in turn, calls the DRA Manager's `UpdateAllocatedResourcesStatus` method, which reads the current health status for the Pod's allocated DRA devices from `healthInfoCache` and populates the `pod.Status.ContainerStatuses[].AllocatedResourcesStatus` field (and `pod.Status.AllocatedResourcesStatus` for pod-level resources in alpha2) with the correct health information.

*Note: Kubelet will ONLY currently use this health information to update the Pod Status. The DRA plugin remains responsible for updating ResourceSlices to taint resources or mark them unhealthy through other mechanisms if needed.*

**Core Components and Logic for DRA Health**

1. **DRA Health Information Cache (`pkg/kubelet/cm/dra/healthinfo.go`)**
* **Purpose:** To maintain the source of truth for device health within Kubelet, reconciling reported data and handling missing/stale information.
* **Data Structure:**
* `healthInfoCache`: Holds `HealthInfo` (e.g., `*state.DevicesHealthMap`) and state file persistence information.
* `state.DevicesHealthMap`: `map[string]DriverHealthState` (keyed by driver name).
* `state.DriverHealthState`: Contains `Devices []DeviceHealth`.
* `state.DeviceHealth`: Contains `PoolName string`, `DeviceName string`, `Health state.DeviceHealthString`, `LastUpdated time.Time`.
* `state.DeviceHealthString`: A string type with values "Healthy", "Unhealthy", "Unknown".
* **Key Features & Functions:**
* `newHealthInfoCache(stateFile)`: Initializes cache, loads from checkpoint file if it exists.
* `updateHealthInfo(driverName, devices)`: Performs full-state reconciliation for a driver based on a `WatchResourcesResponse`. Updates `LastUpdated` timestamps. Marks devices not present in the update as "Unknown" if `time.Since(LastUpdated) > healthTimeout`. Preserves existing health status if within timeout. Returns information about changed devices.
* `getHealthInfo(driverName, poolName, deviceName)`: Returns current health, considering `healthTimeout`. If `time.Since(device.LastUpdated) > healthTimeout`, it returns "Unknown".
* `clearDriver(driverName)`: Removes all data for a driver (e.g., on deregistration or stream end).
* `saveToCheckpoint()` / `loadFromCheckpoint()`: Handles persistence via JSON marshalling.
* `healthTimeout`: A constant (e.g., 30s) for marking stale devices "Unknown".
* **Handling Timeouts and 'Unknown' State:**
* "Unknown" status due to timeout is determined on-demand by `getHealthInfo`.
* `updateHealthInfo` also actively sets "Unknown" for devices previously known but not in the latest update from the plugin, if their `LastUpdated` timestamp exceeds `healthTimeout`.
* No separate background expiration worker is required; Kubelet's existing status manager update triggers will naturally reflect timeouts when `getHealthInfo` is called.
* If a plugin stops sending updates for all its devices (e.g., plugin crashes, stream ends), `HandleWatchResourcesStream` calls `clearDriver`, and subsequent calls to `getHealthInfo` for these devices will return "Unknown".

2. **gRPC API for DRA Device Health (`staging/src/k8s.io/kubelet/pkg/apis/dra-health/v1alpha1/api.proto`)**
* A new API group `dra-health/v1alpha1` is introduced.
* Defines a `NodeHealth` service:
```proto
service NodeHealth {
// WatchResources allows a DRA plugin to stream health updates for its devices to Kubelet.
// Kubelet calls this method, and the plugin streams responses.
// This method is optional; if not implemented by a plugin, Kubelet will assume
// devices managed by that plugin have an "Unknown" health status.
rpc WatchResources(WatchResourcesRequest) returns (stream WatchResourcesResponse) {}
}

message WatchResourcesRequest {
// Reserved for future use, e.g., filtering or options.
}

message WatchResourcesResponse {
// A list of all devices managed by the plugin for which health is being reported.
// This should be a complete list for the driver; Kubelet will reconcile this state.
repeated DeviceHealth devices = 1;
}

message DeviceHealth {
// The name of the resource pool this device belongs to.
// Required.
string pool_name = 1;
// The unique name of the device within the pool.
// Required.
string device_name = 2;
// Health status of the device.
// Expected values: "Healthy", "Unhealthy", "Unknown".
// Required.
string health_status = 3;
// Timestamp of when this health status was last determined by the plugin, as a Unix timestamp (seconds).
// Required.
int64 last_updated_timestamp = 4;
// Optional: A globally unique resource name, e.g., <driver>/<pool>/<device>.
// Useful for correlating with other systems.
string resource_name = 5;
}
```
* This gRPC service is kept separate from the core DRA API (`Node` service in `k8s.io/kubelet/pkg/apis/dra/v1alphaX`) to signify its optionality.
* If a plugin does not implement this service, plugin registration will continue. If device health is queried for such a plugin (e.g., during a call to `UpdateAllocatedResourcesStatus`), its devices will report an "Unknown" health state.

3. **Kubelet DRA Plugin Client and Registration for Health Monitoring (`pkg/kubelet/cm/dra/plugin/plugin.go`, `pkg/kubelet/cm/dra/plugin/registration.go`)**
* **Plugin Client Logic:**
* The `Plugin` struct will hold a `healthClient` (e.g., `drahealthv1alpha1.NodeHealthClient`), `healthStreamCtx (context.Context)`, and `healthStreamCancel (context.CancelFunc)`.
* `getOrCreateGRPCConn()`: Initializes the `healthClient` after gRPC connection establishment.
* `WatchResources(ctx)`: A method on the plugin client, called by the registration handler, to initiate the gRPC stream via `healthClient.WatchResources()`.
* `SetHealthStream`, `HealthStreamCancel`: Manage the context and cancellation for the health stream goroutine.
* **Registration:**
* The `RegistrationHandler` will be extended to handle the health stream.
* `RegisterPlugin`: After successful registration with the core DRA API, it will attempt to call `pluginInstance.WatchResources()`.
* If an `Unimplemented` error is received, Kubelet logs this and continues registration without health monitoring for that plugin. PodStatus will not be populated with health for this driver's resources (will default to "Unknown").
* On Success: It stores the stream context/cancel func using `pluginInstance.SetHealthStream()` and starts a background goroutine (e.g., `go h.streamHandler.HandleWatchResourcesStream(streamCtx, stream, pluginName)` via an interface).
* On Replacement of a plugin: The `HealthStreamCancel()` function for the old plugin instance is called to stop its health stream.
* **Deregistration:**
* `DeRegisterPlugin`: Calls `p.HealthStreamCancel()` to stop the health stream goroutine and cleans up related plugin resources.

4. **Kubelet DRA Manager and Container Manager Integration (`pkg/kubelet/cm/dra/manager.go`, `pkg/kubelet/cm/container_manager_linux.go`)**
* **DRA Manager (`pkg/kubelet/cm/dra/manager.go`):**
* `ManagerImpl` will implement a `plugin.StreamHandler` interface (or similar). It holds the `healthInfoCache`, a `healthInfoMutex`, and an `update chan resourceupdates.Update`.
* `HandleWatchResourcesStream(ctx context.Context, stream drahealthv1alpha1.NodeHealth_WatchResourcesClient, pluginName string)`: This function runs in a goroutine for each plugin that supports health monitoring. It receives health updates from the stream, calls `healthInfoCache.updateHealthInfo()`. If `updateHealthInfo` indicates devices changed state, it finds affected Pod UIDs from its `claimInfoCache` and sends these UIDs to its `update` channel (non-blocking). It logs errors/cancellation and exits. A `defer` function will be added to clean the cache for the specific driver if the stream ends or an error occurs (e.g., call `healthInfoCache.clearDriver(pluginName)`).
* `Updates() <-chan resourceupdates.Update`: Implemented method that returns the DRA manager's `m.update` channel.
* `UpdateAllocatedResourcesStatus(pod, status)`: Reads health for the pod's allocated DRA devices from `healthInfoCache` (using `getHealthInfo`), formats it into `v1.ResourceStatus` (with Name = claim name, Resources = slice of `v1.ResourceHealth`), and updates the input `PodStatus`.
* `GetWatcherHandler()`: Instantiates `plugin.NewRegistrationHandler`, passing the `ManagerImpl` instance (which implements the stream handling logic) as the `plugin.StreamHandler`.
* **Container Manager (`pkg/kubelet/cm/container_manager_linux.go`):**
* `Updates()`: Merges the channel returned by `m.draManager.Updates()` (if DRA health is enabled) with the `deviceManager`'s update channel into a single output channel consumed by the main Kubelet sync loop.
* `UpdateAllocatedResourcesStatus()`: Calls the device manager's update method, then calls the DRA manager's `UpdateAllocatedResourcesStatus` method (guarded by the feature gate).

In `1.30` we had a similar `ListAndWatch()` API as in DevicePlugin, from which we could have inferred something very analogous to the above. However, we are removing this in `1.31`, so will need to provide something different.

An optional gRPC interface will be created, so DRA drivers can opt into this by implementing it. The interface will allow a plugin to stream health status information in the form of deviceIDs (of the form `<driver name>/<pool name>/<device name>`) along with extra metadata indicating its health status. Just as before, a device completely disappearing would still need to trigger some state change, but now more detailed information could be attached in the form of metadata when a device isn't necessarily gone, but also isn't operating as it should be.

The API will be limited to "prepared" devices and include the claim `name/namespace/UID`. That should be enough information for kubelet to correlate with the pods for which the claim was prepared and then post that information for those pods.

Kubelet will react on this field the same way as we propose to do it for the Device Plugin.

The new method will be added to the same gRPC server that serves the [Node service](https://github.com/kubernetes/kubernetes/blob/04bba3c222bb2c5b1b1565713de4bf334ee7fbe4/staging/src/k8s.io/kubelet/pkg/apis/dra/v1alpha4/api.proto#L34) interface (Node service exposes
`NodePrepareResources` and `NodeUnprepareResources`). The new interface will have a `Device` structure similar to Node Service's device, with the added `health` field:

``` proto
service NodeHealth {
...

// WatchDevicesStatus returns a stream of List of Devices
// Whenever a Device state change or a Device disappears, WatchDevicesStatus
// returns the new list.
// This method is optional and may not be implemented.
rpc WatchDevicesStatus(Empty) returns (stream DevicesStatusResponse) {}
}

// ListAndWatch returns a stream of List of Devices
// Whenever a Device state change or a Device disappears, ListAndWatch
// returns the new list
message DevicesStatusResponse {
repeated Device devices = 1;
}

message Device {
... existing fields ...
// The device itself. Required.
string device_name = 3;
... existing fields ...

// Health of the device, can be Healthy or Unhealthy.
string Health = 5;
}
```

Implementation will ignore the `Unimplemented` error when the DRA plugin doesn't have this interface implemented.

Note, the gRPC details are still a subject to change and will go thru API review during the implementation.

### Test Plan

Expand All @@ -350,6 +410,40 @@ Device Plugin and DRA are relatively new features and have a reasonable test cov
- `k8s.io/kubernetes/pkg/kubelet/cm/dra/plugin`: `5/31/2024` - `34`
- `k8s.io/kubernetes/pkg/kubelet/cm/dra/state`: `5/31/2024` - `98`

The following areas will have thorough unit test coverage:
- `pkg/kubelet/cm/dra/healthinfo.go`:
- Cache initialization (new, load from checkpoint, file not found).
- `updateHealthInfo`:
- Correct reconciliation of device lists.
- `LastUpdated` timestamp updates.
- Marking devices not in update as "Unknown" after `healthTimeout`.
- Preserving health status within `healthTimeout`.
- Correct identification of `changedDevices`.
- `getHealthInfo`:
- Returning correct status for existing devices.
- Returning "Unknown" for timed-out devices.
- Returning "Unknown" for non-existent devices/drivers.
- `clearDriver`: Correct removal of driver data.
- Persistence: `saveToCheckpoint` and `loadFromCheckpoint` logic.
- `pkg/kubelet/cm/dra/plugin/registration.go` (and related client logic in `plugin.go`):
- Successful health stream start and goroutine launch.
- Handling of `Unimplemented` error from plugin for `WatchResources`.
- Correct cancellation of health stream on plugin replacement or deregistration.
- Error handling during stream initiation.
- `pkg/kubelet/cm/dra/manager.go`:
- `HandleWatchResourcesStream`:
- Correctly processing messages from the stream.
- Calling `healthInfoCache.updateHealthInfo`.
- Finding affected Pod UIDs from `claimInfoCache` when health changes.
- Sending Pod UIDs to the update channel.
- Error handling, stream termination, and cache cleanup (`clearDriver`).
- `UpdateAllocatedResourcesStatus`:
- Correctly reading from `healthInfoCache`.
- Formatting `v1.ResourceStatus` accurately.
- Populating `PodStatus` for both container-level and pod-level DRA resources.
- `GetWatcherHandler` and `Updates` channel logic.


##### Integration tests

N/A
Expand Down