From e08bd3bc263615f3a26bd17c66a3222e4b6ff7ec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Tyczy=C5=84ski?= Date: Mon, 3 Jul 2023 16:56:53 +0200 Subject: [PATCH] Simplify DeleteCollection --- .../pkg/registry/generic/registry/store.go | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go index 353e5532c1e..028053952a3 100644 --- a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go +++ b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go @@ -1160,7 +1160,6 @@ func (e *Store) DeleteCollection(ctx context.Context, deleteValidation rest.Vali listOptions = listOptions.DeepCopy() } - itemsLock := sync.RWMutex{} var items []runtime.Object // TODO(wojtek-t): Decide if we don't want to start workers more opportunistically. @@ -1169,7 +1168,14 @@ func (e *Store) DeleteCollection(ctx context.Context, deleteValidation rest.Vali workersNumber = 1 } wg := sync.WaitGroup{} - toProcess := make(chan int, 2*workersNumber) + // Ensure that chanSize is not too high (to avoid wasted work) but + // at the same time high enough to start listing before we process + // the whole page. + chanSize := 2 * workersNumber + if chanSize < 256 { + chanSize = 256 + } + toProcess := make(chan runtime.Object, chanSize) errs := make(chan error, workersNumber+1) workersExited := make(chan struct{}) @@ -1182,10 +1188,8 @@ func (e *Store) DeleteCollection(ctx context.Context, deleteValidation rest.Vali }) defer wg.Done() - for index := range toProcess { - itemsLock.RLock() - accessor, err := meta.Accessor(items[index]) - itemsLock.RUnlock() + for item := range toProcess { + accessor, err := meta.Accessor(item) if err != nil { errs <- err return @@ -1238,13 +1242,11 @@ func (e *Store) DeleteCollection(ctx context.Context, deleteValidation rest.Vali if err != nil { return nil, err } - itemsLock.Lock() items = append(items, newItems...) - itemsLock.Unlock() for i := 0; i < len(newItems); i++ { select { - case toProcess <- processedItems + i: + case toProcess <- newItems[i]: case <-workersExited: klog.V(4).InfoS("workers already exited, and there are some items waiting to be processed", "queued/finished", i, "total", processedItems+len(newItems)) // Try to propagate an error from the workers if possible. @@ -1274,9 +1276,7 @@ func (e *Store) DeleteCollection(ctx context.Context, deleteValidation rest.Vali return nil, err } if len(m.GetContinue()) == 0 { - itemsLock.Lock() meta.SetList(originalList, items) - itemsLock.Unlock() return originalList, nil }