diff --git a/components/registry-facade/pkg/registry/manifest.go b/components/registry-facade/pkg/registry/manifest.go index a3a4cdf460e838..6828113a0e8817 100644 --- a/components/registry-facade/pkg/registry/manifest.go +++ b/components/registry-facade/pkg/registry/manifest.go @@ -27,6 +27,7 @@ import ( "github.com/opentracing/opentracing-go" "github.com/pkg/errors" "golang.org/x/xerrors" + "k8s.io/apimachinery/pkg/util/wait" "github.com/gitpod-io/gitpod/common-go/log" "github.com/gitpod-io/gitpod/common-go/tracing" @@ -96,6 +97,15 @@ func (reg *Registry) handleManifest(ctx context.Context, r *http.Request) http.H return res } +// fetcherBackoffParams defines the backoff parameters for blob retrieval. +// Aiming at ~10 seconds total time for retries +var fetcherBackoffParams = wait.Backoff{ + Duration: 1 * time.Second, + Factor: 1.2, + Jitter: 0.2, + Steps: 5, +} + type manifestHandler struct { Context context.Context @@ -278,39 +288,51 @@ func DownloadConfig(ctx context.Context, fetch FetcherFunc, ref string, desc oci return nil, xerrors.Errorf("unsupported media type: %s", desc.MediaType) } + log := log.WithField("desc", desc) var opts manifestDownloadOptions for _, o := range options { o(&opts) } - var rc io.ReadCloser - if opts.Store != nil { - r, err := opts.Store.ReaderAt(ctx, desc) - if errors.Is(err, errdefs.ErrNotFound) { - // not cached yet - } else if err != nil { - log.WithError(err).WithField("desc", desc).Warn("cannot read config from store - fetching again") - } else { - defer r.Close() - rc = io.NopCloser(content.NewReader(r)) + var buf []byte + err = wait.ExponentialBackoffWithContext(ctx, fetcherBackoffParams, func(ctx context.Context) (done bool, err error) { + var rc io.ReadCloser + if opts.Store != nil { + r, err := opts.Store.ReaderAt(ctx, desc) + if errors.Is(err, errdefs.ErrNotFound) { + // not cached yet + } else if err != nil { + log.WithError(err).Warn("cannot read config from store - fetching again") + } else { + defer r.Close() + rc = io.NopCloser(content.NewReader(r)) + } } - } - if rc == nil { - fetcher, err := fetch() - if err != nil { - return nil, err + if rc == nil { + fetcher, err := fetch() + if err != nil { + log.WithError(err).Warn("cannot create fetcher") + return false, nil // retry + } + rc, err = fetcher.Fetch(ctx, desc) + if err != nil { + log.WithError(err).Warn("cannot fetch config") + return false, nil // retry + } + defer rc.Close() } - rc, err = fetcher.Fetch(ctx, desc) + + buf, err = io.ReadAll(rc) if err != nil { - return nil, xerrors.Errorf("cannot download config: %w", err) + log.WithError(err).Warn("cannot read config") + return false, nil // retry } - defer rc.Close() - } - buf, err := io.ReadAll(rc) + return true, nil + }) if err != nil { - return nil, xerrors.Errorf("cannot read config: %w", err) + return nil, xerrors.Errorf("failed to fetch config: %w", err) } var res ociv1.Image @@ -387,6 +409,8 @@ func AsFetcherFunc(f remotes.Fetcher) FetcherFunc { // DownloadManifest downloads and unmarshals the manifest of the given desc. If the desc points to manifest list // we choose the first manifest in that list. func DownloadManifest(ctx context.Context, fetch FetcherFunc, desc ociv1.Descriptor, options ...ManifestDownloadOption) (cfg *ociv1.Manifest, rdesc *ociv1.Descriptor, err error) { + log := log.WithField("desc", desc) + var opts manifestDownloadOptions for _, o := range options { o(&opts) @@ -394,61 +418,71 @@ func DownloadManifest(ctx context.Context, fetch FetcherFunc, desc ociv1.Descrip var ( placeInStore bool - rc io.ReadCloser mediaType = desc.MediaType + inpt []byte ) - if opts.Store != nil { - func() { - nfo, err := opts.Store.Info(ctx, desc.Digest) - if errors.Is(err, errdefs.ErrNotFound) { - // not in store yet - return - } + err = wait.ExponentialBackoffWithContext(ctx, fetcherBackoffParams, func(ctx context.Context) (done bool, err error) { + var rc io.ReadCloser + if opts.Store != nil { + func() { + nfo, err := opts.Store.Info(ctx, desc.Digest) + if errors.Is(err, errdefs.ErrNotFound) { + // not in store yet + return + } + if err != nil { + log.WithError(err).Warn("cannot get manifest from store") + return + } + if nfo.Labels["Content-Type"] == "" { + // we have broken data in the store - ignore it and overwrite + return + } + + r, err := opts.Store.ReaderAt(ctx, desc) + if errors.Is(err, errdefs.ErrNotFound) { + // not in store yet + return + } + if err != nil { + log.WithError(err).Warn("cannot get manifest from store") + return + } + + mediaType, rc = nfo.Labels["Content-Type"], &reader{ReaderAt: r} + }() + } + if rc == nil { + // did not find in store, or there was no store. Either way, let's fetch this + // thing from the remote. + placeInStore = true + + var fetcher remotes.Fetcher + fetcher, err = fetch() if err != nil { - log.WithError(err).WithField("desc", desc).Warn("cannot get manifest from store") - return - } - if nfo.Labels["Content-Type"] == "" { - // we have broken data in the store - ignore it and overwrite - return + log.WithError(err).Warn("cannot create fetcher") + return false, nil // retry } - r, err := opts.Store.ReaderAt(ctx, desc) - if errors.Is(err, errdefs.ErrNotFound) { - // not in store yet - return - } + rc, err = fetcher.Fetch(ctx, desc) if err != nil { - log.WithError(err).WithField("desc", desc).Warn("cannot get manifest from store") - return + log.WithError(err).Warn("cannot fetch manifest") + return false, nil // retry } - - mediaType, rc = nfo.Labels["Content-Type"], &reader{ReaderAt: r} - }() - } - if rc == nil { - // did not find in store, or there was no store. Either way, let's fetch this - // thing from the remote. - placeInStore = true - - var fetcher remotes.Fetcher - fetcher, err = fetch() - if err != nil { - return + mediaType = desc.MediaType } - rc, err = fetcher.Fetch(ctx, desc) + inpt, err = io.ReadAll(rc) + rc.Close() if err != nil { - err = xerrors.Errorf("cannot fetch manifest: %w", err) - return + log.WithError(err).Warn("cannot read manifest") + return false, nil // retry } - mediaType = desc.MediaType - } - inpt, err := io.ReadAll(rc) - rc.Close() + return true, nil + }) if err != nil { - err = xerrors.Errorf("cannot download manifest: %w", err) + err = xerrors.Errorf("failed to fetch manifest: %w", err) return } @@ -457,7 +491,8 @@ func DownloadManifest(ctx context.Context, fetch FetcherFunc, desc ociv1.Descrip switch rdesc.MediaType { case images.MediaTypeDockerSchema2ManifestList, ociv1.MediaTypeImageIndex: - log.WithField("desc", rdesc).Debug("resolving image index") + log := log.WithField("desc", rdesc) + log.Debug("resolving image index") // we received a manifest list which means we'll pick the default platform // and fetch that manifest @@ -472,24 +507,34 @@ func DownloadManifest(ctx context.Context, fetch FetcherFunc, desc ociv1.Descrip return } - var fetcher remotes.Fetcher - fetcher, err = fetch() - if err != nil { - return - } + err = wait.ExponentialBackoffWithContext(ctx, fetcherBackoffParams, func(ctx context.Context) (done bool, err error) { + var fetcher remotes.Fetcher + fetcher, err = fetch() + if err != nil { + log.WithError(err).Warn("cannot create fetcher") + return false, nil // retry + } - // TODO(cw): choose by platform, not just the first manifest - md := list.Manifests[0] - rc, err = fetcher.Fetch(ctx, md) - if err != nil { - err = xerrors.Errorf("cannot download config: %w", err) - return - } - rdesc = &md - inpt, err = io.ReadAll(rc) - rc.Close() + // TODO(cw): choose by platform, not just the first manifest + var rc io.ReadCloser + md := list.Manifests[0] + rc, err = fetcher.Fetch(ctx, md) + if err != nil { + log.WithError(err).Warn("cannot download config") + return false, nil // retry + } + rdesc = &md + inpt, err = io.ReadAll(rc) + rc.Close() + if err != nil { + log.WithError(err).Warn("cannot download manifest") + return false, nil // retry + } + + return true, nil + }) if err != nil { - err = xerrors.Errorf("cannot download manifest: %w", err) + err = xerrors.Errorf("failed to download config: %w", err) return } } diff --git a/components/registry-facade/pkg/registry/manifest_test.go b/components/registry-facade/pkg/registry/manifest_test.go index ec790c97704348..30bcae951bdb0b 100644 --- a/components/registry-facade/pkg/registry/manifest_test.go +++ b/components/registry-facade/pkg/registry/manifest_test.go @@ -7,14 +7,23 @@ package registry import ( "context" "encoding/json" + "errors" "fmt" + "io" + "strings" + "syscall" "testing" + "time" "github.com/containerd/containerd/content" "github.com/containerd/containerd/errdefs" + "github.com/containerd/containerd/remotes" "github.com/opencontainers/go-digest" "github.com/opencontainers/image-spec/specs-go" ociv1 "github.com/opencontainers/image-spec/specs-go/v1" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "k8s.io/apimachinery/pkg/util/wait" ) func TestDownloadManifest(t *testing.T) { @@ -140,3 +149,153 @@ func (fbs *misbehavingStore) Writer(ctx context.Context, opts ...content.WriterO func (fbs *misbehavingStore) Info(ctx context.Context, dgst digest.Digest) (content.Info, error) { return content.Info{}, fmt.Errorf("you wish") } + +// manifestFailingReader is a reader that fails after a certain point. +type manifestFailingReader struct { + reader io.Reader + failAfterBytes int + failError error + bytesRead int +} + +func (fr *manifestFailingReader) Read(p []byte) (n int, err error) { + if fr.bytesRead >= fr.failAfterBytes { + return 0, fr.failError + } + n, err = fr.reader.Read(p) + if err != nil { + return n, err + } + fr.bytesRead += n + if fr.bytesRead >= fr.failAfterBytes { + // Return the error, but also the bytes read in this call. + return n, fr.failError + } + return n, nil +} + +func (fr *manifestFailingReader) Close() error { + return nil +} + +type mockFetcher struct { + // How many times Fetch should fail before succeeding. + failCount int + // The error to return on failure. + failError error + + // Internal counter for calls. + callCount int + // The data to return on success. + successData string + + // Whether to use a reader that fails mid-stream on the first call. + failReaderOnFirstCall bool + // The number of bytes to read successfully before the reader fails. + failAfterBytes int +} + +func (m *mockFetcher) Fetch(ctx context.Context, desc ociv1.Descriptor) (io.ReadCloser, error) { + m.callCount++ + if m.callCount <= m.failCount { + return nil, m.failError + } + + if m.failReaderOnFirstCall && m.callCount == 1 { + return &manifestFailingReader{ + reader: strings.NewReader(m.successData), + failAfterBytes: m.failAfterBytes, + failError: m.failError, + }, nil + } + + return io.NopCloser(strings.NewReader(m.successData)), nil +} + +func TestDownloadManifest_RetryOnReadAll(t *testing.T) { + // Arrange + mockFetcher := &mockFetcher{ + failCount: 0, // Fetch succeeds immediately + failReaderOnFirstCall: true, + failAfterBytes: 5, + failError: syscall.EPIPE, + successData: `{"schemaVersion": 2, "mediaType": "application/vnd.oci.image.manifest.v1+json"}`, + } + + fetcherFunc := func() (remotes.Fetcher, error) { + return mockFetcher, nil + } + + // Use short backoff for testing + originalBackoff := fetcherBackoffParams + fetcherBackoffParams = wait.Backoff{ + Duration: 1 * time.Millisecond, + Steps: 3, + } + defer func() { fetcherBackoffParams = originalBackoff }() + + // Act + _, _, err := DownloadManifest(context.Background(), fetcherFunc, ociv1.Descriptor{MediaType: ociv1.MediaTypeImageManifest}) + + // Assert + require.NoError(t, err) + assert.Equal(t, 2, mockFetcher.callCount, "Expected Fetch to be called twice (1st succeeds, read fails, 2nd succeeds)") +} + +func TestDownloadConfig_RetryOnReadAll(t *testing.T) { + // Arrange + mockFetcher := &mockFetcher{ + failCount: 0, // Fetch succeeds immediately + failReaderOnFirstCall: true, + failAfterBytes: 5, + failError: syscall.EPIPE, + successData: `{"architecture": "amd64", "os": "linux"}`, + } + + fetcherFunc := func() (remotes.Fetcher, error) { + return mockFetcher, nil + } + + // Use short backoff for testing + originalBackoff := fetcherBackoffParams + fetcherBackoffParams = wait.Backoff{ + Duration: 1 * time.Millisecond, + Steps: 3, + } + defer func() { fetcherBackoffParams = originalBackoff }() + + // Act + _, err := DownloadConfig(context.Background(), fetcherFunc, "ref", ociv1.Descriptor{MediaType: ociv1.MediaTypeImageConfig}) + + // Assert + require.NoError(t, err) + assert.Equal(t, 2, mockFetcher.callCount, "Expected Fetch to be called twice (1st succeeds, read fails, 2nd succeeds)") +} + +func TestDownloadManifest_RetryOnFetch(t *testing.T) { + // Arrange + mockFetcher := &mockFetcher{ + failCount: 2, + failError: errors.New("transient network error"), + successData: `{"schemaVersion": 2, "mediaType": "application/vnd.oci.image.manifest.v1+json"}`, + } + + fetcherFunc := func() (remotes.Fetcher, error) { + return mockFetcher, nil + } + + // Use short backoff for testing + originalBackoff := fetcherBackoffParams + fetcherBackoffParams = wait.Backoff{ + Duration: 1 * time.Millisecond, + Steps: 3, + } + defer func() { fetcherBackoffParams = originalBackoff }() + + // Act + _, _, err := DownloadManifest(context.Background(), fetcherFunc, ociv1.Descriptor{MediaType: ociv1.MediaTypeImageManifest}) + + // Assert + require.NoError(t, err) + assert.Equal(t, 3, mockFetcher.callCount, "Expected Fetch to be called 3 times (2 failures + 1 success)") +}