mirror of
https://github.com/distribution/distribution.git
synced 2025-09-15 06:39:25 +00:00
storagedriver/s3: Optimized Walk implementation + bugfix
Optimized S3 Walk impl by no longer listing files recursively. Overall gives a huge performance increase both in terms of runtime and S3 calls (up to ~500x). Fixed a bug in WalkFallback where ErrSkipDir for was not handled as documented for non-directory. Signed-off-by: Collin Shoop <cshoop@digitalocean.com>
This commit is contained in:
@@ -20,6 +20,7 @@ import (
|
||||
"io/ioutil"
|
||||
"math"
|
||||
"net/http"
|
||||
"path/filepath"
|
||||
"reflect"
|
||||
"sort"
|
||||
"strconv"
|
||||
@@ -941,110 +942,84 @@ func (d *driver) Walk(ctx context.Context, from string, f storagedriver.WalkFn)
|
||||
return nil
|
||||
}
|
||||
|
||||
type walkInfoContainer struct {
|
||||
storagedriver.FileInfoFields
|
||||
prefix *string
|
||||
}
|
||||
|
||||
// Path provides the full path of the target of this file info.
|
||||
func (wi walkInfoContainer) Path() string {
|
||||
return wi.FileInfoFields.Path
|
||||
}
|
||||
|
||||
// Size returns current length in bytes of the file. The return value can
|
||||
// be used to write to the end of the file at path. The value is
|
||||
// meaningless if IsDir returns true.
|
||||
func (wi walkInfoContainer) Size() int64 {
|
||||
return wi.FileInfoFields.Size
|
||||
}
|
||||
|
||||
// ModTime returns the modification time for the file. For backends that
|
||||
// don't have a modification time, the creation time should be returned.
|
||||
func (wi walkInfoContainer) ModTime() time.Time {
|
||||
return wi.FileInfoFields.ModTime
|
||||
}
|
||||
|
||||
// IsDir returns true if the path is a directory.
|
||||
func (wi walkInfoContainer) IsDir() bool {
|
||||
return wi.FileInfoFields.IsDir
|
||||
}
|
||||
|
||||
func (d *driver) doWalk(parentCtx context.Context, objectCount *int64, path, prefix string, f storagedriver.WalkFn) error {
|
||||
var retError error
|
||||
var (
|
||||
retError error
|
||||
// the most recent directory walked for de-duping
|
||||
prevDir string
|
||||
// the most recent skip directory to avoid walking over undesirable files
|
||||
prevSkipDir string
|
||||
)
|
||||
prevDir = prefix + path
|
||||
|
||||
listObjectsInput := &s3.ListObjectsV2Input{
|
||||
Bucket: aws.String(d.Bucket),
|
||||
Prefix: aws.String(path),
|
||||
Delimiter: aws.String("/"),
|
||||
MaxKeys: aws.Int64(listMax),
|
||||
Bucket: aws.String(d.Bucket),
|
||||
Prefix: aws.String(path),
|
||||
MaxKeys: aws.Int64(listMax),
|
||||
}
|
||||
|
||||
ctx, done := dcontext.WithTrace(parentCtx)
|
||||
defer done("s3aws.ListObjectsV2Pages(%s)", path)
|
||||
|
||||
// When the "delimiter" argument is omitted, the S3 list API will list all objects in the bucket
|
||||
// recursively, omitting directory paths. Objects are listed in sorted, depth-first order so we
|
||||
// can infer all the directories by comparing each object path to the last one we saw.
|
||||
// See: https://docs.aws.amazon.com/AmazonS3/latest/userguide/ListingKeysUsingAPIs.html
|
||||
|
||||
// With files returned in sorted depth-first order, directories are inferred in the same order.
|
||||
// ErrSkipDir is handled by explicitly skipping over any files under the skipped directory. This may be sub-optimal
|
||||
// for extreme edge cases but for the general use case in a registry, this is orders of magnitude
|
||||
// faster than a more explicit recursive implementation.
|
||||
listObjectErr := d.S3.ListObjectsV2PagesWithContext(ctx, listObjectsInput, func(objects *s3.ListObjectsV2Output, lastPage bool) bool {
|
||||
|
||||
var count int64
|
||||
// KeyCount was introduced with version 2 of the GET Bucket operation in S3.
|
||||
// Some S3 implementations don't support V2 now, so we fall back to manual
|
||||
// calculation of the key count if required
|
||||
if objects.KeyCount != nil {
|
||||
count = *objects.KeyCount
|
||||
*objectCount += *objects.KeyCount
|
||||
} else {
|
||||
count = int64(len(objects.Contents) + len(objects.CommonPrefixes))
|
||||
*objectCount += count
|
||||
}
|
||||
|
||||
walkInfos := make([]walkInfoContainer, 0, count)
|
||||
|
||||
for _, dir := range objects.CommonPrefixes {
|
||||
commonPrefix := *dir.Prefix
|
||||
walkInfos = append(walkInfos, walkInfoContainer{
|
||||
prefix: dir.Prefix,
|
||||
FileInfoFields: storagedriver.FileInfoFields{
|
||||
IsDir: true,
|
||||
Path: strings.Replace(commonPrefix[:len(commonPrefix)-1], d.s3Path(""), prefix, 1),
|
||||
},
|
||||
})
|
||||
}
|
||||
walkInfos := make([]storagedriver.FileInfoInternal, 0, len(objects.Contents))
|
||||
|
||||
for _, file := range objects.Contents {
|
||||
// empty prefixes are listed as objects inside its own prefix.
|
||||
// https://docs.aws.amazon.com/AmazonS3/latest/user-guide/using-folders.html
|
||||
if strings.HasSuffix(*file.Key, "/") {
|
||||
continue
|
||||
filePath := strings.Replace(*file.Key, d.s3Path(""), prefix, 1)
|
||||
|
||||
// get a list of all inferred directories between the previous directory and this file
|
||||
dirs := directoryDiff(prevDir, filePath)
|
||||
if len(dirs) > 0 {
|
||||
for _, dir := range dirs {
|
||||
walkInfos = append(walkInfos, storagedriver.FileInfoInternal{
|
||||
FileInfoFields: storagedriver.FileInfoFields{
|
||||
IsDir: true,
|
||||
Path: dir,
|
||||
},
|
||||
})
|
||||
prevDir = dir
|
||||
}
|
||||
}
|
||||
walkInfos = append(walkInfos, walkInfoContainer{
|
||||
|
||||
walkInfos = append(walkInfos, storagedriver.FileInfoInternal{
|
||||
FileInfoFields: storagedriver.FileInfoFields{
|
||||
IsDir: false,
|
||||
Size: *file.Size,
|
||||
ModTime: *file.LastModified,
|
||||
Path: strings.Replace(*file.Key, d.s3Path(""), prefix, 1),
|
||||
Path: filePath,
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
sort.SliceStable(walkInfos, func(i, j int) bool { return walkInfos[i].FileInfoFields.Path < walkInfos[j].FileInfoFields.Path })
|
||||
|
||||
for _, walkInfo := range walkInfos {
|
||||
err := f(walkInfo)
|
||||
|
||||
if err == storagedriver.ErrSkipDir {
|
||||
if walkInfo.IsDir() {
|
||||
continue
|
||||
} else {
|
||||
break
|
||||
}
|
||||
} else if err != nil {
|
||||
retError = err
|
||||
return false
|
||||
// skip any results under the last skip directory
|
||||
if prevSkipDir != "" && strings.HasPrefix(walkInfo.Path(), prevSkipDir) {
|
||||
continue
|
||||
}
|
||||
|
||||
if walkInfo.IsDir() {
|
||||
if err := d.doWalk(ctx, objectCount, *walkInfo.prefix, prefix, f); err != nil {
|
||||
retError = err
|
||||
err := f(walkInfo)
|
||||
*objectCount++
|
||||
|
||||
if err != nil {
|
||||
if err == storagedriver.ErrSkipDir {
|
||||
if walkInfo.IsDir() {
|
||||
prevSkipDir = walkInfo.Path()
|
||||
continue
|
||||
}
|
||||
// is file, stop gracefully
|
||||
return false
|
||||
}
|
||||
retError = err
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
@@ -1061,6 +1036,44 @@ func (d *driver) doWalk(parentCtx context.Context, objectCount *int64, path, pre
|
||||
return nil
|
||||
}
|
||||
|
||||
// directoryDiff finds all directories that are not in common between
|
||||
// the previous and current paths in sorted order.
|
||||
//
|
||||
// Eg 1 directoryDiff("/path/to/folder", "/path/to/folder/folder/file")
|
||||
// => [ "/path/to/folder/folder" ],
|
||||
// Eg 2 directoryDiff("/path/to/folder/folder1", "/path/to/folder/folder2/file")
|
||||
// => [ "/path/to/folder/folder2" ]
|
||||
// Eg 3 directoryDiff("/path/to/folder/folder1/file", "/path/to/folder/folder2/file")
|
||||
// => [ "/path/to/folder/folder2" ]
|
||||
// Eg 4 directoryDiff("/path/to/folder/folder1/file", "/path/to/folder/folder2/folder1/file")
|
||||
// => [ "/path/to/folder/folder2", "/path/to/folder/folder2/folder1" ]
|
||||
// Eg 5 directoryDiff("/", "/path/to/folder/folder/file")
|
||||
// => [ "/path", "/path/to", "/path/to/folder", "/path/to/folder/folder" ],
|
||||
func directoryDiff(prev, current string) []string {
|
||||
var paths []string
|
||||
|
||||
if prev == "" || current == "" {
|
||||
return paths
|
||||
}
|
||||
|
||||
parent := current
|
||||
for {
|
||||
parent = filepath.Dir(parent)
|
||||
if parent == "/" || parent == prev || strings.HasPrefix(prev, parent) {
|
||||
break
|
||||
}
|
||||
paths = append(paths, parent)
|
||||
}
|
||||
reverse(paths)
|
||||
return paths
|
||||
}
|
||||
|
||||
func reverse(s []string) {
|
||||
for i, j := 0, len(s)-1; i < j; i, j = i+1, j-1 {
|
||||
s[i], s[j] = s[j], s[i]
|
||||
}
|
||||
}
|
||||
|
||||
func (d *driver) s3Path(path string) string {
|
||||
return strings.TrimLeft(strings.TrimRight(d.RootDirectory, "/")+path, "/")
|
||||
}
|
||||
|
Reference in New Issue
Block a user