Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 46 additions & 80 deletions pkg/apk/apk/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,66 +32,60 @@ import (
"chainguard.dev/apko/pkg/paths"
)

type flightCache[T any] struct {
flight *singleflight.Group
cache *sync.Map
// newCoalescingCache creates a new coalescingCache.
func newCoalescingCache[K comparable, V any]() *coalescingCache[K, V] {
return &coalescingCache[K, V]{
cache: make(map[K]func() (V, error)),
}
}

// TODO: Consider [K, V] if we need a non-string key type.
func newFlightCache[T any]() *flightCache[T] {
return &flightCache[T]{
flight: &singleflight.Group{},
cache: &sync.Map{},
}
// coalescingCache combines singleflight's coalescing with a cache.
type coalescingCache[K comparable, V any] struct {
mux sync.RWMutex
cache map[K]func() (V, error)
}

// Do returns coalesces multiple calls, like singleflight, but also caches
// the result if the call is successful. Failures are not cached to avoid
// permanently failing for transient errors.
func (f *flightCache[T]) Do(key string, fn func() (T, error)) (T, error) {
v, ok := f.cache.Load(key)
if ok {
if t, ok := v.(T); ok {
return t, nil
} else {
// This can't happen but just in case things change.
return t, fmt.Errorf("unexpected type %T", v)
}
// the result if the call is successful.
// Failures are not cached to avoid permanently failing for transient errors.
func (f *coalescingCache[K, V]) Do(key K, fn func() (V, error)) (V, error) {
f.mux.RLock()
if v, ok := f.cache[key]; ok {
f.mux.RUnlock()
return v()
}
f.mux.RUnlock()

v, err, _ := f.flight.Do(key, func() (interface{}, error) {
if v, ok := f.cache.Load(key); ok {
return v, nil
}
f.mux.Lock()

// Don't cache errors, but maybe we should.
v, err := fn()
// Doubly-checked-locking in case of race conditions.
if v, ok := f.cache[key]; ok {
f.mux.Unlock()
return v()
}

v := sync.OnceValues(func() (V, error) {
ret, err := fn()
if err != nil {
return nil, err
// We've put this value into the cache before executing it, so we need to remove it
// to avoid caching errors.
f.mux.Lock()
delete(f.cache, key)
f.mux.Unlock()
}

f.cache.Store(key, v)

return v, nil
return ret, err
})
f.cache[key] = v
f.mux.Unlock()

t, ok := v.(T)
if err != nil {
return t, err
}
if !ok {
// This can't happen but just in case things change.
return t, fmt.Errorf("unexpected type %T", v)
}
return t, nil
return v()
}

type Cache struct {
etagCache *sync.Map
headFlight *singleflight.Group
getFlight *singleflight.Group
getFlight *singleflight.Group

discoverKeys *flightCache[[]Key]
etags *coalescingCache[string, *http.Response]
discoverKeys *coalescingCache[string, []Key]
}

// NewCache returns a new Cache, which allows us to persist the results of HEAD requests
Expand All @@ -107,39 +101,18 @@ type Cache struct {
// requests for the same resource when passing etag=false.
func NewCache(etag bool) *Cache {
c := &Cache{
headFlight: &singleflight.Group{},
getFlight: &singleflight.Group{},
discoverKeys: newFlightCache[[]Key](),
discoverKeys: newCoalescingCache[string, []Key](),
}

if etag {
c.etagCache = &sync.Map{}
//nolint:bodyclose // Requests are closed as they're put into the cache already.
c.etags = newCoalescingCache[string, *http.Response]()
}

return c
}

func (c *Cache) load(cacheFile string) (*http.Response, bool) {
if c == nil || c.etagCache == nil {
return nil, false
}

v, ok := c.etagCache.Load(cacheFile)
if !ok {
return nil, false
}

return v.(*http.Response), true
}

func (c *Cache) store(cacheFile string, resp *http.Response) {
if c == nil || c.etagCache == nil {
return
}

c.etagCache.Store(cacheFile, resp)
}

// cache
type cache struct {
dir string
Expand Down Expand Up @@ -209,12 +182,7 @@ func (t *cacheTransport) RoundTrip(request *http.Request) (*http.Response, error
}

func (t *cacheTransport) head(request *http.Request, cacheFile string) (*http.Response, error) {
resp, ok := t.cache.load(cacheFile)
if ok {
return resp, nil
}

v, err, _ := t.cache.headFlight.Do(cacheFile, func() (interface{}, error) {
fetch := func() (*http.Response, error) {
req := request.Clone(request.Context())
req.Method = http.MethodHead
resp, err := t.wrapped.Do(req)
Expand All @@ -225,15 +193,13 @@ func (t *cacheTransport) head(request *http.Request, cacheFile string) (*http.Re
// HEAD shouldn't have a body. Make sure we close it so we can reuse the connection.
defer resp.Body.Close()

t.cache.store(cacheFile, resp)

return resp, nil
})
if err != nil {
return nil, err
}

return v.(*http.Response), nil
if t.cache.etags != nil {
return t.cache.etags.Do(cacheFile, fetch)
}
return fetch()
}

func (t *cacheTransport) get(ctx context.Context, request *http.Request, cacheFile, initialEtag string) (string, error) {
Expand Down
60 changes: 60 additions & 0 deletions pkg/apk/apk/cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright 2025 Chainguard, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package apk

import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestFlightCache(t *testing.T) {
s := newCoalescingCache[string, int]()
var called int
r1, err := s.Do("test", func() (int, error) {
called++
return 42, nil
})
require.NoError(t, err)

r2, err := s.Do("test", func() (int, error) {
called++
return 1337, nil
})
require.NoError(t, err)

require.Equal(t, r1, r2)
require.Equal(t, 1, called, "Function should only be called once")
}

func TestFlightCacheCachesNoErrors(t *testing.T) {
s := newCoalescingCache[string, int]()
var called int
_, err := s.Do("test", func() (int, error) {
called++
return 42, assert.AnError
})
require.ErrorIs(t, assert.AnError, err)

r2, err := s.Do("test", func() (int, error) {
called++
return 1337, nil
})
require.NoError(t, err)

require.Equal(t, 1337, r2)
require.Equal(t, 2, called, "Function should be called twice, once for the error and once for the success")
}
41 changes: 4 additions & 37 deletions pkg/apk/apk/implementation.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import (
"runtime"
"slices"
"strings"
"sync"
"time"

"github.com/hashicorp/go-retryablehttp"
Expand All @@ -63,7 +62,7 @@ import (
// This is terrible but simpler than plumbing around a cache for now.
// We just hold the expanded APK in memory rather than re-parsing it every time,
// which is expensive. This also dedupes simultaneous fetches.
var globalApkCache = &apkCache{}
var globalApkCache = newCoalescingCache[string, *expandapk.APKExpanded]()

type APK struct {
arch string
Expand Down Expand Up @@ -1226,40 +1225,6 @@ func (a *APK) cachedPackage(ctx context.Context, pkg InstallablePackage, cacheDi
return &exp, nil
}

type apkResult struct {
exp *expandapk.APKExpanded
err error
}

type apkCache struct {
// url -> *sync.Once
onces sync.Map

// url -> apkResult
resps sync.Map
}

func (c *apkCache) get(ctx context.Context, a *APK, pkg InstallablePackage) (*expandapk.APKExpanded, error) {
u := pkg.URL()
// Do all the expensive things inside the once.
once, _ := c.onces.LoadOrStore(u, &sync.Once{})
once.(*sync.Once).Do(func() {
exp, err := expandPackage(ctx, a, pkg)
c.resps.Store(u, apkResult{
exp: exp,
err: err,
})
})

v, ok := c.resps.Load(u)
if !ok {
panic(fmt.Errorf("did not see apk %q after writing it", u))
}

result := v.(apkResult)
return result.exp, result.err
}

func (a *APK) expandPackage(ctx context.Context, pkg InstallablePackage) (*expandapk.APKExpanded, error) {
if a.cache == nil {
// If we don't have a cache configured, don't use the global cache.
Expand All @@ -1269,7 +1234,9 @@ func (a *APK) expandPackage(ctx context.Context, pkg InstallablePackage) (*expan
return expandPackage(ctx, a, pkg)
}

return globalApkCache.get(ctx, a, pkg)
return globalApkCache.Do(pkg.URL(), func() (*expandapk.APKExpanded, error) {
return expandPackage(ctx, a, pkg)
})
}

func expandPackage(ctx context.Context, a *APK, pkg InstallablePackage) (*expandapk.APKExpanded, error) {
Expand Down
Loading