From 5ee5aaa058c53bf881327164c323eadca85d0766 Mon Sep 17 00:00:00 2001 From: Thomas Way Date: Wed, 30 Oct 2024 21:46:36 +0000 Subject: [PATCH] fix(registry/storage/driver/s3-aws): use a consistent multipart chunk size Some S3 compatible object storage systems like R2 require that all multipart chunks are the same size. This was mostly true before, except the final chunk was larger than the requested chunk size which causes uploads to fail. In addition, the two byte slices have been replaced with a single *bytes.Buffer and the surrounding code simplified significantly. Fixes: #3873 Signed-off-by: Thomas Way --- docs/content/storage-drivers/s3.md | 3 - registry/storage/driver/s3-aws/s3.go | 341 +++++++----------- registry/storage/driver/s3-aws/s3_32bit.go | 10 + registry/storage/driver/s3-aws/s3_64bit.go | 7 + registry/storage/driver/s3-aws/s3_test.go | 50 +-- .../storage/driver/testsuites/testsuites.go | 23 +- 6 files changed, 179 insertions(+), 255 deletions(-) create mode 100644 registry/storage/driver/s3-aws/s3_32bit.go create mode 100644 registry/storage/driver/s3-aws/s3_64bit.go diff --git a/docs/content/storage-drivers/s3.md b/docs/content/storage-drivers/s3.md index a3a89213d..098550683 100644 --- a/docs/content/storage-drivers/s3.md +++ b/docs/content/storage-drivers/s3.md @@ -80,9 +80,6 @@ Amazon S3 or S3 compatible services for object storage. `loglevel`: (optional) Valid values are: `off` (default), `debug`, `debugwithsigning`, `debugwithhttpbody`, `debugwithrequestretries`, `debugwithrequesterrors` and `debugwitheventstreambody`. See the [AWS SDK for Go API reference](https://docs.aws.amazon.com/sdk-for-go/api/aws/#LogLevelType) for details. -**NOTE:** Currently the S3 storage driver only supports S3 API compatible storage that -allows parts of a multipart upload to vary in size. [Cloudflare R2 is not supported.](https://developers.cloudflare.com/r2/objects/multipart-objects/#limitations) - ## S3 permission scopes The following AWS policy is required by the registry for push and pull. Make sure to replace `S3_BUCKET_NAME` with the name of your bucket. diff --git a/registry/storage/driver/s3-aws/s3.go b/registry/storage/driver/s3-aws/s3.go index 11822a0b2..1550ddb4b 100644 --- a/registry/storage/driver/s3-aws/s3.go +++ b/registry/storage/driver/s3-aws/s3.go @@ -21,7 +21,7 @@ import ( "math" "net/http" "path/filepath" - "reflect" + "slices" "sort" "strconv" "strings" @@ -48,10 +48,6 @@ const driverName = "s3aws" // S3 API requires multipart upload chunks to be at least 5MB const minChunkSize = 5 * 1024 * 1024 -// maxChunkSize defines the maximum multipart upload chunk size allowed by S3. -// S3 API requires max upload chunk to be 5GB. -const maxChunkSize = 5 * 1024 * 1024 * 1024 - const defaultChunkSize = 2 * minChunkSize const ( @@ -107,7 +103,7 @@ type DriverParameters struct { Secure bool SkipVerify bool V4Auth bool - ChunkSize int64 + ChunkSize int MultipartCopyChunkSize int64 MultipartCopyMaxConcurrency int64 MultipartCopyThresholdSize int64 @@ -158,7 +154,7 @@ var _ storagedriver.StorageDriver = &driver{} type driver struct { S3 *s3.S3 Bucket string - ChunkSize int64 + ChunkSize int Encrypt bool KeyID string MultipartCopyChunkSize int64 @@ -313,22 +309,22 @@ func FromParameters(ctx context.Context, parameters map[string]interface{}) (*Dr keyID = "" } - chunkSize, err := getParameterAsInt64(parameters, "chunksize", defaultChunkSize, minChunkSize, maxChunkSize) + chunkSize, err := getParameterAsInteger(parameters, "chunksize", defaultChunkSize, minChunkSize, maxChunkSize) if err != nil { return nil, err } - multipartCopyChunkSize, err := getParameterAsInt64(parameters, "multipartcopychunksize", defaultMultipartCopyChunkSize, minChunkSize, maxChunkSize) + multipartCopyChunkSize, err := getParameterAsInteger[int64](parameters, "multipartcopychunksize", defaultMultipartCopyChunkSize, minChunkSize, maxChunkSize) if err != nil { return nil, err } - multipartCopyMaxConcurrency, err := getParameterAsInt64(parameters, "multipartcopymaxconcurrency", defaultMultipartCopyMaxConcurrency, 1, math.MaxInt64) + multipartCopyMaxConcurrency, err := getParameterAsInteger[int64](parameters, "multipartcopymaxconcurrency", defaultMultipartCopyMaxConcurrency, 1, math.MaxInt64) if err != nil { return nil, err } - multipartCopyThresholdSize, err := getParameterAsInt64(parameters, "multipartcopythresholdsize", defaultMultipartCopyThresholdSize, 0, maxChunkSize) + multipartCopyThresholdSize, err := getParameterAsInteger[int64](parameters, "multipartcopythresholdsize", defaultMultipartCopyThresholdSize, 0, maxChunkSize) if err != nil { return nil, err } @@ -424,29 +420,29 @@ func FromParameters(ctx context.Context, parameters map[string]interface{}) (*Dr } params := DriverParameters{ - fmt.Sprint(accessKey), - fmt.Sprint(secretKey), - fmt.Sprint(bucket), - region, - fmt.Sprint(regionEndpoint), - forcePathStyleBool, - encryptBool, - fmt.Sprint(keyID), - secureBool, - skipVerifyBool, - v4Bool, - chunkSize, - multipartCopyChunkSize, - multipartCopyMaxConcurrency, - multipartCopyThresholdSize, - fmt.Sprint(rootDirectory), - storageClass, - fmt.Sprint(userAgent), - objectACL, - fmt.Sprint(sessionToken), - useDualStackBool, - accelerateBool, - getS3LogLevelFromParam(parameters["loglevel"]), + AccessKey: fmt.Sprint(accessKey), + SecretKey: fmt.Sprint(secretKey), + Bucket: fmt.Sprint(bucket), + Region: region, + RegionEndpoint: fmt.Sprint(regionEndpoint), + ForcePathStyle: forcePathStyleBool, + Encrypt: encryptBool, + KeyID: fmt.Sprint(keyID), + Secure: secureBool, + SkipVerify: skipVerifyBool, + V4Auth: v4Bool, + ChunkSize: chunkSize, + MultipartCopyChunkSize: multipartCopyChunkSize, + MultipartCopyMaxConcurrency: multipartCopyMaxConcurrency, + MultipartCopyThresholdSize: multipartCopyThresholdSize, + RootDirectory: fmt.Sprint(rootDirectory), + StorageClass: storageClass, + UserAgent: fmt.Sprint(userAgent), + ObjectACL: objectACL, + SessionToken: fmt.Sprint(sessionToken), + UseDualStack: useDualStackBool, + Accelerate: accelerateBool, + LogLevel: getS3LogLevelFromParam(parameters["loglevel"]), } return New(ctx, params) @@ -479,33 +475,29 @@ func getS3LogLevelFromParam(param interface{}) aws.LogLevelType { return logLevel } -// getParameterAsInt64 converts parameters[name] to an int64 value (using -// defaultt if nil), verifies it is no smaller than min, and returns it. -func getParameterAsInt64(parameters map[string]interface{}, name string, defaultt int64, min int64, max int64) (int64, error) { - rv := defaultt - param := parameters[name] - switch v := param.(type) { - case string: - vv, err := strconv.ParseInt(v, 0, 64) - if err != nil { - return 0, fmt.Errorf("%s parameter must be an integer, %v invalid", name, param) +type integer interface{ signed | unsigned } + +type signed interface { + ~int | ~int8 | ~int16 | ~int32 | ~int64 +} + +type unsigned interface { + ~uint | ~uint8 | ~uint16 | ~uint32 | ~uint64 | ~uintptr +} + +// getParameterAsInteger converts parameters[name] to T (using defaultValue if +// nil) and ensures it is in the range of min and max. +func getParameterAsInteger[T integer](parameters map[string]any, name string, defaultValue, min, max T) (T, error) { + v := defaultValue + if p := parameters[name]; p != nil { + if _, err := fmt.Sscanf(fmt.Sprint(p), "%d", &v); err != nil { + return 0, fmt.Errorf("%s parameter must be an integer, %v invalid", name, p) } - rv = vv - case int64: - rv = v - case int, uint, int32, uint32, uint64: - rv = reflect.ValueOf(v).Convert(reflect.TypeOf(rv)).Int() - case nil: - // do nothing - default: - return 0, fmt.Errorf("invalid value for %s: %#v", name, param) } - - if rv < min || rv > max { - return 0, fmt.Errorf("the %s %#v parameter should be a number between %d and %d (inclusive)", name, rv, min, max) + if v < min || v > max { + return 0, fmt.Errorf("the %s %#v parameter should be a number between %d and %d (inclusive)", name, v, min, max) } - - return rv, nil + return v, nil } // New constructs a new Driver with the given AWS credentials, region, encryption flag, and @@ -592,11 +584,7 @@ func New(ctx context.Context, params DriverParameters) (*Driver, error) { StorageClass: params.StorageClass, ObjectACL: params.ObjectACL, pool: &sync.Pool{ - New: func() interface{} { - return &buffer{ - data: make([]byte, 0, params.ChunkSize), - } - }, + New: func() any { return &bytes.Buffer{} }, }, } @@ -903,7 +891,7 @@ func (d *driver) List(ctx context.Context, opath string) ([]string, error) { // Move moves an object stored at sourcePath to destPath, removing the original // object. -func (d *driver) Move(ctx context.Context, sourcePath string, destPath string) error { +func (d *driver) Move(ctx context.Context, sourcePath, destPath string) error { /* This is terrible, but aws doesn't have an actual move. */ if err := d.copy(ctx, sourcePath, destPath); err != nil { return err @@ -912,7 +900,7 @@ func (d *driver) Move(ctx context.Context, sourcePath string, destPath string) e } // copy copies an object stored at sourcePath to destPath. -func (d *driver) copy(ctx context.Context, sourcePath string, destPath string) error { +func (d *driver) copy(ctx context.Context, sourcePath, destPath string) error { // S3 can copy objects up to 5 GB in size with a single PUT Object - Copy // operation. For larger objects, the multipart upload API must be used. // @@ -1121,7 +1109,7 @@ func (d *driver) Walk(ctx context.Context, from string, f storagedriver.WalkFn, return nil } -func (d *driver) doWalk(parentCtx context.Context, objectCount *int64, from string, startAfter string, f storagedriver.WalkFn) error { +func (d *driver) doWalk(parentCtx context.Context, objectCount *int64, from, startAfter string, f storagedriver.WalkFn) error { var ( retError error // the most recent directory walked for de-duping @@ -1267,16 +1255,10 @@ func directoryDiff(prev, current string) []string { } paths = append(paths, parent) } - reverse(paths) + slices.Reverse(paths) return paths } -func reverse(s []string) { - for i, j := 0, len(s)-1; i < j; i, j = i+1, j-1 { - s[i], s[j] = s[j], s[i] - } -} - func (d *driver) s3Path(path string) string { return strings.TrimLeft(strings.TrimRight(d.RootDirectory, "/")+path, "/") } @@ -1326,53 +1308,11 @@ func (d *driver) getStorageClass() *string { return aws.String(d.StorageClass) } -// buffer is a static size bytes buffer. -type buffer struct { - data []byte -} - -// NewBuffer returns a new bytes buffer from driver's memory pool. -// The size of the buffer is static and set to params.ChunkSize. -func (d *driver) NewBuffer() *buffer { - return d.pool.Get().(*buffer) -} - -// ReadFrom reads as much data as it can fit in from r without growing its size. -// It returns the number of bytes successfully read from r or error. -func (b *buffer) ReadFrom(r io.Reader) (offset int64, err error) { - for len(b.data) < cap(b.data) && err == nil { - var n int - n, err = r.Read(b.data[len(b.data):cap(b.data)]) - offset += int64(n) - b.data = b.data[:len(b.data)+n] - } - // NOTE(milosgajdos): io.ReaderFrom "swallows" io.EOF - // See: https://pkg.go.dev/io#ReaderFrom - if err == io.EOF { - err = nil - } - return offset, err -} - -// Cap returns the capacity of the buffer's underlying byte slice. -func (b *buffer) Cap() int { - return cap(b.data) -} - -// Len returns the length of the data in the buffer -func (b *buffer) Len() int { - return len(b.data) -} - -// Clear the buffer data. -func (b *buffer) Clear() { - b.data = b.data[:0] -} - -// writer attempts to upload parts to S3 in a buffered fashion where the last -// part is at least as large as the chunksize, so the multipart upload could be -// cleanly resumed in the future. This is violated if Close is called after less -// than a full chunk is written. +// writer uploads parts to S3 in a buffered fashion where the length of each +// part is [writer.driver.ChunkSize], excluding the last part which may be +// smaller than the configured chunk size and never larger. This allows the +// multipart upload to be cleanly resumed in future. This is violated if +// [writer.Close] is called before at least one chunk is written. type writer struct { ctx context.Context driver *driver @@ -1380,8 +1320,7 @@ type writer struct { uploadID string parts []*s3.Part size int64 - ready *buffer - pending *buffer + buf *bytes.Buffer closed bool committed bool cancelled bool @@ -1399,8 +1338,7 @@ func (d *driver) newWriter(ctx context.Context, key, uploadID string, parts []*s uploadID: uploadID, parts: parts, size: size, - ready: d.NewBuffer(), - pending: d.NewBuffer(), + buf: d.pool.Get().(*bytes.Buffer), } } @@ -1411,12 +1349,8 @@ func (a completedParts) Swap(i, j int) { a[i], a[j] = a[j], a[i] } func (a completedParts) Less(i, j int) bool { return *a[i].PartNumber < *a[j].PartNumber } func (w *writer) Write(p []byte) (int, error) { - if w.closed { - return 0, fmt.Errorf("already closed") - } else if w.committed { - return 0, fmt.Errorf("already committed") - } else if w.cancelled { - return 0, fmt.Errorf("already cancelled") + if err := w.done(); err != nil { + return 0, err } // If the last written part is smaller than minChunkSize, we need to make a @@ -1476,17 +1410,11 @@ func (w *writer) Write(p []byte) (int, error) { } defer resp.Body.Close() - // reset uploaded parts - w.parts = nil - w.ready.Clear() + w.reset() - n, err := w.ready.ReadFrom(resp.Body) - if err != nil { + if _, err := io.Copy(w.buf, resp.Body); err != nil { return 0, err } - if resp.ContentLength != nil && n < *resp.ContentLength { - return 0, io.ErrShortBuffer - } } else { // Otherwise we can use the old file as the new first part copyPartResp, err := w.driver.S3.UploadPartCopyWithContext(w.ctx, &s3.UploadPartCopyInput{ @@ -1499,52 +1427,21 @@ func (w *writer) Write(p []byte) (int, error) { if err != nil { return 0, err } - w.parts = []*s3.Part{ - { - ETag: copyPartResp.CopyPartResult.ETag, - PartNumber: aws.Int64(1), - Size: aws.Int64(w.size), - }, - } + w.parts = []*s3.Part{{ + ETag: copyPartResp.CopyPartResult.ETag, + PartNumber: aws.Int64(1), + Size: aws.Int64(w.size), + }} } } - var n int + n, _ := w.buf.Write(p) - defer func() { w.size += int64(n) }() - - reader := bytes.NewReader(p) - - for reader.Len() > 0 { - // NOTE(milosgajdos): we do some seemingly unsafe conversions - // from int64 to int in this for loop. These are fine as the - // offset returned from buffer.ReadFrom can only ever be - // maxChunkSize large which fits in to int. The reason why - // we return int64 is to play nice with Go interfaces where - // the buffer implements io.ReaderFrom interface. - - // fill up the ready parts buffer - offset, err := w.ready.ReadFrom(reader) - n += int(offset) - if err != nil { - return n, err - } - - // try filling up the pending parts buffer - offset, err = w.pending.ReadFrom(reader) - n += int(offset) - if err != nil { - return n, err - } - - // we filled up pending buffer, flush - if w.pending.Len() == w.pending.Cap() { - if err := w.flush(); err != nil { - return n, err - } + for w.buf.Len() >= w.driver.ChunkSize { + if err := w.flush(); err != nil { + return 0, fmt.Errorf("flush: %w", err) } } - return n, nil } @@ -1552,28 +1449,38 @@ func (w *writer) Size() int64 { return w.size } +// Close flushes any remaining data in the buffer and releases the buffer back +// to the pool. func (w *writer) Close() error { if w.closed { return fmt.Errorf("already closed") } + w.closed = true - defer func() { - w.ready.Clear() - w.driver.pool.Put(w.ready) - w.pending.Clear() - w.driver.pool.Put(w.pending) - }() + defer w.releaseBuffer() return w.flush() } +func (w *writer) reset() { + w.buf.Reset() + w.parts = nil + w.size = 0 +} + +// releaseBuffer resets the buffer and returns it to the pool. +func (w *writer) releaseBuffer() { + w.buf.Reset() + w.driver.pool.Put(w.buf) +} + +// Cancel aborts the multipart upload and closes the writer. func (w *writer) Cancel(ctx context.Context) error { - if w.closed { - return fmt.Errorf("already closed") - } else if w.committed { - return fmt.Errorf("already committed") + if err := w.done(); err != nil { + return err } + w.cancelled = true _, err := w.driver.S3.AbortMultipartUploadWithContext(ctx, &s3.AbortMultipartUploadInput{ Bucket: aws.String(w.driver.Bucket), @@ -1583,17 +1490,14 @@ func (w *writer) Cancel(ctx context.Context) error { return err } +// Commit flushes any remaining data in the buffer and completes the multipart +// upload. func (w *writer) Commit(ctx context.Context) error { - if w.closed { - return fmt.Errorf("already closed") - } else if w.committed { - return fmt.Errorf("already committed") - } else if w.cancelled { - return fmt.Errorf("already cancelled") + if err := w.done(); err != nil { + return err } - err := w.flush() - if err != nil { + if err := w.flush(); err != nil { return err } @@ -1634,15 +1538,14 @@ func (w *writer) Commit(ctx context.Context) error { sort.Sort(completedUploadedParts) - _, err = w.driver.S3.CompleteMultipartUploadWithContext(w.ctx, &s3.CompleteMultipartUploadInput{ + if _, err := w.driver.S3.CompleteMultipartUploadWithContext(w.ctx, &s3.CompleteMultipartUploadInput{ Bucket: aws.String(w.driver.Bucket), Key: aws.String(w.key), UploadId: aws.String(w.uploadID), MultipartUpload: &s3.CompletedMultipartUpload{ Parts: completedUploadedParts, }, - }) - if err != nil { + }); err != nil { if _, aErr := w.driver.S3.AbortMultipartUploadWithContext(w.ctx, &s3.AbortMultipartUploadInput{ Bucket: aws.String(w.driver.Bucket), Key: aws.String(w.key), @@ -1655,33 +1558,28 @@ func (w *writer) Commit(ctx context.Context) error { return nil } -// flush flushes all buffers to write a part to S3. -// flush is only called by Write (with both buffers full) and Close/Commit (always) +// flush writes at most [w.driver.ChunkSize] of the buffer to S3. flush is only +// called by [writer.Write] if the buffer is full, and always by [writer.Close] +// and [writer.Commit]. func (w *writer) flush() error { - if w.ready.Len() == 0 && w.pending.Len() == 0 { + if w.buf.Len() == 0 { return nil } - buf := bytes.NewBuffer(w.ready.data) - if w.pending.Len() > 0 && w.pending.Len() < int(w.driver.ChunkSize) { - if _, err := buf.Write(w.pending.data); err != nil { - return err - } - w.pending.Clear() - } + r := bytes.NewReader(w.buf.Next(w.driver.ChunkSize)) - partSize := buf.Len() - partNumber := aws.Int64(int64(len(w.parts) + 1)) + partSize := r.Len() + partNumber := aws.Int64(int64(len(w.parts)) + 1) resp, err := w.driver.S3.UploadPartWithContext(w.ctx, &s3.UploadPartInput{ Bucket: aws.String(w.driver.Bucket), Key: aws.String(w.key), PartNumber: partNumber, UploadId: aws.String(w.uploadID), - Body: bytes.NewReader(buf.Bytes()), + Body: r, }) if err != nil { - return err + return fmt.Errorf("upload part: %w", err) } w.parts = append(w.parts, &s3.Part{ @@ -1690,9 +1588,20 @@ func (w *writer) flush() error { Size: aws.Int64(int64(partSize)), }) - // reset the flushed buffer and swap buffers - w.ready.Clear() - w.ready, w.pending = w.pending, w.ready + w.size += int64(partSize) return nil } + +// done returns an error if the writer is in an invalid state. +func (w *writer) done() error { + switch { + case w.closed: + return fmt.Errorf("already closed") + case w.committed: + return fmt.Errorf("already committed") + case w.cancelled: + return fmt.Errorf("already cancelled") + } + return nil +} diff --git a/registry/storage/driver/s3-aws/s3_32bit.go b/registry/storage/driver/s3-aws/s3_32bit.go new file mode 100644 index 000000000..218e3eab2 --- /dev/null +++ b/registry/storage/driver/s3-aws/s3_32bit.go @@ -0,0 +1,10 @@ +//go:build arm + +package s3 + +import "math" + +// maxChunkSize defines the maximum multipart upload chunk size allowed by S3. +// S3 API requires max upload chunk to be 5GB, but this overflows on 32-bit +// platforms. +const maxChunkSize = math.MaxInt32 diff --git a/registry/storage/driver/s3-aws/s3_64bit.go b/registry/storage/driver/s3-aws/s3_64bit.go new file mode 100644 index 000000000..55254e497 --- /dev/null +++ b/registry/storage/driver/s3-aws/s3_64bit.go @@ -0,0 +1,7 @@ +//go:build !arm + +package s3 + +// maxChunkSize defines the maximum multipart upload chunk size allowed by S3. +// S3 API requires max upload chunk to be 5GB. +const maxChunkSize = 5 * 1024 * 1024 * 1024 diff --git a/registry/storage/driver/s3-aws/s3_test.go b/registry/storage/driver/s3-aws/s3_test.go index 9e1a875f6..ab7094708 100644 --- a/registry/storage/driver/s3-aws/s3_test.go +++ b/registry/storage/driver/s3-aws/s3_test.go @@ -101,30 +101,34 @@ func init() { } } + if objectACL == "" { + objectACL = s3.ObjectCannedACLPrivate + } + parameters := DriverParameters{ - accessKey, - secretKey, - bucket, - region, - regionEndpoint, - forcePathStyleBool, - encryptBool, - keyID, - secureBool, - skipVerifyBool, - v4Bool, - minChunkSize, - defaultMultipartCopyChunkSize, - defaultMultipartCopyMaxConcurrency, - defaultMultipartCopyThresholdSize, - rootDirectory, - storageClass, - driverName + "-test", - objectACL, - sessionToken, - useDualStackBool, - accelerateBool, - getS3LogLevelFromParam(logLevel), + AccessKey: accessKey, + SecretKey: secretKey, + Bucket: bucket, + Region: region, + RegionEndpoint: regionEndpoint, + ForcePathStyle: forcePathStyleBool, + Encrypt: encryptBool, + KeyID: keyID, + Secure: secureBool, + SkipVerify: skipVerifyBool, + V4Auth: v4Bool, + ChunkSize: minChunkSize, + MultipartCopyChunkSize: defaultMultipartCopyChunkSize, + MultipartCopyMaxConcurrency: defaultMultipartCopyMaxConcurrency, + MultipartCopyThresholdSize: defaultMultipartCopyThresholdSize, + RootDirectory: rootDirectory, + StorageClass: storageClass, + UserAgent: driverName + "-test", + ObjectACL: objectACL, + SessionToken: sessionToken, + UseDualStack: useDualStackBool, + Accelerate: accelerateBool, + LogLevel: getS3LogLevelFromParam(logLevel), } return New(context.Background(), parameters) diff --git a/registry/storage/driver/testsuites/testsuites.go b/registry/storage/driver/testsuites/testsuites.go index 55cbfcd97..b1221a64b 100644 --- a/registry/storage/driver/testsuites/testsuites.go +++ b/registry/storage/driver/testsuites/testsuites.go @@ -398,31 +398,28 @@ func (suite *DriverSuite) testContinueStreamAppend(chunkSize int64) { filename := randomPath(32) defer suite.deletePath(firstPart(filename)) - contentsChunk1 := randomContents(chunkSize) - contentsChunk2 := randomContents(chunkSize) - contentsChunk3 := randomContents(chunkSize) - - fullContents := append(append(contentsChunk1, contentsChunk2...), contentsChunk3...) + var fullContents bytes.Buffer + contents := io.TeeReader(newRandReader(chunkSize*3), &fullContents) writer, err := suite.StorageDriver.Writer(suite.ctx, filename, false) suite.Require().NoError(err) - nn, err := io.Copy(writer, bytes.NewReader(contentsChunk1)) + nn, err := io.CopyN(writer, contents, chunkSize) suite.Require().NoError(err) - suite.Require().Equal(int64(len(contentsChunk1)), nn) + suite.Require().Equal(chunkSize, nn) err = writer.Close() suite.Require().NoError(err) curSize := writer.Size() - suite.Require().Equal(int64(len(contentsChunk1)), curSize) + suite.Require().Equal(chunkSize, curSize) writer, err = suite.StorageDriver.Writer(suite.ctx, filename, true) suite.Require().NoError(err) suite.Require().Equal(curSize, writer.Size()) - nn, err = io.Copy(writer, bytes.NewReader(contentsChunk2)) + nn, err = io.CopyN(writer, contents, chunkSize) suite.Require().NoError(err) - suite.Require().Equal(int64(len(contentsChunk2)), nn) + suite.Require().Equal(chunkSize, nn) err = writer.Close() suite.Require().NoError(err) @@ -434,9 +431,9 @@ func (suite *DriverSuite) testContinueStreamAppend(chunkSize int64) { suite.Require().NoError(err) suite.Require().Equal(curSize, writer.Size()) - nn, err = io.Copy(writer, bytes.NewReader(fullContents[curSize:])) + nn, err = io.CopyN(writer, contents, chunkSize) suite.Require().NoError(err) - suite.Require().Equal(int64(len(fullContents[curSize:])), nn) + suite.Require().Equal(chunkSize, nn) err = writer.Commit(context.Background()) suite.Require().NoError(err) @@ -445,7 +442,7 @@ func (suite *DriverSuite) testContinueStreamAppend(chunkSize int64) { received, err := suite.StorageDriver.GetContent(suite.ctx, filename) suite.Require().NoError(err) - suite.Require().Equal(fullContents, received) + suite.Require().Equal(fullContents.Bytes(), received) } // TestReadNonexistentStream tests that reading a stream for a nonexistent path