Merge pull request #1495 from cgwalters/proxy-config

proxy: Add `GetConfig`, add manifest list support, add an integration test
This commit is contained in:
Valentin Rothberg 2021-11-16 17:00:59 +01:00 committed by GitHub
commit 002978258c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 446 additions and 44 deletions

View File

@ -83,9 +83,11 @@ import (
// protocolVersion is semantic version of the protocol used by this proxy.
// The first version of the protocol has major version 0.2 to signify a
// departure from the original code which used HTTP. The minor version is 1
// instead of 0 to help exercise semver parsers.
const protocolVersion = "0.2.1"
// departure from the original code which used HTTP.
//
// 0.2.1: Initial version
// 0.2.2: Added support for fetching image configuration as OCI
const protocolVersion = "0.2.2"
// maxMsgSize is the current limit on a packet size.
// Note that all non-metadata (i.e. payload data) is sent over a pipe.
@ -143,7 +145,7 @@ type openImage struct {
// id is an opaque integer handle
id uint32
src types.ImageSource
img types.Image
cachedimg types.Image
}
// proxyHandler is the state associated with our socket.
@ -199,14 +201,14 @@ func (h *proxyHandler) OpenImage(args []interface{}) (replyBuf, error) {
var ret replyBuf
if h.sysctx == nil {
return ret, fmt.Errorf("Must invoke Initialize")
return ret, fmt.Errorf("client error: must invoke Initialize")
}
if len(args) != 1 {
return ret, fmt.Errorf("invalid request, expecting one argument")
}
imageref, ok := args[0].(string)
if !ok {
return ret, fmt.Errorf("Expecting string imageref, not %T", args[0])
return ret, fmt.Errorf("expecting string imageref, not %T", args[0])
}
imgRef, err := alltransports.ParseImageName(imageref)
@ -217,16 +219,11 @@ func (h *proxyHandler) OpenImage(args []interface{}) (replyBuf, error) {
if err != nil {
return ret, err
}
img, err := image.FromUnparsedImage(context.Background(), h.sysctx, image.UnparsedInstance(imgsrc, nil))
if err != nil {
return ret, fmt.Errorf("failed to load image: %w", err)
}
h.imageSerial++
openimg := &openImage{
id: h.imageSerial,
src: imgsrc,
img: img,
}
h.images[openimg.id] = openimg
ret.value = openimg.id
@ -240,7 +237,7 @@ func (h *proxyHandler) CloseImage(args []interface{}) (replyBuf, error) {
var ret replyBuf
if h.sysctx == nil {
return ret, fmt.Errorf("Must invoke Initialize")
return ret, fmt.Errorf("client error: must invoke Initialize")
}
if len(args) != 1 {
return ret, fmt.Errorf("invalid request, expecting one argument")
@ -258,7 +255,7 @@ func (h *proxyHandler) CloseImage(args []interface{}) (replyBuf, error) {
func parseImageID(v interface{}) (uint32, error) {
imgidf, ok := v.(float64)
if !ok {
return 0, fmt.Errorf("Expecting integer imageid, not %T", v)
return 0, fmt.Errorf("expecting integer imageid, not %T", v)
}
return uint32(imgidf), nil
}
@ -267,10 +264,10 @@ func parseImageID(v interface{}) (uint32, error) {
func parseUint64(v interface{}) (uint64, error) {
f, ok := v.(float64)
if !ok {
return 0, fmt.Errorf("Expecting numeric, not %T", v)
return 0, fmt.Errorf("expecting numeric, not %T", v)
}
if f > maxJSONFloat {
return 0, fmt.Errorf("Out of range integer for numeric %f", f)
return 0, fmt.Errorf("out of range integer for numeric %f", f)
}
return uint64(f), nil
}
@ -282,7 +279,7 @@ func (h *proxyHandler) parseImageFromID(v interface{}) (*openImage, error) {
}
imgref, ok := h.images[imgid]
if !ok {
return nil, fmt.Errorf("No image %v", imgid)
return nil, fmt.Errorf("no image %v", imgid)
}
return imgref, nil
}
@ -300,7 +297,70 @@ func (h *proxyHandler) allocPipe() (*os.File, *activePipe, error) {
return piper, &f, nil
}
// returnBytes generates a return pipe() from a byte array
// In the future it might be nicer to return this via memfd_create()
func (h *proxyHandler) returnBytes(retval interface{}, buf []byte) (replyBuf, error) {
var ret replyBuf
piper, f, err := h.allocPipe()
if err != nil {
return ret, err
}
go func() {
// Signal completion when we return
defer f.wg.Done()
_, err = io.Copy(f.w, bytes.NewReader(buf))
if err != nil {
f.err = err
}
}()
ret.value = retval
ret.fd = piper
ret.pipeid = uint32(f.w.Fd())
return ret, nil
}
// cacheTargetManifest is invoked when GetManifest or GetConfig is invoked
// the first time for a given image. If the requested image is a manifest
// list, this function resolves it to the image matching the calling process'
// operating system and architecture.
//
// TODO: Add GetRawManifest or so that exposes manifest lists
func (h *proxyHandler) cacheTargetManifest(img *openImage) error {
ctx := context.Background()
if img.cachedimg != nil {
return nil
}
unparsedToplevel := image.UnparsedInstance(img.src, nil)
mfest, manifestType, err := unparsedToplevel.Manifest(ctx)
if err != nil {
return err
}
var target *image.UnparsedImage
if manifest.MIMETypeIsMultiImage(manifestType) {
manifestList, err := manifest.ListFromBlob(mfest, manifestType)
if err != nil {
return err
}
instanceDigest, err := manifestList.ChooseInstance(h.sysctx)
if err != nil {
return err
}
target = image.UnparsedInstance(img.src, &instanceDigest)
} else {
target = unparsedToplevel
}
cachedimg, err := image.FromUnparsedImage(ctx, h.sysctx, target)
if err != nil {
return err
}
img.cachedimg = cachedimg
return nil
}
// GetManifest returns a copy of the manifest, converted to OCI format, along with the original digest.
// Manifest lists are resolved to the current operating system and architecture.
func (h *proxyHandler) GetManifest(args []interface{}) (replyBuf, error) {
h.lock.Lock()
defer h.lock.Unlock()
@ -308,7 +368,7 @@ func (h *proxyHandler) GetManifest(args []interface{}) (replyBuf, error) {
var ret replyBuf
if h.sysctx == nil {
return ret, fmt.Errorf("Must invoke Initialize")
return ret, fmt.Errorf("client error: must invoke Initialize")
}
if len(args) != 1 {
return ret, fmt.Errorf("invalid request, expecting one argument")
@ -318,11 +378,18 @@ func (h *proxyHandler) GetManifest(args []interface{}) (replyBuf, error) {
return ret, err
}
ctx := context.TODO()
rawManifest, manifestType, err := imgref.img.Manifest(ctx)
err = h.cacheTargetManifest(imgref)
if err != nil {
return ret, err
}
img := imgref.cachedimg
ctx := context.Background()
rawManifest, manifestType, err := img.Manifest(ctx)
if err != nil {
return ret, err
}
// We only support OCI and docker2schema2. We know docker2schema2 can be easily+cheaply
// converted into OCI, so consumers only need to see OCI.
switch manifestType {
@ -330,9 +397,9 @@ func (h *proxyHandler) GetManifest(args []interface{}) (replyBuf, error) {
break
// Explicitly reject e.g. docker schema 1 type with a "legacy" note
case manifest.DockerV2Schema1MediaType, manifest.DockerV2Schema1SignedMediaType:
return ret, fmt.Errorf("Unsupported legacy manifest MIME type: %s", manifestType)
return ret, fmt.Errorf("unsupported legacy manifest MIME type: %s", manifestType)
default:
return ret, fmt.Errorf("Unsupported manifest MIME type: %s", manifestType)
return ret, fmt.Errorf("unsupported manifest MIME type: %s", manifestType)
}
// We always return the original digest, as that's what clients need to do pull-by-digest
@ -347,7 +414,7 @@ func (h *proxyHandler) GetManifest(args []interface{}) (replyBuf, error) {
// docker schema and MIME types.
if manifestType != imgspecv1.MediaTypeImageManifest {
manifestUpdates := types.ManifestUpdateOptions{ManifestMIMEType: imgspecv1.MediaTypeImageManifest}
ociImage, err := imgref.img.UpdatedImage(ctx, manifestUpdates)
ociImage, err := img.UpdatedImage(ctx, manifestUpdates)
if err != nil {
return ret, err
}
@ -360,24 +427,43 @@ func (h *proxyHandler) GetManifest(args []interface{}) (replyBuf, error) {
} else {
serialized = rawManifest
}
piper, f, err := h.allocPipe()
return h.returnBytes(digest, serialized)
}
// GetConfig returns a copy of the image configuration, converted to OCI format.
func (h *proxyHandler) GetConfig(args []interface{}) (replyBuf, error) {
h.lock.Lock()
defer h.lock.Unlock()
var ret replyBuf
if h.sysctx == nil {
return ret, fmt.Errorf("client error: must invoke Initialize")
}
if len(args) != 1 {
return ret, fmt.Errorf("invalid request, expecting: [imgid]")
}
imgref, err := h.parseImageFromID(args[0])
if err != nil {
return ret, err
}
go func() {
// Signal completion when we return
defer f.wg.Done()
_, err = io.Copy(f.w, bytes.NewReader(serialized))
err = h.cacheTargetManifest(imgref)
if err != nil {
f.err = err
return ret, err
}
}()
img := imgref.cachedimg
ctx := context.TODO()
config, err := img.OCIConfig(ctx)
if err != nil {
return ret, err
}
serialized, err := json.Marshal(&config.Config)
if err != nil {
return ret, err
}
return h.returnBytes(nil, serialized)
ret.value = digest.String()
ret.fd = piper
ret.pipeid = uint32(f.w.Fd())
return ret, nil
}
// GetBlob fetches a blob, performing digest verification.
@ -388,7 +474,7 @@ func (h *proxyHandler) GetBlob(args []interface{}) (replyBuf, error) {
var ret replyBuf
if h.sysctx == nil {
return ret, fmt.Errorf("Must invoke Initialize")
return ret, fmt.Errorf("client error: must invoke Initialize")
}
if len(args) != 3 {
return ret, fmt.Errorf("found %d args, expecting (imgid, digest, size)", len(args))
@ -431,7 +517,7 @@ func (h *proxyHandler) GetBlob(args []interface{}) (replyBuf, error) {
return
}
if n != int64(size) {
f.err = fmt.Errorf("Expected %d bytes in blob, got %d", size, n)
f.err = fmt.Errorf("expected %d bytes in blob, got %d", size, n)
}
if !verifier.Verified() {
f.err = fmt.Errorf("corrupted blob, expecting %s", d.String())
@ -451,11 +537,11 @@ func (h *proxyHandler) FinishPipe(args []interface{}) (replyBuf, error) {
var ret replyBuf
pipeidf, ok := args[0].(float64)
if !ok {
return ret, fmt.Errorf("finishpipe: expecting pipeid, not %T", args[0])
pipeidv, err := parseUint64(args[0])
if err != nil {
return ret, err
}
pipeid := uint32(pipeidf)
pipeid := uint32(pipeidv)
f, ok := h.activePipes[pipeid]
if !ok {
@ -467,7 +553,7 @@ func (h *proxyHandler) FinishPipe(args []interface{}) (replyBuf, error) {
// And only now do we close the write half; this forces the client to call this API
f.w.Close()
// Propagate any errors from the goroutine worker
err := f.err
err = f.err
delete(h.activePipes, pipeid)
return ret, err
}
@ -551,6 +637,8 @@ func (h *proxyHandler) processRequest(req request) (rb replyBuf, terminate bool,
rb, err = h.CloseImage(req.Args)
case "GetManifest":
rb, err = h.GetManifest(req.Args)
case "GetConfig":
rb, err = h.GetConfig(req.Args)
case "GetBlob":
rb, err = h.GetBlob(req.Args)
case "FinishPipe":

12
integration/procutils.go Normal file
View File

@ -0,0 +1,12 @@
//go:build !linux
// +build !linux
package main
import (
"os/exec"
)
// cmdLifecycleToParentIfPossible tries to exit if the parent process exits (only works on Linux)
func cmdLifecycleToParentIfPossible(c *exec.Cmd) {
}

View File

@ -0,0 +1,14 @@
package main
import (
"os/exec"
"syscall"
)
// cmdLifecyleToParentIfPossible is a thin wrapper around prctl(PR_SET_PDEATHSIG)
// on Linux.
func cmdLifecycleToParentIfPossible(c *exec.Cmd) {
c.SysProcAttr = &syscall.SysProcAttr{
Pdeathsig: syscall.SIGTERM,
}
}

288
integration/proxy_test.go Normal file
View File

@ -0,0 +1,288 @@
package main
import (
"encoding/json"
"fmt"
"io/ioutil"
"net"
"os"
"os/exec"
"strings"
"syscall"
"time"
"gopkg.in/check.v1"
"github.com/containers/image/v5/manifest"
imgspecv1 "github.com/opencontainers/image-spec/specs-go/v1"
)
// This image is known to be x86_64 only right now
const knownNotManifestListedImage_x8664 = "docker://quay.io/coreos/11bot"
const expectedProxySemverMajor = "0.2"
// request is copied from proxy.go
// We intentionally copy to ensure that we catch any unexpected "API" changes
// in the JSON.
type request struct {
// Method is the name of the function
Method string `json:"method"`
// Args is the arguments (parsed inside the fuction)
Args []interface{} `json:"args"`
}
// reply is copied from proxy.go
type reply struct {
// Success is true if and only if the call succeeded.
Success bool `json:"success"`
// Value is an arbitrary value (or values, as array/map) returned from the call.
Value interface{} `json:"value"`
// PipeID is an index into open pipes, and should be passed to FinishPipe
PipeID uint32 `json:"pipeid"`
// Error should be non-empty if Success == false
Error string `json:"error"`
}
// maxMsgSize is also copied from proxy.go
const maxMsgSize = 32 * 1024
type proxy struct {
c *net.UnixConn
}
type pipefd struct {
// id is the remote identifier "pipeid"
id uint
fd *os.File
}
func (self *proxy) call(method string, args []interface{}) (rval interface{}, fd *pipefd, err error) {
req := request{
Method: method,
Args: args,
}
reqbuf, err := json.Marshal(&req)
if err != nil {
return
}
n, err := self.c.Write(reqbuf)
if err != nil {
return
}
if n != len(reqbuf) {
err = fmt.Errorf("short write during call of %d bytes", n)
return
}
oob := make([]byte, syscall.CmsgSpace(1))
replybuf := make([]byte, maxMsgSize)
n, oobn, _, _, err := self.c.ReadMsgUnix(replybuf, oob)
if err != nil {
err = fmt.Errorf("reading reply: %v", err)
return
}
var reply reply
err = json.Unmarshal(replybuf[0:n], &reply)
if err != nil {
err = fmt.Errorf("Failed to parse reply: %w", err)
return
}
if !reply.Success {
err = fmt.Errorf("remote error: %s", reply.Error)
return
}
if reply.PipeID > 0 {
var scms []syscall.SocketControlMessage
scms, err = syscall.ParseSocketControlMessage(oob[:oobn])
if err != nil {
err = fmt.Errorf("failed to parse control message: %v", err)
return
}
if len(scms) != 1 {
err = fmt.Errorf("Expected 1 received fd, found %d", len(scms))
return
}
var fds []int
fds, err = syscall.ParseUnixRights(&scms[0])
if err != nil {
err = fmt.Errorf("failed to parse unix rights: %v", err)
return
}
fd = &pipefd{
fd: os.NewFile(uintptr(fds[0]), "replyfd"),
id: uint(reply.PipeID),
}
}
rval = reply.Value
return
}
func (self *proxy) callNoFd(method string, args []interface{}) (rval interface{}, err error) {
var fd *pipefd
rval, fd, err = self.call(method, args)
if err != nil {
return
}
if fd != nil {
err = fmt.Errorf("Unexpected fd from method %s", method)
return
}
return rval, nil
}
func (self *proxy) callReadAllBytes(method string, args []interface{}) (rval interface{}, buf []byte, err error) {
var fd *pipefd
rval, fd, err = self.call(method, args)
if err != nil {
return
}
if fd == nil {
err = fmt.Errorf("Expected fd from method %s", method)
return
}
fetchchan := make(chan byteFetch)
go func() {
manifestBytes, err := ioutil.ReadAll(fd.fd)
fetchchan <- byteFetch{
content: manifestBytes,
err: err,
}
}()
_, err = self.callNoFd("FinishPipe", []interface{}{fd.id})
if err != nil {
return
}
select {
case fetchRes := <-fetchchan:
err = fetchRes.err
if err != nil {
return
}
buf = fetchRes.content
case <-time.After(5 * time.Minute):
err = fmt.Errorf("timed out during proxy fetch")
}
return
}
func newProxy() (*proxy, error) {
fds, err := syscall.Socketpair(syscall.AF_LOCAL, syscall.SOCK_SEQPACKET, 0)
if err != nil {
return nil, err
}
myfd := os.NewFile(uintptr(fds[0]), "myfd")
defer myfd.Close()
theirfd := os.NewFile(uintptr(fds[1]), "theirfd")
defer theirfd.Close()
mysock, err := net.FileConn(myfd)
if err != nil {
return nil, err
}
// Note ExtraFiles starts at 3
proc := exec.Command("skopeo", "experimental-image-proxy", "--sockfd", "3")
proc.Stderr = os.Stderr
cmdLifecycleToParentIfPossible(proc)
proc.ExtraFiles = append(proc.ExtraFiles, theirfd)
if err = proc.Start(); err != nil {
return nil, err
}
p := &proxy{
c: mysock.(*net.UnixConn),
}
v, err := p.callNoFd("Initialize", nil)
if err != nil {
return nil, err
}
semver, ok := v.(string)
if !ok {
return nil, fmt.Errorf("proxy Initialize: Unexpected value %T", v)
}
if !strings.HasPrefix(semver, expectedProxySemverMajor) {
return nil, fmt.Errorf("Unexpected semver %s", semver)
}
return p, nil
}
func init() {
check.Suite(&ProxySuite{})
}
type ProxySuite struct {
}
func (s *ProxySuite) SetUpSuite(c *check.C) {
}
func (s *ProxySuite) TearDownSuite(c *check.C) {
}
type byteFetch struct {
content []byte
err error
}
func runTestGetManifestAndConfig(p *proxy, img string) error {
v, err := p.callNoFd("OpenImage", []interface{}{knownNotManifestListedImage_x8664})
if err != nil {
return err
}
imgidv, ok := v.(float64)
if !ok {
return fmt.Errorf("OpenImage return value is %T", v)
}
imgid := uint32(imgidv)
v, manifestBytes, err := p.callReadAllBytes("GetManifest", []interface{}{imgid})
if err != nil {
return err
}
_, err = manifest.OCI1FromManifest(manifestBytes)
if err != nil {
return err
}
v, configBytes, err := p.callReadAllBytes("GetConfig", []interface{}{imgid})
if err != nil {
return err
}
var config imgspecv1.ImageConfig
err = json.Unmarshal(configBytes, &config)
if err != nil {
return err
}
// Validate that the config seems sane
if len(config.Cmd) == 0 && len(config.Entrypoint) == 0 {
return fmt.Errorf("No CMD or ENTRYPOINT set")
}
_, err = p.callNoFd("CloseImage", []interface{}{imgid})
return nil
}
func (s *ProxySuite) TestProxy(c *check.C) {
p, err := newProxy()
c.Assert(err, check.IsNil)
err = runTestGetManifestAndConfig(p, knownNotManifestListedImage_x8664)
if err != nil {
err = fmt.Errorf("Testing image %s: %v", knownNotManifestListedImage_x8664, err)
}
c.Assert(err, check.IsNil)
err = runTestGetManifestAndConfig(p, knownListImage)
if err != nil {
err = fmt.Errorf("Testing image %s: %v", knownListImage, err)
}
c.Assert(err, check.IsNil)
}