proxy: Add GetRawBlob

The original model the idea here is the proxy centralizes
verification of things like digest. However in practice,
this causes reading to be seriously awkward; ref
https://github.com/containers/containers-image-proxy-rs/issues/79
(Basically `FinishPipe` blocks the metadata channel)

Also, I have a project to implement a registry frontend to
`containers-storage:` and a core problem with `GetBlob` right
now is it *requires* the blob size up front even though the
underlying Go logic doesn't.

Moving to a "raw" interface solves that too. In this new
raw API, we return two file descriptors, one for the data
and one for the error channel, which contains a JSON
serialization of an error.

For the error type we reuse the existing "is error retryable"
and expose that back to the client.

We also (backwards compatibly) add this new error code
for the existing APIs.

Signed-off-by: Colin Walters <walters@verbum.org>
This commit is contained in:
Colin Walters
2025-05-10 15:46:38 -04:00
parent a477063650
commit 983e77d85f
2 changed files with 302 additions and 18 deletions

View File

@@ -70,6 +70,7 @@ import (
"sync"
"syscall"
"github.com/containers/common/pkg/retry"
"github.com/containers/image/v5/image"
"github.com/containers/image/v5/manifest"
"github.com/containers/image/v5/pkg/blobinfocache"
@@ -93,7 +94,8 @@ import (
// 0.2.5: Added LayerInfoJSON
// 0.2.6: Policy Verification before pulling OCI
// 0.2.7: Added GetLayerInfoPiped
const protocolVersion = "0.2.7"
// 0.2.8: Added GetRawBlob and reply.error_code
const protocolVersion = "0.2.8"
// maxMsgSize is the current limit on a packet size.
// Note that all non-metadata (i.e. payload data) is sent over a pipe.
@@ -116,6 +118,23 @@ type request struct {
Args []any `json:"args"`
}
type proxyErrorCode string
const (
// proxyErrPipe means we got EPIPE writing to a pipe owned by the client
proxyErrPipe proxyErrorCode = "EPIPE"
// proxyErrRetryable can be used by clients to automatically retry operations
proxyErrRetryable proxyErrorCode = "retryable"
// All other errors
proxyErrOther proxyErrorCode = "other"
)
// proxyError is serialized over the errfd channel for GetRawBlob
type proxyError struct {
Code proxyErrorCode `json:"code"`
Message string `json:"message"`
}
// reply is serialized to JSON as the return value from a function call.
type reply struct {
// Success is true if and only if the call succeeded.
@@ -124,6 +143,8 @@ type reply struct {
Value any `json:"value"`
// PipeID is an index into open pipes, and should be passed to FinishPipe
PipeID uint32 `json:"pipeid"`
// ErrorCode will be non-empty if error is set (new in 0.2.8)
ErrorCode proxyErrorCode `json:"error_code"`
// Error should be non-empty if Success == false
Error string `json:"error"`
}
@@ -132,8 +153,11 @@ type reply struct {
type replyBuf struct {
// value will be converted to a reply Value
value any
// fd is the read half of a pipe, passed back to the client
// fd is the read half of a pipe, passed back to the client for additional data
fd *os.File
// errfd will be a serialization of error state. This is optional and is currently
// only used by GetRawBlob.
errfd *os.File
// pipeid will be provided to the client as PipeID, an index into our open pipes
pipeid uint32
}
@@ -182,6 +206,30 @@ type convertedLayerInfo struct {
MediaType string `json:"media_type"`
}
// mapProxyErrorCode turns an error into a known string value.
func mapProxyErrorCode(err error) proxyErrorCode {
switch {
case err == nil:
return ""
case errors.Is(err, syscall.EPIPE):
return proxyErrPipe
case retry.IsErrorRetryable(err):
return proxyErrRetryable
default:
return proxyErrOther
}
}
// newProxyError creates a serializable structure for
// the client containing a mapped error code based
// on the error type, plus its value as a string.
func newProxyError(err error) proxyError {
return proxyError{
Code: mapProxyErrorCode(err),
Message: fmt.Sprintf("%v", err),
}
}
// Initialize performs one-time initialization, and returns the protocol version
func (h *proxyHandler) Initialize(args []any) (replyBuf, error) {
h.lock.Lock()
@@ -617,6 +665,91 @@ func (h *proxyHandler) GetBlob(args []any) (replyBuf, error) {
return ret, nil
}
// GetRawBlob can be viewed as a more general purpose successor
// to GetBlob. First, it does not verify the digest, which in
// some cases is unnecessary as the client would prefer to do it.
//
// It also does not use the "FinishPipe" API call, but instead
// returns *two* file descriptors, one for errors and one for data.
//
// On (initial) success, the return value provided to the client is the size of the blob.
func (h *proxyHandler) GetRawBlob(args []any) (replyBuf, error) {
h.lock.Lock()
defer h.lock.Unlock()
var ret replyBuf
if h.sysctx == nil {
return ret, fmt.Errorf("client error: must invoke Initialize")
}
if len(args) != 2 {
return ret, fmt.Errorf("found %d args, expecting (imgid, digest)", len(args))
}
imgref, err := h.parseImageFromID(args[0])
if err != nil {
return ret, err
}
digestStr, ok := args[1].(string)
if !ok {
return ret, fmt.Errorf("expecting string blobid")
}
ctx := context.TODO()
d, err := digest.Parse(digestStr)
if err != nil {
return ret, err
}
blobr, blobSize, err := imgref.src.GetBlob(ctx, types.BlobInfo{Digest: d, Size: int64(-1)}, h.cache)
if err != nil {
return ret, err
}
// Note this doesn't call allocPipe; we're not using the FinishPipe infrastructure.
piper, pipew, err := os.Pipe()
if err != nil {
blobr.Close()
return ret, err
}
errpipeR, errpipeW, err := os.Pipe()
if err != nil {
piper.Close()
pipew.Close()
blobr.Close()
return ret, err
}
// Asynchronous worker doing a copy
go func() {
// We own the read from registry, and write pipe objects
defer blobr.Close()
defer pipew.Close()
defer errpipeW.Close()
logrus.Debugf("Copying blob to client: %d bytes", blobSize)
_, err := io.Copy(pipew, blobr)
// Handle errors here by serializing a JSON error back over
// the error channel. In either case, both file descriptors
// will be closed, signaling the completion of the operation.
if err != nil {
logrus.Debugf("Sending error to client: %v", err)
serializedErr := newProxyError(err)
buf, err := json.Marshal(serializedErr)
if err != nil {
// Should never happen
panic(err)
}
_, writeErr := errpipeW.Write(buf)
if writeErr != nil && !errors.Is(err, syscall.EPIPE) {
logrus.Debugf("Writing to client: %v", err)
}
}
logrus.Debugf("Completed GetRawBlob operation")
}()
ret.value = blobSize
ret.fd = piper
ret.errfd = errpipeR
return ret, nil
}
// GetLayerInfo returns data about the layers of an image, useful for reading the layer contents.
//
// This is the same as GetLayerInfoPiped, but returns its contents inline. This is subject to
@@ -763,30 +896,37 @@ func (h *proxyHandler) close() {
// send writes a reply buffer to the socket
func (buf replyBuf) send(conn *net.UnixConn, err error) error {
logrus.Debugf("Sending reply: err=%v value=%v pipeid=%v", err, buf.value, buf.pipeid)
logrus.Debugf("Sending reply: err=%v value=%v pipeid=%v datafd=%v errfd=%v", err, buf.value, buf.pipeid, buf.fd, buf.errfd)
replyToSerialize := reply{
Success: err == nil,
Value: buf.value,
PipeID: buf.pipeid,
}
if err != nil {
replyToSerialize.ErrorCode = mapProxyErrorCode(err)
replyToSerialize.Error = err.Error()
}
serializedReply, err := json.Marshal(&replyToSerialize)
if err != nil {
return err
}
// We took ownership of the FD - close it when we're done.
// We took ownership of these FDs, so close when we're done sending them or on error
defer func() {
if buf.fd != nil {
buf.fd.Close()
}
if buf.errfd != nil {
buf.errfd.Close()
}
}()
// Copy the FD number to the socket ancillary buffer
// Copy the FD number(s) to the socket ancillary buffer
fds := make([]int, 0)
if buf.fd != nil {
fds = append(fds, int(buf.fd.Fd()))
}
if buf.errfd != nil {
fds = append(fds, int(buf.errfd.Fd()))
}
oob := syscall.UnixRights(fds...)
n, oobn, err := conn.WriteMsgUnix(serializedReply, oob, nil)
if err != nil {
@@ -858,6 +998,8 @@ func (h *proxyHandler) processRequest(readBytes []byte) (rb replyBuf, terminate
rb, err = h.GetFullConfig(req.Args)
case "GetBlob":
rb, err = h.GetBlob(req.Args)
case "GetRawBlob":
rb, err = h.GetRawBlob(req.Args)
case "GetLayerInfo":
rb, err = h.GetLayerInfo(req.Args)
case "GetLayerInfoPiped":

View File

@@ -10,6 +10,7 @@ import (
"os"
"os/exec"
"strings"
"sync"
"syscall"
"testing"
"time"
@@ -61,7 +62,8 @@ type proxy struct {
type pipefd struct {
// id is the remote identifier "pipeid"
id uint
fd *os.File
datafd *os.File
errfd *os.File
}
func (p *proxy) call(method string, args []any) (rval any, fd *pipefd, err error) {
@@ -99,26 +101,41 @@ func (p *proxy) call(method string, args []any) (rval any, fd *pipefd, err error
return
}
if reply.PipeID > 0 {
var scms []syscall.SocketControlMessage
scms, err = syscall.ParseSocketControlMessage(oob[:oobn])
if err != nil {
err = fmt.Errorf("failed to parse control message: %w", err)
return
}
if reply.PipeID > 0 {
if len(scms) != 1 {
err = fmt.Errorf("Expected 1 received fd, found %d", len(scms))
err = fmt.Errorf("Expected 1 socket control message, found %d", len(scms))
return
}
}
if len(scms) > 2 {
err = fmt.Errorf("Expected 1 or 2 socket control message, found %d", len(scms))
return
}
if len(scms) != 0 {
var fds []int
fds, err = syscall.ParseUnixRights(&scms[0])
if err != nil {
err = fmt.Errorf("failed to parse unix rights: %w", err)
return
}
if len(fds) < 1 || len(fds) > 2 {
err = fmt.Errorf("expected 1 or 2 fds, found %d", len(fds))
return
}
var errfd *os.File
if len(fds) == 2 {
errfd = os.NewFile(uintptr(fds[1]), "errfd")
}
fd = &pipefd{
fd: os.NewFile(uintptr(fds[0]), "replyfd"),
datafd: os.NewFile(uintptr(fds[0]), "replyfd"),
id: uint(reply.PipeID),
errfd: errfd,
}
}
@@ -151,7 +168,7 @@ func (p *proxy) callReadAllBytes(method string, args []any) (rval any, buf []byt
}
fetchchan := make(chan byteFetch)
go func() {
manifestBytes, err := io.ReadAll(fd.fd)
manifestBytes, err := io.ReadAll(fd.datafd)
fetchchan <- byteFetch{
content: manifestBytes,
err: err,
@@ -175,6 +192,80 @@ func (p *proxy) callReadAllBytes(method string, args []any) (rval any, buf []byt
return
}
type proxyError struct {
Code string `json:"code"`
Message string `json:"message"`
}
func (p *proxy) callGetRawBlob(args []any) (rval any, buf []byte, err error) {
var fd *pipefd
rval, fd, err = p.call("GetRawBlob", args)
if err != nil {
return
}
if fd == nil {
err = fmt.Errorf("Expected fds from method GetRawBlob")
return
}
if fd.errfd == nil {
err = fmt.Errorf("Expected errfd from method GetRawBlob")
return
}
var wg sync.WaitGroup
fetchchan := make(chan byteFetch, 1)
errchan := make(chan proxyError, 1)
wg.Add(1)
go func() {
defer wg.Done()
defer close(fetchchan)
defer fd.datafd.Close()
buf, err := io.ReadAll(fd.datafd)
fetchchan <- byteFetch{
content: buf,
err: err,
}
}()
wg.Add(1)
go func() {
defer wg.Done()
defer fd.errfd.Close()
defer close(errchan)
buf, err := io.ReadAll(fd.errfd)
var proxyErr proxyError
if err != nil {
proxyErr.Code = "read-from-proxy"
proxyErr.Message = err.Error()
errchan <- proxyErr
return
}
// No error, leave code+message unset
if len(buf) == 0 {
return
}
unmarshalErr := json.Unmarshal(buf, &proxyErr)
// Shouldn't happen
if unmarshalErr != nil {
panic(unmarshalErr)
}
errchan <- proxyErr
}()
wg.Wait()
errMsg := <-errchan
if errMsg.Code != "" {
return nil, nil, fmt.Errorf("(%s) %s", errMsg.Code, errMsg.Message)
}
fetchRes := <-fetchchan
err = fetchRes.err
if err != nil {
return
}
buf = fetchRes.content
return
}
func newProxy() (*proxy, error) {
fds, err := syscall.Socketpair(syscall.AF_LOCAL, syscall.SOCK_SEQPACKET, 0)
if err != nil {
@@ -348,7 +439,46 @@ func runTestOpenImageOptionalNotFound(p *proxy, img string) error {
return nil
}
func (s *proxySuite) TestProxy() {
func runTestGetBlob(p *proxy, img string) error {
imgid, err := p.callNoFd("OpenImage", []any{img})
if err != nil {
return err
}
_, manifestBytes, err := p.callReadAllBytes("GetManifest", []any{imgid})
if err != nil {
return err
}
mfest, err := manifest.OCI1FromManifest(manifestBytes)
if err != nil {
return err
}
for _, layer := range mfest.Layers {
_, blobBytes, err := p.callGetRawBlob([]any{imgid, layer.Digest})
if err != nil {
return err
}
if len(blobBytes) != int(layer.Size) {
panic(fmt.Sprintf("Expected %d bytes, got %d", layer.Size, len(blobBytes)))
}
}
// echo "not a valid layer" | sha256sum
invalidDigest := "sha256:21a9aab5a3494674d2b4d8e7381c236a799384dd10545531014606cf652c119f"
_, blobBytes, err := p.callGetRawBlob([]any{imgid, invalidDigest})
if err == nil {
panic("Expected error fetching invalid blob")
}
if blobBytes != nil {
panic("Expected no bytes fetching invalid blob")
}
return nil
}
func (s *proxySuite) TestProxyMetadata() {
t := s.T()
p, err := newProxy()
require.NoError(t, err)
@@ -371,3 +501,15 @@ func (s *proxySuite) TestProxy() {
}
assert.NoError(t, err)
}
func (s *proxySuite) TestProxyGetBlob() {
t := s.T()
p, err := newProxy()
require.NoError(t, err)
err = runTestGetBlob(p, knownListImage)
if err != nil {
err = fmt.Errorf("Testing GetBLob for %s: %v", knownListImage, err)
}
assert.NoError(t, err)
}