fix(registry/storage/driver/s3-aws): use a consistent multipart chunk size

Some S3 compatible object storage systems like R2 require that all
multipart chunks are the same size. This was mostly true before, except
the final chunk was larger than the requested chunk size which causes
uploads to fail.

In addition, the two byte slices have been replaced with a single
*bytes.Buffer and the surrounding code simplified significantly.

Fixes: #3873

Signed-off-by: Thomas Way <thomas@6f.io>
This commit is contained in:
Thomas Way
2024-10-30 21:46:36 +00:00
parent c427f84503
commit 5ee5aaa058
6 changed files with 179 additions and 255 deletions

View File

@@ -80,9 +80,6 @@ Amazon S3 or S3 compatible services for object storage.
`loglevel`: (optional) Valid values are: `off` (default), `debug`, `debugwithsigning`, `debugwithhttpbody`, `debugwithrequestretries`, `debugwithrequesterrors` and `debugwitheventstreambody`. See the [AWS SDK for Go API reference](https://docs.aws.amazon.com/sdk-for-go/api/aws/#LogLevelType) for details. `loglevel`: (optional) Valid values are: `off` (default), `debug`, `debugwithsigning`, `debugwithhttpbody`, `debugwithrequestretries`, `debugwithrequesterrors` and `debugwitheventstreambody`. See the [AWS SDK for Go API reference](https://docs.aws.amazon.com/sdk-for-go/api/aws/#LogLevelType) for details.
**NOTE:** Currently the S3 storage driver only supports S3 API compatible storage that
allows parts of a multipart upload to vary in size. [Cloudflare R2 is not supported.](https://developers.cloudflare.com/r2/objects/multipart-objects/#limitations)
## S3 permission scopes ## S3 permission scopes
The following AWS policy is required by the registry for push and pull. Make sure to replace `S3_BUCKET_NAME` with the name of your bucket. The following AWS policy is required by the registry for push and pull. Make sure to replace `S3_BUCKET_NAME` with the name of your bucket.

View File

@@ -21,7 +21,7 @@ import (
"math" "math"
"net/http" "net/http"
"path/filepath" "path/filepath"
"reflect" "slices"
"sort" "sort"
"strconv" "strconv"
"strings" "strings"
@@ -48,10 +48,6 @@ const driverName = "s3aws"
// S3 API requires multipart upload chunks to be at least 5MB // S3 API requires multipart upload chunks to be at least 5MB
const minChunkSize = 5 * 1024 * 1024 const minChunkSize = 5 * 1024 * 1024
// maxChunkSize defines the maximum multipart upload chunk size allowed by S3.
// S3 API requires max upload chunk to be 5GB.
const maxChunkSize = 5 * 1024 * 1024 * 1024
const defaultChunkSize = 2 * minChunkSize const defaultChunkSize = 2 * minChunkSize
const ( const (
@@ -107,7 +103,7 @@ type DriverParameters struct {
Secure bool Secure bool
SkipVerify bool SkipVerify bool
V4Auth bool V4Auth bool
ChunkSize int64 ChunkSize int
MultipartCopyChunkSize int64 MultipartCopyChunkSize int64
MultipartCopyMaxConcurrency int64 MultipartCopyMaxConcurrency int64
MultipartCopyThresholdSize int64 MultipartCopyThresholdSize int64
@@ -158,7 +154,7 @@ var _ storagedriver.StorageDriver = &driver{}
type driver struct { type driver struct {
S3 *s3.S3 S3 *s3.S3
Bucket string Bucket string
ChunkSize int64 ChunkSize int
Encrypt bool Encrypt bool
KeyID string KeyID string
MultipartCopyChunkSize int64 MultipartCopyChunkSize int64
@@ -313,22 +309,22 @@ func FromParameters(ctx context.Context, parameters map[string]interface{}) (*Dr
keyID = "" keyID = ""
} }
chunkSize, err := getParameterAsInt64(parameters, "chunksize", defaultChunkSize, minChunkSize, maxChunkSize) chunkSize, err := getParameterAsInteger(parameters, "chunksize", defaultChunkSize, minChunkSize, maxChunkSize)
if err != nil { if err != nil {
return nil, err return nil, err
} }
multipartCopyChunkSize, err := getParameterAsInt64(parameters, "multipartcopychunksize", defaultMultipartCopyChunkSize, minChunkSize, maxChunkSize) multipartCopyChunkSize, err := getParameterAsInteger[int64](parameters, "multipartcopychunksize", defaultMultipartCopyChunkSize, minChunkSize, maxChunkSize)
if err != nil { if err != nil {
return nil, err return nil, err
} }
multipartCopyMaxConcurrency, err := getParameterAsInt64(parameters, "multipartcopymaxconcurrency", defaultMultipartCopyMaxConcurrency, 1, math.MaxInt64) multipartCopyMaxConcurrency, err := getParameterAsInteger[int64](parameters, "multipartcopymaxconcurrency", defaultMultipartCopyMaxConcurrency, 1, math.MaxInt64)
if err != nil { if err != nil {
return nil, err return nil, err
} }
multipartCopyThresholdSize, err := getParameterAsInt64(parameters, "multipartcopythresholdsize", defaultMultipartCopyThresholdSize, 0, maxChunkSize) multipartCopyThresholdSize, err := getParameterAsInteger[int64](parameters, "multipartcopythresholdsize", defaultMultipartCopyThresholdSize, 0, maxChunkSize)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -424,29 +420,29 @@ func FromParameters(ctx context.Context, parameters map[string]interface{}) (*Dr
} }
params := DriverParameters{ params := DriverParameters{
fmt.Sprint(accessKey), AccessKey: fmt.Sprint(accessKey),
fmt.Sprint(secretKey), SecretKey: fmt.Sprint(secretKey),
fmt.Sprint(bucket), Bucket: fmt.Sprint(bucket),
region, Region: region,
fmt.Sprint(regionEndpoint), RegionEndpoint: fmt.Sprint(regionEndpoint),
forcePathStyleBool, ForcePathStyle: forcePathStyleBool,
encryptBool, Encrypt: encryptBool,
fmt.Sprint(keyID), KeyID: fmt.Sprint(keyID),
secureBool, Secure: secureBool,
skipVerifyBool, SkipVerify: skipVerifyBool,
v4Bool, V4Auth: v4Bool,
chunkSize, ChunkSize: chunkSize,
multipartCopyChunkSize, MultipartCopyChunkSize: multipartCopyChunkSize,
multipartCopyMaxConcurrency, MultipartCopyMaxConcurrency: multipartCopyMaxConcurrency,
multipartCopyThresholdSize, MultipartCopyThresholdSize: multipartCopyThresholdSize,
fmt.Sprint(rootDirectory), RootDirectory: fmt.Sprint(rootDirectory),
storageClass, StorageClass: storageClass,
fmt.Sprint(userAgent), UserAgent: fmt.Sprint(userAgent),
objectACL, ObjectACL: objectACL,
fmt.Sprint(sessionToken), SessionToken: fmt.Sprint(sessionToken),
useDualStackBool, UseDualStack: useDualStackBool,
accelerateBool, Accelerate: accelerateBool,
getS3LogLevelFromParam(parameters["loglevel"]), LogLevel: getS3LogLevelFromParam(parameters["loglevel"]),
} }
return New(ctx, params) return New(ctx, params)
@@ -479,33 +475,29 @@ func getS3LogLevelFromParam(param interface{}) aws.LogLevelType {
return logLevel return logLevel
} }
// getParameterAsInt64 converts parameters[name] to an int64 value (using type integer interface{ signed | unsigned }
// defaultt if nil), verifies it is no smaller than min, and returns it.
func getParameterAsInt64(parameters map[string]interface{}, name string, defaultt int64, min int64, max int64) (int64, error) { type signed interface {
rv := defaultt ~int | ~int8 | ~int16 | ~int32 | ~int64
param := parameters[name]
switch v := param.(type) {
case string:
vv, err := strconv.ParseInt(v, 0, 64)
if err != nil {
return 0, fmt.Errorf("%s parameter must be an integer, %v invalid", name, param)
}
rv = vv
case int64:
rv = v
case int, uint, int32, uint32, uint64:
rv = reflect.ValueOf(v).Convert(reflect.TypeOf(rv)).Int()
case nil:
// do nothing
default:
return 0, fmt.Errorf("invalid value for %s: %#v", name, param)
} }
if rv < min || rv > max { type unsigned interface {
return 0, fmt.Errorf("the %s %#v parameter should be a number between %d and %d (inclusive)", name, rv, min, max) ~uint | ~uint8 | ~uint16 | ~uint32 | ~uint64 | ~uintptr
} }
return rv, nil // getParameterAsInteger converts parameters[name] to T (using defaultValue if
// nil) and ensures it is in the range of min and max.
func getParameterAsInteger[T integer](parameters map[string]any, name string, defaultValue, min, max T) (T, error) {
v := defaultValue
if p := parameters[name]; p != nil {
if _, err := fmt.Sscanf(fmt.Sprint(p), "%d", &v); err != nil {
return 0, fmt.Errorf("%s parameter must be an integer, %v invalid", name, p)
}
}
if v < min || v > max {
return 0, fmt.Errorf("the %s %#v parameter should be a number between %d and %d (inclusive)", name, v, min, max)
}
return v, nil
} }
// New constructs a new Driver with the given AWS credentials, region, encryption flag, and // New constructs a new Driver with the given AWS credentials, region, encryption flag, and
@@ -592,11 +584,7 @@ func New(ctx context.Context, params DriverParameters) (*Driver, error) {
StorageClass: params.StorageClass, StorageClass: params.StorageClass,
ObjectACL: params.ObjectACL, ObjectACL: params.ObjectACL,
pool: &sync.Pool{ pool: &sync.Pool{
New: func() interface{} { New: func() any { return &bytes.Buffer{} },
return &buffer{
data: make([]byte, 0, params.ChunkSize),
}
},
}, },
} }
@@ -903,7 +891,7 @@ func (d *driver) List(ctx context.Context, opath string) ([]string, error) {
// Move moves an object stored at sourcePath to destPath, removing the original // Move moves an object stored at sourcePath to destPath, removing the original
// object. // object.
func (d *driver) Move(ctx context.Context, sourcePath string, destPath string) error { func (d *driver) Move(ctx context.Context, sourcePath, destPath string) error {
/* This is terrible, but aws doesn't have an actual move. */ /* This is terrible, but aws doesn't have an actual move. */
if err := d.copy(ctx, sourcePath, destPath); err != nil { if err := d.copy(ctx, sourcePath, destPath); err != nil {
return err return err
@@ -912,7 +900,7 @@ func (d *driver) Move(ctx context.Context, sourcePath string, destPath string) e
} }
// copy copies an object stored at sourcePath to destPath. // copy copies an object stored at sourcePath to destPath.
func (d *driver) copy(ctx context.Context, sourcePath string, destPath string) error { func (d *driver) copy(ctx context.Context, sourcePath, destPath string) error {
// S3 can copy objects up to 5 GB in size with a single PUT Object - Copy // S3 can copy objects up to 5 GB in size with a single PUT Object - Copy
// operation. For larger objects, the multipart upload API must be used. // operation. For larger objects, the multipart upload API must be used.
// //
@@ -1121,7 +1109,7 @@ func (d *driver) Walk(ctx context.Context, from string, f storagedriver.WalkFn,
return nil return nil
} }
func (d *driver) doWalk(parentCtx context.Context, objectCount *int64, from string, startAfter string, f storagedriver.WalkFn) error { func (d *driver) doWalk(parentCtx context.Context, objectCount *int64, from, startAfter string, f storagedriver.WalkFn) error {
var ( var (
retError error retError error
// the most recent directory walked for de-duping // the most recent directory walked for de-duping
@@ -1267,16 +1255,10 @@ func directoryDiff(prev, current string) []string {
} }
paths = append(paths, parent) paths = append(paths, parent)
} }
reverse(paths) slices.Reverse(paths)
return paths return paths
} }
func reverse(s []string) {
for i, j := 0, len(s)-1; i < j; i, j = i+1, j-1 {
s[i], s[j] = s[j], s[i]
}
}
func (d *driver) s3Path(path string) string { func (d *driver) s3Path(path string) string {
return strings.TrimLeft(strings.TrimRight(d.RootDirectory, "/")+path, "/") return strings.TrimLeft(strings.TrimRight(d.RootDirectory, "/")+path, "/")
} }
@@ -1326,53 +1308,11 @@ func (d *driver) getStorageClass() *string {
return aws.String(d.StorageClass) return aws.String(d.StorageClass)
} }
// buffer is a static size bytes buffer. // writer uploads parts to S3 in a buffered fashion where the length of each
type buffer struct { // part is [writer.driver.ChunkSize], excluding the last part which may be
data []byte // smaller than the configured chunk size and never larger. This allows the
} // multipart upload to be cleanly resumed in future. This is violated if
// [writer.Close] is called before at least one chunk is written.
// NewBuffer returns a new bytes buffer from driver's memory pool.
// The size of the buffer is static and set to params.ChunkSize.
func (d *driver) NewBuffer() *buffer {
return d.pool.Get().(*buffer)
}
// ReadFrom reads as much data as it can fit in from r without growing its size.
// It returns the number of bytes successfully read from r or error.
func (b *buffer) ReadFrom(r io.Reader) (offset int64, err error) {
for len(b.data) < cap(b.data) && err == nil {
var n int
n, err = r.Read(b.data[len(b.data):cap(b.data)])
offset += int64(n)
b.data = b.data[:len(b.data)+n]
}
// NOTE(milosgajdos): io.ReaderFrom "swallows" io.EOF
// See: https://pkg.go.dev/io#ReaderFrom
if err == io.EOF {
err = nil
}
return offset, err
}
// Cap returns the capacity of the buffer's underlying byte slice.
func (b *buffer) Cap() int {
return cap(b.data)
}
// Len returns the length of the data in the buffer
func (b *buffer) Len() int {
return len(b.data)
}
// Clear the buffer data.
func (b *buffer) Clear() {
b.data = b.data[:0]
}
// writer attempts to upload parts to S3 in a buffered fashion where the last
// part is at least as large as the chunksize, so the multipart upload could be
// cleanly resumed in the future. This is violated if Close is called after less
// than a full chunk is written.
type writer struct { type writer struct {
ctx context.Context ctx context.Context
driver *driver driver *driver
@@ -1380,8 +1320,7 @@ type writer struct {
uploadID string uploadID string
parts []*s3.Part parts []*s3.Part
size int64 size int64
ready *buffer buf *bytes.Buffer
pending *buffer
closed bool closed bool
committed bool committed bool
cancelled bool cancelled bool
@@ -1399,8 +1338,7 @@ func (d *driver) newWriter(ctx context.Context, key, uploadID string, parts []*s
uploadID: uploadID, uploadID: uploadID,
parts: parts, parts: parts,
size: size, size: size,
ready: d.NewBuffer(), buf: d.pool.Get().(*bytes.Buffer),
pending: d.NewBuffer(),
} }
} }
@@ -1411,12 +1349,8 @@ func (a completedParts) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a completedParts) Less(i, j int) bool { return *a[i].PartNumber < *a[j].PartNumber } func (a completedParts) Less(i, j int) bool { return *a[i].PartNumber < *a[j].PartNumber }
func (w *writer) Write(p []byte) (int, error) { func (w *writer) Write(p []byte) (int, error) {
if w.closed { if err := w.done(); err != nil {
return 0, fmt.Errorf("already closed") return 0, err
} else if w.committed {
return 0, fmt.Errorf("already committed")
} else if w.cancelled {
return 0, fmt.Errorf("already cancelled")
} }
// If the last written part is smaller than minChunkSize, we need to make a // If the last written part is smaller than minChunkSize, we need to make a
@@ -1476,17 +1410,11 @@ func (w *writer) Write(p []byte) (int, error) {
} }
defer resp.Body.Close() defer resp.Body.Close()
// reset uploaded parts w.reset()
w.parts = nil
w.ready.Clear()
n, err := w.ready.ReadFrom(resp.Body) if _, err := io.Copy(w.buf, resp.Body); err != nil {
if err != nil {
return 0, err return 0, err
} }
if resp.ContentLength != nil && n < *resp.ContentLength {
return 0, io.ErrShortBuffer
}
} else { } else {
// Otherwise we can use the old file as the new first part // Otherwise we can use the old file as the new first part
copyPartResp, err := w.driver.S3.UploadPartCopyWithContext(w.ctx, &s3.UploadPartCopyInput{ copyPartResp, err := w.driver.S3.UploadPartCopyWithContext(w.ctx, &s3.UploadPartCopyInput{
@@ -1499,52 +1427,21 @@ func (w *writer) Write(p []byte) (int, error) {
if err != nil { if err != nil {
return 0, err return 0, err
} }
w.parts = []*s3.Part{ w.parts = []*s3.Part{{
{
ETag: copyPartResp.CopyPartResult.ETag, ETag: copyPartResp.CopyPartResult.ETag,
PartNumber: aws.Int64(1), PartNumber: aws.Int64(1),
Size: aws.Int64(w.size), Size: aws.Int64(w.size),
}, }}
}
} }
} }
var n int n, _ := w.buf.Write(p)
defer func() { w.size += int64(n) }() for w.buf.Len() >= w.driver.ChunkSize {
reader := bytes.NewReader(p)
for reader.Len() > 0 {
// NOTE(milosgajdos): we do some seemingly unsafe conversions
// from int64 to int in this for loop. These are fine as the
// offset returned from buffer.ReadFrom can only ever be
// maxChunkSize large which fits in to int. The reason why
// we return int64 is to play nice with Go interfaces where
// the buffer implements io.ReaderFrom interface.
// fill up the ready parts buffer
offset, err := w.ready.ReadFrom(reader)
n += int(offset)
if err != nil {
return n, err
}
// try filling up the pending parts buffer
offset, err = w.pending.ReadFrom(reader)
n += int(offset)
if err != nil {
return n, err
}
// we filled up pending buffer, flush
if w.pending.Len() == w.pending.Cap() {
if err := w.flush(); err != nil { if err := w.flush(); err != nil {
return n, err return 0, fmt.Errorf("flush: %w", err)
} }
} }
}
return n, nil return n, nil
} }
@@ -1552,28 +1449,38 @@ func (w *writer) Size() int64 {
return w.size return w.size
} }
// Close flushes any remaining data in the buffer and releases the buffer back
// to the pool.
func (w *writer) Close() error { func (w *writer) Close() error {
if w.closed { if w.closed {
return fmt.Errorf("already closed") return fmt.Errorf("already closed")
} }
w.closed = true w.closed = true
defer func() { defer w.releaseBuffer()
w.ready.Clear()
w.driver.pool.Put(w.ready)
w.pending.Clear()
w.driver.pool.Put(w.pending)
}()
return w.flush() return w.flush()
} }
func (w *writer) Cancel(ctx context.Context) error { func (w *writer) reset() {
if w.closed { w.buf.Reset()
return fmt.Errorf("already closed") w.parts = nil
} else if w.committed { w.size = 0
return fmt.Errorf("already committed")
} }
// releaseBuffer resets the buffer and returns it to the pool.
func (w *writer) releaseBuffer() {
w.buf.Reset()
w.driver.pool.Put(w.buf)
}
// Cancel aborts the multipart upload and closes the writer.
func (w *writer) Cancel(ctx context.Context) error {
if err := w.done(); err != nil {
return err
}
w.cancelled = true w.cancelled = true
_, err := w.driver.S3.AbortMultipartUploadWithContext(ctx, &s3.AbortMultipartUploadInput{ _, err := w.driver.S3.AbortMultipartUploadWithContext(ctx, &s3.AbortMultipartUploadInput{
Bucket: aws.String(w.driver.Bucket), Bucket: aws.String(w.driver.Bucket),
@@ -1583,17 +1490,14 @@ func (w *writer) Cancel(ctx context.Context) error {
return err return err
} }
// Commit flushes any remaining data in the buffer and completes the multipart
// upload.
func (w *writer) Commit(ctx context.Context) error { func (w *writer) Commit(ctx context.Context) error {
if w.closed { if err := w.done(); err != nil {
return fmt.Errorf("already closed") return err
} else if w.committed {
return fmt.Errorf("already committed")
} else if w.cancelled {
return fmt.Errorf("already cancelled")
} }
err := w.flush() if err := w.flush(); err != nil {
if err != nil {
return err return err
} }
@@ -1634,15 +1538,14 @@ func (w *writer) Commit(ctx context.Context) error {
sort.Sort(completedUploadedParts) sort.Sort(completedUploadedParts)
_, err = w.driver.S3.CompleteMultipartUploadWithContext(w.ctx, &s3.CompleteMultipartUploadInput{ if _, err := w.driver.S3.CompleteMultipartUploadWithContext(w.ctx, &s3.CompleteMultipartUploadInput{
Bucket: aws.String(w.driver.Bucket), Bucket: aws.String(w.driver.Bucket),
Key: aws.String(w.key), Key: aws.String(w.key),
UploadId: aws.String(w.uploadID), UploadId: aws.String(w.uploadID),
MultipartUpload: &s3.CompletedMultipartUpload{ MultipartUpload: &s3.CompletedMultipartUpload{
Parts: completedUploadedParts, Parts: completedUploadedParts,
}, },
}) }); err != nil {
if err != nil {
if _, aErr := w.driver.S3.AbortMultipartUploadWithContext(w.ctx, &s3.AbortMultipartUploadInput{ if _, aErr := w.driver.S3.AbortMultipartUploadWithContext(w.ctx, &s3.AbortMultipartUploadInput{
Bucket: aws.String(w.driver.Bucket), Bucket: aws.String(w.driver.Bucket),
Key: aws.String(w.key), Key: aws.String(w.key),
@@ -1655,33 +1558,28 @@ func (w *writer) Commit(ctx context.Context) error {
return nil return nil
} }
// flush flushes all buffers to write a part to S3. // flush writes at most [w.driver.ChunkSize] of the buffer to S3. flush is only
// flush is only called by Write (with both buffers full) and Close/Commit (always) // called by [writer.Write] if the buffer is full, and always by [writer.Close]
// and [writer.Commit].
func (w *writer) flush() error { func (w *writer) flush() error {
if w.ready.Len() == 0 && w.pending.Len() == 0 { if w.buf.Len() == 0 {
return nil return nil
} }
buf := bytes.NewBuffer(w.ready.data) r := bytes.NewReader(w.buf.Next(w.driver.ChunkSize))
if w.pending.Len() > 0 && w.pending.Len() < int(w.driver.ChunkSize) {
if _, err := buf.Write(w.pending.data); err != nil {
return err
}
w.pending.Clear()
}
partSize := buf.Len() partSize := r.Len()
partNumber := aws.Int64(int64(len(w.parts) + 1)) partNumber := aws.Int64(int64(len(w.parts)) + 1)
resp, err := w.driver.S3.UploadPartWithContext(w.ctx, &s3.UploadPartInput{ resp, err := w.driver.S3.UploadPartWithContext(w.ctx, &s3.UploadPartInput{
Bucket: aws.String(w.driver.Bucket), Bucket: aws.String(w.driver.Bucket),
Key: aws.String(w.key), Key: aws.String(w.key),
PartNumber: partNumber, PartNumber: partNumber,
UploadId: aws.String(w.uploadID), UploadId: aws.String(w.uploadID),
Body: bytes.NewReader(buf.Bytes()), Body: r,
}) })
if err != nil { if err != nil {
return err return fmt.Errorf("upload part: %w", err)
} }
w.parts = append(w.parts, &s3.Part{ w.parts = append(w.parts, &s3.Part{
@@ -1690,9 +1588,20 @@ func (w *writer) flush() error {
Size: aws.Int64(int64(partSize)), Size: aws.Int64(int64(partSize)),
}) })
// reset the flushed buffer and swap buffers w.size += int64(partSize)
w.ready.Clear()
w.ready, w.pending = w.pending, w.ready
return nil return nil
} }
// done returns an error if the writer is in an invalid state.
func (w *writer) done() error {
switch {
case w.closed:
return fmt.Errorf("already closed")
case w.committed:
return fmt.Errorf("already committed")
case w.cancelled:
return fmt.Errorf("already cancelled")
}
return nil
}

View File

@@ -0,0 +1,10 @@
//go:build arm
package s3
import "math"
// maxChunkSize defines the maximum multipart upload chunk size allowed by S3.
// S3 API requires max upload chunk to be 5GB, but this overflows on 32-bit
// platforms.
const maxChunkSize = math.MaxInt32

View File

@@ -0,0 +1,7 @@
//go:build !arm
package s3
// maxChunkSize defines the maximum multipart upload chunk size allowed by S3.
// S3 API requires max upload chunk to be 5GB.
const maxChunkSize = 5 * 1024 * 1024 * 1024

View File

@@ -101,30 +101,34 @@ func init() {
} }
} }
if objectACL == "" {
objectACL = s3.ObjectCannedACLPrivate
}
parameters := DriverParameters{ parameters := DriverParameters{
accessKey, AccessKey: accessKey,
secretKey, SecretKey: secretKey,
bucket, Bucket: bucket,
region, Region: region,
regionEndpoint, RegionEndpoint: regionEndpoint,
forcePathStyleBool, ForcePathStyle: forcePathStyleBool,
encryptBool, Encrypt: encryptBool,
keyID, KeyID: keyID,
secureBool, Secure: secureBool,
skipVerifyBool, SkipVerify: skipVerifyBool,
v4Bool, V4Auth: v4Bool,
minChunkSize, ChunkSize: minChunkSize,
defaultMultipartCopyChunkSize, MultipartCopyChunkSize: defaultMultipartCopyChunkSize,
defaultMultipartCopyMaxConcurrency, MultipartCopyMaxConcurrency: defaultMultipartCopyMaxConcurrency,
defaultMultipartCopyThresholdSize, MultipartCopyThresholdSize: defaultMultipartCopyThresholdSize,
rootDirectory, RootDirectory: rootDirectory,
storageClass, StorageClass: storageClass,
driverName + "-test", UserAgent: driverName + "-test",
objectACL, ObjectACL: objectACL,
sessionToken, SessionToken: sessionToken,
useDualStackBool, UseDualStack: useDualStackBool,
accelerateBool, Accelerate: accelerateBool,
getS3LogLevelFromParam(logLevel), LogLevel: getS3LogLevelFromParam(logLevel),
} }
return New(context.Background(), parameters) return New(context.Background(), parameters)

View File

@@ -398,31 +398,28 @@ func (suite *DriverSuite) testContinueStreamAppend(chunkSize int64) {
filename := randomPath(32) filename := randomPath(32)
defer suite.deletePath(firstPart(filename)) defer suite.deletePath(firstPart(filename))
contentsChunk1 := randomContents(chunkSize) var fullContents bytes.Buffer
contentsChunk2 := randomContents(chunkSize) contents := io.TeeReader(newRandReader(chunkSize*3), &fullContents)
contentsChunk3 := randomContents(chunkSize)
fullContents := append(append(contentsChunk1, contentsChunk2...), contentsChunk3...)
writer, err := suite.StorageDriver.Writer(suite.ctx, filename, false) writer, err := suite.StorageDriver.Writer(suite.ctx, filename, false)
suite.Require().NoError(err) suite.Require().NoError(err)
nn, err := io.Copy(writer, bytes.NewReader(contentsChunk1)) nn, err := io.CopyN(writer, contents, chunkSize)
suite.Require().NoError(err) suite.Require().NoError(err)
suite.Require().Equal(int64(len(contentsChunk1)), nn) suite.Require().Equal(chunkSize, nn)
err = writer.Close() err = writer.Close()
suite.Require().NoError(err) suite.Require().NoError(err)
curSize := writer.Size() curSize := writer.Size()
suite.Require().Equal(int64(len(contentsChunk1)), curSize) suite.Require().Equal(chunkSize, curSize)
writer, err = suite.StorageDriver.Writer(suite.ctx, filename, true) writer, err = suite.StorageDriver.Writer(suite.ctx, filename, true)
suite.Require().NoError(err) suite.Require().NoError(err)
suite.Require().Equal(curSize, writer.Size()) suite.Require().Equal(curSize, writer.Size())
nn, err = io.Copy(writer, bytes.NewReader(contentsChunk2)) nn, err = io.CopyN(writer, contents, chunkSize)
suite.Require().NoError(err) suite.Require().NoError(err)
suite.Require().Equal(int64(len(contentsChunk2)), nn) suite.Require().Equal(chunkSize, nn)
err = writer.Close() err = writer.Close()
suite.Require().NoError(err) suite.Require().NoError(err)
@@ -434,9 +431,9 @@ func (suite *DriverSuite) testContinueStreamAppend(chunkSize int64) {
suite.Require().NoError(err) suite.Require().NoError(err)
suite.Require().Equal(curSize, writer.Size()) suite.Require().Equal(curSize, writer.Size())
nn, err = io.Copy(writer, bytes.NewReader(fullContents[curSize:])) nn, err = io.CopyN(writer, contents, chunkSize)
suite.Require().NoError(err) suite.Require().NoError(err)
suite.Require().Equal(int64(len(fullContents[curSize:])), nn) suite.Require().Equal(chunkSize, nn)
err = writer.Commit(context.Background()) err = writer.Commit(context.Background())
suite.Require().NoError(err) suite.Require().NoError(err)
@@ -445,7 +442,7 @@ func (suite *DriverSuite) testContinueStreamAppend(chunkSize int64) {
received, err := suite.StorageDriver.GetContent(suite.ctx, filename) received, err := suite.StorageDriver.GetContent(suite.ctx, filename)
suite.Require().NoError(err) suite.Require().NoError(err)
suite.Require().Equal(fullContents, received) suite.Require().Equal(fullContents.Bytes(), received)
} }
// TestReadNonexistentStream tests that reading a stream for a nonexistent path // TestReadNonexistentStream tests that reading a stream for a nonexistent path