storagedriver/s3: Major change to the S3 Walk impl to infer directories to walk between files. This is needed for manifest enumeration among others

This commit is contained in:
Collin Shoop 2021-06-29 10:29:05 -04:00
parent 8d38cde0f3
commit b9b0cac122
2 changed files with 88 additions and 56 deletions

View File

@ -1091,6 +1091,7 @@ func (d *driver) doWalk(parentCtx context.Context, objectCount *int64, path, pre
// the most recent skip directory to avoid walking over undesirable files // the most recent skip directory to avoid walking over undesirable files
prevSkipDir string prevSkipDir string
) )
prevDir = prefix + path
listObjectsInput := &s3.ListObjectsV2Input{ listObjectsInput := &s3.ListObjectsV2Input{
Bucket: aws.String(d.Bucket), Bucket: aws.String(d.Bucket),
@ -1116,12 +1117,28 @@ func (d *driver) doWalk(parentCtx context.Context, objectCount *int64, path, pre
walkInfos := make([]storagedriver.FileInfoInternal, 0, len(objects.Contents)) walkInfos := make([]storagedriver.FileInfoInternal, 0, len(objects.Contents))
for _, file := range objects.Contents { for _, file := range objects.Contents {
filePath := strings.Replace(*file.Key, d.s3Path(""), prefix, 1)
// get a list of all inferred directories skipped between the previous directory and this file
dirs := directoryDiff(prevDir, filePath)
if len(dirs) > 0 {
for _, dir := range dirs {
walkInfos = append(walkInfos, storagedriver.FileInfoInternal{
FileInfoFields: storagedriver.FileInfoFields{
IsDir: true,
Path: dir,
},
})
prevDir = dir
}
}
walkInfos = append(walkInfos, storagedriver.FileInfoInternal{ walkInfos = append(walkInfos, storagedriver.FileInfoInternal{
FileInfoFields: storagedriver.FileInfoFields{ FileInfoFields: storagedriver.FileInfoFields{
IsDir: false, IsDir: false,
Size: *file.Size, Size: *file.Size,
ModTime: *file.LastModified, ModTime: *file.LastModified,
Path: strings.Replace(*file.Key, d.s3Path(""), prefix, 1), Path: filePath,
}, },
}) })
} }
@ -1132,35 +1149,16 @@ func (d *driver) doWalk(parentCtx context.Context, objectCount *int64, path, pre
continue continue
} }
// walk over file's parent directory if not a duplicate
dir := filepath.Dir(walkInfo.Path())
if dir != prevDir {
prevDir = dir
walkDirInfo := storagedriver.FileInfoInternal{
FileInfoFields: storagedriver.FileInfoFields{
IsDir: true,
Path: dir,
},
}
err := f(walkDirInfo)
*objectCount++
if err != nil {
if err == storagedriver.ErrSkipDir {
prevSkipDir = dir
continue
}
retError = err
return false
}
}
err := f(walkInfo) err := f(walkInfo)
*objectCount++ *objectCount++
if err != nil { if err != nil {
if err == storagedriver.ErrSkipDir { if err == storagedriver.ErrSkipDir {
// stop early without return error if walkInfo.IsDir() {
prevSkipDir = walkInfo.Path()
continue
}
// is file, stop gracefully
return false return false
} }
retError = err retError = err
@ -1181,6 +1179,38 @@ func (d *driver) doWalk(parentCtx context.Context, objectCount *int64, path, pre
return nil return nil
} }
// directoryDiff finds all directories that are not in common between
// the previous and current paths in sorted order.
//
// Eg 1 directoryDiff("/path/to/folder", "/path/to/folder/folder/file")
// => [ "/path/to/folder/folder" ],
// Eg 2 directoryDiff("/path/to/folder/folder1", "/path/to/folder/folder2/file")
// => [ "/path/to/folder/folder2" ]
// Eg 3 directoryDiff("/path/to/folder/folder1/file", "/path/to/folder/folder2/file")
// => [ "/path/to/folder/folder2" ]
// Eg 4 directoryDiff("/path/to/folder/folder1/file", "/path/to/folder/folder2/folder1/file")
// => [ "/path/to/folder/folder2", "/path/to/folder/folder2/folder1" ]
// Eg 5 directoryDiff("/", "/path/to/folder/folder/file")
// => [ "/path", "/path/to", "/path/to/folder", "/path/to/folder/folder" ],
func directoryDiff(prev, current string) []string {
var parents []string
if prev == "" || current == "" {
return parents
}
parent := current
for {
parent = filepath.Dir(parent)
if parent == "/" || parent == prev || strings.HasPrefix(prev, parent) {
break
}
parents = append(parents, parent)
}
sort.Sort(sort.StringSlice(parents))
return parents
}
func (d *driver) s3Path(path string) string { func (d *driver) s3Path(path string) string {
return strings.TrimLeft(strings.TrimRight(d.RootDirectory, "/")+path, "/") return strings.TrimLeft(strings.TrimRight(d.RootDirectory, "/")+path, "/")
} }

View File

@ -259,29 +259,24 @@ func TestWalk(t *testing.T) {
t.Fatalf("unexpected error creating driver with standard storage: %v", err) t.Fatalf("unexpected error creating driver with standard storage: %v", err)
} }
var fileset = map[string][]string{ var fileset = []string{
"/": {"/file1", "/folder1", "/folder2"}, "/file1",
"/folder1": {"/folder1/file1"}, "/folder1/file1",
"/folder2": {"/folder2/file1"}, "/folder2/file1",
} "/folder3/subfolder1/subfolder1/file1",
isDir := func(path string) bool { "/folder3/subfolder2/subfolder1/file1",
_, isDir := fileset[path] "/folder4/file1",
return isDir
} }
// create file structure matching fileset above // create file structure matching fileset above
var created []string var created []string
for _, paths := range fileset { for _, path := range fileset {
for _, path := range paths { err := driver.PutContent(context.Background(), path, []byte("content "+path))
if _, isDir := fileset[path]; isDir { if err != nil {
continue // skip directories fmt.Printf("unable to create file %s: %s\n", path, err)
} continue
err := driver.PutContent(context.Background(), path, []byte("content "+path))
if err != nil {
fmt.Printf("unable to create file %s: %s\n", path, err)
}
created = append(created, path)
} }
created = append(created, path)
} }
// cleanup // cleanup
@ -310,32 +305,43 @@ func TestWalk(t *testing.T) {
name: "walk all", name: "walk all",
fn: func(fileInfo storagedriver.FileInfo) error { return nil }, fn: func(fileInfo storagedriver.FileInfo) error { return nil },
expected: []string{ expected: []string{
"/",
"/file1", "/file1",
"/folder1", "/folder1",
"/folder1/file1", "/folder1/file1",
"/folder2", "/folder2",
"/folder2/file1", "/folder2/file1",
"/folder3",
"/folder3/subfolder1",
"/folder3/subfolder1/subfolder1",
"/folder3/subfolder1/subfolder1/file1",
"/folder3/subfolder2",
"/folder3/subfolder2/subfolder1",
"/folder3/subfolder2/subfolder1/file1",
"/folder4",
"/folder4/file1",
}, },
}, },
{ {
name: "skip directory", name: "skip directory",
fn: func(fileInfo storagedriver.FileInfo) error { fn: func(fileInfo storagedriver.FileInfo) error {
if fileInfo.Path() == "/folder1" { if fileInfo.Path() == "/folder3" {
return storagedriver.ErrSkipDir return storagedriver.ErrSkipDir
} }
if strings.Contains(fileInfo.Path(), "/folder1") { if strings.Contains(fileInfo.Path(), "/folder3") {
t.Fatalf("skipped dir %s and should not walk %s", "/folder1", fileInfo.Path()) t.Fatalf("skipped dir %s and should not walk %s", "/folder3", fileInfo.Path())
} }
return nil return nil
}, },
expected: []string{ expected: []string{
"/",
"/file1", "/file1",
"/folder1", // return ErrSkipDir, skip anything under /folder1 "/folder1",
// skip /folder1/file1 "/folder1/file1",
"/folder2", "/folder2",
"/folder2/file1", "/folder2/file1",
"/folder3",
// folder 3 contents skipped
"/folder4",
"/folder4/file1",
}, },
}, },
{ {
@ -347,7 +353,6 @@ func TestWalk(t *testing.T) {
return nil return nil
}, },
expected: []string{ expected: []string{
"/",
"/file1", "/file1",
"/folder1", "/folder1",
"/folder1/file1", "/folder1/file1",
@ -361,7 +366,7 @@ func TestWalk(t *testing.T) {
return errors.New("foo") return errors.New("foo")
}, },
expected: []string{ expected: []string{
"/", "/file1",
}, },
err: true, err: true,
}, },
@ -384,9 +389,6 @@ func TestWalk(t *testing.T) {
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {
err := driver.Walk(context.Background(), tc.from, func(fileInfo storagedriver.FileInfo) error { err := driver.Walk(context.Background(), tc.from, func(fileInfo storagedriver.FileInfo) error {
walked = append(walked, fileInfo.Path()) walked = append(walked, fileInfo.Path())
if fileInfo.IsDir() != isDir(fileInfo.Path()) {
t.Fatalf("fileInfo isDir not matching file system: expected %t actual %t", isDir(fileInfo.Path()), fileInfo.IsDir())
}
return tc.fn(fileInfo) return tc.fn(fileInfo)
}) })
if tc.err && err == nil { if tc.err && err == nil {
@ -493,7 +495,7 @@ func compareWalked(t *testing.T, expected, walked []string) {
} }
for i := range walked { for i := range walked {
if walked[i] != expected[i] { if walked[i] != expected[i] {
t.Fatalf("expected walked to come in order expected: walked %s", walked) t.Fatalf("walked in unexpected order: expected %s; walked %s", expected, walked)
} }
} }
} }