From 09522d8535d6a3b0f2d7c1bd446ceec919e71da0 Mon Sep 17 00:00:00 2001 From: Stephen J Day Date: Thu, 8 Jan 2015 11:40:41 -0800 Subject: [PATCH 1/6] Implement a remote file writer for use with StorageDriver This changeset implements a fileWriter type that can be used to managed writes to remote files in a StorageDriver. Basically, it manages a local seek position for a remote path. An efficient use of this implementation will write data in large blocks. Signed-off-by: Stephen J Day --- storage/filewriter.go | 153 +++++++++++++++++++++++++++++++++++++ storage/filewriter_test.go | 148 +++++++++++++++++++++++++++++++++++ 2 files changed, 301 insertions(+) create mode 100644 storage/filewriter.go create mode 100644 storage/filewriter_test.go diff --git a/storage/filewriter.go b/storage/filewriter.go new file mode 100644 index 000000000..cfa7c93de --- /dev/null +++ b/storage/filewriter.go @@ -0,0 +1,153 @@ +package storage + +import ( + "bytes" + "fmt" + "io" + "os" + + "github.com/docker/distribution/storagedriver" +) + +// fileWriter implements a remote file writer backed by a storage driver. +type fileWriter struct { + driver storagedriver.StorageDriver + + // identifying fields + path string + + // mutable fields + size int64 // size of the file, aka the current end + offset int64 // offset is the current write offset + err error // terminal error, if set, reader is closed +} + +// fileWriterInterface makes the desired io compliant interface that the +// filewriter should implement. +type fileWriterInterface interface { + io.WriteSeeker + io.WriterAt + io.ReaderFrom + io.Closer +} + +var _ fileWriterInterface = &fileWriter{} + +// newFileWriter returns a prepared fileWriter for the driver and path. This +// could be considered similar to an "open" call on a regular filesystem. +func newFileWriter(driver storagedriver.StorageDriver, path string) (*fileWriter, error) { + fw := fileWriter{ + driver: driver, + path: path, + } + + if fi, err := driver.Stat(path); err != nil { + switch err := err.(type) { + case storagedriver.PathNotFoundError: + // ignore, offset is zero + default: + return nil, err + } + } else { + if fi.IsDir() { + return nil, fmt.Errorf("cannot write to a directory") + } + + fw.size = fi.Size() + } + + return &fw, nil +} + +// Write writes the buffer p at the current write offset. +func (fw *fileWriter) Write(p []byte) (n int, err error) { + nn, err := fw.readFromAt(bytes.NewReader(p), -1) + return int(nn), err +} + +// WriteAt writes p at the specified offset. The underlying offset does not +// change. +func (fw *fileWriter) WriteAt(p []byte, offset int64) (n int, err error) { + nn, err := fw.readFromAt(bytes.NewReader(p), offset) + return int(nn), err +} + +// ReadFrom reads reader r until io.EOF writing the contents at the current +// offset. +func (fw *fileWriter) ReadFrom(r io.Reader) (n int64, err error) { + return fw.readFromAt(r, -1) +} + +// Seek moves the write position do the requested offest based on the whence +// argument, which can be os.SEEK_CUR, os.SEEK_END, or os.SEEK_SET. +func (fw *fileWriter) Seek(offset int64, whence int) (int64, error) { + if fw.err != nil { + return 0, fw.err + } + + var err error + newOffset := fw.offset + + switch whence { + case os.SEEK_CUR: + newOffset += int64(offset) + case os.SEEK_END: + newOffset = fw.size + int64(offset) + case os.SEEK_SET: + newOffset = int64(offset) + } + + if newOffset < 0 { + err = fmt.Errorf("cannot seek to negative position") + } else if newOffset > fw.size { + fw.offset = newOffset + fw.size = newOffset + } else { + // No problems, set the offset. + fw.offset = newOffset + } + + return fw.offset, err +} + +// Close closes the fileWriter for writing. +func (fw *fileWriter) Close() error { + if fw.err != nil { + return fw.err + } + + fw.err = fmt.Errorf("filewriter@%v: closed", fw.path) + + return fw.err +} + +// readFromAt writes to fw from r at the specified offset. If offset is less +// than zero, the value of fw.offset is used and updated after the operation. +func (fw *fileWriter) readFromAt(r io.Reader, offset int64) (n int64, err error) { + if fw.err != nil { + return 0, fw.err + } + + var updateOffset bool + if offset < 0 { + offset = fw.offset + updateOffset = true + } + + nn, err := fw.driver.WriteStream(fw.path, offset, r) + + if updateOffset { + // We should forward the offset, whether or not there was an error. + // Basically, we keep the filewriter in sync with the reader's head. If an + // error is encountered, the whole thing should be retried but we proceed + // from an expected offset, even if the data didn't make it to the + // backend. + fw.offset += nn + + if fw.offset > fw.size { + fw.size = fw.offset + } + } + + return nn, err +} diff --git a/storage/filewriter_test.go b/storage/filewriter_test.go new file mode 100644 index 000000000..2235462f8 --- /dev/null +++ b/storage/filewriter_test.go @@ -0,0 +1,148 @@ +package storage + +import ( + "bytes" + "crypto/rand" + "io" + "os" + "testing" + + "github.com/docker/distribution/digest" + "github.com/docker/distribution/storagedriver/inmemory" +) + +// TestSimpleWrite takes the fileWriter through common write operations +// ensuring data integrity. +func TestSimpleWrite(t *testing.T) { + content := make([]byte, 1<<20) + n, err := rand.Read(content) + if err != nil { + t.Fatalf("unexpected error building random data: %v", err) + } + + if n != len(content) { + t.Fatalf("random read did't fill buffer") + } + + dgst, err := digest.FromReader(bytes.NewReader(content)) + if err != nil { + t.Fatalf("unexpected error digesting random content: %v", err) + } + + driver := inmemory.New() + path := "/random" + + fw, err := newFileWriter(driver, path) + if err != nil { + t.Fatalf("unexpected error creating fileWriter: %v", err) + } + defer fw.Close() + + n, err = fw.Write(content) + if err != nil { + t.Fatalf("unexpected error writing content: %v", err) + } + + if n != len(content) { + t.Fatalf("unexpected write length: %d != %d", n, len(content)) + } + + fr, err := newFileReader(driver, path) + if err != nil { + t.Fatalf("unexpected error creating fileReader: %v", err) + } + defer fr.Close() + + verifier := digest.NewDigestVerifier(dgst) + io.Copy(verifier, fr) + + if !verifier.Verified() { + t.Fatalf("unable to verify write data") + } + + // Check the seek position is equal to the content length + end, err := fw.Seek(0, os.SEEK_END) + if err != nil { + t.Fatalf("unexpected error seeking: %v", err) + } + + if end != int64(len(content)) { + t.Fatalf("write did not advance offset: %d != %d", end, len(content)) + } + + // Double the content, but use the WriteAt method + doubled := append(content, content...) + doubledgst, err := digest.FromReader(bytes.NewReader(doubled)) + if err != nil { + t.Fatalf("unexpected error digesting doubled content: %v", err) + } + + n, err = fw.WriteAt(content, end) + if err != nil { + t.Fatalf("unexpected error writing content at %d: %v", end, err) + } + + if n != len(content) { + t.Fatalf("writeat was short: %d != %d", n, len(content)) + } + + fr, err = newFileReader(driver, path) + if err != nil { + t.Fatalf("unexpected error creating fileReader: %v", err) + } + defer fr.Close() + + verifier = digest.NewDigestVerifier(doubledgst) + io.Copy(verifier, fr) + + if !verifier.Verified() { + t.Fatalf("unable to verify write data") + } + + // Check that WriteAt didn't update the offset. + end, err = fw.Seek(0, os.SEEK_END) + if err != nil { + t.Fatalf("unexpected error seeking: %v", err) + } + + if end != int64(len(content)) { + t.Fatalf("write did not advance offset: %d != %d", end, len(content)) + } + + // Now, we copy from one path to another, running the data through the + // fileReader to fileWriter, rather than the driver.Move command to ensure + // everything is working correctly. + fr, err = newFileReader(driver, path) + if err != nil { + t.Fatalf("unexpected error creating fileReader: %v", err) + } + defer fr.Close() + + fw, err = newFileWriter(driver, "/copied") + if err != nil { + t.Fatalf("unexpected error creating fileWriter: %v", err) + } + defer fw.Close() + + nn, err := io.Copy(fw, fr) + if err != nil { + t.Fatalf("unexpected error copying data: %v", err) + } + + if nn != int64(len(doubled)) { + t.Fatalf("unexpected copy length: %d != %d", nn, len(doubled)) + } + + fr, err = newFileReader(driver, "/copied") + if err != nil { + t.Fatalf("unexpected error creating fileReader: %v", err) + } + defer fr.Close() + + verifier = digest.NewDigestVerifier(doubledgst) + io.Copy(verifier, fr) + + if !verifier.Verified() { + t.Fatalf("unable to verify write data") + } +} From 75c5916ddef8987e853992483493af7088d05040 Mon Sep 17 00:00:00 2001 From: Stephen J Day Date: Thu, 8 Jan 2015 12:03:21 -0800 Subject: [PATCH 2/6] Release lock during WriteStream for inmemory driver While reading from the input in WriteStream, the inmemory driver can deadlock if the reader is from the same instance. To fix this, the write lock is released before reading into a local buffer. The lock is re-acquired to finish the actual write. Signed-off-by: Stephen J Day --- storagedriver/inmemory/driver.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/storagedriver/inmemory/driver.go b/storagedriver/inmemory/driver.go index e3c63f741..c2be1913b 100644 --- a/storagedriver/inmemory/driver.go +++ b/storagedriver/inmemory/driver.go @@ -133,6 +133,11 @@ func (d *Driver) WriteStream(path string, offset int64, reader io.Reader) (nn in return 0, fmt.Errorf("not a file") } + // Unlock while we are reading from the source, in case we are reading + // from the same mfs instance. This can be fixed by a more granular + // locking model. + d.mutex.Unlock() + d.mutex.RLock() // Take the readlock to block other writers. var buf bytes.Buffer nn, err = buf.ReadFrom(reader) @@ -142,9 +147,13 @@ func (d *Driver) WriteStream(path string, offset int64, reader io.Reader) (nn in // backend. What is the correct return value? Really, the caller needs // to know that the reader has been advanced and reattempting the // operation is incorrect. + d.mutex.RUnlock() + d.mutex.Lock() return nn, err } + d.mutex.RUnlock() + d.mutex.Lock() f.WriteAt(buf.Bytes(), offset) return nn, err } From 219bd48c24557f35d06011f9643d48212520c377 Mon Sep 17 00:00:00 2001 From: Stephen J Day Date: Thu, 8 Jan 2015 14:10:08 -0800 Subject: [PATCH 3/6] Add path mapper definitions for upload locations This change updates the path mapper to be able to specify upload management locations. This includes a startedat file, which contains the RFC3339 formatted start time of the upload and the actual data file. Signed-off-by: Stephen J Day --- storage/paths.go | 39 ++++++++++++++++++++++++++++++++++++--- storage/paths_test.go | 14 +++++++++++--- 2 files changed, 47 insertions(+), 6 deletions(-) diff --git a/storage/paths.go b/storage/paths.go index c5d6c90fc..0724b2865 100644 --- a/storage/paths.go +++ b/storage/paths.go @@ -23,20 +23,26 @@ const storagePathVersion = "v2" // // -> layers/ // +// -> uploads/ +// data +// startedat // -> blob/ // // // There are few important components to this path layout. First, we have the // repository store identified by name. This contains the image manifests and -// a layer store with links to CAS blob ids. Outside of the named repo area, -// we have the the blob store. It contains the actual layer data and any other -// data that can be referenced by a CAS id. +// a layer store with links to CAS blob ids. Upload coordination data is also +// stored here. Outside of the named repo area, we have the the blob store. It +// contains the actual layer data and any other data that can be referenced by +// a CAS id. // // We cover the path formats implemented by this path mapper below. // // manifestPathSpec: /v2/repositories//manifests/ // layerLinkPathSpec: /v2/repositories//layers/tarsum/// // blobPathSpec: /v2/blob/// +// uploadDataPathSpec: /v2/repositories//uploads//data +// uploadStartedAtPathSpec: /v2/repositories//uploads//startedat // // For more information on the semantic meaning of each path and their // contents, please see the path spec documentation. @@ -103,6 +109,10 @@ func (pm *pathMapper) path(spec pathSpec) (string, error) { blobPathPrefix := append(rootPrefix, "blob") return path.Join(append(blobPathPrefix, components...)...), nil + case uploadDataPathSpec: + return path.Join(append(repoPrefix, v.name, "uploads", v.uuid, "data")...), nil + case uploadStartedAtPathSpec: + return path.Join(append(repoPrefix, v.name, "uploads", v.uuid, "startedat")...), nil default: // TODO(sday): This is an internal error. Ensure it doesn't escape (panic?). return "", fmt.Errorf("unknown path spec: %#v", v) @@ -170,6 +180,29 @@ type blobPathSpec struct { func (blobPathSpec) pathSpec() {} +// uploadDataPathSpec defines the path parameters of the data file for +// uploads. +type uploadDataPathSpec struct { + name string + uuid string +} + +func (uploadDataPathSpec) pathSpec() {} + +// uploadDataPathSpec defines the path parameters for the file that stores the +// start time of an uploads. If it is missing, the upload is considered +// unknown. Admittedly, the presence of this file is an ugly hack to make sure +// we have a way to cleanup old or stalled uploads that doesn't rely on driver +// FileInfo behavior. If we come up with a more clever way to do this, we +// should remove this file immediately and rely on the startetAt field from +// the client to enforce time out policies. +type uploadStartedAtPathSpec struct { + name string + uuid string +} + +func (uploadStartedAtPathSpec) pathSpec() {} + // digestPathComoponents provides a consistent path breakdown for a given // digest. For a generic digest, it will be as follows: // diff --git a/storage/paths_test.go b/storage/paths_test.go index 7b91865f7..3a5ea899d 100644 --- a/storage/paths_test.go +++ b/storage/paths_test.go @@ -43,10 +43,18 @@ func TestPathMapper(t *testing.T) { expected: "/pathmapper-test/blob/tarsum/v1/sha256/ab/abcdefabcdefabcdef908909909", }, { - spec: blobPathSpec{ - digest: digest.Digest("tarsum+sha256:abcdefabcdefabcdef908909909"), + spec: uploadDataPathSpec{ + name: "foo/bar", + uuid: "asdf-asdf-asdf-adsf", }, - expected: "/pathmapper-test/blob/tarsum/v0/sha256/ab/abcdefabcdefabcdef908909909", + expected: "/pathmapper-test/repositories/foo/bar/uploads/asdf-asdf-asdf-adsf/data", + }, + { + spec: uploadStartedAtPathSpec{ + name: "foo/bar", + uuid: "asdf-asdf-asdf-adsf", + }, + expected: "/pathmapper-test/repositories/foo/bar/uploads/asdf-asdf-asdf-adsf/startedat", }, } { p, err := pm.path(testcase.spec) From ba6b774aea917c900a729310539ab29cf841fc39 Mon Sep 17 00:00:00 2001 From: Stephen J Day Date: Thu, 8 Jan 2015 14:24:02 -0800 Subject: [PATCH 4/6] Spool layer uploads to remote storage To smooth initial implementation, uploads were spooled to local file storage, validated, then pushed to remote storage. That approach was flawed in that it present easy clustering of registry services that share a remote storage backend. The original plan was to implement resumable hashes then implement remote upload storage. After some thought, it was found to be better to get remote spooling working, then optimize with resumable hashes. Moving to this approach has tradeoffs: after storing the complete upload remotely, the node must fetch the content and validate it before moving it to the final location. This can double bandwidth usage to the remote backend. Modifying the verification and upload code to store intermediate hashes should be trivial once the layer digest format has settled. The largest changes for users of the storage package (mostly the registry app) are the LayerService interface and the LayerUpload interface. The LayerService now takes qualified repository names to start and resume uploads. In corallry, the concept of LayerUploadState has been complete removed, exposing all aspects of that state as part of the LayerUpload object. The LayerUpload object has been modified to work as an io.WriteSeeker and includes a StartedAt time, to allow for upload timeout policies. Finish now only requires a digest, eliding the requirement for a size parameter. Resource cleanup has taken a turn for the better. Resources are cleaned up after successful uploads and during a cancel call. Admittedly, this is probably not completely where we want to be. It's recommend that we bolster this with a periodic driver utility script that scans for partial uploads and deletes the underlying data. As a small benefit, we can leave these around to better understand how and why these uploads are failing, at the cost of some extra disk space. Many other changes follow from the changes above. The webapp needs to be updated to meet the new interface requirements. Signed-off-by: Stephen J Day --- storage/layer.go | 29 ++- storage/layer_test.go | 22 +- storage/layerstore.go | 83 ++++++-- storage/layerupload.go | 373 +++++++--------------------------- storage/manifeststore_test.go | 2 +- storage/services.go | 25 +-- 6 files changed, 181 insertions(+), 353 deletions(-) diff --git a/storage/layer.go b/storage/layer.go index ec5f0f9de..24736c701 100644 --- a/storage/layer.go +++ b/storage/layer.go @@ -24,8 +24,7 @@ type Layer interface { // layers. Digest() digest.Digest - // CreatedAt returns the time this layer was created. Until we implement - // Stat call on storagedriver, this just returns the zero time. + // CreatedAt returns the time this layer was created. CreatedAt() time.Time } @@ -33,26 +32,22 @@ type Layer interface { // Instances can be obtained from the LayerService.Upload and // LayerService.Resume. type LayerUpload interface { - io.WriteCloser - - // UUID returns the identifier for this upload. - UUID() string + io.WriteSeeker + io.Closer // Name of the repository under which the layer will be linked. Name() string - // Offset returns the position of the last byte written to this layer. - Offset() int64 + // UUID returns the identifier for this upload. + UUID() string - // TODO(stevvooe): Consider completely removing the size check from this - // interface. The digest check may be adequate and we are making it - // optional in the HTTP API. + // StartedAt returns the time this layer upload was started. + StartedAt() time.Time // Finish marks the upload as completed, returning a valid handle to the - // uploaded layer. The final size and digest are validated against the - // contents of the uploaded layer. If the size is negative, only the - // digest will be checked. - Finish(size int64, digest digest.Digest) (Layer, error) + // uploaded layer. The digest is validated against the contents of the + // uploaded layer. + Finish(digest digest.Digest) (Layer, error) // Cancel the layer upload process. Cancel() error @@ -84,11 +79,11 @@ func (err ErrUnknownLayer) Error() string { // ErrLayerInvalidDigest returned when tarsum check fails. type ErrLayerInvalidDigest struct { - FSLayer manifest.FSLayer + Digest digest.Digest } func (err ErrLayerInvalidDigest) Error() string { - return fmt.Sprintf("invalid digest for referenced layer: %v", err.FSLayer.BlobSum) + return fmt.Sprintf("invalid digest for referenced layer: %v", err.Digest) } // ErrLayerInvalidSize returned when length check fails. diff --git a/storage/layer_test.go b/storage/layer_test.go index ec5b7406a..d6f4718aa 100644 --- a/storage/layer_test.go +++ b/storage/layer_test.go @@ -26,21 +26,18 @@ func TestSimpleLayerUpload(t *testing.T) { dgst := digest.Digest(tarSumStr) - uploadStore, err := newTemporaryLocalFSLayerUploadStore() if err != nil { t.Fatalf("error allocating upload store: %v", err) } imageName := "foo/bar" - driver := inmemory.New() ls := &layerStore{ - driver: driver, + driver: inmemory.New(), pathMapper: &pathMapper{ root: "/storage/testing", version: storagePathVersion, }, - uploadStore: uploadStore, } h := sha256.New() @@ -58,7 +55,7 @@ func TestSimpleLayerUpload(t *testing.T) { } // Do a resume, get unknown upload - layerUpload, err = ls.Resume(LayerUploadState{Name: layerUpload.Name(), UUID: layerUpload.UUID(), Offset: layerUpload.Offset()}) + layerUpload, err = ls.Resume(layerUpload.Name(), layerUpload.UUID()) if err != ErrLayerUploadUnknown { t.Fatalf("unexpected error resuming upload, should be unkown: %v", err) } @@ -84,26 +81,31 @@ func TestSimpleLayerUpload(t *testing.T) { t.Fatalf("layer data write incomplete") } - if layerUpload.Offset() != nn { - t.Fatalf("layerUpload not updated with correct offset: %v != %v", layerUpload.Offset(), nn) + offset, err := layerUpload.Seek(0, os.SEEK_CUR) + if err != nil { + t.Fatalf("unexpected error seeking layer upload: %v", err) + } + + if offset != nn { + t.Fatalf("layerUpload not updated with correct offset: %v != %v", offset, nn) } layerUpload.Close() // Do a resume, for good fun - layerUpload, err = ls.Resume(LayerUploadState{Name: layerUpload.Name(), UUID: layerUpload.UUID(), Offset: layerUpload.Offset()}) + layerUpload, err = ls.Resume(layerUpload.Name(), layerUpload.UUID()) if err != nil { t.Fatalf("unexpected error resuming upload: %v", err) } sha256Digest := digest.NewDigest("sha256", h) - layer, err := layerUpload.Finish(randomDataSize, dgst) + layer, err := layerUpload.Finish(dgst) if err != nil { t.Fatalf("unexpected error finishing layer upload: %v", err) } // After finishing an upload, it should no longer exist. - if _, err := ls.Resume(LayerUploadState{Name: layerUpload.Name(), UUID: layerUpload.UUID(), Offset: layerUpload.Offset()}); err != ErrLayerUploadUnknown { + if _, err := ls.Resume(layerUpload.Name(), layerUpload.UUID()); err != ErrLayerUploadUnknown { t.Fatalf("expected layer upload to be unknown, got %v", err) } diff --git a/storage/layerstore.go b/storage/layerstore.go index f73bef6d2..41227cc5b 100644 --- a/storage/layerstore.go +++ b/storage/layerstore.go @@ -1,15 +1,17 @@ package storage import ( + "time" + + "code.google.com/p/go-uuid/uuid" "github.com/docker/distribution/digest" "github.com/docker/distribution/manifest" "github.com/docker/distribution/storagedriver" ) type layerStore struct { - driver storagedriver.StorageDriver - pathMapper *pathMapper - uploadStore layerUploadStore + driver storagedriver.StorageDriver + pathMapper *pathMapper } func (ls *layerStore) Exists(name string, digest digest.Digest) (bool, error) { @@ -66,31 +68,86 @@ func (ls *layerStore) Upload(name string) (LayerUpload, error) { // the same two layers. Should it be disallowed? For now, we allow both // parties to proceed and the the first one uploads the layer. - lus, err := ls.uploadStore.New(name) + uuid := uuid.New() + startedAt := time.Now().UTC() + + path, err := ls.pathMapper.path(uploadDataPathSpec{ + name: name, + uuid: uuid, + }) + if err != nil { return nil, err } - return ls.newLayerUpload(lus), nil + startedAtPath, err := ls.pathMapper.path(uploadStartedAtPathSpec{ + name: name, + uuid: uuid, + }) + + if err != nil { + return nil, err + } + + // Write a startedat file for this upload + if err := ls.driver.PutContent(startedAtPath, []byte(startedAt.Format(time.RFC3339))); err != nil { + return nil, err + } + + return ls.newLayerUpload(name, uuid, path, startedAt) } // Resume continues an in progress layer upload, returning the current // state of the upload. -func (ls *layerStore) Resume(lus LayerUploadState) (LayerUpload, error) { - _, err := ls.uploadStore.GetState(lus.UUID) +func (ls *layerStore) Resume(name, uuid string) (LayerUpload, error) { + startedAtPath, err := ls.pathMapper.path(uploadStartedAtPathSpec{ + name: name, + uuid: uuid, + }) if err != nil { return nil, err } - return ls.newLayerUpload(lus), nil + startedAtBytes, err := ls.driver.GetContent(startedAtPath) + if err != nil { + switch err := err.(type) { + case storagedriver.PathNotFoundError: + return nil, ErrLayerUploadUnknown + default: + return nil, err + } + } + + startedAt, err := time.Parse(time.RFC3339, string(startedAtBytes)) + if err != nil { + return nil, err + } + + path, err := ls.pathMapper.path(uploadDataPathSpec{ + name: name, + uuid: uuid, + }) + + if err != nil { + return nil, err + } + + return ls.newLayerUpload(name, uuid, path, startedAt) } // newLayerUpload allocates a new upload controller with the given state. -func (ls *layerStore) newLayerUpload(lus LayerUploadState) LayerUpload { - return &layerUploadController{ - LayerUploadState: lus, - layerStore: ls, - uploadStore: ls.uploadStore, +func (ls *layerStore) newLayerUpload(name, uuid, path string, startedAt time.Time) (LayerUpload, error) { + fw, err := newFileWriter(ls.driver, path) + if err != nil { + return nil, err } + + return &layerUploadController{ + layerStore: ls, + name: name, + uuid: uuid, + startedAt: startedAt, + fileWriter: *fw, + }, nil } diff --git a/storage/layerupload.go b/storage/layerupload.go index 3175a09ef..b9953b236 100644 --- a/storage/layerupload.go +++ b/storage/layerupload.go @@ -1,229 +1,84 @@ package storage import ( - "fmt" "io" - "io/ioutil" - "os" - "path/filepath" - - "code.google.com/p/go-uuid/uuid" + "path" + "time" + "github.com/Sirupsen/logrus" "github.com/docker/distribution/digest" - "github.com/docker/distribution/manifest" "github.com/docker/distribution/storagedriver" "github.com/docker/docker/pkg/tarsum" ) -// LayerUploadState captures the state serializable state of the layer upload. -type LayerUploadState struct { - // name is the primary repository under which the layer will be linked. - Name string - - // UUID identifies the upload. - UUID string - - // offset contains the current progress of the upload. - Offset int64 -} - // layerUploadController is used to control the various aspects of resumable // layer upload. It implements the LayerUpload interface. type layerUploadController struct { - LayerUploadState + layerStore *layerStore - layerStore *layerStore - uploadStore layerUploadStore - fp layerFile - err error // terminal error, if set, controller is closed -} + name string + uuid string + startedAt time.Time -// layerFile documents the interface used while writing layer files, similar -// to *os.File. This is separate from layerReader, for now, because we want to -// store uploads on the local file system until we have write-through hashing -// support. They should be combined once this is worked out. -type layerFile interface { - io.WriteSeeker - io.Reader - io.Closer - - // Sync commits the contents of the writer to storage. - Sync() (err error) -} - -// layerUploadStore provides storage for temporary files and upload state of -// layers. This is be used by the LayerService to manage the state of ongoing -// uploads. This interface will definitely change and will most likely end up -// being exported to the app layer. Move the layer.go when it's ready to go. -type layerUploadStore interface { - New(name string) (LayerUploadState, error) - Open(uuid string) (layerFile, error) - GetState(uuid string) (LayerUploadState, error) - // TODO: factor this method back in - // SaveState(lus LayerUploadState) error - DeleteState(uuid string) error + fileWriter } var _ LayerUpload = &layerUploadController{} // Name of the repository under which the layer will be linked. func (luc *layerUploadController) Name() string { - return luc.LayerUploadState.Name + return luc.name } // UUID returns the identifier for this upload. func (luc *layerUploadController) UUID() string { - return luc.LayerUploadState.UUID + return luc.uuid } -// Offset returns the position of the last byte written to this layer. -func (luc *layerUploadController) Offset() int64 { - return luc.LayerUploadState.Offset +func (luc *layerUploadController) StartedAt() time.Time { + return luc.startedAt } // Finish marks the upload as completed, returning a valid handle to the // uploaded layer. The final size and checksum are validated against the // contents of the uploaded layer. The checksum should be provided in the // format :. -func (luc *layerUploadController) Finish(size int64, digest digest.Digest) (Layer, error) { - - // This section is going to be pretty ugly now. We will have to read the - // file twice. First, to get the tarsum and checksum. When those are - // available, and validated, we will upload it to the blob store and link - // it into the repository. In the future, we need to use resumable hash - // calculations for tarsum and checksum that can be calculated during the - // upload. This will allow us to cut the data directly into a temporary - // directory in the storage backend. - - fp, err := luc.file() - - if err != nil { - // Cleanup? - return nil, err - } - - digest, err = luc.validateLayer(fp, size, digest) +func (luc *layerUploadController) Finish(digest digest.Digest) (Layer, error) { + canonical, err := luc.validateLayer(digest) if err != nil { return nil, err } - if nn, err := luc.writeLayer(fp, digest); err != nil { - // Cleanup? - return nil, err - } else if size >= 0 && nn != size { - // TODO(stevvooe): Short write. Will have to delete the location and - // report an error. This error needs to be reported to the client. - return nil, fmt.Errorf("short write writing layer") - } - - // Yes! We have written some layer data. Let's make it visible. Link the - // layer blob into the repository. - if err := luc.linkLayer(digest); err != nil { + if err := luc.moveLayer(canonical); err != nil { + // TODO(stevvooe): Cleanup? return nil, err } - // Ok, the upload has completed and finished. Delete the state. - if err := luc.uploadStore.DeleteState(luc.UUID()); err != nil { - // Can we ignore this error? + // Link the layer blob into the repository. + if err := luc.linkLayer(canonical); err != nil { return nil, err } - return luc.layerStore.Fetch(luc.Name(), digest) + if err := luc.removeResources(); err != nil { + return nil, err + } + + return luc.layerStore.Fetch(luc.Name(), canonical) } // Cancel the layer upload process. func (luc *layerUploadController) Cancel() error { - if err := luc.layerStore.uploadStore.DeleteState(luc.UUID()); err != nil { + if err := luc.removeResources(); err != nil { return err } - return luc.Close() + luc.Close() + return nil } -func (luc *layerUploadController) Write(p []byte) (int, error) { - wr, err := luc.file() - if err != nil { - return 0, err - } - - n, err := wr.Write(p) - - // Because we expect the reported offset to be consistent with the storage - // state, unfortunately, we need to Sync on every call to write. - if err := wr.Sync(); err != nil { - // Effectively, ignore the write state if the Sync fails. Report that - // no bytes were written and seek back to the starting offset. - offset, seekErr := wr.Seek(luc.Offset(), os.SEEK_SET) - if seekErr != nil { - // What do we do here? Quite disasterous. - luc.reset() - - return 0, fmt.Errorf("multiple errors encounterd after Sync + Seek: %v then %v", err, seekErr) - } - - if offset != luc.Offset() { - return 0, fmt.Errorf("unexpected offset after seek") - } - - return 0, err - } - - luc.LayerUploadState.Offset += int64(n) - - return n, err -} - -func (luc *layerUploadController) Close() error { - if luc.err != nil { - return luc.err - } - - if luc.fp != nil { - luc.err = luc.fp.Close() - } - - return luc.err -} - -func (luc *layerUploadController) file() (layerFile, error) { - if luc.fp != nil { - return luc.fp, nil - } - - fp, err := luc.uploadStore.Open(luc.UUID()) - - if err != nil { - return nil, err - } - - // TODO(stevvooe): We may need a more aggressive check here to ensure that - // the file length is equal to the current offset. We may want to sync the - // offset before return the layer upload to the client so it can be - // validated before proceeding with any writes. - - // Seek to the current layer offset for good measure. - if _, err = fp.Seek(luc.Offset(), os.SEEK_SET); err != nil { - return nil, err - } - - luc.fp = fp - - return luc.fp, nil -} - -// reset closes and drops the current writer. -func (luc *layerUploadController) reset() { - if luc.fp != nil { - luc.fp.Close() - luc.fp = nil - } -} - -// validateLayer runs several checks on the layer file to ensure its validity. -// This is currently very expensive and relies on fast io and fast seek on the -// local host. If successful, the latest digest is returned, which should be -// used over the passed in value. -func (luc *layerUploadController) validateLayer(fp layerFile, size int64, dgst digest.Digest) (digest.Digest, error) { +// validateLayer checks the layer data against the digest, returning an error +// if it does not match. The canonical digest is returned. +func (luc *layerUploadController) validateLayer(dgst digest.Digest) (digest.Digest, error) { // First, check the incoming tarsum version of the digest. version, err := tarsum.GetVersionFromTarsum(dgst.String()) if err != nil { @@ -239,87 +94,65 @@ func (luc *layerUploadController) validateLayer(fp layerFile, size int64, dgst d } digestVerifier := digest.NewDigestVerifier(dgst) - lengthVerifier := digest.NewLengthVerifier(size) - // First, seek to the end of the file, checking the size is as expected. - end, err := fp.Seek(0, os.SEEK_END) + // TODO(stevvooe): Store resumable hash calculations in upload directory + // in driver. Something like a file at path /resumablehash/ + // with the hash state up to that point would be perfect. The hasher would + // then only have to fetch the difference. + + // Read the file from the backend driver and validate it. + fr, err := newFileReader(luc.fileWriter.driver, luc.path) if err != nil { return "", err } - // Only check size if it is greater than - if size >= 0 && end != size { - // Fast path length check. - return "", ErrLayerInvalidSize{Size: size} - } - - // Now seek back to start and take care of the digest. - if _, err := fp.Seek(0, os.SEEK_SET); err != nil { - return "", err - } - - tr := io.TeeReader(fp, digestVerifier) - - // Only verify the size if a positive size argument has been passed. - if size >= 0 { - tr = io.TeeReader(tr, lengthVerifier) - } + tr := io.TeeReader(fr, digestVerifier) // TODO(stevvooe): This is one of the places we need a Digester write - // sink. Instead, its read driven. This migth be okay. + // sink. Instead, its read driven. This might be okay. // Calculate an updated digest with the latest version. - dgst, err = digest.FromReader(tr) + canonical, err := digest.FromReader(tr) if err != nil { return "", err } - if size >= 0 && !lengthVerifier.Verified() { - return "", ErrLayerInvalidSize{Size: size} - } - if !digestVerifier.Verified() { - return "", ErrLayerInvalidDigest{manifest.FSLayer{BlobSum: dgst}} + return "", ErrLayerInvalidDigest{Digest: dgst} } - return dgst, nil + return canonical, nil } -// writeLayer actually writes the the layer file into its final destination, +// moveLayer moves the data into its final, hash-qualified destination, // identified by dgst. The layer should be validated before commencing the -// write. -func (luc *layerUploadController) writeLayer(fp layerFile, dgst digest.Digest) (nn int64, err error) { +// move. +func (luc *layerUploadController) moveLayer(dgst digest.Digest) error { blobPath, err := luc.layerStore.pathMapper.path(blobPathSpec{ digest: dgst, }) if err != nil { - return 0, err + return err } // Check for existence if _, err := luc.layerStore.driver.Stat(blobPath); err != nil { - // TODO(stevvooe): This check is kind of problematic and very racy. switch err := err.(type) { case storagedriver.PathNotFoundError: break // ensure that it doesn't exist. default: - // TODO(stevvooe): This isn't actually an error: the blob store is - // content addressable and we should just use this to ensure we - // have it written. Although, we do need to verify that the - // content that is there is the correct length. - return 0, err + return err } + } else { + // If the path exists, we can assume that the content has already + // been uploaded, since the blob storage is content-addressable. + // While it may be corrupted, detection of such corruption belongs + // elsewhere. + return nil } - // Seek our local layer file back now. - if _, err := fp.Seek(0, os.SEEK_SET); err != nil { - // Cleanup? - return 0, err - } - - // Okay: we can write the file to the blob store. - return luc.layerStore.driver.WriteStream(blobPath, 0, fp) + return luc.driver.Move(luc.path, blobPath) } // linkLayer links a valid, written layer blob into the registry under the @@ -337,85 +170,35 @@ func (luc *layerUploadController) linkLayer(digest digest.Digest) error { return luc.layerStore.driver.PutContent(layerLinkPath, []byte(digest)) } -// localFSLayerUploadStore implements a local layerUploadStore. There are some -// complexities around hashsums that make round tripping to the storage -// backend problematic, so we'll store and read locally for now. By GO-beta, -// this should be fully implemented on top of the backend storagedriver. -// -// For now, the directory layout is as follows: -// -// //registry-layer-upload/ -// / -// -> state.json -// -> data -// -// Each upload, identified by uuid, has its own directory with a state file -// and a data file. The state file has a json representation of the current -// state. The data file is the in-progress upload data. -type localFSLayerUploadStore struct { - root string -} - -func newTemporaryLocalFSLayerUploadStore() (layerUploadStore, error) { - path, err := ioutil.TempDir("", "registry-layer-upload") +// removeResources should clean up all resources associated with the upload +// instance. An error will be returned if the clean up cannot proceed. If the +// resources are already not present, no error will be returned. +func (luc *layerUploadController) removeResources() error { + dataPath, err := luc.layerStore.pathMapper.path(uploadDataPathSpec{ + name: luc.name, + uuid: luc.uuid, + }) if err != nil { - return nil, err - } - - return &localFSLayerUploadStore{ - root: path, - }, nil -} - -func (llufs *localFSLayerUploadStore) New(name string) (LayerUploadState, error) { - lus := LayerUploadState{ - Name: name, - UUID: uuid.New(), - } - - if err := os.Mkdir(llufs.path(lus.UUID, ""), 0755); err != nil { - return lus, err - } - - return lus, nil -} - -func (llufs *localFSLayerUploadStore) Open(uuid string) (layerFile, error) { - fp, err := os.OpenFile(llufs.path(uuid, "data"), os.O_CREATE|os.O_APPEND|os.O_RDWR, 0644) - - if err != nil { - return nil, err - } - - return fp, nil -} - -func (llufs *localFSLayerUploadStore) GetState(uuid string) (LayerUploadState, error) { - var lus LayerUploadState - - if _, err := os.Stat(llufs.path(uuid, "")); err != nil { - if os.IsNotExist(err) { - return lus, ErrLayerUploadUnknown - } - - return lus, err - } - return lus, nil -} - -func (llufs *localFSLayerUploadStore) DeleteState(uuid string) error { - if err := os.RemoveAll(llufs.path(uuid, "")); err != nil { - if os.IsNotExist(err) { - return ErrLayerUploadUnknown - } - return err } + // Resolve and delete the containing directory, which should include any + // upload related files. + dirPath := path.Dir(dataPath) + + if err := luc.driver.Delete(dirPath); err != nil { + switch err := err.(type) { + case storagedriver.PathNotFoundError: + break // already gone! + default: + // This should be uncommon enough such that returning an error + // should be okay. At this point, the upload should be mostly + // complete, but perhaps the backend became unaccessible. + logrus.Errorf("unable to delete layer upload resources %q: %v", dirPath, err) + return err + } + } + return nil } - -func (llufs *localFSLayerUploadStore) path(uuid, file string) string { - return filepath.Join(llufs.root, uuid, file) -} diff --git a/storage/manifeststore_test.go b/storage/manifeststore_test.go index 991028e56..a6cca9627 100644 --- a/storage/manifeststore_test.go +++ b/storage/manifeststore_test.go @@ -153,6 +153,6 @@ func (mockedExistenceLayerService) Upload(name string) (LayerUpload, error) { panic("not implemented") } -func (mockedExistenceLayerService) Resume(lus LayerUploadState) (LayerUpload, error) { +func (mockedExistenceLayerService) Resume(name, uuid string) (LayerUpload, error) { panic("not implemented") } diff --git a/storage/services.go b/storage/services.go index 5507faebd..97edca3fc 100644 --- a/storage/services.go +++ b/storage/services.go @@ -9,28 +9,18 @@ import ( // Services provides various services with application-level operations for // use across backend storage drivers. type Services struct { - driver storagedriver.StorageDriver - pathMapper *pathMapper - layerUploadStore layerUploadStore + driver storagedriver.StorageDriver + pathMapper *pathMapper } // NewServices creates a new Services object to access docker objects stored // in the underlying driver. func NewServices(driver storagedriver.StorageDriver) *Services { - layerUploadStore, err := newTemporaryLocalFSLayerUploadStore() - - if err != nil { - // TODO(stevvooe): This failure needs to be understood in the context - // of the lifecycle of the services object, which is uncertain at this - // point. - panic("unable to allocate layerUploadStore: " + err.Error()) - } return &Services{ driver: driver, // TODO(sday): This should be configurable. - pathMapper: defaultPathMapper, - layerUploadStore: layerUploadStore, + pathMapper: defaultPathMapper, } } @@ -38,7 +28,7 @@ func NewServices(driver storagedriver.StorageDriver) *Services { // may be context sensitive in the future. The instance should be used similar // to a request local. func (ss *Services) Layers() LayerService { - return &layerStore{driver: ss.driver, pathMapper: ss.pathMapper, uploadStore: ss.layerUploadStore} + return &layerStore{driver: ss.driver, pathMapper: ss.pathMapper} } // Manifests returns an instance of ManifestService. Instantiation is cheap and @@ -78,7 +68,8 @@ type LayerService interface { // returning a handle. Upload(name string) (LayerUpload, error) - // Resume continues an in progress layer upload, returning the current - // state of the upload. - Resume(layerUploadState LayerUploadState) (LayerUpload, error) + // Resume continues an in progress layer upload, returning a handle to the + // upload. The caller should seek to the latest desired upload location + // before proceeding. + Resume(name, uuid string) (LayerUpload, error) } From 2f2445a335f0c5e9c52ceebd2cf2c938bf23b252 Mon Sep 17 00:00:00 2001 From: Stephen J Day Date: Thu, 8 Jan 2015 14:59:15 -0800 Subject: [PATCH 5/6] Refactor handling of hmac state packing This refactors the hmac state token to take control of the layerUploadState json message, which has been removed from the storage backend. It also moves away from the concept of a LayerUploadStateStore callback object, which was short-lived. This allows for upload offset to be managed by the web application logic in the face of an inconsistent backend. By controlling the upload offset externally, we reduce the possibility of misreporting upload state to a client. We may still want to modify the way this works after getting production experience. Signed-off-by: Stephen J Day --- registry/hmac.go | 72 +++++++++++++++++++++++ registry/{tokens_test.go => hmac_test.go} | 34 +++++------ registry/tokens.go | 65 -------------------- 3 files changed, 87 insertions(+), 84 deletions(-) create mode 100644 registry/hmac.go rename registry/{tokens_test.go => hmac_test.go} (69%) delete mode 100644 registry/tokens.go diff --git a/registry/hmac.go b/registry/hmac.go new file mode 100644 index 000000000..d24700875 --- /dev/null +++ b/registry/hmac.go @@ -0,0 +1,72 @@ +package registry + +import ( + "crypto/hmac" + "crypto/sha256" + "encoding/base64" + "encoding/json" + "fmt" + "time" +) + +// layerUploadState captures the state serializable state of the layer upload. +type layerUploadState struct { + // name is the primary repository under which the layer will be linked. + Name string + + // UUID identifies the upload. + UUID string + + // offset contains the current progress of the upload. + Offset int64 + + // StartedAt is the original start time of the upload. + StartedAt time.Time +} + +type hmacKey string + +// unpackUploadState unpacks and validates the layer upload state from the +// token, using the hmacKey secret. +func (secret hmacKey) unpackUploadState(token string) (layerUploadState, error) { + var state layerUploadState + + tokenBytes, err := base64.URLEncoding.DecodeString(token) + if err != nil { + return state, err + } + mac := hmac.New(sha256.New, []byte(secret)) + + if len(tokenBytes) < mac.Size() { + return state, fmt.Errorf("Invalid token") + } + + macBytes := tokenBytes[:mac.Size()] + messageBytes := tokenBytes[mac.Size():] + + mac.Write(messageBytes) + if !hmac.Equal(mac.Sum(nil), macBytes) { + return state, fmt.Errorf("Invalid token") + } + + if err := json.Unmarshal(messageBytes, &state); err != nil { + return state, err + } + + return state, nil +} + +// packUploadState packs the upload state signed with and hmac digest using +// the hmacKey secret, encoding to url safe base64. The resulting token can be +// used to share data with minimized risk of external tampering. +func (secret hmacKey) packUploadState(lus layerUploadState) (string, error) { + mac := hmac.New(sha256.New, []byte(secret)) + p, err := json.Marshal(lus) + if err != nil { + return "", err + } + + mac.Write(p) + + return base64.URLEncoding.EncodeToString(append(mac.Sum(nil), p...)), nil +} diff --git a/registry/tokens_test.go b/registry/hmac_test.go similarity index 69% rename from registry/tokens_test.go rename to registry/hmac_test.go index a447438a0..5ad60f61d 100644 --- a/registry/tokens_test.go +++ b/registry/hmac_test.go @@ -1,12 +1,8 @@ package registry -import ( - "testing" +import "testing" - "github.com/docker/distribution/storage" -) - -var layerUploadStates = []storage.LayerUploadState{ +var layerUploadStates = []layerUploadState{ { Name: "hello", UUID: "abcd-1234-qwer-0987", @@ -47,15 +43,15 @@ var secrets = []string{ // TestLayerUploadTokens constructs stateTokens from LayerUploadStates and // validates that the tokens can be used to reconstruct the proper upload state. func TestLayerUploadTokens(t *testing.T) { - tokenProvider := newHMACTokenProvider("supersecret") + secret := hmacKey("supersecret") for _, testcase := range layerUploadStates { - token, err := tokenProvider.layerUploadStateToToken(testcase) + token, err := secret.packUploadState(testcase) if err != nil { t.Fatal(err) } - lus, err := tokenProvider.layerUploadStateFromToken(token) + lus, err := secret.unpackUploadState(token) if err != nil { t.Fatal(err) } @@ -68,39 +64,39 @@ func TestLayerUploadTokens(t *testing.T) { // only if they share the same secret. func TestHMACValidation(t *testing.T) { for _, secret := range secrets { - tokenProvider1 := newHMACTokenProvider(secret) - tokenProvider2 := newHMACTokenProvider(secret) - badTokenProvider := newHMACTokenProvider("DifferentSecret") + secret1 := hmacKey(secret) + secret2 := hmacKey(secret) + badSecret := hmacKey("DifferentSecret") for _, testcase := range layerUploadStates { - token, err := tokenProvider1.layerUploadStateToToken(testcase) + token, err := secret1.packUploadState(testcase) if err != nil { t.Fatal(err) } - lus, err := tokenProvider2.layerUploadStateFromToken(token) + lus, err := secret2.unpackUploadState(token) if err != nil { t.Fatal(err) } assertLayerUploadStateEquals(t, testcase, lus) - _, err = badTokenProvider.layerUploadStateFromToken(token) + _, err = badSecret.unpackUploadState(token) if err == nil { t.Fatalf("Expected token provider to fail at retrieving state from token: %s", token) } - badToken, err := badTokenProvider.layerUploadStateToToken(testcase) + badToken, err := badSecret.packUploadState(lus) if err != nil { t.Fatal(err) } - _, err = tokenProvider1.layerUploadStateFromToken(badToken) + _, err = secret1.unpackUploadState(badToken) if err == nil { t.Fatalf("Expected token provider to fail at retrieving state from token: %s", badToken) } - _, err = tokenProvider2.layerUploadStateFromToken(badToken) + _, err = secret2.unpackUploadState(badToken) if err == nil { t.Fatalf("Expected token provider to fail at retrieving state from token: %s", badToken) } @@ -108,7 +104,7 @@ func TestHMACValidation(t *testing.T) { } } -func assertLayerUploadStateEquals(t *testing.T, expected storage.LayerUploadState, received storage.LayerUploadState) { +func assertLayerUploadStateEquals(t *testing.T, expected layerUploadState, received layerUploadState) { if expected.Name != received.Name { t.Fatalf("Expected Name=%q, Received Name=%q", expected.Name, received.Name) } diff --git a/registry/tokens.go b/registry/tokens.go deleted file mode 100644 index 276b896e8..000000000 --- a/registry/tokens.go +++ /dev/null @@ -1,65 +0,0 @@ -package registry - -import ( - "crypto/hmac" - "crypto/sha256" - "encoding/base64" - "encoding/json" - "fmt" - - "github.com/docker/distribution/storage" -) - -// tokenProvider contains methods for serializing and deserializing state from token strings. -type tokenProvider interface { - // layerUploadStateFromToken retrieves the LayerUploadState for a given state token. - layerUploadStateFromToken(stateToken string) (storage.LayerUploadState, error) - - // layerUploadStateToToken returns a token string representing the given LayerUploadState. - layerUploadStateToToken(layerUploadState storage.LayerUploadState) (string, error) -} - -type hmacTokenProvider struct { - secret string -} - -func newHMACTokenProvider(secret string) tokenProvider { - return &hmacTokenProvider{secret: secret} -} - -// layerUploadStateFromToken deserializes the given HMAC stateToken and validates the prefix HMAC -func (ts *hmacTokenProvider) layerUploadStateFromToken(stateToken string) (storage.LayerUploadState, error) { - var lus storage.LayerUploadState - - tokenBytes, err := base64.URLEncoding.DecodeString(stateToken) - if err != nil { - return lus, err - } - mac := hmac.New(sha256.New, []byte(ts.secret)) - - if len(tokenBytes) < mac.Size() { - return lus, fmt.Errorf("Invalid token") - } - - macBytes := tokenBytes[:mac.Size()] - messageBytes := tokenBytes[mac.Size():] - - mac.Write(messageBytes) - if !hmac.Equal(mac.Sum(nil), macBytes) { - return lus, fmt.Errorf("Invalid token") - } - - if err := json.Unmarshal(messageBytes, &lus); err != nil { - return lus, err - } - - return lus, nil -} - -// layerUploadStateToToken serializes the given LayerUploadState to JSON with an HMAC prepended -func (ts *hmacTokenProvider) layerUploadStateToToken(lus storage.LayerUploadState) (string, error) { - mac := hmac.New(sha256.New, []byte(ts.secret)) - stateJSON := fmt.Sprintf("{\"Name\": \"%s\", \"UUID\": \"%s\", \"Offset\": %d}", lus.Name, lus.UUID, lus.Offset) - mac.Write([]byte(stateJSON)) - return base64.URLEncoding.EncodeToString(append(mac.Sum(nil), stateJSON...)), nil -} From 4aa7837f83b709cd84ad7672de4ba8e99a918b7c Mon Sep 17 00:00:00 2001 From: Stephen J Day Date: Thu, 8 Jan 2015 15:04:00 -0800 Subject: [PATCH 6/6] Directly manage layerUploadState in webapp Most of this change follows from the modifications to the storage api. The driving factor is the separation of layerUploadState from the storage backend, leaving it to the web application to store and update it. As part of the updates to meet changes in the storage api, support for the size parameter has been completely removed. Signed-off-by: Stephen J Day --- registry/app.go | 4 -- registry/layerupload.go | 96 ++++++++++++++++++++++++++++------------- 2 files changed, 67 insertions(+), 33 deletions(-) diff --git a/registry/app.go b/registry/app.go index 72ac4f065..6a79cdfab 100644 --- a/registry/app.go +++ b/registry/app.go @@ -29,8 +29,6 @@ type App struct { // services contains the main services instance for the application. services *storage.Services - tokenProvider tokenProvider - layerHandler storage.LayerHandler accessController auth.AccessController @@ -66,8 +64,6 @@ func NewApp(configuration configuration.Configuration) *App { app.driver = driver app.services = storage.NewServices(app.driver) - app.tokenProvider = newHMACTokenProvider(configuration.HTTP.Secret) - authType := configuration.Auth.Type() if authType != "" { diff --git a/registry/layerupload.go b/registry/layerupload.go index b694a6773..158bf7b4f 100644 --- a/registry/layerupload.go +++ b/registry/layerupload.go @@ -5,7 +5,7 @@ import ( "io" "net/http" "net/url" - "strconv" + "os" "github.com/Sirupsen/logrus" "github.com/docker/distribution/api/v2" @@ -33,26 +33,57 @@ func layerUploadDispatcher(ctx *Context, r *http.Request) http.Handler { if luh.UUID != "" { luh.log = luh.log.WithField("uuid", luh.UUID) - state, err := ctx.tokenProvider.layerUploadStateFromToken(r.FormValue("_state")) + state, err := hmacKey(ctx.Config.HTTP.Secret).unpackUploadState(r.FormValue("_state")) if err != nil { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - logrus.Infof("error resolving upload: %v", err) - w.WriteHeader(http.StatusInternalServerError) - luh.Errors.Push(v2.ErrorCodeUnknown, err) + ctx.log.Infof("error resolving upload: %v", err) + w.WriteHeader(http.StatusBadRequest) + luh.Errors.Push(v2.ErrorCodeBlobUploadInvalid, err) + }) + } + luh.State = state + + if state.UUID != luh.UUID { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + ctx.log.Infof("mismatched uuid in upload state: %q != %q", state.UUID, luh.UUID) + w.WriteHeader(http.StatusBadRequest) + luh.Errors.Push(v2.ErrorCodeBlobUploadInvalid, err) }) } layers := ctx.services.Layers() - upload, err := layers.Resume(state) + upload, err := layers.Resume(luh.Name, luh.UUID) if err != nil && err != storage.ErrLayerUploadUnknown { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - logrus.Infof("error resolving upload: %v", err) - w.WriteHeader(http.StatusInternalServerError) - luh.Errors.Push(v2.ErrorCodeUnknown, err) + ctx.log.Errorf("error resolving upload: %v", err) + w.WriteHeader(http.StatusBadRequest) + luh.Errors.Push(v2.ErrorCodeBlobUploadUnknown, err) }) } - luh.Upload = upload + + if state.Offset > 0 { + // Seek the layer upload to the correct spot if it's non-zero. + // These error conditions should be rare and demonstrate really + // problems. We basically cancel the upload and tell the client to + // start over. + if nn, err := upload.Seek(luh.State.Offset, os.SEEK_SET); err != nil { + ctx.log.Infof("error seeking layer upload: %v", err) + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusBadRequest) + luh.Errors.Push(v2.ErrorCodeBlobUploadInvalid, err) + upload.Cancel() + }) + } else if nn != luh.State.Offset { + ctx.log.Infof("seek to wrong offest: %d != %d", nn, luh.State.Offset) + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusBadRequest) + luh.Errors.Push(v2.ErrorCodeBlobUploadInvalid, err) + upload.Cancel() + }) + } + } + handler = closeResources(handler, luh.Upload) } @@ -67,6 +98,8 @@ type layerUploadHandler struct { UUID string Upload storage.LayerUpload + + State layerUploadState } // StartLayerUpload begins the layer upload process and allocates a server- @@ -171,14 +204,30 @@ func (luh *layerUploadHandler) CancelLayerUpload(w http.ResponseWriter, r *http. // chunk responses. This sets the correct headers but the response status is // left to the caller. func (luh *layerUploadHandler) layerUploadResponse(w http.ResponseWriter, r *http.Request) error { - values := make(url.Values) - stateToken, err := luh.Context.tokenProvider.layerUploadStateToToken(storage.LayerUploadState{Name: luh.Upload.Name(), UUID: luh.Upload.UUID(), Offset: luh.Upload.Offset()}) + + offset, err := luh.Upload.Seek(0, os.SEEK_CUR) + if err != nil { + luh.log.Errorf("unable get current offset of layer upload: %v", err) + return err + } + + // TODO(stevvooe): Need a better way to manage the upload state automatically. + luh.State.Name = luh.Name + luh.State.UUID = luh.Upload.UUID() + luh.State.Offset = offset + luh.State.StartedAt = luh.Upload.StartedAt() + + token, err := hmacKey(luh.Config.HTTP.Secret).packUploadState(luh.State) if err != nil { logrus.Infof("error building upload state token: %s", err) return err } - values.Set("_state", stateToken) - uploadURL, err := luh.urlBuilder.BuildBlobUploadChunkURL(luh.Upload.Name(), luh.Upload.UUID(), values) + + uploadURL, err := luh.urlBuilder.BuildBlobUploadChunkURL( + luh.Upload.Name(), luh.Upload.UUID(), + url.Values{ + "_state": []string{token}, + }) if err != nil { logrus.Infof("error building upload url: %s", err) return err @@ -186,7 +235,7 @@ func (luh *layerUploadHandler) layerUploadResponse(w http.ResponseWriter, r *htt w.Header().Set("Location", uploadURL) w.Header().Set("Content-Length", "0") - w.Header().Set("Range", fmt.Sprintf("0-%d", luh.Upload.Offset())) + w.Header().Set("Range", fmt.Sprintf("0-%d", luh.State.Offset)) return nil } @@ -198,7 +247,6 @@ var errNotReadyToComplete = fmt.Errorf("not ready to complete upload") func (luh *layerUploadHandler) maybeCompleteUpload(w http.ResponseWriter, r *http.Request) error { // If we get a digest and length, we can finish the upload. dgstStr := r.FormValue("digest") // TODO(stevvooe): Support multiple digest parameters! - sizeStr := r.FormValue("size") if dgstStr == "" { return errNotReadyToComplete @@ -209,23 +257,13 @@ func (luh *layerUploadHandler) maybeCompleteUpload(w http.ResponseWriter, r *htt return err } - var size int64 - if sizeStr != "" { - size, err = strconv.ParseInt(sizeStr, 10, 64) - if err != nil { - return err - } - } else { - size = -1 - } - - luh.completeUpload(w, r, size, dgst) + luh.completeUpload(w, r, dgst) return nil } // completeUpload finishes out the upload with the correct response. -func (luh *layerUploadHandler) completeUpload(w http.ResponseWriter, r *http.Request, size int64, dgst digest.Digest) { - layer, err := luh.Upload.Finish(size, dgst) +func (luh *layerUploadHandler) completeUpload(w http.ResponseWriter, r *http.Request, dgst digest.Digest) { + layer, err := luh.Upload.Finish(dgst) if err != nil { luh.Errors.Push(v2.ErrorCodeUnknown, err) w.WriteHeader(http.StatusInternalServerError)