Merge pull request #110362 from wojtek-t/fix_leaking_goroutines_5

Fix leaking goroutines in multiple integration tests
This commit is contained in:
Kubernetes Prow Robot
2022-06-07 08:44:55 -07:00
committed by GitHub
9 changed files with 90 additions and 70 deletions

View File

@@ -31,10 +31,8 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
certificatesinformers "k8s.io/client-go/informers/certificates/v1"
clientset "k8s.io/client-go/kubernetes"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
certificateslisters "k8s.io/client-go/listers/certificates/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/controller"
@@ -60,11 +58,6 @@ func NewCertificateController(
csrInformer certificatesinformers.CertificateSigningRequestInformer,
handler func(context.Context, *certificates.CertificateSigningRequest) error,
) *CertificateController {
// Send events to the apiserver
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartStructuredLogging(0)
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
cc := &CertificateController{
name: name,
kubeClient: kubeClient,

View File

@@ -358,7 +358,18 @@ func verifyGroupKind(controllerRef *metav1.OwnerReference, expectedKind string,
func (dc *DisruptionController) Run(ctx context.Context) {
defer utilruntime.HandleCrash()
// Start events processing pipeline.
if dc.kubeClient != nil {
klog.Infof("Sending events to api server.")
dc.broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: dc.kubeClient.CoreV1().Events("")})
} else {
klog.Infof("No api server defined - no events will be sent to API server.")
}
defer dc.broadcaster.Shutdown()
defer dc.queue.ShutDown()
defer dc.recheckQueue.ShutDown()
klog.Infof("Starting disruption controller")
defer klog.Infof("Shutting down disruption controller")
@@ -367,12 +378,6 @@ func (dc *DisruptionController) Run(ctx context.Context) {
return
}
if dc.kubeClient != nil {
klog.Infof("Sending events to api server.")
dc.broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: dc.kubeClient.CoreV1().Events("")})
} else {
klog.Infof("No api server defined - no events will be sent to API server.")
}
go wait.UntilWithContext(ctx, dc.worker, time.Second)
go wait.Until(dc.recheckWorker, time.Second, ctx.Done())

View File

@@ -300,7 +300,8 @@ type Controller struct {
getPodsAssignedToNode func(nodeName string) ([]*v1.Pod, error)
recorder record.EventRecorder
broadcaster record.EventBroadcaster
recorder record.EventRecorder
// Value controlling Controller monitoring period, i.e. how often does Controller
// check node health signal posted from kubelet. This value should be lower than
@@ -372,13 +373,6 @@ func NewNodeLifecycleController(
eventBroadcaster := record.NewBroadcaster()
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "node-controller"})
eventBroadcaster.StartStructuredLogging(0)
klog.Infof("Sending events to api server.")
eventBroadcaster.StartRecordingToSink(
&v1core.EventSinkImpl{
Interface: v1core.New(kubeClient.CoreV1().RESTClient()).Events(""),
})
if kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil {
ratelimiter.RegisterMetricAndTrackRateLimiterUsage("node_lifecycle_controller", kubeClient.CoreV1().RESTClient().GetRateLimiter())
@@ -390,6 +384,7 @@ func NewNodeLifecycleController(
knownNodeSet: make(map[string]*v1.Node),
nodeHealthMap: newNodeHealthMap(),
nodeEvictionMap: newNodeEvictionMap(),
broadcaster: eventBroadcaster,
recorder: recorder,
nodeMonitorPeriod: nodeMonitorPeriod,
nodeStartupGracePeriod: nodeStartupGracePeriod,
@@ -536,6 +531,19 @@ func NewNodeLifecycleController(
func (nc *Controller) Run(ctx context.Context) {
defer utilruntime.HandleCrash()
// Start events processing pipeline.
nc.broadcaster.StartStructuredLogging(0)
klog.Infof("Sending events to api server.")
nc.broadcaster.StartRecordingToSink(
&v1core.EventSinkImpl{
Interface: v1core.New(nc.kubeClient.CoreV1().RESTClient()).Events(""),
})
defer nc.broadcaster.Shutdown()
// Close node update queue to cleanup go routine.
defer nc.nodeUpdateQueue.ShutDown()
defer nc.podUpdateQueue.ShutDown()
klog.Infof("Starting node controller")
defer klog.Infof("Shutting down node controller")
@@ -547,10 +555,6 @@ func (nc *Controller) Run(ctx context.Context) {
go nc.taintManager.Run(ctx)
}
// Close node update queue to cleanup go routine.
defer nc.nodeUpdateQueue.ShutDown()
defer nc.podUpdateQueue.ShutDown()
// Start workers to reconcile labels and/or update NoSchedule taint for nodes.
for i := 0; i < scheduler.UpdateWorkerSize; i++ {
// Thanks to "workqueue", each worker just need to get item from queue, because

View File

@@ -82,6 +82,7 @@ type GetPodsByNodeNameFunc func(nodeName string) ([]*v1.Pod, error)
// from Nodes tainted with NoExecute Taints.
type NoExecuteTaintManager struct {
client clientset.Interface
broadcaster record.EventBroadcaster
recorder record.EventRecorder
getPod GetPodFunc
getNode GetNodeFunc
@@ -158,16 +159,10 @@ func getMinTolerationTime(tolerations []v1.Toleration) time.Duration {
func NewNoExecuteTaintManager(ctx context.Context, c clientset.Interface, getPod GetPodFunc, getNode GetNodeFunc, getPodsAssignedToNode GetPodsByNodeNameFunc) *NoExecuteTaintManager {
eventBroadcaster := record.NewBroadcaster()
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "taint-controller"})
eventBroadcaster.StartStructuredLogging(0)
if c != nil {
klog.InfoS("Sending events to api server")
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: c.CoreV1().Events("")})
} else {
klog.Fatalf("kubeClient is nil when starting NodeController")
}
tm := &NoExecuteTaintManager{
client: c,
broadcaster: eventBroadcaster,
recorder: recorder,
getPod: getPod,
getNode: getNode,
@@ -184,8 +179,23 @@ func NewNoExecuteTaintManager(ctx context.Context, c clientset.Interface, getPod
// Run starts NoExecuteTaintManager which will run in loop until `stopCh` is closed.
func (tc *NoExecuteTaintManager) Run(ctx context.Context) {
defer utilruntime.HandleCrash()
klog.InfoS("Starting NoExecuteTaintManager")
// Start events processing pipeline.
tc.broadcaster.StartStructuredLogging(0)
if tc.client != nil {
klog.InfoS("Sending events to api server")
tc.broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: tc.client.CoreV1().Events("")})
} else {
klog.Fatalf("kubeClient is nil when starting NodeController")
}
defer tc.broadcaster.Shutdown()
defer tc.nodeUpdateQueue.ShutDown()
defer tc.podUpdateQueue.ShutDown()
for i := 0; i < UpdateWorkerSize; i++ {
tc.nodeUpdateChannels = append(tc.nodeUpdateChannels, make(chan nodeUpdateItem, NodeUpdateChannelSize))
tc.podUpdateChannels = append(tc.podUpdateChannels, make(chan podUpdateItem, podUpdateChannelSize))

View File

@@ -58,7 +58,6 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/endpoints/discovery"
apiserverfeatures "k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/registry/generic"
@@ -477,7 +476,7 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget)
time.Duration(c.ExtraConfig.IdentityLeaseRenewIntervalSeconds)*time.Second,
metav1.NamespaceSystem,
labelAPIServerHeartbeat)
go controller.Run(wait.NeverStop)
go controller.Run(hookContext.StopCh)
return nil
})
m.GenericAPIServer.AddPostStartHookOrDie("start-kube-apiserver-identity-lease-garbage-collector", func(hookContext genericapiserver.PostStartHookContext) error {
@@ -490,7 +489,7 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget)
time.Duration(c.ExtraConfig.IdentityLeaseDurationSeconds)*time.Second,
metav1.NamespaceSystem,
KubeAPIServerIdentityLeaseLabelSelector,
).Run(wait.NeverStop)
).Run(hookContext.StopCh)
return nil
})
}

View File

@@ -51,12 +51,12 @@ import (
func TestCSRDuration(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute)
t.Cleanup(cancel)
s := kubeapiservertesting.StartTestServerOrDie(t, nil, nil, framework.SharedEtcd())
t.Cleanup(s.TearDownFn)
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute)
t.Cleanup(cancel)
// assert that the metrics we collect during the test run match expectations
// we have 7 valid test cases below that request a duration of which 6 should have their duration honored
wantMetricStrings := []string{

View File

@@ -101,7 +101,9 @@ func setup(t *testing.T) (*kubeapiservertesting.TestServer, *disruption.Disrupti
func TestPDBWithScaleSubresource(t *testing.T) {
s, pdbc, informers, clientSet, apiExtensionClient, dynamicClient := setup(t)
defer s.TearDownFn()
ctx := context.TODO()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
nsName := "pdb-scale-subresource"
createNs(ctx, t, nsName, clientSet)
@@ -187,16 +189,14 @@ func TestPDBWithScaleSubresource(t *testing.T) {
}
func TestEmptySelector(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
testcases := []struct {
name string
createPDBFunc func(clientSet clientset.Interface, name, nsName string, minAvailable intstr.IntOrString) error
createPDBFunc func(ctx context.Context, clientSet clientset.Interface, name, nsName string, minAvailable intstr.IntOrString) error
expectedCurrentHealthy int32
}{
{
name: "v1beta1 should not target any pods",
createPDBFunc: func(clientSet clientset.Interface, name, nsName string, minAvailable intstr.IntOrString) error {
createPDBFunc: func(ctx context.Context, clientSet clientset.Interface, name, nsName string, minAvailable intstr.IntOrString) error {
pdb := &v1beta1.PodDisruptionBudget{
ObjectMeta: metav1.ObjectMeta{
Name: name,
@@ -213,7 +213,7 @@ func TestEmptySelector(t *testing.T) {
},
{
name: "v1 should target all pods",
createPDBFunc: func(clientSet clientset.Interface, name, nsName string, minAvailable intstr.IntOrString) error {
createPDBFunc: func(ctx context.Context, clientSet clientset.Interface, name, nsName string, minAvailable intstr.IntOrString) error {
pdb := &policyv1.PodDisruptionBudget{
ObjectMeta: metav1.ObjectMeta{
Name: name,
@@ -235,6 +235,9 @@ func TestEmptySelector(t *testing.T) {
s, pdbc, informers, clientSet, _, _ := setup(t)
defer s.TearDownFn()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
nsName := fmt.Sprintf("pdb-empty-selector-%d", i)
createNs(ctx, t, nsName, clientSet)
@@ -252,7 +255,7 @@ func TestEmptySelector(t *testing.T) {
waitToObservePods(t, informers.Core().V1().Pods().Informer(), 4, v1.PodRunning)
pdbName := "test-pdb"
if err := tc.createPDBFunc(clientSet, pdbName, nsName, minAvailable); err != nil {
if err := tc.createPDBFunc(ctx, clientSet, pdbName, nsName, minAvailable); err != nil {
t.Errorf("Error creating PodDisruptionBudget: %v", err)
}
@@ -271,16 +274,14 @@ func TestEmptySelector(t *testing.T) {
}
func TestSelectorsForPodsWithoutLabels(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
testcases := []struct {
name string
createPDBFunc func(clientSet clientset.Interface, name, nsName string, minAvailable intstr.IntOrString) error
createPDBFunc func(ctx context.Context, clientSet clientset.Interface, name, nsName string, minAvailable intstr.IntOrString) error
expectedCurrentHealthy int32
}{
{
name: "pods with no labels can be targeted by v1 PDBs with empty selector",
createPDBFunc: func(clientSet clientset.Interface, name, nsName string, minAvailable intstr.IntOrString) error {
createPDBFunc: func(ctx context.Context, clientSet clientset.Interface, name, nsName string, minAvailable intstr.IntOrString) error {
pdb := &policyv1.PodDisruptionBudget{
ObjectMeta: metav1.ObjectMeta{
Name: name,
@@ -297,7 +298,7 @@ func TestSelectorsForPodsWithoutLabels(t *testing.T) {
},
{
name: "pods with no labels can be targeted by v1 PDBs with DoesNotExist selector",
createPDBFunc: func(clientSet clientset.Interface, name, nsName string, minAvailable intstr.IntOrString) error {
createPDBFunc: func(ctx context.Context, clientSet clientset.Interface, name, nsName string, minAvailable intstr.IntOrString) error {
pdb := &policyv1.PodDisruptionBudget{
ObjectMeta: metav1.ObjectMeta{
Name: name,
@@ -321,7 +322,7 @@ func TestSelectorsForPodsWithoutLabels(t *testing.T) {
},
{
name: "pods with no labels can be targeted by v1beta1 PDBs with DoesNotExist selector",
createPDBFunc: func(clientSet clientset.Interface, name, nsName string, minAvailable intstr.IntOrString) error {
createPDBFunc: func(ctx context.Context, clientSet clientset.Interface, name, nsName string, minAvailable intstr.IntOrString) error {
pdb := &v1beta1.PodDisruptionBudget{
ObjectMeta: metav1.ObjectMeta{
Name: name,
@@ -350,6 +351,9 @@ func TestSelectorsForPodsWithoutLabels(t *testing.T) {
s, pdbc, informers, clientSet, _, _ := setup(t)
defer s.TearDownFn()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
nsName := fmt.Sprintf("pdb-selectors-%d", i)
createNs(ctx, t, nsName, clientSet)
@@ -360,7 +364,7 @@ func TestSelectorsForPodsWithoutLabels(t *testing.T) {
// Create the PDB first and wait for it to settle.
pdbName := "test-pdb"
if err := tc.createPDBFunc(clientSet, pdbName, nsName, minAvailable); err != nil {
if err := tc.createPDBFunc(ctx, clientSet, pdbName, nsName, minAvailable); err != nil {
t.Errorf("Error creating PodDisruptionBudget: %v", err)
}
waitPDBStable(ctx, t, clientSet, 0, nsName, pdbName)
@@ -498,9 +502,15 @@ func waitToObservePods(t *testing.T, podInformer cache.SharedIndexInformer, podN
}
func TestPatchCompatibility(t *testing.T) {
s, _, _, clientSet, _, _ := setup(t)
s, pdbc, _, clientSet, _, _ := setup(t)
defer s.TearDownFn()
// Even though pdbc isn't used in this test, its creation is already
// spawning some goroutines. So we need to run it to ensure they won't leak.
ctx, cancel := context.WithCancel(context.Background())
cancel()
pdbc.Run(ctx)
testcases := []struct {
name string
version string
@@ -634,5 +644,4 @@ func TestPatchCompatibility(t *testing.T) {
}
})
}
}

View File

@@ -58,13 +58,17 @@ func TestEventCompatibility(t *testing.T) {
if err != nil {
t.Fatal(err)
}
stopCh := make(chan struct{})
defer close(stopCh)
oldBroadcaster := record.NewBroadcaster()
defer oldBroadcaster.Shutdown()
oldRecorder := oldBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "integration"})
oldBroadcaster.StartRecordingToSink(&typedv1.EventSinkImpl{Interface: client.CoreV1().Events("")})
oldRecorder.Eventf(regarding, v1.EventTypeNormal, "started", "note")
newBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1()})
defer newBroadcaster.Shutdown()
newRecorder := newBroadcaster.NewRecorder(scheme.Scheme, "k8s.io/kube-scheduler")
newBroadcaster.StartRecordingToSink(stopCh)
newRecorder.Eventf(regarding, related, v1.EventTypeNormal, "memoryPressure", "killed", "memory pressure")

View File

@@ -26,9 +26,8 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
netutils "k8s.io/utils/net"
"k8s.io/kubernetes/cmd/kube-apiserver/app/options"
"k8s.io/kubernetes/pkg/controlplane"
"k8s.io/kubernetes/test/integration/framework"
)
@@ -38,26 +37,23 @@ import (
// mistakenly, repair the ClusterIP assigned to the Service that is being deleted.
// https://issues.k8s.io/87603
func TestServicesFinalizersRepairLoop(t *testing.T) {
serviceCIDR := "10.0.0.0/16"
clusterIP := "10.0.0.20"
interval := 5 * time.Second
cfg := framework.NewIntegrationTestControlPlaneConfig()
_, cidr, err := netutils.ParseCIDRSloppy(serviceCIDR)
if err != nil {
t.Fatalf("bad cidr: %v", err)
}
cfg.ExtraConfig.ServiceIPRange = *cidr
cfg.ExtraConfig.RepairServicesInterval = interval
_, s, closeFn := framework.RunAnAPIServer(cfg)
defer closeFn()
client := clientset.NewForConfigOrDie(&restclient.Config{Host: s.URL})
client, _, tearDownFn := framework.StartTestServer(t, framework.TestServerSetup{
ModifyServerRunOptions: func(opts *options.ServerRunOptions) {
opts.ServiceClusterIPRanges = serviceCIDR
},
ModifyServerConfig: func(cfg *controlplane.Config) {
cfg.ExtraConfig.RepairServicesInterval = interval
},
})
defer tearDownFn()
// verify client is working
if err := wait.PollImmediate(5*time.Second, 2*time.Minute, func() (bool, error) {
_, err = client.CoreV1().Endpoints(metav1.NamespaceDefault).Get(context.TODO(), "kubernetes", metav1.GetOptions{})
_, err := client.CoreV1().Endpoints(metav1.NamespaceDefault).Get(context.TODO(), "kubernetes", metav1.GetOptions{})
if err != nil {
t.Logf("error fetching endpoints: %v", err)
return false, nil