Merge pull request #13 from digitalocean/awg/quotas

Handle S3 QuotaExceeded errors appropriately
This commit is contained in:
Adam Wolfe Gordon
2020-10-26 11:12:58 -06:00
committed by GitHub
5 changed files with 47 additions and 11 deletions

View File

@@ -11,6 +11,7 @@ import (
"github.com/docker/distribution/registry/api/errcode" "github.com/docker/distribution/registry/api/errcode"
v2 "github.com/docker/distribution/registry/api/v2" v2 "github.com/docker/distribution/registry/api/v2"
"github.com/docker/distribution/registry/storage" "github.com/docker/distribution/registry/storage"
storagedriver "github.com/docker/distribution/registry/storage/driver"
"github.com/gorilla/handlers" "github.com/gorilla/handlers"
"github.com/opencontainers/go-digest" "github.com/opencontainers/go-digest"
) )
@@ -81,6 +82,8 @@ func (buh *blobUploadHandler) StartBlobUpload(w http.ResponseWriter, r *http.Req
if err := buh.writeBlobCreatedHeaders(w, ebm.Descriptor); err != nil { if err := buh.writeBlobCreatedHeaders(w, ebm.Descriptor); err != nil {
buh.Errors = append(buh.Errors, errcode.ErrorCodeUnknown.WithDetail(err)) buh.Errors = append(buh.Errors, errcode.ErrorCodeUnknown.WithDetail(err))
} }
} else if _, ok := err.(storagedriver.QuotaExceededError); ok {
buh.Errors = append(buh.Errors, errcode.ErrorCodeDenied.WithMessage("quota exceeded"))
} else if err == distribution.ErrUnsupported { } else if err == distribution.ErrUnsupported {
buh.Errors = append(buh.Errors, errcode.ErrorCodeUnsupported) buh.Errors = append(buh.Errors, errcode.ErrorCodeUnsupported)
} else { } else {
@@ -137,7 +140,12 @@ func (buh *blobUploadHandler) PatchBlobData(w http.ResponseWriter, r *http.Reque
// TODO(dmcgowan): support Content-Range header to seek and write range // TODO(dmcgowan): support Content-Range header to seek and write range
if err := copyFullPayload(buh, w, r, buh.Upload, -1, "blob PATCH"); err != nil { if err := copyFullPayload(buh, w, r, buh.Upload, -1, "blob PATCH"); err != nil {
buh.Errors = append(buh.Errors, errcode.ErrorCodeUnknown.WithDetail(err.Error())) switch err := err.(type) {
case storagedriver.QuotaExceededError:
buh.Errors = append(buh.Errors, errcode.ErrorCodeDenied.WithMessage("quota exceeded"))
default:
buh.Errors = append(buh.Errors, errcode.ErrorCodeUnknown.WithDetail(err.Error()))
}
return return
} }
@@ -185,7 +193,12 @@ func (buh *blobUploadHandler) BlobUploadComplete(w http.ResponseWriter, r *http.
} }
if err := copyFullPayload(buh, w, r, buh.Upload, -1, "blob PUT"); err != nil { if err := copyFullPayload(buh, w, r, buh.Upload, -1, "blob PUT"); err != nil {
buh.Errors = append(buh.Errors, errcode.ErrorCodeUnknown.WithDetail(err.Error())) switch err := err.(type) {
case storagedriver.QuotaExceededError:
buh.Errors = append(buh.Errors, errcode.ErrorCodeDenied.WithMessage("quota exceeded"))
default:
buh.Errors = append(buh.Errors, errcode.ErrorCodeUnknown.WithDetail(err.Error()))
}
return return
} }
@@ -201,6 +214,8 @@ func (buh *blobUploadHandler) BlobUploadComplete(w http.ResponseWriter, r *http.
switch err := err.(type) { switch err := err.(type) {
case distribution.ErrBlobInvalidDigest: case distribution.ErrBlobInvalidDigest:
buh.Errors = append(buh.Errors, v2.ErrorCodeDigestInvalid.WithDetail(err)) buh.Errors = append(buh.Errors, v2.ErrorCodeDigestInvalid.WithDetail(err))
case storagedriver.QuotaExceededError:
buh.Errors = append(buh.Errors, errcode.ErrorCodeDenied.WithMessage("quota exceeded"))
case errcode.Error: case errcode.Error:
buh.Errors = append(buh.Errors, err) buh.Errors = append(buh.Errors, err)
default: default:

View File

@@ -17,6 +17,7 @@ import (
"github.com/docker/distribution/registry/api/errcode" "github.com/docker/distribution/registry/api/errcode"
v2 "github.com/docker/distribution/registry/api/v2" v2 "github.com/docker/distribution/registry/api/v2"
"github.com/docker/distribution/registry/auth" "github.com/docker/distribution/registry/auth"
storagedriver "github.com/docker/distribution/registry/storage/driver"
"github.com/gorilla/handlers" "github.com/gorilla/handlers"
"github.com/opencontainers/go-digest" "github.com/opencontainers/go-digest"
v1 "github.com/opencontainers/image-spec/specs-go/v1" v1 "github.com/opencontainers/image-spec/specs-go/v1"
@@ -369,6 +370,8 @@ func (imh *manifestHandler) PutManifest(w http.ResponseWriter, r *http.Request)
} }
} }
} }
case storagedriver.QuotaExceededError:
imh.Errors = append(imh.Errors, errcode.ErrorCodeDenied.WithMessage("quota exceeded"))
case errcode.Error: case errcode.Error:
imh.Errors = append(imh.Errors, err) imh.Errors = append(imh.Errors, err)
default: default:

View File

@@ -80,6 +80,9 @@ func (base *Base) setDriverName(e error) error {
case storagedriver.InvalidOffsetError: case storagedriver.InvalidOffsetError:
actual.DriverName = base.StorageDriver.Name() actual.DriverName = base.StorageDriver.Name()
return actual return actual
case storagedriver.QuotaExceededError:
actual.DriverName = base.StorageDriver.Name()
return actual
default: default:
storageError := storagedriver.Error{ storageError := storagedriver.Error{
DriverName: base.StorageDriver.Name(), DriverName: base.StorageDriver.Name(),

View File

@@ -1188,8 +1188,13 @@ func (d *Driver) S3BucketKey(path string) string {
} }
func parseError(path string, err error) error { func parseError(path string, err error) error {
if s3Err, ok := err.(awserr.Error); ok && s3Err.Code() == "NoSuchKey" { if s3Err, ok := err.(awserr.Error); ok {
return storagedriver.PathNotFoundError{Path: path} switch s3Err.Code() {
case "NoSuchKey":
return storagedriver.PathNotFoundError{Path: path}
case "QuotaExceeded":
return storagedriver.QuotaExceededError{}
}
} }
return err return err
@@ -1300,7 +1305,7 @@ func (w *writer) Write(p []byte) (int, error) {
Key: aws.String(w.key), Key: aws.String(w.key),
UploadId: aws.String(w.uploadID), UploadId: aws.String(w.uploadID),
}) })
return 0, err return 0, parseError(w.key, err)
} }
resp, err := w.driver.S3.CreateMultipartUpload(&s3.CreateMultipartUploadInput{ resp, err := w.driver.S3.CreateMultipartUpload(&s3.CreateMultipartUploadInput{
@@ -1312,7 +1317,7 @@ func (w *writer) Write(p []byte) (int, error) {
StorageClass: w.driver.getStorageClass(), StorageClass: w.driver.getStorageClass(),
}) })
if err != nil { if err != nil {
return 0, err return 0, parseError(w.key, err)
} }
w.uploadID = *resp.UploadId w.uploadID = *resp.UploadId
@@ -1324,7 +1329,7 @@ func (w *writer) Write(p []byte) (int, error) {
Key: aws.String(w.key), Key: aws.String(w.key),
}) })
if err != nil { if err != nil {
return 0, err return 0, parseError(w.key, err)
} }
defer resp.Body.Close() defer resp.Body.Close()
w.parts = nil w.parts = nil
@@ -1342,7 +1347,7 @@ func (w *writer) Write(p []byte) (int, error) {
UploadId: resp.UploadId, UploadId: resp.UploadId,
}) })
if err != nil { if err != nil {
return 0, err return 0, parseError(w.key, err)
} }
w.parts = []*s3.Part{ w.parts = []*s3.Part{
{ {
@@ -1415,7 +1420,7 @@ func (w *writer) Cancel() error {
Key: aws.String(w.key), Key: aws.String(w.key),
UploadId: aws.String(w.uploadID), UploadId: aws.String(w.uploadID),
}) })
return err return parseError(w.key, err)
} }
func (w *writer) Commit() error { func (w *writer) Commit() error {
@@ -1456,7 +1461,7 @@ func (w *writer) Commit() error {
Key: aws.String(w.key), Key: aws.String(w.key),
UploadId: aws.String(w.uploadID), UploadId: aws.String(w.uploadID),
}) })
return err return parseError(w.key, err)
} }
return nil return nil
} }
@@ -1484,7 +1489,7 @@ func (w *writer) flushPart() error {
Body: bytes.NewReader(w.readyPart), Body: bytes.NewReader(w.readyPart),
}) })
if err != nil { if err != nil {
return err return parseError(w.key, err)
} }
w.parts = append(w.parts, &s3.Part{ w.parts = append(w.parts, &s3.Part{
ETag: resp.ETag, ETag: resp.ETag,

View File

@@ -159,6 +159,16 @@ func (err InvalidOffsetError) Error() string {
return fmt.Sprintf("%s: invalid offset: %d for path: %s", err.DriverName, err.Offset, err.Path) return fmt.Sprintf("%s: invalid offset: %d for path: %s", err.DriverName, err.Offset, err.Path)
} }
// QuotaExceededError is returned when a storage quota is exceeded during a
// write.
type QuotaExceededError struct {
DriverName string
}
func (err QuotaExceededError) Error() string {
return fmt.Sprintf("%s: quota exceeded", err.DriverName)
}
// Error is a catch-all error type which captures an error string and // Error is a catch-all error type which captures an error string and
// the driver type on which it occurred. // the driver type on which it occurred.
type Error struct { type Error struct {