Skip to content

Commit

Permalink
Initial commit for OCI dist spec v1.1.0 agent support
Browse files Browse the repository at this point in the history
Partially addresses kubeflow/community#682

OCI image and distribution specs v1.1.0 has added support for pushing
and pulling arbitrary artifacts to a conformant registry, and not just
container images.

Since a registry is already needed to deploy inference workloads as
containers, and that it would be desirable to avoid another piece of
infrastructure just to store inference data, a OCI conformant registry
could become that ideal store to combine and colocate both use cases.

This plugin adds that support.

Uses the oras-go library.

References:

https://opencontainers.org/posts/blog/2024-03-13-image-and-distribution-1-1/

Signed-off-by: Ramkumar Chinchani <[email protected]>
  • Loading branch information
rchincha committed May 24, 2024
1 parent 36a3e6d commit 3963e0d
Show file tree
Hide file tree
Showing 8 changed files with 241 additions and 258 deletions.
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,9 @@ deploy-ci: manifests
deploy-helm: manifests
helm install kserve-crd charts/kserve-crd/ --wait --timeout 180s
helm install kserve charts/kserve-resources/ --wait --timeout 180s
# deploy a OCI dist spec v1.1.0 registry
helm repo add project-zot http://zotregistry.dev/helm-charts
helm install --set service.port=5000 zot project-zot/zot

undeploy:
kubectl delete -k config/default
Expand Down
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ require (
github.com/kelseyhightower/envconfig v1.4.0
github.com/onsi/ginkgo/v2 v2.13.0
github.com/onsi/gomega v1.30.0
github.com/opencontainers/image-spec v1.1.0
github.com/pkg/errors v0.9.1
github.com/spf13/cobra v1.8.0
github.com/spf13/pflag v1.0.5
Expand All @@ -37,6 +38,7 @@ require (
knative.dev/networking v0.0.0-20231115015815-3af9769712cd
knative.dev/pkg v0.0.0-20231115001034-97c7258e3a98
knative.dev/serving v0.39.3
oras.land/oras-go/v2 v2.5.0
sigs.k8s.io/controller-runtime v0.16.3
sigs.k8s.io/yaml v1.4.0
)
Expand Down Expand Up @@ -105,7 +107,7 @@ require (
golang.org/x/mod v0.14.0 // indirect
golang.org/x/net v0.23.0 // indirect
golang.org/x/oauth2 v0.14.0 // indirect
golang.org/x/sync v0.5.0 // indirect
golang.org/x/sync v0.6.0 // indirect
golang.org/x/sys v0.18.0 // indirect
golang.org/x/term v0.18.0 // indirect
golang.org/x/text v0.14.0 // indirect
Expand Down
248 changes: 6 additions & 242 deletions go.sum

Large diffs are not rendered by default.

220 changes: 220 additions & 0 deletions pkg/agent/storage/oci.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
/*
Copyright 2021 The KServe Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package storage

import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"sync"

"oras.land/oras-go/v2"
"oras.land/oras-go/v2/content"

ocispec "github.com/opencontainers/image-spec/specs-go/v1"
)

type OCIProvider struct {
Client *http.Client
}

func (m *OCIProvider) DownloadModel(modelDir string, modelName string, storageUri string) error {
log.Info("Download model ", "modelName", modelName, "storageUri", storageUri, "modelDir", modelDir)
uri, err := url.Parse(storageUri)
if err != nil {
return fmt.Errorf("unable to parse storage uri: %w", err)
}
OCIDownloader := &OCIDownloader{
StorageUri: storageUri,
ModelDir: modelDir,
ModelName: modelName,
Uri: uri,
}
if err := OCIDownloader.Download(*m.Client); err != nil {
return err
}
return nil
}

type OCIDownloader struct {
StorageUri string
ModelDir string
ModelName string
Uri *url.URL
}

// generateContentKey generates a unique key for each content descriptor, using
// its digest and name if applicable.
func generateContentKey(desc ocispec.Descriptor) string {
return desc.Digest.String() + desc.Annotations[ocispec.AnnotationTitle]
}

func (h *OCIDownloader) Download(client http.Client) error {
// Copy Options
var printed sync.Map
copyOptions := oras.DefaultCopyOptions

var getConfigOnce sync.Once
copyOptions.FindSuccessors = func(ctx context.Context, fetcher content.Fetcher, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
statusFetcher := content.FetcherFunc(func(ctx context.Context, target ocispec.Descriptor) (fetched io.ReadCloser, fetchErr error) {
if _, ok := printed.LoadOrStore(generateContentKey(target), true); ok {
return fetcher.Fetch(ctx, target)
}

/*
// print status log for first-time fetching
if err := display.PrintStatus(target, "Downloading", opts.Verbose); err != nil {
return nil, err
}
*/
rc, err := fetcher.Fetch(ctx, target)
if err != nil {
return nil, err
}
defer func() {
if fetchErr != nil {
rc.Close()
}
}()
return rc, nil
})

nodes, subject, config, err := Successors(ctx, statusFetcher, desc)
if err != nil {
return nil, err
}
if subject != nil {
nodes = append(nodes, *subject)
}
/*
if config != nil {
getConfigOnce.Do(func() {
if configPath != "" && (configMediaType == "" || config.MediaType == configMediaType) {
if config.Annotations == nil {
config.Annotations = make(map[string]string)
}
config.Annotations[ocispec.AnnotationTitle] = configPath
}
})
nodes = append(nodes, *config)
}
*/

var ret []ocispec.Descriptor
for _, s := range nodes {
if s.Annotations[ocispec.AnnotationTitle] == "" {
ss, err := content.Successors(ctx, fetcher, s)
if err != nil {
return nil, err
}
if len(ss) == 0 {
/*
// skip s if it is unnamed AND has no successors.
if err := printOnce(&printed, s, "Skipped ", opts.Verbose); err != nil {
return nil, err
}
*/
continue
}
}
ret = append(ret, s)
}

return ret, nil
}

return nil
}

// MediaTypeArtifactManifest specifies the media type for a content descriptor.
const MediaTypeArtifactManifest = "application/vnd.oci.artifact.manifest.v1+json"

// Artifact describes an artifact manifest.
// This structure provides `application/vnd.oci.artifact.manifest.v1+json` mediatype when marshalled to JSON.
//
// This manifest type was introduced in image-spec v1.1.0-rc1 and was removed in
// image-spec v1.1.0-rc3. It is not part of the current image-spec and is kept
// here for Go compatibility.
//
// Reference: https://github.com/opencontainers/image-spec/pull/999
type Artifact struct {
// MediaType is the media type of the object this schema refers to.
MediaType string `json:"mediaType"`

// ArtifactType is the IANA media type of the artifact this schema refers to.
ArtifactType string `json:"artifactType"`

// Blobs is a collection of blobs referenced by this manifest.
Blobs []ocispec.Descriptor `json:"blobs,omitempty"`

// Subject (reference) is an optional link from the artifact to another manifest forming an association between the artifact and the other manifest.
Subject *ocispec.Descriptor `json:"subject,omitempty"`

// Annotations contains arbitrary metadata for the artifact manifest.
Annotations map[string]string `json:"annotations,omitempty"`
}

// Successors returns the nodes directly pointed by the current node, picking
// out subject and config descriptor if applicable.
// Returning nil when no subject and config found.
func Successors(ctx context.Context, fetcher content.Fetcher, node ocispec.Descriptor) (nodes []ocispec.Descriptor, subject, config *ocispec.Descriptor, err error) {
switch node.MediaType {
case ocispec.MediaTypeImageManifest:
var fetched []byte
fetched, err = content.FetchAll(ctx, fetcher, node)
if err != nil {
return
}
var manifest ocispec.Manifest
if err = json.Unmarshal(fetched, &manifest); err != nil {
return
}
nodes = manifest.Layers
subject = manifest.Subject
config = &manifest.Config
case MediaTypeArtifactManifest:
var fetched []byte
fetched, err = content.FetchAll(ctx, fetcher, node)
if err != nil {
return
}
var manifest Artifact
if err = json.Unmarshal(fetched, &manifest); err != nil {
return
}
nodes = manifest.Blobs
subject = manifest.Subject
case ocispec.MediaTypeImageIndex:
var fetched []byte
fetched, err = content.FetchAll(ctx, fetcher, node)
if err != nil {
return
}
var index ocispec.Index
if err = json.Unmarshal(fetched, &index); err != nil {
return
}
nodes = index.Manifests
subject = index.Subject
default:
nodes, err = content.Successors(ctx, fetcher, node)
}
return
}
4 changes: 3 additions & 1 deletion pkg/agent/storage/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,11 @@ const (
// File Protocol = "file://"
HTTPS Protocol = "https://"
HTTP Protocol = "http://"
// OCI dist spec v1.1.0
OCI Protocol = "oci-registry://"
)

var SupportedProtocols = []Protocol{S3, GCS, HTTPS, HTTP}
var SupportedProtocols = []Protocol{S3, GCS, HTTPS, HTTP, OCI}

func GetAllProtocol() (protocols []string) {
for _, protocol := range SupportedProtocols {
Expand Down
5 changes: 5 additions & 0 deletions pkg/agent/storage/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,11 @@ func GetProvider(providers map[Protocol]Provider, protocol Protocol) (Provider,
providers[HTTP] = &HTTPSProvider{
Client: httpsClient,
}
case OCI:
httpsClient := &http.Client{}
providers[OCI] = &OCIProvider{
Client: httpsClient,
}
}

return providers[protocol], nil
Expand Down
14 changes: 0 additions & 14 deletions pkg/apis/serving/v1beta1/openapi_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 3963e0d

Please sign in to comment.