Paginate within DeleteCollection call.

This commit is contained in:
Wojciech Tyczyński 2023-02-09 20:59:18 +01:00
parent 9d50c0a025
commit 732775ee7d
2 changed files with 118 additions and 42 deletions

View File

@ -1143,6 +1143,11 @@ func (e *Store) DeleteReturnsDeletedObject() bool {
return e.ReturnDeletedObject
}
// deleteCollectionPageSize is the size of the page used when
// listing objects from storage during DeleteCollection calls.
// It's a variable to make allow overwriting in tests.
var deleteCollectionPageSize = int64(10000)
// DeleteCollection removes all items returned by List with a given ListOptions from storage.
//
// DeleteCollection is currently NOT atomic. It can happen that only subset of objects
@ -1155,27 +1160,11 @@ func (e *Store) DeleteCollection(ctx context.Context, deleteValidation rest.Vali
listOptions = listOptions.DeepCopy()
}
listObj, err := e.List(ctx, listOptions)
if err != nil {
return nil, err
}
items, err := meta.ExtractList(listObj)
if err != nil {
return nil, err
}
if len(items) == 0 {
// Nothing to delete, return now
return listObj, nil
}
// Spawn a number of goroutines, so that we can issue requests to storage
// in parallel to speed up deletion.
// It is proportional to the number of items to delete, up to
// DeleteCollectionWorkers (it doesn't make much sense to spawn 16
// workers to delete 10 items).
itemsLock := sync.RWMutex{}
var items []runtime.Object
// TODO(wojtek-t): Decide if we don't want to start workers more opportunistically.
workersNumber := e.DeleteCollectionWorkers
if workersNumber > len(items) {
workersNumber = len(items)
}
if workersNumber < 1 {
workersNumber = 1
}
@ -1194,7 +1183,9 @@ func (e *Store) DeleteCollection(ctx context.Context, deleteValidation rest.Vali
defer wg.Done()
for index := range toProcess {
itemsLock.RLock()
accessor, err := meta.Accessor(items[index])
itemsLock.RUnlock()
if err != nil {
errs <- err
return
@ -1220,20 +1211,86 @@ func (e *Store) DeleteCollection(ctx context.Context, deleteValidation rest.Vali
close(workersExited)
}()
func() {
hasLimit := listOptions.Limit > 0
if listOptions.Limit == 0 {
listOptions.Limit = deleteCollectionPageSize
}
// Paginate the list request and throw all items into workers.
listObj, err := func() (runtime.Object, error) {
defer close(toProcess)
for i := 0; i < len(items); i++ {
processedItems := 0
var originalList runtime.Object
for {
select {
case toProcess <- i:
case <-workersExited:
klog.V(4).InfoS("workers already exited, and there are some items waiting to be processed", "finished", i, "total", len(items))
return
case <-ctx.Done():
return nil, ctx.Err()
default:
}
listObj, err := e.List(ctx, listOptions)
if err != nil {
return nil, err
}
newItems, err := meta.ExtractList(listObj)
if err != nil {
return nil, err
}
itemsLock.Lock()
items = append(items, newItems...)
itemsLock.Unlock()
for i := 0; i < len(newItems); i++ {
select {
case toProcess <- processedItems + i:
case <-workersExited:
klog.V(4).InfoS("workers already exited, and there are some items waiting to be processed", "queued/finished", i, "total", processedItems+len(newItems))
// Try to propagate an error from the workers if possible.
select {
case err := <-errs:
return nil, err
default:
return nil, fmt.Errorf("all DeleteCollection workers exited")
}
}
}
processedItems += len(newItems)
// If the original request was setting the limit, finish after running it.
if hasLimit {
return listObj, nil
}
if originalList == nil {
originalList = listObj
meta.SetList(originalList, nil)
}
// If there are no more items, return the list.
m, err := meta.ListAccessor(listObj)
if err != nil {
return nil, err
}
if len(m.GetContinue()) == 0 {
itemsLock.Lock()
meta.SetList(originalList, items)
itemsLock.Unlock()
return originalList, nil
}
// Set up the next loop.
listOptions.Continue = m.GetContinue()
listOptions.ResourceVersion = ""
listOptions.ResourceVersionMatch = ""
}
}()
if err != nil {
return nil, err
}
// Wait for all workers to exist.
// Wait for all workers to exit.
<-workersExited
select {

View File

@ -25,6 +25,7 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
@ -2020,19 +2021,34 @@ func TestStoreDeletionPropagation(t *testing.T) {
}
}
func TestStoreDeleteCollection(t *testing.T) {
podA := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}
podB := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}}
type storageWithCounter struct {
storage.Interface
listCounter int64
}
func (s *storageWithCounter) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
atomic.AddInt64(&s.listCounter, 1)
return s.Interface.GetList(ctx, key, opts, listObj)
}
func TestStoreDeleteCollection(t *testing.T) {
testContext := genericapirequest.WithNamespace(genericapirequest.NewContext(), "test")
destroyFunc, registry := NewTestGenericStoreRegistry(t)
defer destroyFunc()
if _, err := registry.Create(testContext, podA, rest.ValidateAllObjectFunc, &metav1.CreateOptions{}); err != nil {
t.Errorf("Unexpected error: %v", err)
}
if _, err := registry.Create(testContext, podB, rest.ValidateAllObjectFunc, &metav1.CreateOptions{}); err != nil {
t.Errorf("Unexpected error: %v", err)
// Overwrite the underlying storage interface so that it counts GetList calls
// and reduce the default page size to 2.
storeWithCounter := &storageWithCounter{Interface: registry.Storage.Storage}
registry.Storage.Storage = storeWithCounter
deleteCollectionPageSize = 2
numPods := 10
for i := 0; i < numPods; i++ {
pod := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("foo-%d", i)}}
if _, err := registry.Create(testContext, pod, rest.ValidateAllObjectFunc, &metav1.CreateOptions{}); err != nil {
t.Errorf("Unexpected error: %v", err)
}
}
// Delete all pods.
@ -2041,15 +2057,18 @@ func TestStoreDeleteCollection(t *testing.T) {
t.Fatalf("Unexpected error: %v", err)
}
deletedPods := deleted.(*example.PodList)
if len(deletedPods.Items) != 2 {
t.Errorf("Unexpected number of pods deleted: %d, expected: 3", len(deletedPods.Items))
if len(deletedPods.Items) != numPods {
t.Errorf("Unexpected number of pods deleted: %d, expected: %d", len(deletedPods.Items), numPods)
}
expectedCalls := (int64(numPods) + deleteCollectionPageSize - 1) / deleteCollectionPageSize
if listCalls := atomic.LoadInt64(&storeWithCounter.listCounter); listCalls != expectedCalls {
t.Errorf("Unexpected number of list calls: %d, expected: %d", listCalls, expectedCalls)
}
if _, err := registry.Get(testContext, podA.Name, &metav1.GetOptions{}); !errors.IsNotFound(err) {
t.Errorf("Unexpected error: %v", err)
}
if _, err := registry.Get(testContext, podB.Name, &metav1.GetOptions{}); !errors.IsNotFound(err) {
t.Errorf("Unexpected error: %v", err)
for i := 0; i < numPods; i++ {
if _, err := registry.Get(testContext, fmt.Sprintf("foo-%d", i), &metav1.GetOptions{}); !errors.IsNotFound(err) {
t.Errorf("Unexpected error: %v", err)
}
}
}