Merge pull request #2601 from cgwalters/get-raw-blob

proxy: Add GetRawBlob
This commit is contained in:
Miloslav Trmač
2025-05-16 20:53:08 +02:00
committed by GitHub
2 changed files with 307 additions and 23 deletions

View File

@@ -70,6 +70,7 @@ import (
"sync"
"syscall"
"github.com/containers/common/pkg/retry"
"github.com/containers/image/v5/image"
"github.com/containers/image/v5/manifest"
"github.com/containers/image/v5/pkg/blobinfocache"
@@ -93,7 +94,8 @@ import (
// 0.2.5: Added LayerInfoJSON
// 0.2.6: Policy Verification before pulling OCI
// 0.2.7: Added GetLayerInfoPiped
const protocolVersion = "0.2.7"
// 0.2.8: Added GetRawBlob and reply.error_code
const protocolVersion = "0.2.8"
// maxMsgSize is the current limit on a packet size.
// Note that all non-metadata (i.e. payload data) is sent over a pipe.
@@ -116,6 +118,23 @@ type request struct {
Args []any `json:"args"`
}
type proxyErrorCode string
const (
// proxyErrPipe means we got EPIPE writing to a pipe owned by the client
proxyErrPipe proxyErrorCode = "EPIPE"
// proxyErrRetryable can be used by clients to automatically retry operations
proxyErrRetryable proxyErrorCode = "retryable"
// All other errors
proxyErrOther proxyErrorCode = "other"
)
// proxyError is serialized over the errfd channel for GetRawBlob
type proxyError struct {
Code proxyErrorCode `json:"code"`
Message string `json:"message"`
}
// reply is serialized to JSON as the return value from a function call.
type reply struct {
// Success is true if and only if the call succeeded.
@@ -124,6 +143,8 @@ type reply struct {
Value any `json:"value"`
// PipeID is an index into open pipes, and should be passed to FinishPipe
PipeID uint32 `json:"pipeid"`
// ErrorCode will be non-empty if error is set (new in 0.2.8)
ErrorCode proxyErrorCode `json:"error_code"`
// Error should be non-empty if Success == false
Error string `json:"error"`
}
@@ -132,8 +153,11 @@ type reply struct {
type replyBuf struct {
// value will be converted to a reply Value
value any
// fd is the read half of a pipe, passed back to the client
// fd is the read half of a pipe, passed back to the client for additional data
fd *os.File
// errfd will be a serialization of error state. This is optional and is currently
// only used by GetRawBlob.
errfd *os.File
// pipeid will be provided to the client as PipeID, an index into our open pipes
pipeid uint32
}
@@ -182,6 +206,30 @@ type convertedLayerInfo struct {
MediaType string `json:"media_type"`
}
// mapProxyErrorCode turns an error into a known string value.
func mapProxyErrorCode(err error) proxyErrorCode {
switch {
case err == nil:
return ""
case errors.Is(err, syscall.EPIPE):
return proxyErrPipe
case retry.IsErrorRetryable(err):
return proxyErrRetryable
default:
return proxyErrOther
}
}
// newProxyError creates a serializable structure for
// the client containing a mapped error code based
// on the error type, plus its value as a string.
func newProxyError(err error) proxyError {
return proxyError{
Code: mapProxyErrorCode(err),
Message: fmt.Sprintf("%v", err),
}
}
// Initialize performs one-time initialization, and returns the protocol version
func (h *proxyHandler) Initialize(args []any) (replyBuf, error) {
h.lock.Lock()
@@ -617,6 +665,91 @@ func (h *proxyHandler) GetBlob(args []any) (replyBuf, error) {
return ret, nil
}
// GetRawBlob can be viewed as a more general purpose successor
// to GetBlob. First, it does not verify the digest, which in
// some cases is unnecessary as the client would prefer to do it.
//
// It also does not use the "FinishPipe" API call, but instead
// returns *two* file descriptors, one for errors and one for data.
//
// On (initial) success, the return value provided to the client is the size of the blob.
func (h *proxyHandler) GetRawBlob(args []any) (replyBuf, error) {
h.lock.Lock()
defer h.lock.Unlock()
var ret replyBuf
if h.sysctx == nil {
return ret, fmt.Errorf("client error: must invoke Initialize")
}
if len(args) != 2 {
return ret, fmt.Errorf("found %d args, expecting (imgid, digest)", len(args))
}
imgref, err := h.parseImageFromID(args[0])
if err != nil {
return ret, err
}
digestStr, ok := args[1].(string)
if !ok {
return ret, fmt.Errorf("expecting string blobid")
}
ctx := context.TODO()
d, err := digest.Parse(digestStr)
if err != nil {
return ret, err
}
blobr, blobSize, err := imgref.src.GetBlob(ctx, types.BlobInfo{Digest: d, Size: int64(-1)}, h.cache)
if err != nil {
return ret, err
}
// Note this doesn't call allocPipe; we're not using the FinishPipe infrastructure.
piper, pipew, err := os.Pipe()
if err != nil {
blobr.Close()
return ret, err
}
errpipeR, errpipeW, err := os.Pipe()
if err != nil {
piper.Close()
pipew.Close()
blobr.Close()
return ret, err
}
// Asynchronous worker doing a copy
go func() {
// We own the read from registry, and write pipe objects
defer blobr.Close()
defer pipew.Close()
defer errpipeW.Close()
logrus.Debugf("Copying blob to client: %d bytes", blobSize)
_, err := io.Copy(pipew, blobr)
// Handle errors here by serializing a JSON error back over
// the error channel. In either case, both file descriptors
// will be closed, signaling the completion of the operation.
if err != nil {
logrus.Debugf("Sending error to client: %v", err)
serializedErr := newProxyError(err)
buf, err := json.Marshal(serializedErr)
if err != nil {
// Should never happen
panic(err)
}
_, writeErr := errpipeW.Write(buf)
if writeErr != nil && !errors.Is(err, syscall.EPIPE) {
logrus.Debugf("Writing to client: %v", err)
}
}
logrus.Debugf("Completed GetRawBlob operation")
}()
ret.value = blobSize
ret.fd = piper
ret.errfd = errpipeR
return ret, nil
}
// GetLayerInfo returns data about the layers of an image, useful for reading the layer contents.
//
// This is the same as GetLayerInfoPiped, but returns its contents inline. This is subject to
@@ -763,30 +896,37 @@ func (h *proxyHandler) close() {
// send writes a reply buffer to the socket
func (buf replyBuf) send(conn *net.UnixConn, err error) error {
logrus.Debugf("Sending reply: err=%v value=%v pipeid=%v", err, buf.value, buf.pipeid)
logrus.Debugf("Sending reply: err=%v value=%v pipeid=%v datafd=%v errfd=%v", err, buf.value, buf.pipeid, buf.fd, buf.errfd)
// We took ownership of these FDs, so close when we're done sending them or on error
defer func() {
if buf.fd != nil {
buf.fd.Close()
}
if buf.errfd != nil {
buf.errfd.Close()
}
}()
replyToSerialize := reply{
Success: err == nil,
Value: buf.value,
PipeID: buf.pipeid,
}
if err != nil {
replyToSerialize.ErrorCode = mapProxyErrorCode(err)
replyToSerialize.Error = err.Error()
}
serializedReply, err := json.Marshal(&replyToSerialize)
if err != nil {
return err
}
// We took ownership of the FD - close it when we're done.
defer func() {
if buf.fd != nil {
buf.fd.Close()
}
}()
// Copy the FD number to the socket ancillary buffer
// Copy the FD number(s) to the socket ancillary buffer
fds := make([]int, 0)
if buf.fd != nil {
fds = append(fds, int(buf.fd.Fd()))
}
if buf.errfd != nil {
fds = append(fds, int(buf.errfd.Fd()))
}
oob := syscall.UnixRights(fds...)
n, oobn, err := conn.WriteMsgUnix(serializedReply, oob, nil)
if err != nil {
@@ -858,6 +998,8 @@ func (h *proxyHandler) processRequest(readBytes []byte) (rb replyBuf, terminate
rb, err = h.GetFullConfig(req.Args)
case "GetBlob":
rb, err = h.GetBlob(req.Args)
case "GetRawBlob":
rb, err = h.GetRawBlob(req.Args)
case "GetLayerInfo":
rb, err = h.GetLayerInfo(req.Args)
case "GetLayerInfoPiped":

View File

@@ -10,6 +10,7 @@ import (
"os"
"os/exec"
"strings"
"sync"
"syscall"
"testing"
"time"
@@ -60,8 +61,9 @@ type proxy struct {
type pipefd struct {
// id is the remote identifier "pipeid"
id uint
fd *os.File
id uint
datafd *os.File
errfd *os.File
}
func (p *proxy) call(method string, args []any) (rval any, fd *pipefd, err error) {
@@ -99,26 +101,41 @@ func (p *proxy) call(method string, args []any) (rval any, fd *pipefd, err error
return
}
var scms []syscall.SocketControlMessage
scms, err = syscall.ParseSocketControlMessage(oob[:oobn])
if err != nil {
err = fmt.Errorf("failed to parse control message: %w", err)
return
}
if reply.PipeID > 0 {
var scms []syscall.SocketControlMessage
scms, err = syscall.ParseSocketControlMessage(oob[:oobn])
if err != nil {
err = fmt.Errorf("failed to parse control message: %w", err)
return
}
if len(scms) != 1 {
err = fmt.Errorf("Expected 1 received fd, found %d", len(scms))
err = fmt.Errorf("Expected 1 socket control message, found %d", len(scms))
return
}
}
if len(scms) > 2 {
err = fmt.Errorf("Expected 1 or 2 socket control message, found %d", len(scms))
return
}
if len(scms) != 0 {
var fds []int
fds, err = syscall.ParseUnixRights(&scms[0])
if err != nil {
err = fmt.Errorf("failed to parse unix rights: %w", err)
return
}
if len(fds) < 1 || len(fds) > 2 {
err = fmt.Errorf("expected 1 or 2 fds, found %d", len(fds))
return
}
var errfd *os.File
if len(fds) == 2 {
errfd = os.NewFile(uintptr(fds[1]), "errfd")
}
fd = &pipefd{
fd: os.NewFile(uintptr(fds[0]), "replyfd"),
id: uint(reply.PipeID),
datafd: os.NewFile(uintptr(fds[0]), "replyfd"),
id: uint(reply.PipeID),
errfd: errfd,
}
}
@@ -151,7 +168,7 @@ func (p *proxy) callReadAllBytes(method string, args []any) (rval any, buf []byt
}
fetchchan := make(chan byteFetch)
go func() {
manifestBytes, err := io.ReadAll(fd.fd)
manifestBytes, err := io.ReadAll(fd.datafd)
fetchchan <- byteFetch{
content: manifestBytes,
err: err,
@@ -175,6 +192,80 @@ func (p *proxy) callReadAllBytes(method string, args []any) (rval any, buf []byt
return
}
type proxyError struct {
Code string `json:"code"`
Message string `json:"message"`
}
func (p *proxy) callGetRawBlob(args []any) (rval any, buf []byte, err error) {
var fd *pipefd
rval, fd, err = p.call("GetRawBlob", args)
if err != nil {
return
}
if fd == nil {
err = fmt.Errorf("Expected fds from method GetRawBlob")
return
}
if fd.errfd == nil {
err = fmt.Errorf("Expected errfd from method GetRawBlob")
return
}
var wg sync.WaitGroup
fetchchan := make(chan byteFetch, 1)
errchan := make(chan proxyError, 1)
wg.Add(1)
go func() {
defer wg.Done()
defer close(fetchchan)
defer fd.datafd.Close()
buf, err := io.ReadAll(fd.datafd)
fetchchan <- byteFetch{
content: buf,
err: err,
}
}()
wg.Add(1)
go func() {
defer wg.Done()
defer fd.errfd.Close()
defer close(errchan)
buf, err := io.ReadAll(fd.errfd)
var proxyErr proxyError
if err != nil {
proxyErr.Code = "read-from-proxy"
proxyErr.Message = err.Error()
errchan <- proxyErr
return
}
// No error, leave code+message unset
if len(buf) == 0 {
return
}
unmarshalErr := json.Unmarshal(buf, &proxyErr)
// Shouldn't happen
if unmarshalErr != nil {
panic(unmarshalErr)
}
errchan <- proxyErr
}()
wg.Wait()
errMsg := <-errchan
if errMsg.Code != "" {
return nil, nil, fmt.Errorf("(%s) %s", errMsg.Code, errMsg.Message)
}
fetchRes := <-fetchchan
err = fetchRes.err
if err != nil {
return
}
buf = fetchRes.content
return
}
func newProxy() (*proxy, error) {
fds, err := syscall.Socketpair(syscall.AF_LOCAL, syscall.SOCK_SEQPACKET, 0)
if err != nil {
@@ -348,7 +439,46 @@ func runTestOpenImageOptionalNotFound(p *proxy, img string) error {
return nil
}
func (s *proxySuite) TestProxy() {
func runTestGetBlob(p *proxy, img string) error {
imgid, err := p.callNoFd("OpenImage", []any{img})
if err != nil {
return err
}
_, manifestBytes, err := p.callReadAllBytes("GetManifest", []any{imgid})
if err != nil {
return err
}
mfest, err := manifest.OCI1FromManifest(manifestBytes)
if err != nil {
return err
}
for _, layer := range mfest.Layers {
_, blobBytes, err := p.callGetRawBlob([]any{imgid, layer.Digest})
if err != nil {
return err
}
if len(blobBytes) != int(layer.Size) {
panic(fmt.Sprintf("Expected %d bytes, got %d", layer.Size, len(blobBytes)))
}
}
// echo "not a valid layer" | sha256sum
invalidDigest := "sha256:21a9aab5a3494674d2b4d8e7381c236a799384dd10545531014606cf652c119f"
_, blobBytes, err := p.callGetRawBlob([]any{imgid, invalidDigest})
if err == nil {
panic("Expected error fetching invalid blob")
}
if blobBytes != nil {
panic("Expected no bytes fetching invalid blob")
}
return nil
}
func (s *proxySuite) TestProxyMetadata() {
t := s.T()
p, err := newProxy()
require.NoError(t, err)
@@ -371,3 +501,15 @@ func (s *proxySuite) TestProxy() {
}
assert.NoError(t, err)
}
func (s *proxySuite) TestProxyGetBlob() {
t := s.T()
p, err := newProxy()
require.NoError(t, err)
err = runTestGetBlob(p, knownListImage)
if err != nil {
err = fmt.Errorf("Testing GetBLob for %s: %v", knownListImage, err)
}
assert.NoError(t, err)
}