This commit is contained in:
Gabi 2025-06-03 15:38:17 +08:00 committed by GitHub
commit fe02637e96
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 337 additions and 0 deletions

View File

@ -24,6 +24,10 @@ type httpBlobUpload struct {
location string // always the last value of the location header.
offset int64
closed bool
// maxRange is a way to control the maximum size of the chunk that httpBlobUpload will send to the registry.
// Every ReadFrom and Write call won't read more bytes than the quantity specified in this field
maxRange int64
}
func (hbu *httpBlobUpload) Reader() (io.ReadCloser, error) {
@ -38,6 +42,10 @@ func (hbu *httpBlobUpload) handleErrorResponse(resp *http.Response) error {
}
func (hbu *httpBlobUpload) ReadFrom(r io.Reader) (n int64, err error) {
if hbu.maxRange != 0 {
r = io.LimitReader(r, hbu.maxRange)
}
req, err := http.NewRequestWithContext(hbu.ctx, http.MethodPatch, hbu.location, io.NopCloser(r))
if err != nil {
return 0, err
@ -74,6 +82,10 @@ func (hbu *httpBlobUpload) ReadFrom(r io.Reader) (n int64, err error) {
}
func (hbu *httpBlobUpload) Write(p []byte) (n int, err error) {
if hbu.maxRange != 0 && hbu.maxRange < int64(len(p)) {
p = p[:hbu.maxRange]
}
req, err := http.NewRequestWithContext(hbu.ctx, http.MethodPatch, hbu.location, bytes.NewReader(p))
if err != nil {
return 0, err

View File

@ -3,7 +3,9 @@ package client
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"net/http"
"testing"
@ -500,3 +502,145 @@ func TestUploadWrite(t *testing.T) {
t.Fatalf("Unexpected response status: %s, expected %s", uploadErr.Status, expected)
}
}
// tests the case of sending only the bytes that we're limiting on
func TestUploadLimitRange(t *testing.T) {
const numberOfBlobs = 10
const blobSize = 5
const lastBlobOffset = 2
_, b := newRandomBlob(numberOfBlobs*5 + 2)
repo := "test/upload/write"
locationPath := fmt.Sprintf("/v2/%s/uploads/testid", repo)
requests := []testutil.RequestResponseMapping{
{
Request: testutil.Request{
Method: http.MethodGet,
Route: "/v2/",
},
Response: testutil.Response{
StatusCode: http.StatusOK,
Headers: http.Header(map[string][]string{
"Docker-Distribution-API-Version": {"registry/2.0"},
}),
},
},
}
for blob := 0; blob < numberOfBlobs; blob++ {
start := blob * blobSize
end := ((blob + 1) * blobSize) - 1
requests = append(requests, testutil.RequestResponseMapping{
Request: testutil.Request{
Method: http.MethodPatch,
Route: locationPath,
Body: b[start : end+1],
},
Response: testutil.Response{
StatusCode: http.StatusAccepted,
Headers: http.Header(map[string][]string{
"Docker-Upload-UUID": {"46603072-7a1b-4b41-98f9-fd8a7da89f9b"},
"Location": {locationPath},
"Range": {fmt.Sprintf("%d-%d", start, end)},
}),
},
})
}
requests = append(requests, testutil.RequestResponseMapping{
Request: testutil.Request{
Method: http.MethodPatch,
Route: locationPath,
Body: b[numberOfBlobs*blobSize:],
},
Response: testutil.Response{
StatusCode: http.StatusAccepted,
Headers: http.Header(map[string][]string{
"Docker-Upload-UUID": {"46603072-7a1b-4b41-98f9-fd8a7da89f9b"},
"Location": {locationPath},
"Range": {fmt.Sprintf("%d-%d", numberOfBlobs*blobSize, len(b)-1)},
}),
},
})
t.Run("reader chunked upload", func(t *testing.T) {
m := testutil.RequestResponseMap(requests)
e, c := testServer(m)
defer c()
blobUpload := &httpBlobUpload{
ctx: context.Background(),
client: &http.Client{},
maxRange: int64(blobSize),
}
reader := bytes.NewBuffer(b)
for i := 0; i < numberOfBlobs; i++ {
blobUpload.location = e + locationPath
n, err := blobUpload.ReadFrom(reader)
if err != nil {
t.Fatalf("Error calling Write: %s", err)
}
if n != blobSize {
t.Fatalf("Unexpected n %v != %v blobSize", n, blobSize)
}
}
n, err := blobUpload.ReadFrom(reader)
if err != nil {
t.Fatalf("Error calling Write: %s", err)
}
if n != lastBlobOffset {
t.Fatalf("Expected last write to have written %v but wrote %v", lastBlobOffset, n)
}
_, err = reader.Read([]byte{0, 0, 0})
if !errors.Is(err, io.EOF) {
t.Fatalf("Expected io.EOF when reading blob as the test should've read the whole thing")
}
})
t.Run("buffer chunked upload", func(t *testing.T) {
buff := b
m := testutil.RequestResponseMap(requests)
e, c := testServer(m)
defer c()
blobUpload := &httpBlobUpload{
ctx: context.Background(),
client: &http.Client{},
maxRange: int64(blobSize),
}
for i := 0; i < numberOfBlobs; i++ {
blobUpload.location = e + locationPath
n, err := blobUpload.Write(buff)
if err != nil {
t.Fatalf("Error calling Write: %s", err)
}
if n != blobSize {
t.Fatalf("Unexpected n %v != %v blobSize", n, blobSize)
}
buff = buff[n:]
}
n, err := blobUpload.Write(buff)
if err != nil {
t.Fatalf("Error calling Write: %s", err)
}
if n != lastBlobOffset {
t.Fatalf("Expected last write to have written %v but wrote %v", lastBlobOffset, n)
}
buff = buff[n:]
if len(buff) != 0 {
t.Fatalf("Expected length 0 on the buffer body as we should've read the whole thing, but got %v", len(buff))
}
})
}

View File

@ -810,6 +810,11 @@ func (bs *blobs) Create(ctx context.Context, options ...distribution.BlobCreateO
return nil, err
}
maxRange, err := v2.GetOCIMaxRange(resp)
if err != nil {
return nil, err
}
return &httpBlobUpload{
ctx: ctx,
statter: bs.statter,
@ -817,6 +822,7 @@ func (bs *blobs) Create(ctx context.Context, options ...distribution.BlobCreateO
uuid: uuid,
startedAt: time.Now(),
location: location,
maxRange: maxRange,
}, nil
default:
return nil, HandleHTTPResponseError(resp)

View File

@ -551,6 +551,163 @@ func TestBlobUploadChunked(t *testing.T) {
}
}
func TestBlobUploadChunkedOCIChunkMaxLength(t *testing.T) {
const numberOfBlobs = 10
const blobSize = 5
const lastBlobOffset = 2
const totalSize = numberOfBlobs*blobSize + lastBlobOffset
dgst, b1 := newRandomBlob(totalSize)
originalBlob := b1
var m testutil.RequestResponseMap
repo, _ := reference.WithName("test.example.com/uploadrepo")
uuids := []string{uuid.NewString()}
m = append(m, testutil.RequestResponseMapping{
Request: testutil.Request{
Method: http.MethodPost,
Route: "/v2/" + repo.Name() + "/blobs/uploads/",
},
Response: testutil.Response{
StatusCode: http.StatusAccepted,
Headers: http.Header(map[string][]string{
"Content-Length": {"0"},
"Location": {"/v2/" + repo.Name() + "/blobs/uploads/" + uuids[0]},
"Docker-Upload-UUID": {uuids[0]},
"Range": {"0-0"},
"OCI-Chunk-Max-Length": {fmt.Sprintf("%d", blobSize)},
}),
},
})
for blob := 0; blob < numberOfBlobs; blob++ {
start := blob * blobSize
end := ((blob + 1) * blobSize) - 1
uuids = append(uuids, uuid.NewString())
m = append(m, testutil.RequestResponseMapping{
Request: testutil.Request{
Method: http.MethodPatch,
Route: "/v2/" + repo.Name() + "/blobs/uploads/" + uuids[blob],
Body: b1[start : end+1],
},
Response: testutil.Response{
StatusCode: http.StatusAccepted,
Headers: http.Header(map[string][]string{
"Content-Length": {"0"},
"Location": {"/v2/" + repo.Name() + "/blobs/uploads/" + uuids[blob+1]},
"Docker-Upload-UUID": {uuids[blob+1]},
"Range": {fmt.Sprintf("%d-%d", start, end)},
}),
},
})
}
uuids = append(uuids, uuid.NewString())
m = append(m, testutil.RequestResponseMapping{
Request: testutil.Request{
Method: http.MethodPatch,
Route: "/v2/" + repo.Name() + "/blobs/uploads/" + uuids[len(uuids)-2],
Body: b1[numberOfBlobs*blobSize:],
},
Response: testutil.Response{
StatusCode: http.StatusAccepted,
Headers: http.Header(map[string][]string{
"Content-Length": {"0"},
"Location": {"/v2/" + repo.Name() + "/blobs/uploads/" + uuids[len(uuids)-1]},
"Docker-Upload-UUID": {uuids[len(uuids)-1]},
"Range": {fmt.Sprintf("%d-%d", numberOfBlobs*blobSize, len(b1)-1)},
}),
},
})
m = append(m, testutil.RequestResponseMapping{
Request: testutil.Request{
Method: http.MethodPut,
Route: "/v2/" + repo.Name() + "/blobs/uploads/" + uuids[len(uuids)-1],
QueryParams: map[string][]string{
"digest": {dgst.String()},
},
},
Response: testutil.Response{
StatusCode: http.StatusCreated,
Headers: http.Header(map[string][]string{
"Content-Length": {"0"},
"Docker-Content-Digest": {dgst.String()},
"Content-Range": {fmt.Sprintf("0-%d", totalSize-1)},
}),
},
})
m = append(m, testutil.RequestResponseMapping{
Request: testutil.Request{
Method: http.MethodHead,
Route: "/v2/" + repo.Name() + "/blobs/" + dgst.String(),
},
Response: testutil.Response{
StatusCode: http.StatusOK,
Headers: http.Header(map[string][]string{
"Content-Length": {fmt.Sprint(totalSize)},
"Last-Modified": {time.Now().Add(-1 * time.Second).Format(time.ANSIC)},
}),
},
})
e, c := testServer(m)
defer c()
ctx := context.Background()
r, err := NewRepository(repo, e, nil)
if err != nil {
t.Fatal(err)
}
l := r.Blobs(ctx)
upload, err := l.Create(ctx)
if err != nil {
t.Fatal(err)
}
if upload.ID() != uuids[0] {
log.Fatalf("Unexpected UUID %s; expected %s", upload.ID(), uuids[0])
}
for i := 0; i < numberOfBlobs; i++ {
n, err := upload.Write(b1)
if err != nil {
t.Fatalf("unexpected error in blob %v: %+v", i, err)
}
if n != blobSize {
t.Fatalf("Unexpected length returned from write: %d; expected: %d", n, blobSize)
}
b1 = b1[n:]
}
n, err := upload.Write(b1)
if err != nil {
t.Fatal(err)
}
if n != lastBlobOffset {
t.Fatalf("Unexpected length returned from write: %d; expected: %d", n, lastBlobOffset)
}
b1 = b1[n:]
blob, err := upload.Commit(ctx, distribution.Descriptor{
Digest: dgst,
Size: int64(len(b1)),
})
if err != nil {
t.Fatal(err)
}
if blob.Size != int64(len(originalBlob)) {
t.Fatalf("Unexpected blob size: %d; expected: %d", blob.Size, len(originalBlob))
}
if len(b1) != 0 {
t.Fatalf("Expected to have consumed to the entire buffer at the end, got size %v", len(b1))
}
}
func TestBlobUploadMonolithic(t *testing.T) {
dgst, b1 := newRandomBlob(1024)
var m testutil.RequestResponseMap

View File

@ -2,7 +2,9 @@ package v2
import (
"fmt"
"net/http"
"regexp"
"strconv"
"strings"
"unicode"
)
@ -159,3 +161,19 @@ Loop:
return res, parse, nil
}
// GetOCIMaxRang gets the OCI-Chunk-Max-Length from the headers. If not set it will return 0
func GetOCIMaxRange(resp *http.Response) (int64, error) {
maxRangeStr := resp.Header.Get("OCI-Chunk-Max-Length")
maxRange := int64(0)
if maxRangeStr != "" {
maxRangeRes, err := strconv.ParseInt(maxRangeStr, 10, 64)
if err != nil {
return 0, fmt.Errorf("OCI-Chunk-Max-Length is malformed %q: %w", maxRangeStr, err)
}
maxRange = maxRangeRes
}
return maxRange, nil
}