Skip to content

Commit 00eabf0

Browse files
committed
KEP-5334: Image Pull Progress
1 parent 5be7941 commit 00eabf0

File tree

15 files changed

+1306
-555
lines changed

15 files changed

+1306
-555
lines changed

pkg/kubelet/container/runtime.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,7 @@ type StreamingRuntime interface {
151151
GetExec(ctx context.Context, id ContainerID, cmd []string, stdin, stdout, stderr, tty bool) (*url.URL, error)
152152
GetAttach(ctx context.Context, id ContainerID, stdin, stdout, stderr, tty bool) (*url.URL, error)
153153
GetPortForward(ctx context.Context, podName, podNamespace string, podUID types.UID, ports []int32) (*url.URL, error)
154+
GetImagePullProgress(ctx context.Context, podName, podNamespace string, podUID types.UID) (*url.URL, error)
154155
}
155156

156157
// ImageService interfaces allows to work with image service.

pkg/kubelet/container/testing/fake_runtime.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -511,6 +511,14 @@ func (f *FakeStreamingRuntime) GetPortForward(_ context.Context, podName, podNam
511511
return &url.URL{Host: FakeHost}, f.Err
512512
}
513513

514+
func (f *FakeStreamingRuntime) GetImagePullProgress(ctx context.Context, podName, podNamespace string, podUID types.UID) (*url.URL, error) {
515+
f.Lock()
516+
defer f.Unlock()
517+
518+
f.CalledFunctions = append(f.CalledFunctions, "GetImagePullProgress")
519+
return &url.URL{Host: FakeHost}, f.Err
520+
}
521+
514522
type FakeContainerCommandRunner struct {
515523
// what to return
516524
Stdout string

pkg/kubelet/kubelet_pods.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ import (
4444
utilfeature "k8s.io/apiserver/pkg/util/feature"
4545
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
4646
"k8s.io/klog/v2"
47+
"k8s.io/kubelet/pkg/cri/streaming/imagepullprogress"
4748
"k8s.io/kubelet/pkg/cri/streaming/portforward"
4849
remotecommandserver "k8s.io/kubelet/pkg/cri/streaming/remotecommand"
4950
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
@@ -2450,6 +2451,24 @@ func (kl *Kubelet) GetPortForward(ctx context.Context, podName, podNamespace str
24502451
return kl.streamingRuntime.GetPortForward(ctx, podName, podNamespace, podUID, portForwardOpts.Ports)
24512452
}
24522453

2454+
// GetImagePullProgress gets the URL the image-pull-progress will be served from, or nil if the Kubelet will serve it.
2455+
func (kl *Kubelet) GetImagePullProgress(ctx context.Context, podName, podNamespace string, podUID types.UID, portForwardOpts imagepullprogress.Options) (*url.URL, error) {
2456+
pods, err := kl.containerRuntime.GetPods(ctx, false)
2457+
if err != nil {
2458+
return nil, err
2459+
}
2460+
// Resolve and type convert back again.
2461+
// We need the static pod UID but the kubecontainer API works with types.UID.
2462+
podUID = types.UID(kl.podManager.TranslatePodUID(podUID))
2463+
podFullName := kubecontainer.BuildPodFullName(podName, podNamespace)
2464+
pod := kubecontainer.Pods(pods).FindPod(podFullName, podUID)
2465+
if pod.IsEmpty() {
2466+
return nil, fmt.Errorf("pod not found (%q)", podFullName)
2467+
}
2468+
2469+
return kl.streamingRuntime.GetImagePullProgress(ctx, podName, podNamespace, podUID)
2470+
}
2471+
24532472
// cleanupOrphanedPodCgroups removes cgroups that should no longer exist.
24542473
// it reconciles the cached state of cgroupPods with the specified list of runningPods
24552474
func (kl *Kubelet) cleanupOrphanedPodCgroups(pcm cm.PodContainerManager, cgroupPods map[types.UID]cm.CgroupName, possiblyRunningPods map[types.UID]sets.Empty) {

pkg/kubelet/kuberuntime/instrumented_services.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -272,6 +272,15 @@ func (in instrumentedRuntimeService) PortForward(ctx context.Context, req *runti
272272
return resp, err
273273
}
274274

275+
func (in instrumentedRuntimeService) ImagePullProgress(ctx context.Context, req *runtimeapi.ImagePullProgressRequest) (*runtimeapi.ImagePullProgressResponse, error) {
276+
const operation = "image_pull_progress"
277+
defer recordOperation(operation, time.Now())
278+
279+
resp, err := in.service.ImagePullProgress(ctx, req)
280+
recordError(operation, err)
281+
return resp, err
282+
}
283+
275284
func (in instrumentedRuntimeService) UpdatePodSandboxResources(ctx context.Context, req *runtimeapi.UpdatePodSandboxResourcesRequest) (*runtimeapi.UpdatePodSandboxResourcesResponse, error) {
276285
const operation = "update_podsandbox_resources"
277286
defer recordOperation(operation, time.Now())

pkg/kubelet/kuberuntime/kuberuntime_sandbox.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -380,3 +380,22 @@ func (m *kubeGenericRuntimeManager) GetPortForward(ctx context.Context, podName,
380380
}
381381
return url.Parse(resp.Url)
382382
}
383+
384+
// GetImagePullProgress gets the endpoint the runtime will serve the image-pull-progress request from.
385+
func (m *kubeGenericRuntimeManager) GetImagePullProgress(ctx context.Context, podName, podNamespace string, podUID kubetypes.UID) (*url.URL, error) {
386+
sandboxIDs, err := m.getSandboxIDByPodUID(ctx, podUID, nil)
387+
if err != nil {
388+
return nil, fmt.Errorf("failed to find sandboxID for pod %s: %v", format.PodDesc(podName, podNamespace, podUID), err)
389+
}
390+
if len(sandboxIDs) == 0 {
391+
return nil, fmt.Errorf("failed to find sandboxID for pod %s", format.PodDesc(podName, podNamespace, podUID))
392+
}
393+
req := &runtimeapi.ImagePullProgressRequest{
394+
PodSandboxId: sandboxIDs[0],
395+
}
396+
resp, err := m.runtimeService.ImagePullProgress(ctx, req)
397+
if err != nil {
398+
return nil, err
399+
}
400+
return url.Parse(resp.Url)
401+
}

pkg/kubelet/server/server_test.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -273,6 +273,21 @@ func (fk *fakeKubelet) GetPortForward(ctx context.Context, podName, podNamespace
273273
return url.Parse(resp.GetUrl())
274274
}
275275

276+
func (fk *fakeKubelet) GetPortForward(ctx context.Context, podName, podNamespace string, podUID types.UID, portForwardOpts portforward.V4Options) (*url.URL, error) {
277+
if fk.getPortForwardCheck != nil {
278+
fk.getPortForwardCheck(podName, podNamespace, podUID, portForwardOpts)
279+
}
280+
// Always use testPodSandboxID
281+
resp, err := fk.streamingRuntime.GetPortForward(&runtimeapi.PortForwardRequest{
282+
PodSandboxId: testPodSandboxID,
283+
Port: portForwardOpts.Ports,
284+
})
285+
if err != nil {
286+
return nil, err
287+
}
288+
return url.Parse(resp.GetUrl())
289+
}
290+
276291
// Unused functions
277292
func (*fakeKubelet) GetNode() (*v1.Node, error) { return nil, nil }
278293
func (*fakeKubelet) GetNodeConfig() cm.NodeConfig { return cm.NodeConfig{} }
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package imagepullprogress
18+
19+
type Layer struct {
20+
Name string `json:"name"`
21+
Progress int64 `json:"progress"`
22+
Total int64 `json:"total"`
23+
Error string `json:"error,omitempty"`
24+
}

0 commit comments

Comments
 (0)