Merge pull request #8 from digitalocean/wwarren/s3-storage-driver_log-s3-api-requests

s3: log s3 api requests
This commit is contained in:
wayne 2020-08-13 19:48:06 -05:00 committed by GitHub
commit 3796bf3c1a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 136 additions and 18 deletions

View File

@ -128,6 +128,9 @@ storage:
multipartcopymaxconcurrency: 100
multipartcopythresholdsize: 33554432
rootdirectory: /s3/object/name/prefix
logs3apirequests: true
logs3apiresponseheaders:
s3_http_response_header_x-do-spaces-error: x-do-spaces-error
swift:
username: username
password: password
@ -429,6 +432,9 @@ storage:
multipartcopymaxconcurrency: 100
multipartcopythresholdsize: 33554432
rootdirectory: /s3/object/name/prefix
logs3apirequests: true
logs3apiresponseheaders:
s3_http_response_header_x-do-spaces-error: x-do-spaces-error
swift:
username: username
password: password

View File

@ -28,6 +28,7 @@ import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/client"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/credentials/ec2rolecreds"
"github.com/aws/aws-sdk-go/aws/ec2metadata"
@ -107,6 +108,8 @@ type DriverParameters struct {
UserAgent string
ObjectACL string
SessionToken string
LogS3APIRequests bool
LogS3APIResponseHeaders map[string]string
}
func init() {
@ -153,6 +156,8 @@ type driver struct {
RootDirectory string
StorageClass string
ObjectACL string
LogS3APIRequests bool
LogS3APIResponseHeaders map[string]string
}
type baseEmbed struct {
@ -329,6 +334,34 @@ func FromParameters(parameters map[string]interface{}) (*Driver, error) {
userAgent = ""
}
logS3APIRequestsBool := false
logS3APIRequests := parameters["logs3apirequests"]
switch logS3APIRequests := logS3APIRequests.(type) {
case string:
b, err := strconv.ParseBool(logS3APIRequests)
if err != nil {
return nil, fmt.Errorf("the logS3APIRequests parameter should be a boolean")
}
logS3APIRequestsBool = b
case bool:
logS3APIRequestsBool = logS3APIRequests
case nil:
// do nothing
default:
return nil, fmt.Errorf("the logS3APIRequests parameter should be a boolean")
}
logS3APIResponseHeadersMap := map[string]string{}
logS3APIResponseHeaders := parameters["logs3apiresponseheaders"]
switch logS3APIResponseHeaders := logS3APIResponseHeaders.(type) {
case map[string]string:
logS3APIResponseHeadersMap = logS3APIResponseHeaders
case nil:
// do nothing
default:
return nil, fmt.Errorf("the logS3APIResponseHeaders parameter should be a map[string]string")
}
objectACL := s3.ObjectCannedACLPrivate
objectACLParam := parameters["objectacl"]
if objectACLParam != nil {
@ -366,6 +399,8 @@ func FromParameters(parameters map[string]interface{}) (*Driver, error) {
fmt.Sprint(userAgent),
objectACL,
fmt.Sprint(sessionToken),
logS3APIRequestsBool,
logS3APIResponseHeadersMap,
}
return New(params)
@ -495,6 +530,8 @@ func New(params DriverParameters) (*Driver, error) {
RootDirectory: params.RootDirectory,
StorageClass: params.StorageClass,
ObjectACL: params.ObjectACL,
LogS3APIRequests: params.LogS3APIRequests,
LogS3APIResponseHeaders: params.LogS3APIResponseHeaders,
}
return &Driver{
@ -506,6 +543,60 @@ func New(params DriverParameters) (*Driver, error) {
}, nil
}
// logS3OperationHandlerName is used to identify the handler used to log S3 API
// requests
const logS3OperationHandlerName = "docker.storage-driver.s3.operation-logger"
// logS3Operation logs each S3 operation, including request and response info,
// as it completes
func (d *driver) logS3Operation(ctx context.Context) request.NamedHandler {
return request.NamedHandler{
Name: logS3OperationHandlerName,
Fn: func(r *request.Request) {
req := r.HTTPRequest
resp := r.HTTPResponse
op := r.Operation
fields := map[interface{}]interface{}{
"s3_operation_name": op.Name,
"s3_http_request_method": req.Method,
"s3_http_request_host": req.Host,
"s3_http_request_path": req.URL.Path,
"s3_http_request_content-length": req.ContentLength,
"s3_http_request_remote-addr": req.RemoteAddr,
"s3_http_response_header_x-amz-request-id": resp.Header.Values("x-amz-request-id"),
"s3_http_response_status": resp.StatusCode,
"s3_http_response_content-length": resp.ContentLength,
}
for logKey, headerKey := range d.LogS3APIResponseHeaders {
values := resp.Header.Values(headerKey)
if len(values) == 1 {
fields[logKey] = values[0]
continue
}
if len(values) > 0 {
fields[logKey] = values
}
}
ll := dcontext.GetLoggerWithFields(ctx, fields)
ll.Info("S3 operation completed")
},
}
}
func (d *driver) s3Client(ctx context.Context) *s3.S3 {
s := d.S3
if d.LogS3APIRequests {
s = &s3.S3{Client: client.New(d.S3.Client.Config, d.S3.Client.ClientInfo, d.S3.Client.Handlers.Copy())}
r := d.logS3Operation(ctx)
s.Client.Handlers.Complete.PushBackNamed(r)
}
return s
}
// Implement the storagedriver.StorageDriver interface
func (d *driver) Name() string {
@ -523,7 +614,9 @@ func (d *driver) GetContent(ctx context.Context, path string) ([]byte, error) {
// PutContent stores the []byte content at a location designated by "path".
func (d *driver) PutContent(ctx context.Context, path string, contents []byte) error {
_, err := d.S3.PutObject(&s3.PutObjectInput{
s := d.s3Client(ctx)
_, err := s.PutObject(&s3.PutObjectInput{
Bucket: aws.String(d.Bucket),
Key: aws.String(d.s3Path(path)),
ContentType: d.getContentType(),
@ -539,7 +632,9 @@ func (d *driver) PutContent(ctx context.Context, path string, contents []byte) e
// Reader retrieves an io.ReadCloser for the content stored at "path" with a
// given byte offset.
func (d *driver) Reader(ctx context.Context, path string, offset int64) (io.ReadCloser, error) {
resp, err := d.S3.GetObject(&s3.GetObjectInput{
s := d.s3Client(ctx)
resp, err := s.GetObject(&s3.GetObjectInput{
Bucket: aws.String(d.Bucket),
Key: aws.String(d.s3Path(path)),
Range: aws.String("bytes=" + strconv.FormatInt(offset, 10) + "-"),
@ -558,10 +653,12 @@ func (d *driver) Reader(ctx context.Context, path string, offset int64) (io.Read
// Writer returns a FileWriter which will store the content written to it
// at the location designated by "path" after the call to Commit.
func (d *driver) Writer(ctx context.Context, path string, append bool) (storagedriver.FileWriter, error) {
s := d.s3Client(ctx)
key := d.s3Path(path)
if !append {
// TODO (brianbland): cancel other uploads at this path
resp, err := d.S3.CreateMultipartUpload(&s3.CreateMultipartUploadInput{
resp, err := s.CreateMultipartUpload(&s3.CreateMultipartUploadInput{
Bucket: aws.String(d.Bucket),
Key: aws.String(key),
ContentType: d.getContentType(),
@ -575,7 +672,7 @@ func (d *driver) Writer(ctx context.Context, path string, append bool) (storaged
}
return d.newWriter(key, *resp.UploadId, nil), nil
}
resp, err := d.S3.ListMultipartUploads(&s3.ListMultipartUploadsInput{
resp, err := s.ListMultipartUploads(&s3.ListMultipartUploadsInput{
Bucket: aws.String(d.Bucket),
Prefix: aws.String(key),
})
@ -587,7 +684,7 @@ func (d *driver) Writer(ctx context.Context, path string, append bool) (storaged
if key != *multi.Key {
continue
}
resp, err := d.S3.ListParts(&s3.ListPartsInput{
resp, err := s.ListParts(&s3.ListPartsInput{
Bucket: aws.String(d.Bucket),
Key: aws.String(key),
UploadId: multi.UploadId,
@ -607,11 +704,13 @@ func (d *driver) Writer(ctx context.Context, path string, append bool) (storaged
// Stat retrieves the FileInfo for the given path, including the current size
// in bytes and the creation time.
func (d *driver) Stat(ctx context.Context, path string) (storagedriver.FileInfo, error) {
s := d.s3Client(ctx)
fi := storagedriver.FileInfoFields{
Path: path,
}
headResp, err := d.S3.HeadObjectWithContext(ctx, &s3.HeadObjectInput{
headResp, err := s.HeadObjectWithContext(ctx, &s3.HeadObjectInput{
Bucket: aws.String(d.Bucket),
Key: aws.String(d.s3Path(path)),
})
@ -626,7 +725,8 @@ func (d *driver) Stat(ctx context.Context, path string) (storagedriver.FileInfo,
return storagedriver.FileInfoInternal{FileInfoFields: fi}, nil
}
resp, err := d.S3.ListObjectsWithContext(ctx, &s3.ListObjectsInput{
resp, err := s.ListObjectsWithContext(ctx, &s3.ListObjectsInput{
Bucket: aws.String(d.Bucket),
Prefix: aws.String(d.s3Path(path)),
MaxKeys: aws.Int64(1),
@ -654,6 +754,8 @@ func (d *driver) Stat(ctx context.Context, path string) (storagedriver.FileInfo,
// List returns a list of the objects that are direct descendants of the given path.
func (d *driver) List(ctx context.Context, opath string) ([]string, error) {
s := d.s3Client(ctx)
path := opath
if path != "/" && path[len(path)-1] != '/' {
path = path + "/"
@ -667,7 +769,7 @@ func (d *driver) List(ctx context.Context, opath string) ([]string, error) {
prefix = "/"
}
resp, err := d.S3.ListObjects(&s3.ListObjectsInput{
resp, err := s.ListObjects(&s3.ListObjectsInput{
Bucket: aws.String(d.Bucket),
Prefix: aws.String(d.s3Path(path)),
Delimiter: aws.String("/"),
@ -691,7 +793,7 @@ func (d *driver) List(ctx context.Context, opath string) ([]string, error) {
}
if *resp.IsTruncated {
resp, err = d.S3.ListObjects(&s3.ListObjectsInput{
resp, err = s.ListObjects(&s3.ListObjectsInput{
Bucket: aws.String(d.Bucket),
Prefix: aws.String(d.s3Path(path)),
Delimiter: aws.String("/"),
@ -740,8 +842,10 @@ func (d *driver) copy(ctx context.Context, sourcePath string, destPath string) e
return parseError(sourcePath, err)
}
s := d.s3Client(ctx)
if fileInfo.Size() <= d.MultipartCopyThresholdSize {
_, err := d.S3.CopyObject(&s3.CopyObjectInput{
_, err := s.CopyObject(&s3.CopyObjectInput{
Bucket: aws.String(d.Bucket),
Key: aws.String(d.s3Path(destPath)),
ContentType: d.getContentType(),
@ -757,7 +861,7 @@ func (d *driver) copy(ctx context.Context, sourcePath string, destPath string) e
return nil
}
createResp, err := d.S3.CreateMultipartUpload(&s3.CreateMultipartUploadInput{
createResp, err := s.CreateMultipartUpload(&s3.CreateMultipartUploadInput{
Bucket: aws.String(d.Bucket),
Key: aws.String(d.s3Path(destPath)),
ContentType: d.getContentType(),
@ -784,7 +888,7 @@ func (d *driver) copy(ctx context.Context, sourcePath string, destPath string) e
if lastByte >= fileInfo.Size() {
lastByte = fileInfo.Size() - 1
}
uploadResp, err := d.S3.UploadPartCopy(&s3.UploadPartCopyInput{
uploadResp, err := s.UploadPartCopy(&s3.UploadPartCopyInput{
Bucket: aws.String(d.Bucket),
CopySource: aws.String(d.Bucket + "/" + d.s3Path(sourcePath)),
Key: aws.String(d.s3Path(destPath)),
@ -810,7 +914,7 @@ func (d *driver) copy(ctx context.Context, sourcePath string, destPath string) e
}
}
_, err = d.S3.CompleteMultipartUpload(&s3.CompleteMultipartUploadInput{
_, err = s.CompleteMultipartUpload(&s3.CompleteMultipartUploadInput{
Bucket: aws.String(d.Bucket),
Key: aws.String(d.s3Path(destPath)),
UploadId: createResp.UploadId,
@ -835,10 +939,12 @@ func (d *driver) Delete(ctx context.Context, path string) error {
Bucket: aws.String(d.Bucket),
Prefix: aws.String(s3Path),
}
s := d.s3Client(ctx)
ListLoop:
for {
// list all the objects
resp, err := d.S3.ListObjects(listObjectsInput)
resp, err := s.ListObjects(listObjectsInput)
// resp.Contents can only be empty on the first call
// if there were no more results to return after the first call, resp.IsTruncated would have been false
@ -870,7 +976,7 @@ ListLoop:
// need to chunk objects into groups of 1000 per s3 restrictions
total := len(s3Objects)
for i := 0; i < total; i += 1000 {
_, err := d.S3.DeleteObjects(&s3.DeleteObjectsInput{
_, err := s.DeleteObjects(&s3.DeleteObjectsInput{
Bucket: aws.String(d.Bucket),
Delete: &s3.Delete{
Objects: s3Objects[i:min(i+1000, total)],
@ -887,6 +993,8 @@ ListLoop:
// URLFor returns a URL which may be used to retrieve the content stored at the given path.
// May return an UnsupportedMethodErr in certain StorageDriver implementations.
func (d *driver) URLFor(ctx context.Context, path string, options map[string]interface{}) (string, error) {
s := d.s3Client(ctx)
methodString := "GET"
method, ok := options["method"]
if ok {
@ -909,12 +1017,12 @@ func (d *driver) URLFor(ctx context.Context, path string, options map[string]int
switch methodString {
case "GET":
req, _ = d.S3.GetObjectRequest(&s3.GetObjectInput{
req, _ = s.GetObjectRequest(&s3.GetObjectInput{
Bucket: aws.String(d.Bucket),
Key: aws.String(d.s3Path(path)),
})
case "HEAD":
req, _ = d.S3.HeadObjectRequest(&s3.HeadObjectInput{
req, _ = s.HeadObjectRequest(&s3.HeadObjectInput{
Bucket: aws.String(d.Bucket),
Key: aws.String(d.s3Path(path)),
})
@ -989,9 +1097,11 @@ func (d *driver) doWalk(parentCtx context.Context, objectCount *int64, path, pre
MaxKeys: aws.Int64(listMax),
}
s := d.s3Client(parentCtx)
ctx, done := dcontext.WithTrace(parentCtx)
defer done("s3aws.ListObjectsV2Pages(%s)", path)
listObjectErr := d.S3.ListObjectsV2PagesWithContext(ctx, listObjectsInput, func(objects *s3.ListObjectsV2Output, lastPage bool) bool {
listObjectErr := s.ListObjectsV2PagesWithContext(ctx, listObjectsInput, func(objects *s3.ListObjectsV2Output, lastPage bool) bool {
var count int64
// KeyCount was introduced with version 2 of the GET Bucket operation in S3.

View File

@ -97,6 +97,8 @@ func init() {
driverName + "-test",
objectACL,
sessionToken,
false,
map[string]string{},
}
return New(parameters)