diff --git a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go index faee83855a0..78cf9231a70 100644 --- a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go +++ b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go @@ -1126,14 +1126,22 @@ func (e *Store) DeleteCollection(ctx context.Context, deleteValidation rest.Vali } wg := sync.WaitGroup{} toProcess := make(chan int, 2*workersNumber) - errs := make(chan error, workersNumber+1) + errs := make(chan error, workersNumber) + workersExited := make(chan struct{}) + distributorExited := make(chan struct{}) go func() { defer utilruntime.HandleCrash(func(panicReason interface{}) { errs <- fmt.Errorf("DeleteCollection distributor panicked: %v", panicReason) }) + defer close(distributorExited) for i := 0; i < len(items); i++ { - toProcess <- i + select { + case toProcess <- i: + case <-workersExited: + klog.V(4).InfoS("workers already exited, and there are some items waiting to be processed", "finished", i, "total", len(items)) + return + } } close(toProcess) }() @@ -1166,6 +1174,9 @@ func (e *Store) DeleteCollection(ctx context.Context, deleteValidation rest.Vali }() } wg.Wait() + // notify distributor to exit + close(workersExited) + <-distributorExited select { case err := <-errs: return nil, err diff --git a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store_test.go b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store_test.go index f3ec927f4a1..4645dcfdc28 100644 --- a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store_test.go +++ b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store_test.go @@ -2124,6 +2124,37 @@ func TestStoreDeleteCollectionNotFound(t *testing.T) { } } +func TestStoreDeleteCollectionWorkDistributorExited(t *testing.T) { + destroyFunc, registry := NewTestGenericStoreRegistry(t) + defer destroyFunc() + + testContext := genericapirequest.WithNamespace(genericapirequest.NewContext(), "test") + + for i := 0; i < 100; i++ { + if _, err := registry.Create( + testContext, + &example.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("foo-%d", i), + }, + }, + rest.ValidateAllObjectFunc, + &metav1.CreateOptions{}, + ); err != nil { + t.Errorf("Unexpected error: %v", err) + } + } + + expectErr := fmt.Errorf("validate object failed") + + _, err := registry.DeleteCollection(testContext, func(ctx context.Context, obj runtime.Object) error { + return expectErr + }, nil, &metainternalversion.ListOptions{}) + if err != expectErr { + t.Errorf("Unexpected error: %v", err) + } +} + // Test whether objects deleted with DeleteCollection are correctly delivered // to watchers. func TestStoreDeleteCollectionWithWatch(t *testing.T) {