Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions pkg/worker/image.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,9 @@ type ImageClient struct {
logger *ContainerLogger
// Cache source image references for v2 images (imageId -> sourceImageRef)
v2ImageRefs *common.SafeMap[string]
// Cache image metadata from CLIP archives for v2 images (imageId -> CLIP ImageMetadata)
// We use CLIP's metadata directly as the source of truth rather than converting to beta9's format
clipImageMetadata *common.SafeMap[*clipCommon.ImageMetadata]
}

func NewImageClient(config types.AppConfig, workerId string, workerRepoClient pb.WorkerRepositoryServiceClient, fileCacheManager *FileCacheManager) (*ImageClient, error) {
Expand All @@ -151,6 +154,7 @@ func NewImageClient(config types.AppConfig, workerId string, workerRepoClient pb
skopeoClient: common.NewSkopeoClient(config),
mountedFuseServers: common.NewSafeMap[*fuse.Server](),
v2ImageRefs: common.NewSafeMap[string](),
clipImageMetadata: common.NewSafeMap[*clipCommon.ImageMetadata](),
logger: &ContainerLogger{
logLinesPerHour: config.Worker.ContainerLogLinesPerHour,
},
Expand Down Expand Up @@ -298,6 +302,13 @@ func (c *ImageClient) PullLazy(ctx context.Context, request *types.ContainerRequ
c.v2ImageRefs.Set(imageId, sourceRef)
log.Info().Str("image_id", imageId).Str("source_image", sourceRef).Msg("cached source image reference from clip metadata")
}

// Extract and cache embedded image metadata to avoid runtime lookups
// We store CLIP's metadata directly as the source of truth
if ociInfo.ImageMetadata != nil {
c.clipImageMetadata.Set(imageId, ociInfo.ImageMetadata)
log.Info().Str("image_id", imageId).Msg("cached image metadata from clip archive")
}
}
}
}
Expand Down Expand Up @@ -412,6 +423,12 @@ func (c *ImageClient) GetSourceImageRef(imageId string) (string, bool) {
return c.v2ImageRefs.Get(imageId)
}

// GetCLIPImageMetadata retrieves the cached CLIP image metadata for a v2 image
// Returns the CLIP metadata directly (source of truth) and a boolean indicating if it was found
func (c *ImageClient) GetCLIPImageMetadata(imageId string) (*clipCommon.ImageMetadata, bool) {
return c.clipImageMetadata.Get(imageId)
}

// createCredentialProvider creates a CLIP credential provider from JSON credentials
// Credentials are expected in JSON format: {"registry":"...","type":"...","credentials":{...}}
// This format is used for both build and runtime containers
Expand Down
51 changes: 45 additions & 6 deletions pkg/worker/lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/beam-cloud/beta9/pkg/storage"
types "github.com/beam-cloud/beta9/pkg/types"
pb "github.com/beam-cloud/beta9/proto"
clipCommon "github.com/beam-cloud/clip/pkg/common"
goproc "github.com/beam-cloud/goproc/pkg"
"tags.cncf.io/container-device-interface/pkg/cdi"

Expand Down Expand Up @@ -384,14 +385,22 @@ func (s *Worker) readBundleConfig(request *types.ContainerRequest) (*specs.Spec,
// deriveSpecFromSourceImage creates an OCI spec from the source image metadata.
// This is used for v2 images where we don't have an unpacked bundle with config.json.
func (s *Worker) deriveSpecFromSourceImage(request *types.ContainerRequest) (*specs.Spec, error) {
// Determine source image reference and credentials
// First try to get cached metadata from CLIP archive (v2 images)
if clipMeta, ok := s.imageClient.GetCLIPImageMetadata(request.ImageId); ok {
log.Info().
Str("image_id", request.ImageId).
Msg("using cached image metadata from clip archive")
return s.buildSpecFromCLIPMetadata(clipMeta), nil
}

// Fallback: determine source image reference and credentials for runtime lookup
sourceImageRef, sourceImageCreds := s.getSourceImageInfo(request)
if sourceImageRef == "" {
log.Warn().Str("image_id", request.ImageId).Msg("no source image reference, using base spec")
log.Warn().Str("image_id", request.ImageId).Msg("no source image reference or cached metadata, using base spec")
return nil, nil
}

// Inspect source image with timeout
// Inspect source image with timeout (fallback for v1 images or when metadata is not cached)
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

Expand All @@ -408,9 +417,9 @@ func (s *Worker) deriveSpecFromSourceImage(request *types.ContainerRequest) (*sp
log.Info().
Str("image_id", request.ImageId).
Str("source_image", sourceImageRef).
Msg("derived spec from source image")
Msg("derived spec from source image via runtime lookup")

// Build spec from image metadata
// Build spec from skopeo metadata (legacy path for v1 images)
return s.buildSpecFromImageMetadata(&imgMeta), nil
}

Expand All @@ -433,7 +442,37 @@ func (s *Worker) getSourceImageInfo(request *types.ContainerRequest) (string, st
return "", ""
}

// buildSpecFromImageMetadata constructs an OCI spec from image metadata
// buildSpecFromCLIPMetadata constructs an OCI spec from CLIP image metadata
// This is the primary path for v2 images with embedded metadata
func (s *Worker) buildSpecFromCLIPMetadata(clipMeta *clipCommon.ImageMetadata) *specs.Spec {
spec := specs.Spec{
Process: &specs.Process{
Env: []string{},
},
}

// CLIP metadata has a flat structure with all fields at the top level
if len(clipMeta.Env) > 0 {
spec.Process.Env = clipMeta.Env
Copy link
Contributor

@cubic-dev-ai cubic-dev-ai bot Nov 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GetCLIPImageMetadata hands out the cached metadata pointer, so this assigns the shared Env slice. getContainerEnvironment later appends onto options.InitialSpec.Process.Env, which writes into that same backing array. With multiple containers we now race on the cached metadata and corrupt it. Please clone the env slice before storing it (and do the same for args so the metadata stays read-only).

Prompt for AI agents
Address the following comment on pkg/worker/lifecycle.go at line 456:

<comment>GetCLIPImageMetadata hands out the cached metadata pointer, so this assigns the shared Env slice. `getContainerEnvironment` later appends onto `options.InitialSpec.Process.Env`, which writes into that same backing array. With multiple containers we now race on the cached metadata and corrupt it. Please clone the env slice before storing it (and do the same for args so the metadata stays read-only).</comment>

<file context>
@@ -433,7 +442,37 @@ func (s *Worker) getSourceImageInfo(request *types.ContainerRequest) (string, st
+
+	// CLIP metadata has a flat structure with all fields at the top level
+	if len(clipMeta.Env) &gt; 0 {
+		spec.Process.Env = clipMeta.Env
+	}
+	if clipMeta.WorkingDir != &quot;&quot; {
</file context>
Fix with Cubic

}
if clipMeta.WorkingDir != "" {
spec.Process.Cwd = clipMeta.WorkingDir
}
if clipMeta.User != "" {
spec.Process.User.Username = clipMeta.User
}
// Combine Entrypoint and Cmd, or use Cmd alone
if len(clipMeta.Entrypoint) > 0 {
spec.Process.Args = append(clipMeta.Entrypoint, clipMeta.Cmd...)
} else if len(clipMeta.Cmd) > 0 {
spec.Process.Args = clipMeta.Cmd
}

return &spec
}

// buildSpecFromImageMetadata constructs an OCI spec from skopeo image metadata
// This is the legacy path for v1 images or when CLIP metadata is not available
func (s *Worker) buildSpecFromImageMetadata(imgMeta *common.ImageMetadata) *specs.Spec {
spec := specs.Spec{
Process: &specs.Process{
Expand Down
158 changes: 154 additions & 4 deletions pkg/worker/lifecycle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/beam-cloud/beta9/pkg/common"
"github.com/beam-cloud/beta9/pkg/types"
clipCommon "github.com/beam-cloud/clip/pkg/common"
"github.com/opencontainers/runtime-spec/specs-go"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -49,7 +50,9 @@ func TestV2ImageEnvironmentFlow(t *testing.T) {
imageMountPath: "/tmp/test-images",
containerInstances: common.NewSafeMap[*ContainerInstance](),
imageClient: &ImageClient{
skopeoClient: mockSkopeo,
skopeoClient: mockSkopeo,
v2ImageRefs: common.NewSafeMap[string](),
clipImageMetadata: common.NewSafeMap[*clipCommon.ImageMetadata](),
},
runcServer: &RunCServer{
baseConfigSpec: getTestBaseSpec(),
Expand Down Expand Up @@ -260,9 +263,10 @@ func TestV2ImageEnvironmentFlow_NonBuildContainer(t *testing.T) {
}

imageClient := &ImageClient{
skopeoClient: mockSkopeo,
config: config,
v2ImageRefs: common.NewSafeMap[string](),
skopeoClient: mockSkopeo,
config: config,
v2ImageRefs: common.NewSafeMap[string](),
clipImageMetadata: common.NewSafeMap[*clipCommon.ImageMetadata](),
}

worker := &Worker{
Expand Down Expand Up @@ -361,6 +365,152 @@ func (m *mockSkopeoClient) Copy(ctx context.Context, source, dest, creds string,
return nil
}

// TestCachedImageMetadata tests that cached metadata from CLIP archives is used correctly
func TestCachedImageMetadata(t *testing.T) {
config := types.AppConfig{
ImageService: types.ImageServiceConfig{
ClipVersion: 2,
},
Worker: types.WorkerConfig{},
}

// Create mock skopeo client (should NOT be called when metadata is cached)
skopeoCallCount := 0
mockSkopeo := &mockSkopeoClient{
inspectFunc: func(ctx context.Context, image string, creds string, logger *slog.Logger) (common.ImageMetadata, error) {
skopeoCallCount++
t.Logf("Skopeo.Inspect called (count: %d) - this should NOT happen when metadata is cached", skopeoCallCount)
return common.ImageMetadata{}, nil
},
}

imageClient := &ImageClient{
skopeoClient: mockSkopeo,
config: config,
v2ImageRefs: common.NewSafeMap[string](),
clipImageMetadata: common.NewSafeMap[*clipCommon.ImageMetadata](),
}

worker := &Worker{
config: config,
imageMountPath: "/tmp/test-images",
containerInstances: common.NewSafeMap[*ContainerInstance](),
imageClient: imageClient,
runcServer: &RunCServer{
baseConfigSpec: getTestBaseSpec(),
},
}

// Simulate cached CLIP metadata from archive
imageId := "v2-cached-image-123"
cachedMetadata := &clipCommon.ImageMetadata{
Name: "cached-image",
Architecture: "amd64",
Os: "linux",
Digest: "sha256:abc123",
Env: []string{
"PATH=/usr/local/bin:/usr/bin:/bin",
"LANG=en_US.UTF-8",
},
WorkingDir: "/app",
User: "nobody",
Cmd: []string{"/bin/sh", "-c", "echo hello"},
Entrypoint: []string{"/entrypoint.sh"},
}
imageClient.clipImageMetadata.Set(imageId, cachedMetadata)

t.Run("UsesCachedMetadata", func(t *testing.T) {
request := &types.ContainerRequest{
ContainerId: "test-container-cached",
ImageId: imageId,
}

// Derive spec - should use cached metadata without calling skopeo
spec, err := worker.deriveSpecFromSourceImage(request)
require.NoError(t, err)
require.NotNil(t, spec)

// Verify skopeo was NOT called
assert.Equal(t, 0, skopeoCallCount, "Skopeo.Inspect should not be called when metadata is cached")

// Verify the spec has the cached CLIP metadata
assert.Contains(t, spec.Process.Env, "PATH=/usr/local/bin:/usr/bin:/bin")
assert.Contains(t, spec.Process.Env, "LANG=en_US.UTF-8")
assert.Equal(t, "/app", spec.Process.Cwd)
assert.Equal(t, "nobody", spec.Process.User.Username)
assert.Equal(t, []string{"/entrypoint.sh", "/bin/sh", "-c", "echo hello"}, spec.Process.Args)

t.Logf("✅ Successfully used cached CLIP metadata for image %s", imageId)
})

t.Run("FallbackToSkopeoWhenNotCached", func(t *testing.T) {
// Reset skopeo call count
skopeoCallCount = 0

// Update mock to return valid metadata
mockSkopeo.inspectFunc = func(ctx context.Context, image string, creds string, logger *slog.Logger) (common.ImageMetadata, error) {
skopeoCallCount++
return common.ImageMetadata{
Name: "fallback-image",
Architecture: "amd64",
Os: "linux",
Config: &common.ImageConfig{
Env: []string{"PATH=/usr/bin:/bin"},
},
}, nil
}

// Cache source image reference (but not metadata)
uncachedImageId := "v2-uncached-image-456"
sourceImage := "docker.io/library/alpine:latest"
imageClient.v2ImageRefs.Set(uncachedImageId, sourceImage)

request := &types.ContainerRequest{
ContainerId: "test-container-uncached",
ImageId: uncachedImageId,
}

// Derive spec - should fallback to skopeo
spec, err := worker.deriveSpecFromSourceImage(request)
require.NoError(t, err)
require.NotNil(t, spec)

// Verify skopeo WAS called as fallback
assert.Equal(t, 1, skopeoCallCount, "Skopeo.Inspect should be called when metadata is not cached")

t.Logf("✅ Successfully fell back to skopeo for image without cached metadata")
})
}

// TestGetCLIPImageMetadata tests the CLIP image metadata retrieval
func TestGetCLIPImageMetadata(t *testing.T) {
imageClient := &ImageClient{
clipImageMetadata: common.NewSafeMap[*clipCommon.ImageMetadata](),
}

imageId := "test-image-123"
testMetadata := &clipCommon.ImageMetadata{
Name: "test-image",
Architecture: "amd64",
Os: "linux",
}

t.Run("ReturnsMetadataWhenCached", func(t *testing.T) {
imageClient.clipImageMetadata.Set(imageId, testMetadata)

meta, ok := imageClient.GetCLIPImageMetadata(imageId)
assert.True(t, ok, "Should find cached metadata")
assert.Equal(t, testMetadata.Name, meta.Name)
assert.Equal(t, testMetadata.Architecture, meta.Architecture)
assert.Equal(t, testMetadata.Os, meta.Os)
})

t.Run("ReturnsNotFoundWhenNotCached", func(t *testing.T) {
_, ok := imageClient.GetCLIPImageMetadata("non-existent-image")
assert.False(t, ok, "Should not find non-existent metadata")
})
}

// Get a base test spec
func getTestBaseSpec() specs.Spec {
return specs.Spec{
Expand Down
Loading
Loading