storagedriver: Added a new WalkFiles method (later removed)

This commit is contained in:
Collin Shoop
2021-06-24 14:42:02 -04:00
parent e465862185
commit f31195b2e8
8 changed files with 123 additions and 12 deletions

View File

@@ -360,11 +360,17 @@ func (d *driver) URLFor(ctx context.Context, path string, options map[string]int
}
// Walk traverses a filesystem defined within driver, starting
// from the given path, calling f on each file
// from the given path, calling f on each file and directory
func (d *driver) Walk(ctx context.Context, path string, f storagedriver.WalkFn) error {
return storagedriver.WalkFallback(ctx, d, path, f)
}
// WalkFiles traverses a filesystem defined within driver, starting
// from the given path, calling f on each file
func (d *driver) WalkFiles(ctx context.Context, path string, f storagedriver.WalkFn) error {
return storagedriver.WalkFilesFallback(ctx, d, path, f)
}
// directDescendants will find direct descendants (blobs or virtual containers)
// of from list of blob paths and will return their full paths. Elements in blobs
// list must be prefixed with a "/" and

View File

@@ -241,3 +241,15 @@ func (base *Base) Walk(ctx context.Context, path string, f storagedriver.WalkFn)
return base.setDriverName(base.StorageDriver.Walk(ctx, path, f))
}
// WalkFiles wraps WalkFiles of underlying storage driver.
func (base *Base) WalkFiles(ctx context.Context, path string, f storagedriver.WalkFn) error {
ctx, done := dcontext.WithTrace(ctx)
defer done("%s.WalkFiles(%q)", base.Name(), path)
if !storagedriver.PathRegexp.MatchString(path) && path != "/" {
return storagedriver.InvalidPathError{Path: path, DriverName: base.StorageDriver.Name()}
}
return base.setDriverName(base.StorageDriver.WalkFiles(ctx, path, f))
}

View File

@@ -290,11 +290,17 @@ func (d *driver) URLFor(ctx context.Context, path string, options map[string]int
}
// Walk traverses a filesystem defined within driver, starting
// from the given path, calling f on each file
// from the given path, calling f on each file and directory
func (d *driver) Walk(ctx context.Context, path string, f storagedriver.WalkFn) error {
return storagedriver.WalkFallback(ctx, d, path, f)
}
// WalkFiles traverses a filesystem defined within driver, starting
// from the given path, calling f on each file
func (d *driver) WalkFiles(ctx context.Context, path string, f storagedriver.WalkFn) error {
return storagedriver.WalkFilesFallback(ctx, d, path, f)
}
// fullPath returns the absolute path of a key within the Driver's storage.
func (d *driver) fullPath(subPath string) string {
return path.Join(d.rootDirectory, subPath)

View File

@@ -245,11 +245,17 @@ func (d *driver) URLFor(ctx context.Context, path string, options map[string]int
}
// Walk traverses a filesystem defined within driver, starting
// from the given path, calling f on each file
// from the given path, calling f on each file and directory
func (d *driver) Walk(ctx context.Context, path string, f storagedriver.WalkFn) error {
return storagedriver.WalkFallback(ctx, d, path, f)
}
// WalkFiles traverses a filesystem defined within driver, starting
// from the given path, calling f on each file
func (d *driver) WalkFiles(ctx context.Context, path string, f storagedriver.WalkFn) error {
return storagedriver.WalkFilesFallback(ctx, d, path, f)
}
type writer struct {
d *driver
f *file

View File

@@ -1057,7 +1057,7 @@ func (d *driver) URLFor(ctx context.Context, path string, options map[string]int
}
// Walk traverses a filesystem defined within driver, starting
// from the given path, calling f on each file
// from the given path, calling f on each file and directory
func (d *driver) Walk(ctx context.Context, from string, f storagedriver.WalkFn) error {
path := from
if !strings.HasSuffix(path, "/") {
@@ -1070,7 +1070,33 @@ func (d *driver) Walk(ctx context.Context, from string, f storagedriver.WalkFn)
}
var objectCount int64
if err := d.doWalk(ctx, &objectCount, d.s3Path(path), prefix, f); err != nil {
if err := d.doWalk(ctx, &objectCount, d.s3Path(path), prefix, true, f); err != nil {
return err
}
// S3 doesn't have the concept of empty directories, so it'll return path not found if there are no objects
if objectCount == 0 {
return storagedriver.PathNotFoundError{Path: from}
}
return nil
}
// WalkFiles traverses a filesystem defined within driver, starting
// from the given path, calling f on each file
func (d *driver) WalkFiles(ctx context.Context, from string, f storagedriver.WalkFn) error {
path := from
if !strings.HasSuffix(path, "/") {
path = path + "/"
}
prefix := ""
if d.s3Path("") == "" {
prefix = "/"
}
var objectCount int64
if err := d.doWalk(ctx, &objectCount, d.s3Path(path), prefix, false, f); err != nil {
return err
}
@@ -1110,13 +1136,17 @@ func (wi walkInfoContainer) IsDir() bool {
return wi.FileInfoFields.IsDir
}
func (d *driver) doWalk(parentCtx context.Context, objectCount *int64, path, prefix string, f storagedriver.WalkFn) error {
func (d *driver) doWalk(parentCtx context.Context, objectCount *int64, path, prefix string, walkDirectories bool, f storagedriver.WalkFn) error {
var retError error
delimiter := ""
if walkDirectories {
delimiter = "/"
}
listObjectsInput := &s3.ListObjectsV2Input{
Bucket: aws.String(d.Bucket),
Prefix: aws.String(path),
Delimiter: aws.String("/"),
Delimiter: aws.String(delimiter),
MaxKeys: aws.Int64(listMax),
}
@@ -1180,13 +1210,15 @@ func (d *driver) doWalk(parentCtx context.Context, objectCount *int64, path, pre
return false
}
if walkDirectories {
if walkInfo.IsDir() {
if err := d.doWalk(ctx, objectCount, *walkInfo.prefix, prefix, f); err != nil {
if err := d.doWalk(ctx, objectCount, *walkInfo.prefix, prefix, walkDirectories, f); err != nil {
retError = err
return false
}
}
}
}
return true
})

View File

@@ -90,6 +90,11 @@ type StorageDriver interface {
// to a directory, the directory will not be entered and Walk
// will continue the traversal. If fileInfo refers to a normal file, processing stops
Walk(ctx context.Context, path string, f WalkFn) error
// WalkFiles traverses a filesystem defined within driver, starting
// from the given path, calling f on each file but does not call f with directories.
// If an error is returned from the WalkFn, processing stops
WalkFiles(ctx context.Context, path string, f WalkFn) error
}
// FileWriter provides an abstraction for an opened writable file-like object in

View File

@@ -658,11 +658,17 @@ func (d *driver) URLFor(ctx context.Context, path string, options map[string]int
}
// Walk traverses a filesystem defined within driver, starting
// from the given path, calling f on each file
// from the given path, calling f on each file and directory
func (d *driver) Walk(ctx context.Context, path string, f storagedriver.WalkFn) error {
return storagedriver.WalkFallback(ctx, d, path, f)
}
// WalkFiles traverses a filesystem defined within driver, starting
// from the given path, calling f on each file
func (d *driver) WalkFiles(ctx context.Context, path string, f storagedriver.WalkFn) error {
return storagedriver.WalkFilesFallback(ctx, d, path, f)
}
func (d *driver) swiftPath(path string) string {
return strings.TrimLeft(strings.TrimRight(d.Prefix+"/files"+path, "/"), "/")
}

View File

@@ -59,3 +59,41 @@ func WalkFallback(ctx context.Context, driver StorageDriver, from string, f Walk
}
return nil
}
// WalkFilesFallback traverses a filesystem defined within driver, starting
// from the given path, calling f on each file. It uses the List method and Stat to drive itself.
// If an error is returned from WalkFn, processing stops
func WalkFilesFallback(ctx context.Context, driver StorageDriver, from string, f WalkFn) error {
children, err := driver.List(ctx, from)
if err != nil {
return err
}
sort.Stable(sort.StringSlice(children))
for _, child := range children {
// TODO(stevvooe): Calling driver.Stat for every entry is quite
// expensive when running against backends with a slow Stat
// implementation, such as s3. This is very likely a serious
// performance bottleneck.
fileInfo, err := driver.Stat(ctx, child)
if err != nil {
switch err.(type) {
case PathNotFoundError:
// repository was removed in between listing and enumeration. Ignore it.
logrus.WithField("path", child).Infof("ignoring deleted path")
continue
default:
return err
}
}
if !fileInfo.IsDir() {
err = f(fileInfo)
if err != nil {
return err
}
}
if err := WalkFallback(ctx, driver, child, f); err != nil {
return err
}
}
return nil
}