diff --git a/docs/content/storage-drivers/s3.md b/docs/content/storage-drivers/s3.md index a3a89213d..098550683 100644 --- a/docs/content/storage-drivers/s3.md +++ b/docs/content/storage-drivers/s3.md @@ -80,9 +80,6 @@ Amazon S3 or S3 compatible services for object storage. `loglevel`: (optional) Valid values are: `off` (default), `debug`, `debugwithsigning`, `debugwithhttpbody`, `debugwithrequestretries`, `debugwithrequesterrors` and `debugwitheventstreambody`. See the [AWS SDK for Go API reference](https://docs.aws.amazon.com/sdk-for-go/api/aws/#LogLevelType) for details. -**NOTE:** Currently the S3 storage driver only supports S3 API compatible storage that -allows parts of a multipart upload to vary in size. [Cloudflare R2 is not supported.](https://developers.cloudflare.com/r2/objects/multipart-objects/#limitations) - ## S3 permission scopes The following AWS policy is required by the registry for push and pull. Make sure to replace `S3_BUCKET_NAME` with the name of your bucket. diff --git a/registry/storage/driver/s3-aws/s3.go b/registry/storage/driver/s3-aws/s3.go index 11822a0b2..1550ddb4b 100644 --- a/registry/storage/driver/s3-aws/s3.go +++ b/registry/storage/driver/s3-aws/s3.go @@ -21,7 +21,7 @@ import ( "math" "net/http" "path/filepath" - "reflect" + "slices" "sort" "strconv" "strings" @@ -48,10 +48,6 @@ const driverName = "s3aws" // S3 API requires multipart upload chunks to be at least 5MB const minChunkSize = 5 * 1024 * 1024 -// maxChunkSize defines the maximum multipart upload chunk size allowed by S3. -// S3 API requires max upload chunk to be 5GB. -const maxChunkSize = 5 * 1024 * 1024 * 1024 - const defaultChunkSize = 2 * minChunkSize const ( @@ -107,7 +103,7 @@ type DriverParameters struct { Secure bool SkipVerify bool V4Auth bool - ChunkSize int64 + ChunkSize int MultipartCopyChunkSize int64 MultipartCopyMaxConcurrency int64 MultipartCopyThresholdSize int64 @@ -158,7 +154,7 @@ var _ storagedriver.StorageDriver = &driver{} type driver struct { S3 *s3.S3 Bucket string - ChunkSize int64 + ChunkSize int Encrypt bool KeyID string MultipartCopyChunkSize int64 @@ -313,22 +309,22 @@ func FromParameters(ctx context.Context, parameters map[string]interface{}) (*Dr keyID = "" } - chunkSize, err := getParameterAsInt64(parameters, "chunksize", defaultChunkSize, minChunkSize, maxChunkSize) + chunkSize, err := getParameterAsInteger(parameters, "chunksize", defaultChunkSize, minChunkSize, maxChunkSize) if err != nil { return nil, err } - multipartCopyChunkSize, err := getParameterAsInt64(parameters, "multipartcopychunksize", defaultMultipartCopyChunkSize, minChunkSize, maxChunkSize) + multipartCopyChunkSize, err := getParameterAsInteger[int64](parameters, "multipartcopychunksize", defaultMultipartCopyChunkSize, minChunkSize, maxChunkSize) if err != nil { return nil, err } - multipartCopyMaxConcurrency, err := getParameterAsInt64(parameters, "multipartcopymaxconcurrency", defaultMultipartCopyMaxConcurrency, 1, math.MaxInt64) + multipartCopyMaxConcurrency, err := getParameterAsInteger[int64](parameters, "multipartcopymaxconcurrency", defaultMultipartCopyMaxConcurrency, 1, math.MaxInt64) if err != nil { return nil, err } - multipartCopyThresholdSize, err := getParameterAsInt64(parameters, "multipartcopythresholdsize", defaultMultipartCopyThresholdSize, 0, maxChunkSize) + multipartCopyThresholdSize, err := getParameterAsInteger[int64](parameters, "multipartcopythresholdsize", defaultMultipartCopyThresholdSize, 0, maxChunkSize) if err != nil { return nil, err } @@ -424,29 +420,29 @@ func FromParameters(ctx context.Context, parameters map[string]interface{}) (*Dr } params := DriverParameters{ - fmt.Sprint(accessKey), - fmt.Sprint(secretKey), - fmt.Sprint(bucket), - region, - fmt.Sprint(regionEndpoint), - forcePathStyleBool, - encryptBool, - fmt.Sprint(keyID), - secureBool, - skipVerifyBool, - v4Bool, - chunkSize, - multipartCopyChunkSize, - multipartCopyMaxConcurrency, - multipartCopyThresholdSize, - fmt.Sprint(rootDirectory), - storageClass, - fmt.Sprint(userAgent), - objectACL, - fmt.Sprint(sessionToken), - useDualStackBool, - accelerateBool, - getS3LogLevelFromParam(parameters["loglevel"]), + AccessKey: fmt.Sprint(accessKey), + SecretKey: fmt.Sprint(secretKey), + Bucket: fmt.Sprint(bucket), + Region: region, + RegionEndpoint: fmt.Sprint(regionEndpoint), + ForcePathStyle: forcePathStyleBool, + Encrypt: encryptBool, + KeyID: fmt.Sprint(keyID), + Secure: secureBool, + SkipVerify: skipVerifyBool, + V4Auth: v4Bool, + ChunkSize: chunkSize, + MultipartCopyChunkSize: multipartCopyChunkSize, + MultipartCopyMaxConcurrency: multipartCopyMaxConcurrency, + MultipartCopyThresholdSize: multipartCopyThresholdSize, + RootDirectory: fmt.Sprint(rootDirectory), + StorageClass: storageClass, + UserAgent: fmt.Sprint(userAgent), + ObjectACL: objectACL, + SessionToken: fmt.Sprint(sessionToken), + UseDualStack: useDualStackBool, + Accelerate: accelerateBool, + LogLevel: getS3LogLevelFromParam(parameters["loglevel"]), } return New(ctx, params) @@ -479,33 +475,29 @@ func getS3LogLevelFromParam(param interface{}) aws.LogLevelType { return logLevel } -// getParameterAsInt64 converts parameters[name] to an int64 value (using -// defaultt if nil), verifies it is no smaller than min, and returns it. -func getParameterAsInt64(parameters map[string]interface{}, name string, defaultt int64, min int64, max int64) (int64, error) { - rv := defaultt - param := parameters[name] - switch v := param.(type) { - case string: - vv, err := strconv.ParseInt(v, 0, 64) - if err != nil { - return 0, fmt.Errorf("%s parameter must be an integer, %v invalid", name, param) +type integer interface{ signed | unsigned } + +type signed interface { + ~int | ~int8 | ~int16 | ~int32 | ~int64 +} + +type unsigned interface { + ~uint | ~uint8 | ~uint16 | ~uint32 | ~uint64 | ~uintptr +} + +// getParameterAsInteger converts parameters[name] to T (using defaultValue if +// nil) and ensures it is in the range of min and max. +func getParameterAsInteger[T integer](parameters map[string]any, name string, defaultValue, min, max T) (T, error) { + v := defaultValue + if p := parameters[name]; p != nil { + if _, err := fmt.Sscanf(fmt.Sprint(p), "%d", &v); err != nil { + return 0, fmt.Errorf("%s parameter must be an integer, %v invalid", name, p) } - rv = vv - case int64: - rv = v - case int, uint, int32, uint32, uint64: - rv = reflect.ValueOf(v).Convert(reflect.TypeOf(rv)).Int() - case nil: - // do nothing - default: - return 0, fmt.Errorf("invalid value for %s: %#v", name, param) } - - if rv < min || rv > max { - return 0, fmt.Errorf("the %s %#v parameter should be a number between %d and %d (inclusive)", name, rv, min, max) + if v < min || v > max { + return 0, fmt.Errorf("the %s %#v parameter should be a number between %d and %d (inclusive)", name, v, min, max) } - - return rv, nil + return v, nil } // New constructs a new Driver with the given AWS credentials, region, encryption flag, and @@ -592,11 +584,7 @@ func New(ctx context.Context, params DriverParameters) (*Driver, error) { StorageClass: params.StorageClass, ObjectACL: params.ObjectACL, pool: &sync.Pool{ - New: func() interface{} { - return &buffer{ - data: make([]byte, 0, params.ChunkSize), - } - }, + New: func() any { return &bytes.Buffer{} }, }, } @@ -903,7 +891,7 @@ func (d *driver) List(ctx context.Context, opath string) ([]string, error) { // Move moves an object stored at sourcePath to destPath, removing the original // object. -func (d *driver) Move(ctx context.Context, sourcePath string, destPath string) error { +func (d *driver) Move(ctx context.Context, sourcePath, destPath string) error { /* This is terrible, but aws doesn't have an actual move. */ if err := d.copy(ctx, sourcePath, destPath); err != nil { return err @@ -912,7 +900,7 @@ func (d *driver) Move(ctx context.Context, sourcePath string, destPath string) e } // copy copies an object stored at sourcePath to destPath. -func (d *driver) copy(ctx context.Context, sourcePath string, destPath string) error { +func (d *driver) copy(ctx context.Context, sourcePath, destPath string) error { // S3 can copy objects up to 5 GB in size with a single PUT Object - Copy // operation. For larger objects, the multipart upload API must be used. // @@ -1121,7 +1109,7 @@ func (d *driver) Walk(ctx context.Context, from string, f storagedriver.WalkFn, return nil } -func (d *driver) doWalk(parentCtx context.Context, objectCount *int64, from string, startAfter string, f storagedriver.WalkFn) error { +func (d *driver) doWalk(parentCtx context.Context, objectCount *int64, from, startAfter string, f storagedriver.WalkFn) error { var ( retError error // the most recent directory walked for de-duping @@ -1267,16 +1255,10 @@ func directoryDiff(prev, current string) []string { } paths = append(paths, parent) } - reverse(paths) + slices.Reverse(paths) return paths } -func reverse(s []string) { - for i, j := 0, len(s)-1; i < j; i, j = i+1, j-1 { - s[i], s[j] = s[j], s[i] - } -} - func (d *driver) s3Path(path string) string { return strings.TrimLeft(strings.TrimRight(d.RootDirectory, "/")+path, "/") } @@ -1326,53 +1308,11 @@ func (d *driver) getStorageClass() *string { return aws.String(d.StorageClass) } -// buffer is a static size bytes buffer. -type buffer struct { - data []byte -} - -// NewBuffer returns a new bytes buffer from driver's memory pool. -// The size of the buffer is static and set to params.ChunkSize. -func (d *driver) NewBuffer() *buffer { - return d.pool.Get().(*buffer) -} - -// ReadFrom reads as much data as it can fit in from r without growing its size. -// It returns the number of bytes successfully read from r or error. -func (b *buffer) ReadFrom(r io.Reader) (offset int64, err error) { - for len(b.data) < cap(b.data) && err == nil { - var n int - n, err = r.Read(b.data[len(b.data):cap(b.data)]) - offset += int64(n) - b.data = b.data[:len(b.data)+n] - } - // NOTE(milosgajdos): io.ReaderFrom "swallows" io.EOF - // See: https://pkg.go.dev/io#ReaderFrom - if err == io.EOF { - err = nil - } - return offset, err -} - -// Cap returns the capacity of the buffer's underlying byte slice. -func (b *buffer) Cap() int { - return cap(b.data) -} - -// Len returns the length of the data in the buffer -func (b *buffer) Len() int { - return len(b.data) -} - -// Clear the buffer data. -func (b *buffer) Clear() { - b.data = b.data[:0] -} - -// writer attempts to upload parts to S3 in a buffered fashion where the last -// part is at least as large as the chunksize, so the multipart upload could be -// cleanly resumed in the future. This is violated if Close is called after less -// than a full chunk is written. +// writer uploads parts to S3 in a buffered fashion where the length of each +// part is [writer.driver.ChunkSize], excluding the last part which may be +// smaller than the configured chunk size and never larger. This allows the +// multipart upload to be cleanly resumed in future. This is violated if +// [writer.Close] is called before at least one chunk is written. type writer struct { ctx context.Context driver *driver @@ -1380,8 +1320,7 @@ type writer struct { uploadID string parts []*s3.Part size int64 - ready *buffer - pending *buffer + buf *bytes.Buffer closed bool committed bool cancelled bool @@ -1399,8 +1338,7 @@ func (d *driver) newWriter(ctx context.Context, key, uploadID string, parts []*s uploadID: uploadID, parts: parts, size: size, - ready: d.NewBuffer(), - pending: d.NewBuffer(), + buf: d.pool.Get().(*bytes.Buffer), } } @@ -1411,12 +1349,8 @@ func (a completedParts) Swap(i, j int) { a[i], a[j] = a[j], a[i] } func (a completedParts) Less(i, j int) bool { return *a[i].PartNumber < *a[j].PartNumber } func (w *writer) Write(p []byte) (int, error) { - if w.closed { - return 0, fmt.Errorf("already closed") - } else if w.committed { - return 0, fmt.Errorf("already committed") - } else if w.cancelled { - return 0, fmt.Errorf("already cancelled") + if err := w.done(); err != nil { + return 0, err } // If the last written part is smaller than minChunkSize, we need to make a @@ -1476,17 +1410,11 @@ func (w *writer) Write(p []byte) (int, error) { } defer resp.Body.Close() - // reset uploaded parts - w.parts = nil - w.ready.Clear() + w.reset() - n, err := w.ready.ReadFrom(resp.Body) - if err != nil { + if _, err := io.Copy(w.buf, resp.Body); err != nil { return 0, err } - if resp.ContentLength != nil && n < *resp.ContentLength { - return 0, io.ErrShortBuffer - } } else { // Otherwise we can use the old file as the new first part copyPartResp, err := w.driver.S3.UploadPartCopyWithContext(w.ctx, &s3.UploadPartCopyInput{ @@ -1499,52 +1427,21 @@ func (w *writer) Write(p []byte) (int, error) { if err != nil { return 0, err } - w.parts = []*s3.Part{ - { - ETag: copyPartResp.CopyPartResult.ETag, - PartNumber: aws.Int64(1), - Size: aws.Int64(w.size), - }, - } + w.parts = []*s3.Part{{ + ETag: copyPartResp.CopyPartResult.ETag, + PartNumber: aws.Int64(1), + Size: aws.Int64(w.size), + }} } } - var n int + n, _ := w.buf.Write(p) - defer func() { w.size += int64(n) }() - - reader := bytes.NewReader(p) - - for reader.Len() > 0 { - // NOTE(milosgajdos): we do some seemingly unsafe conversions - // from int64 to int in this for loop. These are fine as the - // offset returned from buffer.ReadFrom can only ever be - // maxChunkSize large which fits in to int. The reason why - // we return int64 is to play nice with Go interfaces where - // the buffer implements io.ReaderFrom interface. - - // fill up the ready parts buffer - offset, err := w.ready.ReadFrom(reader) - n += int(offset) - if err != nil { - return n, err - } - - // try filling up the pending parts buffer - offset, err = w.pending.ReadFrom(reader) - n += int(offset) - if err != nil { - return n, err - } - - // we filled up pending buffer, flush - if w.pending.Len() == w.pending.Cap() { - if err := w.flush(); err != nil { - return n, err - } + for w.buf.Len() >= w.driver.ChunkSize { + if err := w.flush(); err != nil { + return 0, fmt.Errorf("flush: %w", err) } } - return n, nil } @@ -1552,28 +1449,38 @@ func (w *writer) Size() int64 { return w.size } +// Close flushes any remaining data in the buffer and releases the buffer back +// to the pool. func (w *writer) Close() error { if w.closed { return fmt.Errorf("already closed") } + w.closed = true - defer func() { - w.ready.Clear() - w.driver.pool.Put(w.ready) - w.pending.Clear() - w.driver.pool.Put(w.pending) - }() + defer w.releaseBuffer() return w.flush() } +func (w *writer) reset() { + w.buf.Reset() + w.parts = nil + w.size = 0 +} + +// releaseBuffer resets the buffer and returns it to the pool. +func (w *writer) releaseBuffer() { + w.buf.Reset() + w.driver.pool.Put(w.buf) +} + +// Cancel aborts the multipart upload and closes the writer. func (w *writer) Cancel(ctx context.Context) error { - if w.closed { - return fmt.Errorf("already closed") - } else if w.committed { - return fmt.Errorf("already committed") + if err := w.done(); err != nil { + return err } + w.cancelled = true _, err := w.driver.S3.AbortMultipartUploadWithContext(ctx, &s3.AbortMultipartUploadInput{ Bucket: aws.String(w.driver.Bucket), @@ -1583,17 +1490,14 @@ func (w *writer) Cancel(ctx context.Context) error { return err } +// Commit flushes any remaining data in the buffer and completes the multipart +// upload. func (w *writer) Commit(ctx context.Context) error { - if w.closed { - return fmt.Errorf("already closed") - } else if w.committed { - return fmt.Errorf("already committed") - } else if w.cancelled { - return fmt.Errorf("already cancelled") + if err := w.done(); err != nil { + return err } - err := w.flush() - if err != nil { + if err := w.flush(); err != nil { return err } @@ -1634,15 +1538,14 @@ func (w *writer) Commit(ctx context.Context) error { sort.Sort(completedUploadedParts) - _, err = w.driver.S3.CompleteMultipartUploadWithContext(w.ctx, &s3.CompleteMultipartUploadInput{ + if _, err := w.driver.S3.CompleteMultipartUploadWithContext(w.ctx, &s3.CompleteMultipartUploadInput{ Bucket: aws.String(w.driver.Bucket), Key: aws.String(w.key), UploadId: aws.String(w.uploadID), MultipartUpload: &s3.CompletedMultipartUpload{ Parts: completedUploadedParts, }, - }) - if err != nil { + }); err != nil { if _, aErr := w.driver.S3.AbortMultipartUploadWithContext(w.ctx, &s3.AbortMultipartUploadInput{ Bucket: aws.String(w.driver.Bucket), Key: aws.String(w.key), @@ -1655,33 +1558,28 @@ func (w *writer) Commit(ctx context.Context) error { return nil } -// flush flushes all buffers to write a part to S3. -// flush is only called by Write (with both buffers full) and Close/Commit (always) +// flush writes at most [w.driver.ChunkSize] of the buffer to S3. flush is only +// called by [writer.Write] if the buffer is full, and always by [writer.Close] +// and [writer.Commit]. func (w *writer) flush() error { - if w.ready.Len() == 0 && w.pending.Len() == 0 { + if w.buf.Len() == 0 { return nil } - buf := bytes.NewBuffer(w.ready.data) - if w.pending.Len() > 0 && w.pending.Len() < int(w.driver.ChunkSize) { - if _, err := buf.Write(w.pending.data); err != nil { - return err - } - w.pending.Clear() - } + r := bytes.NewReader(w.buf.Next(w.driver.ChunkSize)) - partSize := buf.Len() - partNumber := aws.Int64(int64(len(w.parts) + 1)) + partSize := r.Len() + partNumber := aws.Int64(int64(len(w.parts)) + 1) resp, err := w.driver.S3.UploadPartWithContext(w.ctx, &s3.UploadPartInput{ Bucket: aws.String(w.driver.Bucket), Key: aws.String(w.key), PartNumber: partNumber, UploadId: aws.String(w.uploadID), - Body: bytes.NewReader(buf.Bytes()), + Body: r, }) if err != nil { - return err + return fmt.Errorf("upload part: %w", err) } w.parts = append(w.parts, &s3.Part{ @@ -1690,9 +1588,20 @@ func (w *writer) flush() error { Size: aws.Int64(int64(partSize)), }) - // reset the flushed buffer and swap buffers - w.ready.Clear() - w.ready, w.pending = w.pending, w.ready + w.size += int64(partSize) return nil } + +// done returns an error if the writer is in an invalid state. +func (w *writer) done() error { + switch { + case w.closed: + return fmt.Errorf("already closed") + case w.committed: + return fmt.Errorf("already committed") + case w.cancelled: + return fmt.Errorf("already cancelled") + } + return nil +} diff --git a/registry/storage/driver/s3-aws/s3_32bit.go b/registry/storage/driver/s3-aws/s3_32bit.go new file mode 100644 index 000000000..218e3eab2 --- /dev/null +++ b/registry/storage/driver/s3-aws/s3_32bit.go @@ -0,0 +1,10 @@ +//go:build arm + +package s3 + +import "math" + +// maxChunkSize defines the maximum multipart upload chunk size allowed by S3. +// S3 API requires max upload chunk to be 5GB, but this overflows on 32-bit +// platforms. +const maxChunkSize = math.MaxInt32 diff --git a/registry/storage/driver/s3-aws/s3_64bit.go b/registry/storage/driver/s3-aws/s3_64bit.go new file mode 100644 index 000000000..55254e497 --- /dev/null +++ b/registry/storage/driver/s3-aws/s3_64bit.go @@ -0,0 +1,7 @@ +//go:build !arm + +package s3 + +// maxChunkSize defines the maximum multipart upload chunk size allowed by S3. +// S3 API requires max upload chunk to be 5GB. +const maxChunkSize = 5 * 1024 * 1024 * 1024 diff --git a/registry/storage/driver/s3-aws/s3_test.go b/registry/storage/driver/s3-aws/s3_test.go index 9e1a875f6..ab7094708 100644 --- a/registry/storage/driver/s3-aws/s3_test.go +++ b/registry/storage/driver/s3-aws/s3_test.go @@ -101,30 +101,34 @@ func init() { } } + if objectACL == "" { + objectACL = s3.ObjectCannedACLPrivate + } + parameters := DriverParameters{ - accessKey, - secretKey, - bucket, - region, - regionEndpoint, - forcePathStyleBool, - encryptBool, - keyID, - secureBool, - skipVerifyBool, - v4Bool, - minChunkSize, - defaultMultipartCopyChunkSize, - defaultMultipartCopyMaxConcurrency, - defaultMultipartCopyThresholdSize, - rootDirectory, - storageClass, - driverName + "-test", - objectACL, - sessionToken, - useDualStackBool, - accelerateBool, - getS3LogLevelFromParam(logLevel), + AccessKey: accessKey, + SecretKey: secretKey, + Bucket: bucket, + Region: region, + RegionEndpoint: regionEndpoint, + ForcePathStyle: forcePathStyleBool, + Encrypt: encryptBool, + KeyID: keyID, + Secure: secureBool, + SkipVerify: skipVerifyBool, + V4Auth: v4Bool, + ChunkSize: minChunkSize, + MultipartCopyChunkSize: defaultMultipartCopyChunkSize, + MultipartCopyMaxConcurrency: defaultMultipartCopyMaxConcurrency, + MultipartCopyThresholdSize: defaultMultipartCopyThresholdSize, + RootDirectory: rootDirectory, + StorageClass: storageClass, + UserAgent: driverName + "-test", + ObjectACL: objectACL, + SessionToken: sessionToken, + UseDualStack: useDualStackBool, + Accelerate: accelerateBool, + LogLevel: getS3LogLevelFromParam(logLevel), } return New(context.Background(), parameters) diff --git a/registry/storage/driver/testsuites/testsuites.go b/registry/storage/driver/testsuites/testsuites.go index 55cbfcd97..b1221a64b 100644 --- a/registry/storage/driver/testsuites/testsuites.go +++ b/registry/storage/driver/testsuites/testsuites.go @@ -398,31 +398,28 @@ func (suite *DriverSuite) testContinueStreamAppend(chunkSize int64) { filename := randomPath(32) defer suite.deletePath(firstPart(filename)) - contentsChunk1 := randomContents(chunkSize) - contentsChunk2 := randomContents(chunkSize) - contentsChunk3 := randomContents(chunkSize) - - fullContents := append(append(contentsChunk1, contentsChunk2...), contentsChunk3...) + var fullContents bytes.Buffer + contents := io.TeeReader(newRandReader(chunkSize*3), &fullContents) writer, err := suite.StorageDriver.Writer(suite.ctx, filename, false) suite.Require().NoError(err) - nn, err := io.Copy(writer, bytes.NewReader(contentsChunk1)) + nn, err := io.CopyN(writer, contents, chunkSize) suite.Require().NoError(err) - suite.Require().Equal(int64(len(contentsChunk1)), nn) + suite.Require().Equal(chunkSize, nn) err = writer.Close() suite.Require().NoError(err) curSize := writer.Size() - suite.Require().Equal(int64(len(contentsChunk1)), curSize) + suite.Require().Equal(chunkSize, curSize) writer, err = suite.StorageDriver.Writer(suite.ctx, filename, true) suite.Require().NoError(err) suite.Require().Equal(curSize, writer.Size()) - nn, err = io.Copy(writer, bytes.NewReader(contentsChunk2)) + nn, err = io.CopyN(writer, contents, chunkSize) suite.Require().NoError(err) - suite.Require().Equal(int64(len(contentsChunk2)), nn) + suite.Require().Equal(chunkSize, nn) err = writer.Close() suite.Require().NoError(err) @@ -434,9 +431,9 @@ func (suite *DriverSuite) testContinueStreamAppend(chunkSize int64) { suite.Require().NoError(err) suite.Require().Equal(curSize, writer.Size()) - nn, err = io.Copy(writer, bytes.NewReader(fullContents[curSize:])) + nn, err = io.CopyN(writer, contents, chunkSize) suite.Require().NoError(err) - suite.Require().Equal(int64(len(fullContents[curSize:])), nn) + suite.Require().Equal(chunkSize, nn) err = writer.Commit(context.Background()) suite.Require().NoError(err) @@ -445,7 +442,7 @@ func (suite *DriverSuite) testContinueStreamAppend(chunkSize int64) { received, err := suite.StorageDriver.GetContent(suite.ctx, filename) suite.Require().NoError(err) - suite.Require().Equal(fullContents, received) + suite.Require().Equal(fullContents.Bytes(), received) } // TestReadNonexistentStream tests that reading a stream for a nonexistent path