diff --git a/internal/client/blob_writer.go b/internal/client/blob_writer.go index a4a9cdec5..4a230652a 100644 --- a/internal/client/blob_writer.go +++ b/internal/client/blob_writer.go @@ -24,6 +24,10 @@ type httpBlobUpload struct { location string // always the last value of the location header. offset int64 closed bool + + // maxRange is a way to control the maximum size of the chunk that httpBlobUpload will send to the registry. + // Every ReadFrom and Write call won't read more bytes than the quantity specified in this field + maxRange int64 } func (hbu *httpBlobUpload) Reader() (io.ReadCloser, error) { @@ -38,6 +42,10 @@ func (hbu *httpBlobUpload) handleErrorResponse(resp *http.Response) error { } func (hbu *httpBlobUpload) ReadFrom(r io.Reader) (n int64, err error) { + if hbu.maxRange != 0 { + r = io.LimitReader(r, hbu.maxRange) + } + req, err := http.NewRequestWithContext(hbu.ctx, http.MethodPatch, hbu.location, io.NopCloser(r)) if err != nil { return 0, err @@ -74,6 +82,10 @@ func (hbu *httpBlobUpload) ReadFrom(r io.Reader) (n int64, err error) { } func (hbu *httpBlobUpload) Write(p []byte) (n int, err error) { + if hbu.maxRange != 0 && hbu.maxRange < int64(len(p)) { + p = p[:hbu.maxRange] + } + req, err := http.NewRequestWithContext(hbu.ctx, http.MethodPatch, hbu.location, bytes.NewReader(p)) if err != nil { return 0, err diff --git a/internal/client/blob_writer_test.go b/internal/client/blob_writer_test.go index 2c9d87d2a..108632d10 100644 --- a/internal/client/blob_writer_test.go +++ b/internal/client/blob_writer_test.go @@ -3,7 +3,9 @@ package client import ( "bytes" "context" + "errors" "fmt" + "io" "net/http" "testing" @@ -500,3 +502,145 @@ func TestUploadWrite(t *testing.T) { t.Fatalf("Unexpected response status: %s, expected %s", uploadErr.Status, expected) } } + +// tests the case of sending only the bytes that we're limiting on +func TestUploadLimitRange(t *testing.T) { + const numberOfBlobs = 10 + const blobSize = 5 + const lastBlobOffset = 2 + + _, b := newRandomBlob(numberOfBlobs*5 + 2) + repo := "test/upload/write" + locationPath := fmt.Sprintf("/v2/%s/uploads/testid", repo) + requests := []testutil.RequestResponseMapping{ + { + Request: testutil.Request{ + Method: http.MethodGet, + Route: "/v2/", + }, + Response: testutil.Response{ + StatusCode: http.StatusOK, + Headers: http.Header(map[string][]string{ + "Docker-Distribution-API-Version": {"registry/2.0"}, + }), + }, + }, + } + + for blob := 0; blob < numberOfBlobs; blob++ { + start := blob * blobSize + end := ((blob + 1) * blobSize) - 1 + + requests = append(requests, testutil.RequestResponseMapping{ + Request: testutil.Request{ + Method: http.MethodPatch, + Route: locationPath, + Body: b[start : end+1], + }, + Response: testutil.Response{ + StatusCode: http.StatusAccepted, + Headers: http.Header(map[string][]string{ + "Docker-Upload-UUID": {"46603072-7a1b-4b41-98f9-fd8a7da89f9b"}, + "Location": {locationPath}, + "Range": {fmt.Sprintf("%d-%d", start, end)}, + }), + }, + }) + } + + requests = append(requests, testutil.RequestResponseMapping{ + Request: testutil.Request{ + Method: http.MethodPatch, + Route: locationPath, + Body: b[numberOfBlobs*blobSize:], + }, + Response: testutil.Response{ + StatusCode: http.StatusAccepted, + Headers: http.Header(map[string][]string{ + "Docker-Upload-UUID": {"46603072-7a1b-4b41-98f9-fd8a7da89f9b"}, + "Location": {locationPath}, + "Range": {fmt.Sprintf("%d-%d", numberOfBlobs*blobSize, len(b)-1)}, + }), + }, + }) + + t.Run("reader chunked upload", func(t *testing.T) { + m := testutil.RequestResponseMap(requests) + e, c := testServer(m) + defer c() + + blobUpload := &httpBlobUpload{ + ctx: context.Background(), + client: &http.Client{}, + maxRange: int64(blobSize), + } + + reader := bytes.NewBuffer(b) + for i := 0; i < numberOfBlobs; i++ { + blobUpload.location = e + locationPath + n, err := blobUpload.ReadFrom(reader) + if err != nil { + t.Fatalf("Error calling Write: %s", err) + } + + if n != blobSize { + t.Fatalf("Unexpected n %v != %v blobSize", n, blobSize) + } + } + + n, err := blobUpload.ReadFrom(reader) + if err != nil { + t.Fatalf("Error calling Write: %s", err) + } + + if n != lastBlobOffset { + t.Fatalf("Expected last write to have written %v but wrote %v", lastBlobOffset, n) + } + + _, err = reader.Read([]byte{0, 0, 0}) + if !errors.Is(err, io.EOF) { + t.Fatalf("Expected io.EOF when reading blob as the test should've read the whole thing") + } + }) + + t.Run("buffer chunked upload", func(t *testing.T) { + buff := b + m := testutil.RequestResponseMap(requests) + e, c := testServer(m) + defer c() + + blobUpload := &httpBlobUpload{ + ctx: context.Background(), + client: &http.Client{}, + maxRange: int64(blobSize), + } + + for i := 0; i < numberOfBlobs; i++ { + blobUpload.location = e + locationPath + n, err := blobUpload.Write(buff) + if err != nil { + t.Fatalf("Error calling Write: %s", err) + } + + if n != blobSize { + t.Fatalf("Unexpected n %v != %v blobSize", n, blobSize) + } + + buff = buff[n:] + } + + n, err := blobUpload.Write(buff) + if err != nil { + t.Fatalf("Error calling Write: %s", err) + } + + if n != lastBlobOffset { + t.Fatalf("Expected last write to have written %v but wrote %v", lastBlobOffset, n) + } + + buff = buff[n:] + if len(buff) != 0 { + t.Fatalf("Expected length 0 on the buffer body as we should've read the whole thing, but got %v", len(buff)) + } + }) +} diff --git a/internal/client/repository.go b/internal/client/repository.go index 4f8659306..98691c10b 100644 --- a/internal/client/repository.go +++ b/internal/client/repository.go @@ -810,6 +810,11 @@ func (bs *blobs) Create(ctx context.Context, options ...distribution.BlobCreateO return nil, err } + maxRange, err := v2.GetOCIMaxRange(resp) + if err != nil { + return nil, err + } + return &httpBlobUpload{ ctx: ctx, statter: bs.statter, @@ -817,6 +822,7 @@ func (bs *blobs) Create(ctx context.Context, options ...distribution.BlobCreateO uuid: uuid, startedAt: time.Now(), location: location, + maxRange: maxRange, }, nil default: return nil, HandleHTTPResponseError(resp) diff --git a/internal/client/repository_test.go b/internal/client/repository_test.go index 2135121f3..1ea9f6149 100644 --- a/internal/client/repository_test.go +++ b/internal/client/repository_test.go @@ -551,6 +551,163 @@ func TestBlobUploadChunked(t *testing.T) { } } +func TestBlobUploadChunkedOCIChunkMaxLength(t *testing.T) { + const numberOfBlobs = 10 + const blobSize = 5 + const lastBlobOffset = 2 + const totalSize = numberOfBlobs*blobSize + lastBlobOffset + dgst, b1 := newRandomBlob(totalSize) + originalBlob := b1 + var m testutil.RequestResponseMap + repo, _ := reference.WithName("test.example.com/uploadrepo") + uuids := []string{uuid.NewString()} + m = append(m, testutil.RequestResponseMapping{ + Request: testutil.Request{ + Method: http.MethodPost, + Route: "/v2/" + repo.Name() + "/blobs/uploads/", + }, + Response: testutil.Response{ + StatusCode: http.StatusAccepted, + Headers: http.Header(map[string][]string{ + "Content-Length": {"0"}, + "Location": {"/v2/" + repo.Name() + "/blobs/uploads/" + uuids[0]}, + "Docker-Upload-UUID": {uuids[0]}, + "Range": {"0-0"}, + "OCI-Chunk-Max-Length": {fmt.Sprintf("%d", blobSize)}, + }), + }, + }) + for blob := 0; blob < numberOfBlobs; blob++ { + start := blob * blobSize + end := ((blob + 1) * blobSize) - 1 + uuids = append(uuids, uuid.NewString()) + m = append(m, testutil.RequestResponseMapping{ + Request: testutil.Request{ + Method: http.MethodPatch, + Route: "/v2/" + repo.Name() + "/blobs/uploads/" + uuids[blob], + Body: b1[start : end+1], + }, + Response: testutil.Response{ + StatusCode: http.StatusAccepted, + Headers: http.Header(map[string][]string{ + "Content-Length": {"0"}, + "Location": {"/v2/" + repo.Name() + "/blobs/uploads/" + uuids[blob+1]}, + "Docker-Upload-UUID": {uuids[blob+1]}, + "Range": {fmt.Sprintf("%d-%d", start, end)}, + }), + }, + }) + } + + uuids = append(uuids, uuid.NewString()) + m = append(m, testutil.RequestResponseMapping{ + Request: testutil.Request{ + Method: http.MethodPatch, + Route: "/v2/" + repo.Name() + "/blobs/uploads/" + uuids[len(uuids)-2], + Body: b1[numberOfBlobs*blobSize:], + }, + Response: testutil.Response{ + StatusCode: http.StatusAccepted, + Headers: http.Header(map[string][]string{ + "Content-Length": {"0"}, + "Location": {"/v2/" + repo.Name() + "/blobs/uploads/" + uuids[len(uuids)-1]}, + "Docker-Upload-UUID": {uuids[len(uuids)-1]}, + "Range": {fmt.Sprintf("%d-%d", numberOfBlobs*blobSize, len(b1)-1)}, + }), + }, + }) + + m = append(m, testutil.RequestResponseMapping{ + Request: testutil.Request{ + Method: http.MethodPut, + Route: "/v2/" + repo.Name() + "/blobs/uploads/" + uuids[len(uuids)-1], + QueryParams: map[string][]string{ + "digest": {dgst.String()}, + }, + }, + Response: testutil.Response{ + StatusCode: http.StatusCreated, + Headers: http.Header(map[string][]string{ + "Content-Length": {"0"}, + "Docker-Content-Digest": {dgst.String()}, + "Content-Range": {fmt.Sprintf("0-%d", totalSize-1)}, + }), + }, + }) + m = append(m, testutil.RequestResponseMapping{ + Request: testutil.Request{ + Method: http.MethodHead, + Route: "/v2/" + repo.Name() + "/blobs/" + dgst.String(), + }, + Response: testutil.Response{ + StatusCode: http.StatusOK, + Headers: http.Header(map[string][]string{ + "Content-Length": {fmt.Sprint(totalSize)}, + "Last-Modified": {time.Now().Add(-1 * time.Second).Format(time.ANSIC)}, + }), + }, + }) + + e, c := testServer(m) + defer c() + + ctx := context.Background() + r, err := NewRepository(repo, e, nil) + if err != nil { + t.Fatal(err) + } + l := r.Blobs(ctx) + + upload, err := l.Create(ctx) + if err != nil { + t.Fatal(err) + } + + if upload.ID() != uuids[0] { + log.Fatalf("Unexpected UUID %s; expected %s", upload.ID(), uuids[0]) + } + + for i := 0; i < numberOfBlobs; i++ { + n, err := upload.Write(b1) + if err != nil { + t.Fatalf("unexpected error in blob %v: %+v", i, err) + } + + if n != blobSize { + t.Fatalf("Unexpected length returned from write: %d; expected: %d", n, blobSize) + } + + b1 = b1[n:] + } + + n, err := upload.Write(b1) + if err != nil { + t.Fatal(err) + } + + if n != lastBlobOffset { + t.Fatalf("Unexpected length returned from write: %d; expected: %d", n, lastBlobOffset) + } + + b1 = b1[n:] + + blob, err := upload.Commit(ctx, distribution.Descriptor{ + Digest: dgst, + Size: int64(len(b1)), + }) + if err != nil { + t.Fatal(err) + } + + if blob.Size != int64(len(originalBlob)) { + t.Fatalf("Unexpected blob size: %d; expected: %d", blob.Size, len(originalBlob)) + } + + if len(b1) != 0 { + t.Fatalf("Expected to have consumed to the entire buffer at the end, got size %v", len(b1)) + } +} + func TestBlobUploadMonolithic(t *testing.T) { dgst, b1 := newRandomBlob(1024) var m testutil.RequestResponseMap diff --git a/registry/api/v2/headerparser.go b/registry/api/v2/headerparser.go index 9bc41a3a6..1d849d34e 100644 --- a/registry/api/v2/headerparser.go +++ b/registry/api/v2/headerparser.go @@ -2,7 +2,9 @@ package v2 import ( "fmt" + "net/http" "regexp" + "strconv" "strings" "unicode" ) @@ -159,3 +161,19 @@ Loop: return res, parse, nil } + +// GetOCIMaxRang gets the OCI-Chunk-Max-Length from the headers. If not set it will return 0 +func GetOCIMaxRange(resp *http.Response) (int64, error) { + maxRangeStr := resp.Header.Get("OCI-Chunk-Max-Length") + maxRange := int64(0) + if maxRangeStr != "" { + maxRangeRes, err := strconv.ParseInt(maxRangeStr, 10, 64) + if err != nil { + return 0, fmt.Errorf("OCI-Chunk-Max-Length is malformed %q: %w", maxRangeStr, err) + } + + maxRange = maxRangeRes + } + + return maxRange, nil +}