Merge pull request #3985 from deitch/cache-load

enable import of images from tar files
This commit is contained in:
Avi Deitcher 2024-02-20 04:44:16 -08:00 committed by GitHub
commit 2cff5681b5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 349 additions and 92 deletions

View File

@ -25,5 +25,6 @@ func cacheCmd() *cobra.Command {
cmd.AddCommand(cacheRmCmd())
cmd.AddCommand(cacheLsCmd())
cmd.AddCommand(cacheExportCmd())
cmd.AddCommand(cacheImportCmd())
return cmd
}

11
src/cmd/linuxkit/cache/blob.go vendored Normal file
View File

@ -0,0 +1,11 @@
package cache
import (
"io"
v1 "github.com/google/go-containerregistry/pkg/v1"
)
func (p *Provider) GetContent(hash v1.Hash) (io.ReadCloser, error) {
return p.cache.Blob(hash)
}

View File

@ -9,8 +9,8 @@ import (
"io"
"strings"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/reference"
"github.com/estesp/manifest-tool/v2/pkg/util"
"github.com/google/go-containerregistry/pkg/authn"
"github.com/google/go-containerregistry/pkg/name"
v1 "github.com/google/go-containerregistry/pkg/v1"
@ -127,17 +127,15 @@ func (p *Provider) ImagePull(ref *reference.Spec, trustedRef, architecture strin
// ImageLoad takes an OCI format image tar stream and writes it locally. It should be
// efficient and only write missing blobs, based on their content hash.
func (p *Provider) ImageLoad(ref *reference.Spec, architecture string, r io.Reader) ([]v1.Descriptor, error) {
// Returns any descriptors that are in the tar stream's index.json as manifests.
// Does not try to resolve lower levels. Most such tar streams will have a single
// manifest in the index.json's manifests list, but it is possible to have more.
func (p *Provider) ImageLoad(r io.Reader) ([]v1.Descriptor, error) {
var (
tr = tar.NewReader(r)
index bytes.Buffer
)
if !util.IsValidOSArch(linux, architecture, "") {
return nil, fmt.Errorf("unknown arch %s", architecture)
}
suffix := "-" + architecture
imageName := ref.String() + suffix
log.Debugf("ImageWriteTar to cache %s", imageName)
log.Debugf("ImageWriteTar to cache")
for {
header, err := tr.Next()
if err == io.EOF {
@ -193,45 +191,26 @@ func (p *Provider) ImageLoad(ref *reference.Spec, architecture string, r io.Read
if err != nil {
return nil, fmt.Errorf("error reading index.json")
}
// in theory, we should support a tar stream with multiple images in it. However, how would we
// know which one gets the single name annotation we have? We will find some way in the future.
if len(im.Manifests) != 1 {
return nil, fmt.Errorf("currently only support OCI tar stream that has a single image")
// these manifests are in the root index.json of the tar stream
// each of these is either an image or an index
// either way, it gets added directly to the linuxkit cache index.
for _, desc := range im.Manifests {
if imgName, ok := desc.Annotations[images.AnnotationImageName]; ok {
// remove the old descriptor, if it exists
if err := p.cache.RemoveDescriptors(match.Name(imgName)); err != nil {
return nil, fmt.Errorf("unable to remove old descriptors for %s: %v", imgName, err)
}
if err := p.cache.RemoveDescriptors(match.Name(imageName)); err != nil {
return nil, fmt.Errorf("unable to remove old descriptors for %s: %v", imageName, err)
}
desc := im.Manifests[0]
// is this an image or an index?
if desc.MediaType.IsIndex() {
rc, err := p.cache.Blob(desc.Digest)
if err != nil {
return nil, fmt.Errorf("unable to get index blob: %v", err)
}
ii, err := v1.ParseIndexManifest(rc)
if err != nil {
return nil, fmt.Errorf("unable to parse index blob: %v", err)
}
for _, m := range ii.Manifests {
if m.MediaType.IsImage() {
descs = append(descs, m)
}
}
} else if desc.MediaType.IsImage() {
descs = append(descs, desc)
}
for _, desc := range descs {
if desc.Platform != nil && desc.Platform.Architecture == architecture {
// make sure that we have the correct image name annotation
// save the image name under our proper annotation
if desc.Annotations == nil {
desc.Annotations = map[string]string{}
}
desc.Annotations[imagespec.AnnotationRefName] = imageName
desc.Annotations[imagespec.AnnotationRefName] = imgName
}
log.Debugf("appending descriptor %#v", desc)
if err := p.cache.AppendDescriptor(desc); err != nil {
return nil, fmt.Errorf("error appending descriptor to layout index: %v", err)
}
}
descs = append(descs, desc)
}
}
return descs, nil

View File

@ -0,0 +1,60 @@
package main
import (
"io"
"os"
cachepkg "github.com/linuxkit/linuxkit/src/cmd/linuxkit/cache"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
)
func cacheImportCmd() *cobra.Command {
var (
inputFile string
)
cmd := &cobra.Command{
Use: "import",
Short: "import individual images to the linuxkit cache",
Long: `Import individual images from tar file to the linuxkit cache.
Can provide the file on the command-line or via stdin with filename '-'.
Example:
linuxkit cache import myimage.tar
cat myimage.tar | linuxkit cache import -
Tarfile format must be the OCI v1 file format, see https://github.com/opencontainers/image-spec/blob/main/image-layout.md
`,
Args: cobra.MinimumNArgs(1),
RunE: func(cmd *cobra.Command, args []string) error {
paths := args
infile := paths[0]
p, err := cachepkg.NewProvider(cacheDir)
if err != nil {
log.Fatalf("unable to read a local cache: %v", err)
}
var reader io.ReadCloser
if inputFile == "-" {
reader = os.Stdin
} else {
f, err := os.Open(infile)
if err != nil {
log.Fatalf("unable to open %s: %v", infile, err)
}
defer f.Close()
reader = f
}
defer reader.Close()
if _, err := p.ImageLoad(reader); err != nil {
log.Fatalf("unable to load image: %v", err)
}
return err
},
}
return cmd
}

View File

@ -384,16 +384,11 @@ func (p Pkg) Build(bos ...BuildOpt) error {
if len(builtDescs) == 0 {
return fmt.Errorf("no valid descriptor returned for image for arch %s", platform.Architecture)
}
for i, desc := range builtDescs {
if desc.Platform == nil {
return fmt.Errorf("descriptor %d for platform %v has no information on the platform: %#v", i, platform, desc)
}
}
descs = append(descs, builtDescs...)
}
// after build is done:
// - create multi-arch manifest
// - create multi-arch index
// - potentially push
// - potentially load into docker
// - potentially create a release, including push and load into docker
@ -519,12 +514,24 @@ func (p Pkg) Build(bos ...BuildOpt) error {
return nil
}
// buildArch builds the package for a single arch
// buildArch builds the package for a single arch, and loads the result in the cache provided in the argument.
// Unless force is set, it will check the cache for the image first, then on the registry, and if it exists, it will not build it.
// The image will be saved in the cache with the provided package name and tag, with the architecture appended
// as a suffix, i.e. "myimage:abc-amd64" or "myimage:abc-arm64".
// It returns a list of individual descriptors for the images built, which can be used to create an index.
// These descriptors are not of the index pointed to by "myimage:abc-amd64", but rather the underlying manifests
// in that index.
// The final result then is as follows:
// A - layers, saved in cache as is
// B - config, saved in cache as is
// C - manifest, saved in cache as is, referenced by the index (E), and returned as a descriptor
// D - attestations (if any), saved in cache as is, referenced by the index (E), and returned as a descriptor
// E - index, saved in cache as is, stored in cache as tag "image:tag-arch", *not* returned as a descriptor
func (p Pkg) buildArch(ctx context.Context, d dockerRunner, c lktspec.CacheProvider, builderImage, arch string, restart bool, writer io.Writer, bo buildOpts, imageBuildOpts types.ImageBuildOptions) ([]registry.Descriptor, error) {
var (
descs []registry.Descriptor
tagArch string
tag = p.Tag()
tag = p.FullTag()
indexDesc []registry.Descriptor
)
tagArch = tag + "-" + arch
fmt.Fprintf(writer, "Building for arch %s as %s\n", arch, tagArch)
@ -562,22 +569,18 @@ func (p Pkg) buildArch(ctx context.Context, d dockerRunner, c lktspec.CacheProvi
}
}
)
ref, err := reference.Parse(p.FullTag())
if err != nil {
return nil, fmt.Errorf("could not resolve references for image %s: %v", tagArch, err)
}
// we are writing to local, so we need to catch the tar output stream and place the right files in the right place
piper, pipew := io.Pipe()
stdout = pipew
eg.Go(func() error {
d, err := c.ImageLoad(&ref, arch, piper)
d, err := c.ImageLoad(piper)
// send the error down the channel
if err != nil {
fmt.Fprintf(stdout, "cache.ImageLoad goroutine ended with error: %v\n", err)
} else {
descs = d
indexDesc = d
}
piper.Close()
return err
@ -604,7 +607,31 @@ func (p Pkg) buildArch(ctx context.Context, d dockerRunner, c lktspec.CacheProvi
return nil, err
}
return descs, nil
// find the child manifests
// how many index descriptors did we get?
switch len(indexDesc) {
case 0:
return nil, fmt.Errorf("no index descriptor returned from load")
case 1:
// good, we have one index descriptor
default:
return nil, fmt.Errorf("more than one index descriptor returned from load")
}
// when we build an arch, we might have the descs for the actual arch-specific manifest, or possibly
// an index that wraps it. So let's unwrap it and return the actual image descs and not the index.
// this is because later we will build an index from all of these.
r, err := c.GetContent(indexDesc[0].Digest)
if err != nil {
return nil, fmt.Errorf("could not get content for index descriptor: %v", err)
}
defer r.Close()
dec := json.NewDecoder(r)
var im registry.IndexManifest
if err := dec.Decode(&im); err != nil {
return nil, fmt.Errorf("could not decode index descriptor: %v", err)
}
return im.Manifests, nil
}
type buildCtx struct {

View File

@ -1,6 +1,7 @@
package pkglib
import (
"archive/tar"
"bytes"
"context"
"crypto/rand"
@ -13,9 +14,11 @@ import (
"testing"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/reference"
dockertypes "github.com/docker/docker/api/types"
registry "github.com/google/go-containerregistry/pkg/v1"
v1 "github.com/google/go-containerregistry/pkg/v1"
"github.com/google/go-containerregistry/pkg/v1/types"
lktspec "github.com/linuxkit/linuxkit/src/cmd/linuxkit/spec"
buildkitClient "github.com/moby/buildkit/client"
@ -60,6 +63,142 @@ func (d *dockerMocker) build(ctx context.Context, tag, pkg, dockerContext, build
return errors.New("build disabled")
}
d.builds = append(d.builds, buildLog{tag, pkg, dockerContext, platform})
// must create a tar stream that looks somewhat normal to pass to stdout
// what we need:
// a config blob (random data)
// a layer blob (random data)
// a manifest blob (from the above)
// an index blob (points to the manifest)
// index.json (points to the index)
tw := tar.NewWriter(stdout)
defer tw.Close()
buf := make([]byte, 128)
var (
configHash, layerHash, manifestHash, indexHash v1.Hash
configSize, layerSize, manifestSize, indexSize int64
)
// config blob
if _, err := rand.Read(buf); err != nil {
return err
}
hash, _, err := v1.SHA256(bytes.NewReader(buf))
if err != nil {
return err
}
if err := tw.WriteHeader(&tar.Header{Name: fmt.Sprintf("blobs/sha256/%s", hash.Hex), Size: int64(len(buf))}); err != nil {
return err
}
if _, err := tw.Write(buf); err != nil {
return err
}
configHash = hash
configSize = int64(len(buf))
// layer blob
if _, err := rand.Read(buf); err != nil {
return err
}
hash, _, err = v1.SHA256(bytes.NewReader(buf))
if err != nil {
return err
}
if err := tw.WriteHeader(&tar.Header{Name: fmt.Sprintf("blobs/sha256/%s", hash.Hex), Size: int64(len(buf))}); err != nil {
return err
}
if _, err := tw.Write(buf); err != nil {
return err
}
layerHash = hash
layerSize = int64(len(buf))
// manifest
manifest := v1.Manifest{
Config: v1.Descriptor{
MediaType: types.OCIConfigJSON,
Size: configSize,
Digest: configHash,
},
Layers: []v1.Descriptor{
{
MediaType: types.OCILayer,
Size: layerSize,
Digest: layerHash,
},
},
}
b, err := json.Marshal(manifest)
if err != nil {
return err
}
hash, _, err = v1.SHA256(bytes.NewReader(b))
if err != nil {
return err
}
if err := tw.WriteHeader(&tar.Header{Name: fmt.Sprintf("blobs/sha256/%s", hash.Hex), Size: int64(len(b))}); err != nil {
return err
}
if _, err := tw.Write(b); err != nil {
return err
}
manifestHash = hash
manifestSize = int64(len(b))
// index
index := v1.IndexManifest{
MediaType: types.OCIImageIndex,
Manifests: []v1.Descriptor{
{
MediaType: types.OCIManifestSchema1,
Size: manifestSize,
Digest: manifestHash,
},
},
SchemaVersion: 2,
}
b, err = json.Marshal(index)
if err != nil {
return err
}
hash, _, err = v1.SHA256(bytes.NewReader(b))
if err != nil {
return err
}
if err := tw.WriteHeader(&tar.Header{Name: fmt.Sprintf("blobs/sha256/%s", hash.Hex), Size: int64(len(b))}); err != nil {
return err
}
if _, err := tw.Write(b); err != nil {
return err
}
indexHash = hash
indexSize = int64(len(b))
// index.json
index = v1.IndexManifest{
MediaType: types.OCIImageIndex,
Manifests: []v1.Descriptor{
{
MediaType: types.OCIImageIndex,
Size: indexSize,
Digest: indexHash,
Annotations: map[string]string{
imagespec.AnnotationRefName: tag,
images.AnnotationImageName: tag,
},
},
},
SchemaVersion: 2,
}
b, err = json.Marshal(index)
if err != nil {
return err
}
if err := tw.WriteHeader(&tar.Header{Name: "index.json", Size: int64(len(b))}); err != nil {
return err
}
if _, err := tw.Write(b); err != nil {
return err
}
return nil
}
func (d *dockerMocker) save(tgt string, refs ...string) error {
@ -108,7 +247,7 @@ func (c *cacheMocker) ImagePull(ref *reference.Spec, trustedRef, architecture st
// make some random data for a layer
b := make([]byte, 256)
_, _ = rand.Read(b)
descs, err := c.imageWriteStream(ref, architecture, bytes.NewReader(b))
descs, err := c.imageWriteStream(bytes.NewReader(b))
if err != nil {
return nil, err
}
@ -136,45 +275,78 @@ func (c *cacheMocker) ImageInRegistry(ref *reference.Spec, trustedRef, architect
return false, nil
}
func (c *cacheMocker) ImageLoad(ref *reference.Spec, architecture string, r io.Reader) ([]registry.Descriptor, error) {
func (c *cacheMocker) ImageLoad(r io.Reader) ([]registry.Descriptor, error) {
if !c.enableImageLoad {
return nil, errors.New("ImageLoad disabled")
}
return c.imageWriteStream(ref, architecture, r)
return c.imageWriteStream(r)
}
func (c *cacheMocker) imageWriteStream(ref *reference.Spec, architecture string, r io.Reader) ([]registry.Descriptor, error) {
image := fmt.Sprintf("%s-%s", ref.String(), architecture)
func (c *cacheMocker) imageWriteStream(r io.Reader) ([]registry.Descriptor, error) {
var (
image string
size int64
hash v1.Hash
)
// make some random data for a layer
b, err := io.ReadAll(r)
tarBytes, err := io.ReadAll(r)
if err != nil {
return nil, fmt.Errorf("error reading data: %v", err)
}
hash, size, err := registry.SHA256(bytes.NewReader(b))
if err != nil {
return nil, fmt.Errorf("error calculating hash of layer: %v", err)
var (
tr = tar.NewReader(bytes.NewReader(tarBytes))
index bytes.Buffer
)
for {
header, err := tr.Next()
if err == io.EOF {
break // End of archive
}
c.assignHash(hash.String(), b)
im := registry.Manifest{
MediaType: types.OCIManifestSchema1,
Layers: []registry.Descriptor{
{MediaType: types.OCILayer, Size: size, Digest: hash},
},
SchemaVersion: 2,
if err != nil {
return nil, err
}
// write the updated index, remove the old one
b, err = json.Marshal(im)
if err != nil {
return nil, fmt.Errorf("unable to marshal new image to json: %v", err)
filename := header.Name
switch {
case filename == "index.json":
// any errors should stop and get reported
if _, err := io.Copy(&index, tr); err != nil {
return nil, fmt.Errorf("error reading data for file %s : %v", filename, err)
}
hash, size, err = registry.SHA256(bytes.NewReader(b))
case strings.HasPrefix(filename, "blobs/sha256/"):
// must have a file named blob/sha256/<hash>
parts := strings.Split(filename, "/")
// if we had a file that is just the directory, ignore it
if len(parts) != 3 {
continue
}
hash, err := v1.NewHash(fmt.Sprintf("%s:%s", parts[1], parts[2]))
if err != nil {
return nil, fmt.Errorf("error calculating hash of index json: %v", err)
// malformed file
return nil, fmt.Errorf("invalid hash filename for %s: %v", filename, err)
}
b, err := io.ReadAll(tr)
if err != nil {
return nil, fmt.Errorf("error reading data for file %s : %v", filename, err)
}
c.assignHash(hash.String(), b)
}
}
if index.Len() != 0 {
im, err := v1.ParseIndexManifest(&index)
if err != nil {
return nil, fmt.Errorf("error reading index.json")
}
for _, desc := range im.Manifests {
if imgName, ok := desc.Annotations[images.AnnotationImageName]; ok {
image = imgName
size = desc.Size
hash = desc.Digest
break
}
}
}
desc := registry.Descriptor{
MediaType: types.OCIManifestSchema1,
Size: size,
@ -182,10 +354,6 @@ func (c *cacheMocker) imageWriteStream(ref *reference.Spec, architecture string,
Annotations: map[string]string{
imagespec.AnnotationRefName: image,
},
Platform: &registry.Platform{
OS: "linux",
Architecture: architecture,
},
}
c.appendImage(image, desc)
return []registry.Descriptor{desc}, nil
@ -296,6 +464,14 @@ func (c *cacheMocker) Store() (content.Store, error) {
return nil, errors.New("unsupported")
}
func (c *cacheMocker) GetContent(hash v1.Hash) (io.ReadCloser, error) {
content, ok := c.hashes[hash.String()]
if !ok {
return nil, fmt.Errorf("no content found for hash: %s", hash.String())
}
return io.NopCloser(bytes.NewReader(content)), nil
}
type cacheMockerSource struct {
c *cacheMocker
ref *reference.Spec

View File

@ -33,7 +33,7 @@ type CacheProvider interface {
IndexWrite(ref *reference.Spec, descriptors ...v1.Descriptor) (ImageSource, error)
// ImageLoad takes an OCI format image tar stream in the io.Reader and writes it to the cache. It should be
// efficient and only write missing blobs, based on their content hash.
ImageLoad(ref *reference.Spec, architecture string, r io.Reader) ([]v1.Descriptor, error)
ImageLoad(r io.Reader) ([]v1.Descriptor, error)
// DescriptorWrite writes a descriptor to the cache index; it validates that it has a name
// and replaces any existing one
DescriptorWrite(ref *reference.Spec, descriptors v1.Descriptor) (ImageSource, error)
@ -42,6 +42,9 @@ type CacheProvider interface {
Push(name string, withManifest bool) error
// NewSource return an ImageSource for a specific ref and architecture in the cache.
NewSource(ref *reference.Spec, architecture string, descriptor *v1.Descriptor) ImageSource
// GetContent returns an io.Reader to the provided content as is, given a specific digest. It is
// up to the caller to validate it.
GetContent(hash v1.Hash) (io.ReadCloser, error)
// Store get content.Store referencing the cache
Store() (content.Store, error)
}