Simplify DeleteCollection

This commit is contained in:
Wojciech Tyczyński 2023-07-03 16:56:53 +02:00
parent 732775ee7d
commit e08bd3bc26

View File

@ -1160,7 +1160,6 @@ func (e *Store) DeleteCollection(ctx context.Context, deleteValidation rest.Vali
listOptions = listOptions.DeepCopy() listOptions = listOptions.DeepCopy()
} }
itemsLock := sync.RWMutex{}
var items []runtime.Object var items []runtime.Object
// TODO(wojtek-t): Decide if we don't want to start workers more opportunistically. // TODO(wojtek-t): Decide if we don't want to start workers more opportunistically.
@ -1169,7 +1168,14 @@ func (e *Store) DeleteCollection(ctx context.Context, deleteValidation rest.Vali
workersNumber = 1 workersNumber = 1
} }
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
toProcess := make(chan int, 2*workersNumber) // Ensure that chanSize is not too high (to avoid wasted work) but
// at the same time high enough to start listing before we process
// the whole page.
chanSize := 2 * workersNumber
if chanSize < 256 {
chanSize = 256
}
toProcess := make(chan runtime.Object, chanSize)
errs := make(chan error, workersNumber+1) errs := make(chan error, workersNumber+1)
workersExited := make(chan struct{}) workersExited := make(chan struct{})
@ -1182,10 +1188,8 @@ func (e *Store) DeleteCollection(ctx context.Context, deleteValidation rest.Vali
}) })
defer wg.Done() defer wg.Done()
for index := range toProcess { for item := range toProcess {
itemsLock.RLock() accessor, err := meta.Accessor(item)
accessor, err := meta.Accessor(items[index])
itemsLock.RUnlock()
if err != nil { if err != nil {
errs <- err errs <- err
return return
@ -1238,13 +1242,11 @@ func (e *Store) DeleteCollection(ctx context.Context, deleteValidation rest.Vali
if err != nil { if err != nil {
return nil, err return nil, err
} }
itemsLock.Lock()
items = append(items, newItems...) items = append(items, newItems...)
itemsLock.Unlock()
for i := 0; i < len(newItems); i++ { for i := 0; i < len(newItems); i++ {
select { select {
case toProcess <- processedItems + i: case toProcess <- newItems[i]:
case <-workersExited: case <-workersExited:
klog.V(4).InfoS("workers already exited, and there are some items waiting to be processed", "queued/finished", i, "total", processedItems+len(newItems)) klog.V(4).InfoS("workers already exited, and there are some items waiting to be processed", "queued/finished", i, "total", processedItems+len(newItems))
// Try to propagate an error from the workers if possible. // Try to propagate an error from the workers if possible.
@ -1274,9 +1276,7 @@ func (e *Store) DeleteCollection(ctx context.Context, deleteValidation rest.Vali
return nil, err return nil, err
} }
if len(m.GetContinue()) == 0 { if len(m.GetContinue()) == 0 {
itemsLock.Lock()
meta.SetList(originalList, items) meta.SetList(originalList, items)
itemsLock.Unlock()
return originalList, nil return originalList, nil
} }