Merge pull request #105606 from sxllwx/fix/goroutine-leak

fix goroutine leak in the DeleteCollection
This commit is contained in:
Kubernetes Prow Robot 2021-10-21 06:48:09 -07:00 committed by GitHub
commit 2dede1d4d4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 44 additions and 2 deletions

View File

@ -1126,14 +1126,22 @@ func (e *Store) DeleteCollection(ctx context.Context, deleteValidation rest.Vali
} }
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
toProcess := make(chan int, 2*workersNumber) toProcess := make(chan int, 2*workersNumber)
errs := make(chan error, workersNumber+1) errs := make(chan error, workersNumber)
workersExited := make(chan struct{})
distributorExited := make(chan struct{})
go func() { go func() {
defer utilruntime.HandleCrash(func(panicReason interface{}) { defer utilruntime.HandleCrash(func(panicReason interface{}) {
errs <- fmt.Errorf("DeleteCollection distributor panicked: %v", panicReason) errs <- fmt.Errorf("DeleteCollection distributor panicked: %v", panicReason)
}) })
defer close(distributorExited)
for i := 0; i < len(items); i++ { for i := 0; i < len(items); i++ {
toProcess <- i select {
case toProcess <- i:
case <-workersExited:
klog.V(4).InfoS("workers already exited, and there are some items waiting to be processed", "finished", i, "total", len(items))
return
}
} }
close(toProcess) close(toProcess)
}() }()
@ -1166,6 +1174,9 @@ func (e *Store) DeleteCollection(ctx context.Context, deleteValidation rest.Vali
}() }()
} }
wg.Wait() wg.Wait()
// notify distributor to exit
close(workersExited)
<-distributorExited
select { select {
case err := <-errs: case err := <-errs:
return nil, err return nil, err

View File

@ -2124,6 +2124,37 @@ func TestStoreDeleteCollectionNotFound(t *testing.T) {
} }
} }
func TestStoreDeleteCollectionWorkDistributorExited(t *testing.T) {
destroyFunc, registry := NewTestGenericStoreRegistry(t)
defer destroyFunc()
testContext := genericapirequest.WithNamespace(genericapirequest.NewContext(), "test")
for i := 0; i < 100; i++ {
if _, err := registry.Create(
testContext,
&example.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("foo-%d", i),
},
},
rest.ValidateAllObjectFunc,
&metav1.CreateOptions{},
); err != nil {
t.Errorf("Unexpected error: %v", err)
}
}
expectErr := fmt.Errorf("validate object failed")
_, err := registry.DeleteCollection(testContext, func(ctx context.Context, obj runtime.Object) error {
return expectErr
}, nil, &metainternalversion.ListOptions{})
if err != expectErr {
t.Errorf("Unexpected error: %v", err)
}
}
// Test whether objects deleted with DeleteCollection are correctly delivered // Test whether objects deleted with DeleteCollection are correctly delivered
// to watchers. // to watchers.
func TestStoreDeleteCollectionWithWatch(t *testing.T) { func TestStoreDeleteCollectionWithWatch(t *testing.T) {