diff --git a/docs/configuration.md b/docs/configuration.md index f2c7d9546..a339900ed 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -128,6 +128,9 @@ storage: multipartcopymaxconcurrency: 100 multipartcopythresholdsize: 33554432 rootdirectory: /s3/object/name/prefix + logs3apirequests: true + logs3apiresponseheaders: + s3_http_response_header_x-do-spaces-error: x-do-spaces-error swift: username: username password: password @@ -429,6 +432,9 @@ storage: multipartcopymaxconcurrency: 100 multipartcopythresholdsize: 33554432 rootdirectory: /s3/object/name/prefix + logs3apirequests: true + logs3apiresponseheaders: + s3_http_response_header_x-do-spaces-error: x-do-spaces-error swift: username: username password: password diff --git a/registry/storage/driver/s3-aws/s3.go b/registry/storage/driver/s3-aws/s3.go index 449fcad4a..c3608f076 100644 --- a/registry/storage/driver/s3-aws/s3.go +++ b/registry/storage/driver/s3-aws/s3.go @@ -28,6 +28,7 @@ import ( "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/awserr" + "github.com/aws/aws-sdk-go/aws/client" "github.com/aws/aws-sdk-go/aws/credentials" "github.com/aws/aws-sdk-go/aws/credentials/ec2rolecreds" "github.com/aws/aws-sdk-go/aws/ec2metadata" @@ -107,6 +108,8 @@ type DriverParameters struct { UserAgent string ObjectACL string SessionToken string + LogS3APIRequests bool + LogS3APIResponseHeaders map[string]string } func init() { @@ -153,6 +156,8 @@ type driver struct { RootDirectory string StorageClass string ObjectACL string + LogS3APIRequests bool + LogS3APIResponseHeaders map[string]string } type baseEmbed struct { @@ -329,6 +334,34 @@ func FromParameters(parameters map[string]interface{}) (*Driver, error) { userAgent = "" } + logS3APIRequestsBool := false + logS3APIRequests := parameters["logs3apirequests"] + switch logS3APIRequests := logS3APIRequests.(type) { + case string: + b, err := strconv.ParseBool(logS3APIRequests) + if err != nil { + return nil, fmt.Errorf("the logS3APIRequests parameter should be a boolean") + } + logS3APIRequestsBool = b + case bool: + logS3APIRequestsBool = logS3APIRequests + case nil: + // do nothing + default: + return nil, fmt.Errorf("the logS3APIRequests parameter should be a boolean") + } + + logS3APIResponseHeadersMap := map[string]string{} + logS3APIResponseHeaders := parameters["logs3apiresponseheaders"] + switch logS3APIResponseHeaders := logS3APIResponseHeaders.(type) { + case map[string]string: + logS3APIResponseHeadersMap = logS3APIResponseHeaders + case nil: + // do nothing + default: + return nil, fmt.Errorf("the logS3APIResponseHeaders parameter should be a map[string]string") + } + objectACL := s3.ObjectCannedACLPrivate objectACLParam := parameters["objectacl"] if objectACLParam != nil { @@ -366,6 +399,8 @@ func FromParameters(parameters map[string]interface{}) (*Driver, error) { fmt.Sprint(userAgent), objectACL, fmt.Sprint(sessionToken), + logS3APIRequestsBool, + logS3APIResponseHeadersMap, } return New(params) @@ -495,6 +530,8 @@ func New(params DriverParameters) (*Driver, error) { RootDirectory: params.RootDirectory, StorageClass: params.StorageClass, ObjectACL: params.ObjectACL, + LogS3APIRequests: params.LogS3APIRequests, + LogS3APIResponseHeaders: params.LogS3APIResponseHeaders, } return &Driver{ @@ -506,6 +543,60 @@ func New(params DriverParameters) (*Driver, error) { }, nil } +// logS3OperationHandlerName is used to identify the handler used to log S3 API +// requests +const logS3OperationHandlerName = "docker.storage-driver.s3.operation-logger" + +// logS3Operation logs each S3 operation, including request and response info, +// as it completes +func (d *driver) logS3Operation(ctx context.Context) request.NamedHandler { + return request.NamedHandler{ + Name: logS3OperationHandlerName, + Fn: func(r *request.Request) { + req := r.HTTPRequest + resp := r.HTTPResponse + op := r.Operation + fields := map[interface{}]interface{}{ + "s3_operation_name": op.Name, + "s3_http_request_method": req.Method, + "s3_http_request_host": req.Host, + "s3_http_request_path": req.URL.Path, + "s3_http_request_content-length": req.ContentLength, + "s3_http_request_remote-addr": req.RemoteAddr, + "s3_http_response_header_x-amz-request-id": resp.Header.Values("x-amz-request-id"), + "s3_http_response_status": resp.StatusCode, + "s3_http_response_content-length": resp.ContentLength, + } + + for logKey, headerKey := range d.LogS3APIResponseHeaders { + values := resp.Header.Values(headerKey) + if len(values) == 1 { + fields[logKey] = values[0] + continue + } + if len(values) > 0 { + fields[logKey] = values + } + } + + ll := dcontext.GetLoggerWithFields(ctx, fields) + ll.Info("S3 operation completed") + }, + } +} + +func (d *driver) s3Client(ctx context.Context) *s3.S3 { + s := d.S3 + + if d.LogS3APIRequests { + s = &s3.S3{Client: client.New(d.S3.Client.Config, d.S3.Client.ClientInfo, d.S3.Client.Handlers.Copy())} + r := d.logS3Operation(ctx) + s.Client.Handlers.Complete.PushBackNamed(r) + } + + return s +} + // Implement the storagedriver.StorageDriver interface func (d *driver) Name() string { @@ -523,7 +614,9 @@ func (d *driver) GetContent(ctx context.Context, path string) ([]byte, error) { // PutContent stores the []byte content at a location designated by "path". func (d *driver) PutContent(ctx context.Context, path string, contents []byte) error { - _, err := d.S3.PutObject(&s3.PutObjectInput{ + s := d.s3Client(ctx) + + _, err := s.PutObject(&s3.PutObjectInput{ Bucket: aws.String(d.Bucket), Key: aws.String(d.s3Path(path)), ContentType: d.getContentType(), @@ -539,7 +632,9 @@ func (d *driver) PutContent(ctx context.Context, path string, contents []byte) e // Reader retrieves an io.ReadCloser for the content stored at "path" with a // given byte offset. func (d *driver) Reader(ctx context.Context, path string, offset int64) (io.ReadCloser, error) { - resp, err := d.S3.GetObject(&s3.GetObjectInput{ + s := d.s3Client(ctx) + + resp, err := s.GetObject(&s3.GetObjectInput{ Bucket: aws.String(d.Bucket), Key: aws.String(d.s3Path(path)), Range: aws.String("bytes=" + strconv.FormatInt(offset, 10) + "-"), @@ -558,10 +653,12 @@ func (d *driver) Reader(ctx context.Context, path string, offset int64) (io.Read // Writer returns a FileWriter which will store the content written to it // at the location designated by "path" after the call to Commit. func (d *driver) Writer(ctx context.Context, path string, append bool) (storagedriver.FileWriter, error) { + s := d.s3Client(ctx) + key := d.s3Path(path) if !append { // TODO (brianbland): cancel other uploads at this path - resp, err := d.S3.CreateMultipartUpload(&s3.CreateMultipartUploadInput{ + resp, err := s.CreateMultipartUpload(&s3.CreateMultipartUploadInput{ Bucket: aws.String(d.Bucket), Key: aws.String(key), ContentType: d.getContentType(), @@ -575,7 +672,7 @@ func (d *driver) Writer(ctx context.Context, path string, append bool) (storaged } return d.newWriter(key, *resp.UploadId, nil), nil } - resp, err := d.S3.ListMultipartUploads(&s3.ListMultipartUploadsInput{ + resp, err := s.ListMultipartUploads(&s3.ListMultipartUploadsInput{ Bucket: aws.String(d.Bucket), Prefix: aws.String(key), }) @@ -587,7 +684,7 @@ func (d *driver) Writer(ctx context.Context, path string, append bool) (storaged if key != *multi.Key { continue } - resp, err := d.S3.ListParts(&s3.ListPartsInput{ + resp, err := s.ListParts(&s3.ListPartsInput{ Bucket: aws.String(d.Bucket), Key: aws.String(key), UploadId: multi.UploadId, @@ -607,11 +704,13 @@ func (d *driver) Writer(ctx context.Context, path string, append bool) (storaged // Stat retrieves the FileInfo for the given path, including the current size // in bytes and the creation time. func (d *driver) Stat(ctx context.Context, path string) (storagedriver.FileInfo, error) { + s := d.s3Client(ctx) + fi := storagedriver.FileInfoFields{ Path: path, } - headResp, err := d.S3.HeadObjectWithContext(ctx, &s3.HeadObjectInput{ + headResp, err := s.HeadObjectWithContext(ctx, &s3.HeadObjectInput{ Bucket: aws.String(d.Bucket), Key: aws.String(d.s3Path(path)), }) @@ -626,7 +725,8 @@ func (d *driver) Stat(ctx context.Context, path string) (storagedriver.FileInfo, return storagedriver.FileInfoInternal{FileInfoFields: fi}, nil } - resp, err := d.S3.ListObjectsWithContext(ctx, &s3.ListObjectsInput{ + resp, err := s.ListObjectsWithContext(ctx, &s3.ListObjectsInput{ + Bucket: aws.String(d.Bucket), Prefix: aws.String(d.s3Path(path)), MaxKeys: aws.Int64(1), @@ -654,6 +754,8 @@ func (d *driver) Stat(ctx context.Context, path string) (storagedriver.FileInfo, // List returns a list of the objects that are direct descendants of the given path. func (d *driver) List(ctx context.Context, opath string) ([]string, error) { + s := d.s3Client(ctx) + path := opath if path != "/" && path[len(path)-1] != '/' { path = path + "/" @@ -667,7 +769,7 @@ func (d *driver) List(ctx context.Context, opath string) ([]string, error) { prefix = "/" } - resp, err := d.S3.ListObjects(&s3.ListObjectsInput{ + resp, err := s.ListObjects(&s3.ListObjectsInput{ Bucket: aws.String(d.Bucket), Prefix: aws.String(d.s3Path(path)), Delimiter: aws.String("/"), @@ -691,7 +793,7 @@ func (d *driver) List(ctx context.Context, opath string) ([]string, error) { } if *resp.IsTruncated { - resp, err = d.S3.ListObjects(&s3.ListObjectsInput{ + resp, err = s.ListObjects(&s3.ListObjectsInput{ Bucket: aws.String(d.Bucket), Prefix: aws.String(d.s3Path(path)), Delimiter: aws.String("/"), @@ -740,8 +842,10 @@ func (d *driver) copy(ctx context.Context, sourcePath string, destPath string) e return parseError(sourcePath, err) } + s := d.s3Client(ctx) + if fileInfo.Size() <= d.MultipartCopyThresholdSize { - _, err := d.S3.CopyObject(&s3.CopyObjectInput{ + _, err := s.CopyObject(&s3.CopyObjectInput{ Bucket: aws.String(d.Bucket), Key: aws.String(d.s3Path(destPath)), ContentType: d.getContentType(), @@ -757,7 +861,7 @@ func (d *driver) copy(ctx context.Context, sourcePath string, destPath string) e return nil } - createResp, err := d.S3.CreateMultipartUpload(&s3.CreateMultipartUploadInput{ + createResp, err := s.CreateMultipartUpload(&s3.CreateMultipartUploadInput{ Bucket: aws.String(d.Bucket), Key: aws.String(d.s3Path(destPath)), ContentType: d.getContentType(), @@ -784,7 +888,7 @@ func (d *driver) copy(ctx context.Context, sourcePath string, destPath string) e if lastByte >= fileInfo.Size() { lastByte = fileInfo.Size() - 1 } - uploadResp, err := d.S3.UploadPartCopy(&s3.UploadPartCopyInput{ + uploadResp, err := s.UploadPartCopy(&s3.UploadPartCopyInput{ Bucket: aws.String(d.Bucket), CopySource: aws.String(d.Bucket + "/" + d.s3Path(sourcePath)), Key: aws.String(d.s3Path(destPath)), @@ -810,7 +914,7 @@ func (d *driver) copy(ctx context.Context, sourcePath string, destPath string) e } } - _, err = d.S3.CompleteMultipartUpload(&s3.CompleteMultipartUploadInput{ + _, err = s.CompleteMultipartUpload(&s3.CompleteMultipartUploadInput{ Bucket: aws.String(d.Bucket), Key: aws.String(d.s3Path(destPath)), UploadId: createResp.UploadId, @@ -835,10 +939,12 @@ func (d *driver) Delete(ctx context.Context, path string) error { Bucket: aws.String(d.Bucket), Prefix: aws.String(s3Path), } + + s := d.s3Client(ctx) ListLoop: for { // list all the objects - resp, err := d.S3.ListObjects(listObjectsInput) + resp, err := s.ListObjects(listObjectsInput) // resp.Contents can only be empty on the first call // if there were no more results to return after the first call, resp.IsTruncated would have been false @@ -870,7 +976,7 @@ ListLoop: // need to chunk objects into groups of 1000 per s3 restrictions total := len(s3Objects) for i := 0; i < total; i += 1000 { - _, err := d.S3.DeleteObjects(&s3.DeleteObjectsInput{ + _, err := s.DeleteObjects(&s3.DeleteObjectsInput{ Bucket: aws.String(d.Bucket), Delete: &s3.Delete{ Objects: s3Objects[i:min(i+1000, total)], @@ -887,6 +993,8 @@ ListLoop: // URLFor returns a URL which may be used to retrieve the content stored at the given path. // May return an UnsupportedMethodErr in certain StorageDriver implementations. func (d *driver) URLFor(ctx context.Context, path string, options map[string]interface{}) (string, error) { + s := d.s3Client(ctx) + methodString := "GET" method, ok := options["method"] if ok { @@ -909,12 +1017,12 @@ func (d *driver) URLFor(ctx context.Context, path string, options map[string]int switch methodString { case "GET": - req, _ = d.S3.GetObjectRequest(&s3.GetObjectInput{ + req, _ = s.GetObjectRequest(&s3.GetObjectInput{ Bucket: aws.String(d.Bucket), Key: aws.String(d.s3Path(path)), }) case "HEAD": - req, _ = d.S3.HeadObjectRequest(&s3.HeadObjectInput{ + req, _ = s.HeadObjectRequest(&s3.HeadObjectInput{ Bucket: aws.String(d.Bucket), Key: aws.String(d.s3Path(path)), }) @@ -989,9 +1097,11 @@ func (d *driver) doWalk(parentCtx context.Context, objectCount *int64, path, pre MaxKeys: aws.Int64(listMax), } + s := d.s3Client(parentCtx) + ctx, done := dcontext.WithTrace(parentCtx) defer done("s3aws.ListObjectsV2Pages(%s)", path) - listObjectErr := d.S3.ListObjectsV2PagesWithContext(ctx, listObjectsInput, func(objects *s3.ListObjectsV2Output, lastPage bool) bool { + listObjectErr := s.ListObjectsV2PagesWithContext(ctx, listObjectsInput, func(objects *s3.ListObjectsV2Output, lastPage bool) bool { var count int64 // KeyCount was introduced with version 2 of the GET Bucket operation in S3. diff --git a/registry/storage/driver/s3-aws/s3_test.go b/registry/storage/driver/s3-aws/s3_test.go index d8235f657..482ab3eb4 100644 --- a/registry/storage/driver/s3-aws/s3_test.go +++ b/registry/storage/driver/s3-aws/s3_test.go @@ -97,6 +97,8 @@ func init() { driverName + "-test", objectACL, sessionToken, + false, + map[string]string{}, } return New(parameters)