Merge pull request #115666 from wojtek-t/refactor_delete_collection

Minor refactor of DeleteCollection in preparation for pagination support there
This commit is contained in:
Kubernetes Prow Robot 2023-02-24 09:43:34 -08:00 committed by GitHub
commit 67f4f9ad5a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -1127,11 +1127,6 @@ func (e *Store) DeleteReturnsDeletedObject() bool {
// DeleteCollection is currently NOT atomic. It can happen that only subset of objects
// will be deleted from storage, and then an error will be returned.
// In case of success, the list of deleted objects will be returned.
//
// TODO: Currently, there is no easy way to remove 'directory' entry from storage (if we
// are removing all objects of a given type) with the current API (it's technically
// possibly with storage API, but watch is not delivered correctly then).
// It will be possible to fix it with v3 etcd API.
func (e *Store) DeleteCollection(ctx context.Context, deleteValidation rest.ValidateObjectFunc, options *metav1.DeleteOptions, listOptions *metainternalversion.ListOptions) (runtime.Object, error) {
if listOptions == nil {
listOptions = &metainternalversion.ListOptions{}
@ -1167,23 +1162,6 @@ func (e *Store) DeleteCollection(ctx context.Context, deleteValidation rest.Vali
toProcess := make(chan int, 2*workersNumber)
errs := make(chan error, workersNumber+1)
workersExited := make(chan struct{})
distributorExited := make(chan struct{})
go func() {
defer utilruntime.HandleCrash(func(panicReason interface{}) {
errs <- fmt.Errorf("DeleteCollection distributor panicked: %v", panicReason)
})
defer close(distributorExited)
for i := 0; i < len(items); i++ {
select {
case toProcess <- i:
case <-workersExited:
klog.V(4).InfoS("workers already exited, and there are some items waiting to be processed", "finished", i, "total", len(items))
return
}
}
close(toProcess)
}()
wg.Add(workersNumber)
for i := 0; i < workersNumber; i++ {
@ -1212,10 +1190,31 @@ func (e *Store) DeleteCollection(ctx context.Context, deleteValidation rest.Vali
}
}()
}
wg.Wait()
// notify distributor to exit
close(workersExited)
<-distributorExited
// In case of all workers exit, notify distributor.
go func() {
defer utilruntime.HandleCrash(func(panicReason interface{}) {
errs <- fmt.Errorf("DeleteCollection workers closer panicked: %v", panicReason)
})
wg.Wait()
close(workersExited)
}()
func() {
defer close(toProcess)
for i := 0; i < len(items); i++ {
select {
case toProcess <- i:
case <-workersExited:
klog.V(4).InfoS("workers already exited, and there are some items waiting to be processed", "finished", i, "total", len(items))
return
}
}
}()
// Wait for all workers to exist.
<-workersExited
select {
case err := <-errs:
return nil, err