mirror of
				https://github.com/distribution/distribution.git
				synced 2025-10-25 05:39:33 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			381 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			381 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| package storage
 | |
| 
 | |
| import (
 | |
| 	"errors"
 | |
| 	"fmt"
 | |
| 	"io"
 | |
| 	"path"
 | |
| 	"time"
 | |
| 
 | |
| 	"github.com/Sirupsen/logrus"
 | |
| 	"github.com/docker/distribution"
 | |
| 	"github.com/docker/distribution/context"
 | |
| 	"github.com/docker/distribution/digest"
 | |
| 	storagedriver "github.com/docker/distribution/registry/storage/driver"
 | |
| )
 | |
| 
 | |
| var (
 | |
| 	errResumableDigestNotAvailable = errors.New("resumable digest not available")
 | |
| )
 | |
| 
 | |
| // layerWriter is used to control the various aspects of resumable
 | |
| // layer upload. It implements the LayerUpload interface.
 | |
| type blobWriter struct {
 | |
| 	blobStore *linkedBlobStore
 | |
| 
 | |
| 	id        string
 | |
| 	startedAt time.Time
 | |
| 	digester  digest.Digester
 | |
| 	written   int64 // track the contiguous write
 | |
| 
 | |
| 	// implementes io.WriteSeeker, io.ReaderFrom and io.Closer to satisfy
 | |
| 	// LayerUpload Interface
 | |
| 	fileWriter
 | |
| 
 | |
| 	resumableDigestEnabled bool
 | |
| }
 | |
| 
 | |
| var _ distribution.BlobWriter = &blobWriter{}
 | |
| 
 | |
| // ID returns the identifier for this upload.
 | |
| func (bw *blobWriter) ID() string {
 | |
| 	return bw.id
 | |
| }
 | |
| 
 | |
| func (bw *blobWriter) StartedAt() time.Time {
 | |
| 	return bw.startedAt
 | |
| }
 | |
| 
 | |
| // Commit marks the upload as completed, returning a valid descriptor. The
 | |
| // final size and digest are checked against the first descriptor provided.
 | |
| func (bw *blobWriter) Commit(ctx context.Context, desc distribution.Descriptor) (distribution.Descriptor, error) {
 | |
| 	context.GetLogger(ctx).Debug("(*blobWriter).Commit")
 | |
| 
 | |
| 	if err := bw.fileWriter.Close(); err != nil {
 | |
| 		return distribution.Descriptor{}, err
 | |
| 	}
 | |
| 
 | |
| 	canonical, err := bw.validateBlob(ctx, desc)
 | |
| 	if err != nil {
 | |
| 		return distribution.Descriptor{}, err
 | |
| 	}
 | |
| 
 | |
| 	if err := bw.moveBlob(ctx, canonical); err != nil {
 | |
| 		return distribution.Descriptor{}, err
 | |
| 	}
 | |
| 
 | |
| 	if err := bw.blobStore.linkBlob(ctx, canonical, desc.Digest); err != nil {
 | |
| 		return distribution.Descriptor{}, err
 | |
| 	}
 | |
| 
 | |
| 	if err := bw.removeResources(ctx); err != nil {
 | |
| 		return distribution.Descriptor{}, err
 | |
| 	}
 | |
| 
 | |
| 	err = bw.blobStore.blobAccessController.SetDescriptor(ctx, canonical.Digest, canonical)
 | |
| 	if err != nil {
 | |
| 		return distribution.Descriptor{}, err
 | |
| 	}
 | |
| 
 | |
| 	return canonical, nil
 | |
| }
 | |
| 
 | |
| // Rollback the blob upload process, releasing any resources associated with
 | |
| // the writer and canceling the operation.
 | |
| func (bw *blobWriter) Cancel(ctx context.Context) error {
 | |
| 	context.GetLogger(ctx).Debug("(*blobWriter).Rollback")
 | |
| 	if err := bw.removeResources(ctx); err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	bw.Close()
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (bw *blobWriter) Write(p []byte) (int, error) {
 | |
| 	// Ensure that the current write offset matches how many bytes have been
 | |
| 	// written to the digester. If not, we need to update the digest state to
 | |
| 	// match the current write position.
 | |
| 	if err := bw.resumeDigestAt(bw.blobStore.ctx, bw.offset); err != nil && err != errResumableDigestNotAvailable {
 | |
| 		return 0, err
 | |
| 	}
 | |
| 
 | |
| 	n, err := io.MultiWriter(&bw.fileWriter, bw.digester.Hash()).Write(p)
 | |
| 	bw.written += int64(n)
 | |
| 
 | |
| 	return n, err
 | |
| }
 | |
| 
 | |
| func (bw *blobWriter) ReadFrom(r io.Reader) (n int64, err error) {
 | |
| 	// Ensure that the current write offset matches how many bytes have been
 | |
| 	// written to the digester. If not, we need to update the digest state to
 | |
| 	// match the current write position.
 | |
| 	if err := bw.resumeDigestAt(bw.blobStore.ctx, bw.offset); err != nil && err != errResumableDigestNotAvailable {
 | |
| 		return 0, err
 | |
| 	}
 | |
| 
 | |
| 	nn, err := bw.fileWriter.ReadFrom(io.TeeReader(r, bw.digester.Hash()))
 | |
| 	bw.written += nn
 | |
| 
 | |
| 	return nn, err
 | |
| }
 | |
| 
 | |
| func (bw *blobWriter) Close() error {
 | |
| 	if bw.err != nil {
 | |
| 		return bw.err
 | |
| 	}
 | |
| 
 | |
| 	if err := bw.storeHashState(bw.blobStore.ctx); err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	return bw.fileWriter.Close()
 | |
| }
 | |
| 
 | |
| // validateBlob checks the data against the digest, returning an error if it
 | |
| // does not match. The canonical descriptor is returned.
 | |
| func (bw *blobWriter) validateBlob(ctx context.Context, desc distribution.Descriptor) (distribution.Descriptor, error) {
 | |
| 	var (
 | |
| 		verified, fullHash bool
 | |
| 		canonical          digest.Digest
 | |
| 	)
 | |
| 
 | |
| 	if desc.Digest == "" {
 | |
| 		// if no descriptors are provided, we have nothing to validate
 | |
| 		// against. We don't really want to support this for the registry.
 | |
| 		return distribution.Descriptor{}, distribution.ErrBlobInvalidDigest{
 | |
| 			Reason: fmt.Errorf("cannot validate against empty digest"),
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// Stat the on disk file
 | |
| 	if fi, err := bw.fileWriter.driver.Stat(ctx, bw.path); err != nil {
 | |
| 		switch err := err.(type) {
 | |
| 		case storagedriver.PathNotFoundError:
 | |
| 			// NOTE(stevvooe): We really don't care if the file is
 | |
| 			// not actually present for the reader. We now assume
 | |
| 			// that the desc length is zero.
 | |
| 			desc.Size = 0
 | |
| 		default:
 | |
| 			// Any other error we want propagated up the stack.
 | |
| 			return distribution.Descriptor{}, err
 | |
| 		}
 | |
| 	} else {
 | |
| 		if fi.IsDir() {
 | |
| 			return distribution.Descriptor{}, fmt.Errorf("unexpected directory at upload location %q", bw.path)
 | |
| 		}
 | |
| 
 | |
| 		bw.size = fi.Size()
 | |
| 	}
 | |
| 
 | |
| 	if desc.Size > 0 {
 | |
| 		if desc.Size != bw.size {
 | |
| 			return distribution.Descriptor{}, distribution.ErrBlobInvalidLength
 | |
| 		}
 | |
| 	} else {
 | |
| 		// if provided 0 or negative length, we can assume caller doesn't know or
 | |
| 		// care about length.
 | |
| 		desc.Size = bw.size
 | |
| 	}
 | |
| 
 | |
| 	// TODO(stevvooe): This section is very meandering. Need to be broken down
 | |
| 	// to be a lot more clear.
 | |
| 
 | |
| 	if err := bw.resumeDigestAt(ctx, bw.size); err == nil {
 | |
| 		canonical = bw.digester.Digest()
 | |
| 
 | |
| 		if canonical.Algorithm() == desc.Digest.Algorithm() {
 | |
| 			// Common case: client and server prefer the same canonical digest
 | |
| 			// algorithm - currently SHA256.
 | |
| 			verified = desc.Digest == canonical
 | |
| 		} else {
 | |
| 			// The client wants to use a different digest algorithm. They'll just
 | |
| 			// have to be patient and wait for us to download and re-hash the
 | |
| 			// uploaded content using that digest algorithm.
 | |
| 			fullHash = true
 | |
| 		}
 | |
| 	} else if err == errResumableDigestNotAvailable {
 | |
| 		// Not using resumable digests, so we need to hash the entire layer.
 | |
| 		fullHash = true
 | |
| 	} else {
 | |
| 		return distribution.Descriptor{}, err
 | |
| 	}
 | |
| 
 | |
| 	if fullHash {
 | |
| 		// a fantastic optimization: if the the written data and the size are
 | |
| 		// the same, we don't need to read the data from the backend. This is
 | |
| 		// because we've written the entire file in the lifecycle of the
 | |
| 		// current instance.
 | |
| 		if bw.written == bw.size && digest.Canonical == desc.Digest.Algorithm() {
 | |
| 			canonical = bw.digester.Digest()
 | |
| 			verified = desc.Digest == canonical
 | |
| 		}
 | |
| 
 | |
| 		// If the check based on size fails, we fall back to the slowest of
 | |
| 		// paths. We may be able to make the size-based check a stronger
 | |
| 		// guarantee, so this may be defensive.
 | |
| 		if !verified {
 | |
| 			digester := digest.Canonical.New()
 | |
| 
 | |
| 			digestVerifier, err := digest.NewDigestVerifier(desc.Digest)
 | |
| 			if err != nil {
 | |
| 				return distribution.Descriptor{}, err
 | |
| 			}
 | |
| 
 | |
| 			// Read the file from the backend driver and validate it.
 | |
| 			fr, err := newFileReader(ctx, bw.fileWriter.driver, bw.path, desc.Size)
 | |
| 			if err != nil {
 | |
| 				return distribution.Descriptor{}, err
 | |
| 			}
 | |
| 			defer fr.Close()
 | |
| 
 | |
| 			tr := io.TeeReader(fr, digester.Hash())
 | |
| 
 | |
| 			if _, err := io.Copy(digestVerifier, tr); err != nil {
 | |
| 				return distribution.Descriptor{}, err
 | |
| 			}
 | |
| 
 | |
| 			canonical = digester.Digest()
 | |
| 			verified = digestVerifier.Verified()
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	if !verified {
 | |
| 		context.GetLoggerWithFields(ctx,
 | |
| 			map[interface{}]interface{}{
 | |
| 				"canonical": canonical,
 | |
| 				"provided":  desc.Digest,
 | |
| 			}, "canonical", "provided").
 | |
| 			Errorf("canonical digest does match provided digest")
 | |
| 		return distribution.Descriptor{}, distribution.ErrBlobInvalidDigest{
 | |
| 			Digest: desc.Digest,
 | |
| 			Reason: fmt.Errorf("content does not match digest"),
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// update desc with canonical hash
 | |
| 	desc.Digest = canonical
 | |
| 
 | |
| 	if desc.MediaType == "" {
 | |
| 		desc.MediaType = "application/octet-stream"
 | |
| 	}
 | |
| 
 | |
| 	return desc, nil
 | |
| }
 | |
| 
 | |
| // moveBlob moves the data into its final, hash-qualified destination,
 | |
| // identified by dgst. The layer should be validated before commencing the
 | |
| // move.
 | |
| func (bw *blobWriter) moveBlob(ctx context.Context, desc distribution.Descriptor) error {
 | |
| 	blobPath, err := pathFor(blobDataPathSpec{
 | |
| 		digest: desc.Digest,
 | |
| 	})
 | |
| 
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	// Check for existence
 | |
| 	if _, err := bw.blobStore.driver.Stat(ctx, blobPath); err != nil {
 | |
| 		switch err := err.(type) {
 | |
| 		case storagedriver.PathNotFoundError:
 | |
| 			break // ensure that it doesn't exist.
 | |
| 		default:
 | |
| 			return err
 | |
| 		}
 | |
| 	} else {
 | |
| 		// If the path exists, we can assume that the content has already
 | |
| 		// been uploaded, since the blob storage is content-addressable.
 | |
| 		// While it may be corrupted, detection of such corruption belongs
 | |
| 		// elsewhere.
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	// If no data was received, we may not actually have a file on disk. Check
 | |
| 	// the size here and write a zero-length file to blobPath if this is the
 | |
| 	// case. For the most part, this should only ever happen with zero-length
 | |
| 	// tars.
 | |
| 	if _, err := bw.blobStore.driver.Stat(ctx, bw.path); err != nil {
 | |
| 		switch err := err.(type) {
 | |
| 		case storagedriver.PathNotFoundError:
 | |
| 			// HACK(stevvooe): This is slightly dangerous: if we verify above,
 | |
| 			// get a hash, then the underlying file is deleted, we risk moving
 | |
| 			// a zero-length blob into a nonzero-length blob location. To
 | |
| 			// prevent this horrid thing, we employ the hack of only allowing
 | |
| 			// to this happen for the digest of an empty tar.
 | |
| 			if desc.Digest == digest.DigestSha256EmptyTar {
 | |
| 				return bw.blobStore.driver.PutContent(ctx, blobPath, []byte{})
 | |
| 			}
 | |
| 
 | |
| 			// We let this fail during the move below.
 | |
| 			logrus.
 | |
| 				WithField("upload.id", bw.ID()).
 | |
| 				WithField("digest", desc.Digest).Warnf("attempted to move zero-length content with non-zero digest")
 | |
| 		default:
 | |
| 			return err // unrelated error
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// TODO(stevvooe): We should also write the mediatype when executing this move.
 | |
| 
 | |
| 	return bw.blobStore.driver.Move(ctx, bw.path, blobPath)
 | |
| }
 | |
| 
 | |
| // removeResources should clean up all resources associated with the upload
 | |
| // instance. An error will be returned if the clean up cannot proceed. If the
 | |
| // resources are already not present, no error will be returned.
 | |
| func (bw *blobWriter) removeResources(ctx context.Context) error {
 | |
| 	dataPath, err := pathFor(uploadDataPathSpec{
 | |
| 		name: bw.blobStore.repository.Named().Name(),
 | |
| 		id:   bw.id,
 | |
| 	})
 | |
| 
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	// Resolve and delete the containing directory, which should include any
 | |
| 	// upload related files.
 | |
| 	dirPath := path.Dir(dataPath)
 | |
| 	if err := bw.blobStore.driver.Delete(ctx, dirPath); err != nil {
 | |
| 		switch err := err.(type) {
 | |
| 		case storagedriver.PathNotFoundError:
 | |
| 			break // already gone!
 | |
| 		default:
 | |
| 			// This should be uncommon enough such that returning an error
 | |
| 			// should be okay. At this point, the upload should be mostly
 | |
| 			// complete, but perhaps the backend became unaccessible.
 | |
| 			context.GetLogger(ctx).Errorf("unable to delete layer upload resources %q: %v", dirPath, err)
 | |
| 			return err
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (bw *blobWriter) Reader() (io.ReadCloser, error) {
 | |
| 	// todo(richardscothern): Change to exponential backoff, i=0.5, e=2, n=4
 | |
| 	try := 1
 | |
| 	for try <= 5 {
 | |
| 		_, err := bw.fileWriter.driver.Stat(bw.ctx, bw.path)
 | |
| 		if err == nil {
 | |
| 			break
 | |
| 		}
 | |
| 		switch err.(type) {
 | |
| 		case storagedriver.PathNotFoundError:
 | |
| 			context.GetLogger(bw.ctx).Debugf("Nothing found on try %d, sleeping...", try)
 | |
| 			time.Sleep(1 * time.Second)
 | |
| 			try++
 | |
| 		default:
 | |
| 			return nil, err
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	readCloser, err := bw.fileWriter.driver.ReadStream(bw.ctx, bw.path, 0)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	return readCloser, nil
 | |
| }
 |