From c7a1fa432a87579895eac4b3873162d5f1dba7f5 Mon Sep 17 00:00:00 2001 From: "Dr. Stefan Schimanski" Date: Sat, 27 Jul 2024 16:13:16 +0200 Subject: [PATCH] Call non-blocking informerFactory.Start synchronously to avoid races Signed-off-by: Dr. Stefan Schimanski --- .../garbagecollector/garbagecollector_test.go | 2 +- .../controller_test.go | 2 +- pkg/controller/volume/ephemeral/controller_test.go | 2 +- .../leaderelection_controller_test.go | 14 +++++++------- .../leasecandidategc_controller_test.go | 2 +- pkg/proxy/config/api_test.go | 4 ++-- pkg/proxy/config/config_test.go | 10 +++++----- .../tools/leaderelection/leasecandidate.go | 2 +- .../cmd/informer-gen/generators/factory.go | 1 + 9 files changed, 20 insertions(+), 19 deletions(-) diff --git a/pkg/controller/garbagecollector/garbagecollector_test.go b/pkg/controller/garbagecollector/garbagecollector_test.go index 6d2fdf7f934..2439fb732d4 100644 --- a/pkg/controller/garbagecollector/garbagecollector_test.go +++ b/pkg/controller/garbagecollector/garbagecollector_test.go @@ -223,7 +223,7 @@ func setupGC(t *testing.T, config *restclient.Config) garbageCollector { t.Fatal(err) } stop := make(chan struct{}) - go sharedInformers.Start(stop) + sharedInformers.Start(stop) return garbageCollector{gc, stop} } diff --git a/pkg/controller/validatingadmissionpolicystatus/controller_test.go b/pkg/controller/validatingadmissionpolicystatus/controller_test.go index 17056cbb47b..e7ed6681c73 100644 --- a/pkg/controller/validatingadmissionpolicystatus/controller_test.go +++ b/pkg/controller/validatingadmissionpolicystatus/controller_test.go @@ -113,7 +113,7 @@ func TestTypeChecking(t *testing.T) { if err != nil { t.Fatalf("cannot create controller: %v", err) } - go informerFactory.Start(ctx.Done()) + informerFactory.Start(ctx.Done()) go controller.Run(ctx, 1) err = wait.PollUntilContextCancel(ctx, time.Second, false, func(ctx context.Context) (done bool, err error) { name := policy.Name diff --git a/pkg/controller/volume/ephemeral/controller_test.go b/pkg/controller/volume/ephemeral/controller_test.go index 034bc25ed59..8d9cffbadb4 100644 --- a/pkg/controller/volume/ephemeral/controller_test.go +++ b/pkg/controller/volume/ephemeral/controller_test.go @@ -152,7 +152,7 @@ func TestSyncHandler(t *testing.T) { ec, _ := c.(*ephemeralController) // Ensure informers are up-to-date. - go informerFactory.Start(ctx.Done()) + informerFactory.Start(ctx.Done()) informerFactory.WaitForCacheSync(ctx.Done()) cache.WaitForCacheSync(ctx.Done(), podInformer.Informer().HasSynced, pvcInformer.Informer().HasSynced) diff --git a/pkg/controlplane/controller/leaderelection/leaderelection_controller_test.go b/pkg/controlplane/controller/leaderelection/leaderelection_controller_test.go index 3458aa96df2..412452c9ce4 100644 --- a/pkg/controlplane/controller/leaderelection/leaderelection_controller_test.go +++ b/pkg/controlplane/controller/leaderelection/leaderelection_controller_test.go @@ -32,8 +32,6 @@ import ( "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" "k8s.io/utils/ptr" - - "k8s.io/client-go/tools/cache" ) func TestReconcileElectionStep(t *testing.T) { @@ -339,7 +337,7 @@ func TestReconcileElectionStep(t *testing.T) { ctx := context.Background() client := fake.NewSimpleClientset() informerFactory := informers.NewSharedInformerFactory(client, 0) - _ = informerFactory.Coordination().V1alpha1().LeaseCandidates().Lister() + controller, err := NewController( informerFactory.Coordination().V1().Leases(), informerFactory.Coordination().V1alpha1().LeaseCandidates(), @@ -349,8 +347,7 @@ func TestReconcileElectionStep(t *testing.T) { if err != nil { t.Fatal(err) } - go informerFactory.Start(ctx.Done()) - informerFactory.WaitForCacheSync(ctx.Done()) + // Set up the fake client with the existing lease if tc.existingLease != nil { _, err = client.CoordinationV1().Leases(tc.existingLease.Namespace).Create(ctx, tc.existingLease, metav1.CreateOptions{}) @@ -366,7 +363,10 @@ func TestReconcileElectionStep(t *testing.T) { t.Fatal(err) } } - cache.WaitForCacheSync(ctx.Done(), controller.leaseCandidateInformer.Informer().HasSynced) + + informerFactory.Start(ctx.Done()) + informerFactory.WaitForCacheSync(ctx.Done()) + requeue, err := controller.reconcileElectionStep(ctx, tc.leaseNN) if (requeue != 0) != tc.expectedRequeue { @@ -639,7 +639,7 @@ func TestController(t *testing.T) { t.Fatal(err) } - go informerFactory.Start(ctx.Done()) + informerFactory.Start(ctx.Done()) go controller.Run(ctx, 1) go func() { diff --git a/pkg/controlplane/controller/leaderelection/leasecandidategc_controller_test.go b/pkg/controlplane/controller/leaderelection/leasecandidategc_controller_test.go index b081f715e94..b845d62b8dd 100644 --- a/pkg/controlplane/controller/leaderelection/leasecandidategc_controller_test.go +++ b/pkg/controlplane/controller/leaderelection/leasecandidategc_controller_test.go @@ -115,7 +115,7 @@ func TestLeaseCandidateGCController(t *testing.T) { leaseCandidateInformer := informerFactory.Coordination().V1alpha1().LeaseCandidates() controller := NewLeaseCandidateGC(client, 10*time.Millisecond, leaseCandidateInformer) - go informerFactory.Start(ctx.Done()) + informerFactory.Start(ctx.Done()) informerFactory.WaitForCacheSync(ctx.Done()) // Create lease candidates diff --git a/pkg/proxy/config/api_test.go b/pkg/proxy/config/api_test.go index 1bd9619b808..3c9f09307af 100644 --- a/pkg/proxy/config/api_test.go +++ b/pkg/proxy/config/api_test.go @@ -60,7 +60,7 @@ func TestNewServicesSourceApi_UpdatesAndMultipleServices(t *testing.T) { serviceConfig := NewServiceConfig(ctx, sharedInformers.Core().V1().Services(), time.Minute) serviceConfig.RegisterEventHandler(handler) - go sharedInformers.Start(stopCh) + sharedInformers.Start(stopCh) go serviceConfig.Run(stopCh) // Add the first service @@ -141,7 +141,7 @@ func TestNewEndpointsSourceApi_UpdatesAndMultipleEndpoints(t *testing.T) { endpointsliceConfig := NewEndpointSliceConfig(ctx, sharedInformers.Discovery().V1().EndpointSlices(), time.Minute) endpointsliceConfig.RegisterEventHandler(handler) - go sharedInformers.Start(stopCh) + sharedInformers.Start(stopCh) go endpointsliceConfig.Run(stopCh) // Add the first endpoints diff --git a/pkg/proxy/config/config_test.go b/pkg/proxy/config/config_test.go index 58e9cfa12c3..88802c3667a 100644 --- a/pkg/proxy/config/config_test.go +++ b/pkg/proxy/config/config_test.go @@ -240,7 +240,7 @@ func TestNewServiceAddedAndNotified(t *testing.T) { config := NewServiceConfig(ctx, sharedInformers.Core().V1().Services(), time.Minute) handler := NewServiceHandlerMock() config.RegisterEventHandler(handler) - go sharedInformers.Start(stopCh) + sharedInformers.Start(stopCh) go config.Run(stopCh) service := &v1.Service{ @@ -265,7 +265,7 @@ func TestServiceAddedRemovedSetAndNotified(t *testing.T) { config := NewServiceConfig(ctx, sharedInformers.Core().V1().Services(), time.Minute) handler := NewServiceHandlerMock() config.RegisterEventHandler(handler) - go sharedInformers.Start(stopCh) + sharedInformers.Start(stopCh) go config.Run(stopCh) service1 := &v1.Service{ @@ -304,7 +304,7 @@ func TestNewServicesMultipleHandlersAddedAndNotified(t *testing.T) { handler2 := NewServiceHandlerMock() config.RegisterEventHandler(handler) config.RegisterEventHandler(handler2) - go sharedInformers.Start(stopCh) + sharedInformers.Start(stopCh) go config.Run(stopCh) service1 := &v1.Service{ @@ -339,7 +339,7 @@ func TestNewEndpointsMultipleHandlersAddedAndNotified(t *testing.T) { handler2 := NewEndpointSliceHandlerMock() config.RegisterEventHandler(handler) config.RegisterEventHandler(handler2) - go sharedInformers.Start(stopCh) + sharedInformers.Start(stopCh) go config.Run(stopCh) endpoints1 := &discoveryv1.EndpointSlice{ @@ -386,7 +386,7 @@ func TestNewEndpointsMultipleHandlersAddRemoveSetAndNotified(t *testing.T) { handler2 := NewEndpointSliceHandlerMock() config.RegisterEventHandler(handler) config.RegisterEventHandler(handler2) - go sharedInformers.Start(stopCh) + sharedInformers.Start(stopCh) go config.Run(stopCh) endpoints1 := &discoveryv1.EndpointSlice{ diff --git a/staging/src/k8s.io/client-go/tools/leaderelection/leasecandidate.go b/staging/src/k8s.io/client-go/tools/leaderelection/leasecandidate.go index c75fc3bdf76..74cf5bb5c25 100644 --- a/staging/src/k8s.io/client-go/tools/leaderelection/leasecandidate.go +++ b/staging/src/k8s.io/client-go/tools/leaderelection/leasecandidate.go @@ -120,7 +120,7 @@ func NewCandidate(clientset kubernetes.Interface, func (c *LeaseCandidate) Run(ctx context.Context) { defer c.queue.ShutDown() - go c.informerFactory.Start(ctx.Done()) + c.informerFactory.Start(ctx.Done()) if !cache.WaitForNamedCacheSync("leasecandidateclient", ctx.Done(), c.hasSynced) { return } diff --git a/staging/src/k8s.io/code-generator/cmd/informer-gen/generators/factory.go b/staging/src/k8s.io/code-generator/cmd/informer-gen/generators/factory.go index ee69aac35d9..b59dc99e32b 100644 --- a/staging/src/k8s.io/code-generator/cmd/informer-gen/generators/factory.go +++ b/staging/src/k8s.io/code-generator/cmd/informer-gen/generators/factory.go @@ -299,6 +299,7 @@ type SharedInformerFactory interface { // Start initializes all requested informers. They are handled in goroutines // which run until the stop channel gets closed. + // Warning: Start does not block. When run in a go-routine, it will race with a later WaitForCacheSync. Start(stopCh <-chan struct{}) // Shutdown marks a factory as shutting down. At that point no new