storagedriver/s3: Reverting WalkFiles method. Instead, optimizing the Walk method.

This commit is contained in:
Collin Shoop 2021-06-28 12:17:40 -04:00
parent 1c3ee66061
commit e20be1ead5
11 changed files with 52 additions and 256 deletions

View File

@ -93,7 +93,7 @@ func (bs *blobStore) Enumerate(ctx context.Context, ingester func(dgst digest.Di
return err
}
return bs.driver.WalkFiles(ctx, specPath, func(fileInfo driver.FileInfo) error {
return bs.driver.Walk(ctx, specPath, func(fileInfo driver.FileInfo) error {
currentPath := fileInfo.Path()
// we only want to parse paths that end with /data
_, fileName := path.Split(currentPath)

View File

@ -365,12 +365,6 @@ func (d *driver) Walk(ctx context.Context, path string, f storagedriver.WalkFn)
return storagedriver.WalkFallback(ctx, d, path, f)
}
// WalkFiles traverses a filesystem defined within driver, starting
// from the given path, calling f on each file
func (d *driver) WalkFiles(ctx context.Context, path string, f storagedriver.WalkFn) error {
return storagedriver.WalkFilesFallback(ctx, d, path, f)
}
// directDescendants will find direct descendants (blobs or virtual containers)
// of from list of blob paths and will return their full paths. Elements in blobs
// list must be prefixed with a "/" and

View File

@ -241,15 +241,3 @@ func (base *Base) Walk(ctx context.Context, path string, f storagedriver.WalkFn)
return base.setDriverName(base.StorageDriver.Walk(ctx, path, f))
}
// WalkFiles wraps WalkFiles of underlying storage driver.
func (base *Base) WalkFiles(ctx context.Context, path string, f storagedriver.WalkFn) error {
ctx, done := dcontext.WithTrace(ctx)
defer done("%s.WalkFiles(%q)", base.Name(), path)
if !storagedriver.PathRegexp.MatchString(path) && path != "/" {
return storagedriver.InvalidPathError{Path: path, DriverName: base.StorageDriver.Name()}
}
return base.setDriverName(base.StorageDriver.WalkFiles(ctx, path, f))
}

View File

@ -295,12 +295,6 @@ func (d *driver) Walk(ctx context.Context, path string, f storagedriver.WalkFn)
return storagedriver.WalkFallback(ctx, d, path, f)
}
// WalkFiles traverses a filesystem defined within driver, starting
// from the given path, calling f on each file
func (d *driver) WalkFiles(ctx context.Context, path string, f storagedriver.WalkFn) error {
return storagedriver.WalkFilesFallback(ctx, d, path, f)
}
// fullPath returns the absolute path of a key within the Driver's storage.
func (d *driver) fullPath(subPath string) string {
return path.Join(d.rootDirectory, subPath)

View File

@ -250,12 +250,6 @@ func (d *driver) Walk(ctx context.Context, path string, f storagedriver.WalkFn)
return storagedriver.WalkFallback(ctx, d, path, f)
}
// WalkFiles traverses a filesystem defined within driver, starting
// from the given path, calling f on each file
func (d *driver) WalkFiles(ctx context.Context, path string, f storagedriver.WalkFn) error {
return storagedriver.WalkFilesFallback(ctx, d, path, f)
}
type writer struct {
d *driver
f *file

View File

@ -20,6 +20,7 @@ import (
"io/ioutil"
"math"
"net/http"
"path/filepath"
"reflect"
"sort"
"strconv"
@ -1057,7 +1058,7 @@ func (d *driver) URLFor(ctx context.Context, path string, options map[string]int
}
// Walk traverses a filesystem defined within driver, starting
// from the given path, calling f on each file and directory
// from the given path, calling f on each file
func (d *driver) Walk(ctx context.Context, from string, f storagedriver.WalkFn) error {
path := from
if !strings.HasSuffix(path, "/") {
@ -1070,7 +1071,7 @@ func (d *driver) Walk(ctx context.Context, from string, f storagedriver.WalkFn)
}
var objectCount int64
if err := d.doWalk(ctx, &objectCount, d.s3Path(path), prefix, true, f); err != nil {
if err := d.doWalk(ctx, &objectCount, d.s3Path(path), prefix, f); err != nil {
return err
}
@ -1082,72 +1083,19 @@ func (d *driver) Walk(ctx context.Context, from string, f storagedriver.WalkFn)
return nil
}
// WalkFiles traverses a filesystem defined within driver, starting
// from the given path, calling f on each file
func (d *driver) WalkFiles(ctx context.Context, from string, f storagedriver.WalkFn) error {
path := from
if !strings.HasSuffix(path, "/") {
path = path + "/"
}
func (d *driver) doWalk(parentCtx context.Context, objectCount *int64, path, prefix string, f storagedriver.WalkFn) error {
var (
retError error
// the most recent directory walked for de-duping
prevDir string
// the most recent skip directory to avoid walking over undesirable files
prevSkipDir string
)
prefix := ""
if d.s3Path("") == "" {
prefix = "/"
}
var objectCount int64
if err := d.doWalk(ctx, &objectCount, d.s3Path(path), prefix, false, f); err != nil {
return err
}
// S3 doesn't have the concept of empty directories, so it'll return path not found if there are no objects
if objectCount == 0 {
return storagedriver.PathNotFoundError{Path: from}
}
return nil
}
type walkInfoContainer struct {
storagedriver.FileInfoFields
prefix *string
}
// Path provides the full path of the target of this file info.
func (wi walkInfoContainer) Path() string {
return wi.FileInfoFields.Path
}
// Size returns current length in bytes of the file. The return value can
// be used to write to the end of the file at path. The value is
// meaningless if IsDir returns true.
func (wi walkInfoContainer) Size() int64 {
return wi.FileInfoFields.Size
}
// ModTime returns the modification time for the file. For backends that
// don't have a modification time, the creation time should be returned.
func (wi walkInfoContainer) ModTime() time.Time {
return wi.FileInfoFields.ModTime
}
// IsDir returns true if the path is a directory.
func (wi walkInfoContainer) IsDir() bool {
return wi.FileInfoFields.IsDir
}
func (d *driver) doWalk(parentCtx context.Context, objectCount *int64, path, prefix string, walkDirectories bool, f storagedriver.WalkFn) error {
var retError error
delimiter := ""
if walkDirectories {
delimiter = "/"
}
listObjectsInput := &s3.ListObjectsV2Input{
Bucket: aws.String(d.Bucket),
Prefix: aws.String(path),
Delimiter: aws.String(delimiter),
MaxKeys: aws.Int64(listMax),
Bucket: aws.String(d.Bucket),
Prefix: aws.String(path),
MaxKeys: aws.Int64(listMax),
}
s := d.s3Client(parentCtx)
@ -1155,36 +1103,10 @@ func (d *driver) doWalk(parentCtx context.Context, objectCount *int64, path, pre
ctx, done := dcontext.WithTrace(parentCtx)
defer done("s3aws.ListObjectsV2Pages(%s)", path)
listObjectErr := s.ListObjectsV2PagesWithContext(ctx, listObjectsInput, func(objects *s3.ListObjectsV2Output, lastPage bool) bool {
var count int64
// KeyCount was introduced with version 2 of the GET Bucket operation in S3.
// Some s3 implementations (looking at you ceph/rgw) have a buggy
// implementation so we intionally avoid ever using it, preferring instead
// to calculate the count from the Contents and CommonPrefixes fields of
// the s3.ListObjectsV2Output. we retain the commented out KeyCount code
// and this comment so as not to forget this problem moving forward.
//
// count = *objects.KeyCount
// *objectCount += *objects.KeyCount
count = int64(len(objects.Contents) + len(objects.CommonPrefixes))
*objectCount += count
walkInfos := make([]walkInfoContainer, 0, count)
for _, dir := range objects.CommonPrefixes {
commonPrefix := *dir.Prefix
walkInfos = append(walkInfos, walkInfoContainer{
prefix: dir.Prefix,
FileInfoFields: storagedriver.FileInfoFields{
IsDir: true,
Path: strings.Replace(commonPrefix[:len(commonPrefix)-1], d.s3Path(""), prefix, 1),
},
})
}
walkInfos := make([]storagedriver.FileInfoInternal, 0, len(objects.Contents))
for _, file := range objects.Contents {
walkInfos = append(walkInfos, walkInfoContainer{
walkInfos = append(walkInfos, storagedriver.FileInfoInternal{
FileInfoFields: storagedriver.FileInfoFields{
IsDir: false,
Size: *file.Size,
@ -1194,28 +1116,44 @@ func (d *driver) doWalk(parentCtx context.Context, objectCount *int64, path, pre
})
}
sort.SliceStable(walkInfos, func(i, j int) bool { return walkInfos[i].FileInfoFields.Path < walkInfos[j].FileInfoFields.Path })
for _, walkInfo := range walkInfos {
err := f(walkInfo)
if err == storagedriver.ErrSkipDir {
if walkInfo.IsDir() {
continue
} else {
break
}
} else if err != nil {
retError = err
return false
// skip any results under the last skip directory
if prevSkipDir != "" && strings.HasPrefix(walkInfo.Path(), prevSkipDir) {
continue
}
if walkInfo.IsDir() && walkDirectories {
if err := d.doWalk(ctx, objectCount, *walkInfo.prefix, prefix, walkDirectories, f); err != nil {
dir := filepath.Dir(walkInfo.Path())
if dir != prevDir {
prevDir = dir
walkDirInfo := storagedriver.FileInfoInternal{
FileInfoFields: storagedriver.FileInfoFields{
IsDir: true,
Path: dir,
},
}
err := f(walkDirInfo)
*objectCount++
if err != nil {
if err == storagedriver.ErrSkipDir {
prevSkipDir = dir
continue
}
retError = err
return false
}
}
err := f(walkInfo)
*objectCount++
if err != nil {
if err == storagedriver.ErrSkipDir {
break
}
retError = err
return false
}
}
return true
})

View File

@ -90,11 +90,6 @@ type StorageDriver interface {
// to a directory, the directory will not be entered and Walk
// will continue the traversal. If fileInfo refers to a normal file, processing stops
Walk(ctx context.Context, path string, f WalkFn) error
// WalkFiles traverses a filesystem defined within driver, starting
// from the given path, calling f on each file but does not call f with directories.
// If an error is returned from the WalkFn, processing stops
WalkFiles(ctx context.Context, path string, f WalkFn) error
}
// FileWriter provides an abstraction for an opened writable file-like object in

View File

@ -663,12 +663,6 @@ func (d *driver) Walk(ctx context.Context, path string, f storagedriver.WalkFn)
return storagedriver.WalkFallback(ctx, d, path, f)
}
// WalkFiles traverses a filesystem defined within driver, starting
// from the given path, calling f on each file
func (d *driver) WalkFiles(ctx context.Context, path string, f storagedriver.WalkFn) error {
return storagedriver.WalkFilesFallback(ctx, d, path, f)
}
func (d *driver) swiftPath(path string) string {
return strings.TrimLeft(strings.TrimRight(d.Prefix+"/files"+path, "/"), "/")
}

View File

@ -22,20 +22,6 @@ type WalkFn func(fileInfo FileInfo) error
// to a directory, the directory will not be entered and Walk
// will continue the traversal. If fileInfo refers to a normal file, processing stops
func WalkFallback(ctx context.Context, driver StorageDriver, from string, f WalkFn) error {
return doWalk(ctx, driver, from, true, f)
}
// WalkFilesFallback traverses a filesystem defined within driver, starting
// from the given path, calling f on each file. It uses the List method and Stat to drive itself.
// If an error is returned from WalkFn, processing stops
func WalkFilesFallback(ctx context.Context, driver StorageDriver, from string, f WalkFn) error {
return doWalk(ctx, driver, from, false, f)
}
// WalkFilesFallback traverses a filesystem defined within driver, starting
// from the given path, calling f on each file. It uses the List method and Stat to drive itself.
// If an error is returned from WalkFn, processing stops
func doWalk(ctx context.Context, driver StorageDriver, from string, walkDir bool, f WalkFn) error {
children, err := driver.List(ctx, from)
if err != nil {
return err
@ -57,15 +43,12 @@ func doWalk(ctx context.Context, driver StorageDriver, from string, walkDir bool
return err
}
}
err = nil
if !fileInfo.IsDir() || walkDir {
err = f(fileInfo)
}
err = f(fileInfo)
if err == nil && fileInfo.IsDir() {
if err := doWalk(ctx, driver, child, walkDir, f); err != nil {
if err := WalkFallback(ctx, driver, child, f); err != nil {
return err
}
} else if err == ErrSkipDir && walkDir {
} else if err == ErrSkipDir {
// Stop iteration if it's a file, otherwise noop if it's a directory
if !fileInfo.IsDir() {
return nil

View File

@ -73,26 +73,6 @@ func TestWalkFileRemoved(t *testing.T) {
}
}
func TestWalkFilesFileRemoved(t *testing.T) {
d := &changingFileSystem{
fileset: []string{"zoidberg", "bender"},
keptFiles: map[string]bool{
"zoidberg": true,
},
}
infos := []FileInfo{}
err := WalkFilesFallback(context.Background(), d, "", func(fileInfo FileInfo) error {
infos = append(infos, fileInfo)
return nil
})
if len(infos) != 1 || infos[0].Path() != "zoidberg" {
t.Errorf(fmt.Sprintf("unexpected path set during walk: %s", infos))
}
if err != nil {
t.Fatalf(err.Error())
}
}
func TestWalkFallback(t *testing.T) {
d := &fileSystem{
fileset: map[string][]string{
@ -212,70 +192,6 @@ func TestWalkFallbackErr(t *testing.T) {
compareWalked(t, expected, walked)
}
// WalkFiles is expected to only walk files, not directories
func TestWalkFilesFallback(t *testing.T) {
d := &fileSystem{
fileset: map[string][]string{
"/": {"/folder1", "/file1", "/folder2"},
"/folder1": {"/folder1/folder1"},
"/folder2": {"/folder2/file1", "/folder2/file2"},
"/folder1/folder1": {"/folder1/folder1/file1", "/folder1/folder1/file2"},
},
}
expected := []string{
"/file1",
"/folder1/folder1/file1",
"/folder1/folder1/file2",
"/folder2/file1",
"/folder2/file2",
}
var walked []string
err := WalkFilesFallback(context.Background(), d, "/", func(fileInfo FileInfo) error {
if fileInfo.IsDir() {
t.Fatalf("can't walk over dir %s", fileInfo.Path())
}
if fileInfo.IsDir() != d.isDir(fileInfo.Path()) {
t.Fatalf("fileInfo isDir not matching file system: expected %t actual %t", d.isDir(fileInfo.Path()), fileInfo.IsDir())
}
walked = append(walked, fileInfo.Path())
return nil
})
if err != nil {
t.Fatalf(err.Error())
}
compareWalked(t, expected, walked)
}
// WalkFiles is expected to stop when any error is given
func TestWalkFilesFallbackErr(t *testing.T) {
d := &fileSystem{
fileset: map[string][]string{
"/": {"/file1", "/folder1", "/folder2"},
"/folder1": {"/folder1/file1"},
"/folder2": {"/folder2/file1"},
},
}
skipFile := "/folder1/file1"
expected := []string{
"/file1", "/folder1/file1",
}
var walked []string
err := WalkFilesFallback(context.Background(), d, "/", func(fileInfo FileInfo) error {
fmt.Println("Walk ", fileInfo.Path())
walked = append(walked, fileInfo.Path())
if fileInfo.Path() == skipFile {
return ErrSkipDir
}
return nil
})
if err == nil {
t.Fatalf("expected Walk to ErrSkipDir %v", err)
}
compareWalked(t, expected, walked)
}
func compareWalked(t *testing.T, expected, walked []string) {
if len(walked) != len(expected) {
t.Fatalf("Mismatch number of fileInfo walked %d expected %d", len(walked), len(expected))

View File

@ -247,7 +247,7 @@ func (lbs *linkedBlobStore) Enumerate(ctx context.Context, ingestor func(digest.
if err != nil {
return err
}
return lbs.driver.WalkFiles(ctx, rootPath, func(fileInfo driver.FileInfo) error {
return lbs.driver.Walk(ctx, rootPath, func(fileInfo driver.FileInfo) error {
filePath := fileInfo.Path()
// check if it's a link