mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-24 20:24:09 +00:00
Call non-blocking informerFactory.Start synchronously to avoid races
Signed-off-by: Dr. Stefan Schimanski <stefan.schimanski@gmail.com>
This commit is contained in:
parent
250f7b5d15
commit
c7a1fa432a
@ -223,7 +223,7 @@ func setupGC(t *testing.T, config *restclient.Config) garbageCollector {
|
|||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
stop := make(chan struct{})
|
stop := make(chan struct{})
|
||||||
go sharedInformers.Start(stop)
|
sharedInformers.Start(stop)
|
||||||
return garbageCollector{gc, stop}
|
return garbageCollector{gc, stop}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -113,7 +113,7 @@ func TestTypeChecking(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("cannot create controller: %v", err)
|
t.Fatalf("cannot create controller: %v", err)
|
||||||
}
|
}
|
||||||
go informerFactory.Start(ctx.Done())
|
informerFactory.Start(ctx.Done())
|
||||||
go controller.Run(ctx, 1)
|
go controller.Run(ctx, 1)
|
||||||
err = wait.PollUntilContextCancel(ctx, time.Second, false, func(ctx context.Context) (done bool, err error) {
|
err = wait.PollUntilContextCancel(ctx, time.Second, false, func(ctx context.Context) (done bool, err error) {
|
||||||
name := policy.Name
|
name := policy.Name
|
||||||
|
@ -152,7 +152,7 @@ func TestSyncHandler(t *testing.T) {
|
|||||||
ec, _ := c.(*ephemeralController)
|
ec, _ := c.(*ephemeralController)
|
||||||
|
|
||||||
// Ensure informers are up-to-date.
|
// Ensure informers are up-to-date.
|
||||||
go informerFactory.Start(ctx.Done())
|
informerFactory.Start(ctx.Done())
|
||||||
informerFactory.WaitForCacheSync(ctx.Done())
|
informerFactory.WaitForCacheSync(ctx.Done())
|
||||||
cache.WaitForCacheSync(ctx.Done(), podInformer.Informer().HasSynced, pvcInformer.Informer().HasSynced)
|
cache.WaitForCacheSync(ctx.Done(), podInformer.Informer().HasSynced, pvcInformer.Informer().HasSynced)
|
||||||
|
|
||||||
|
@ -32,8 +32,6 @@ import (
|
|||||||
"k8s.io/client-go/informers"
|
"k8s.io/client-go/informers"
|
||||||
"k8s.io/client-go/kubernetes/fake"
|
"k8s.io/client-go/kubernetes/fake"
|
||||||
"k8s.io/utils/ptr"
|
"k8s.io/utils/ptr"
|
||||||
|
|
||||||
"k8s.io/client-go/tools/cache"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestReconcileElectionStep(t *testing.T) {
|
func TestReconcileElectionStep(t *testing.T) {
|
||||||
@ -339,7 +337,7 @@ func TestReconcileElectionStep(t *testing.T) {
|
|||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
client := fake.NewSimpleClientset()
|
client := fake.NewSimpleClientset()
|
||||||
informerFactory := informers.NewSharedInformerFactory(client, 0)
|
informerFactory := informers.NewSharedInformerFactory(client, 0)
|
||||||
_ = informerFactory.Coordination().V1alpha1().LeaseCandidates().Lister()
|
|
||||||
controller, err := NewController(
|
controller, err := NewController(
|
||||||
informerFactory.Coordination().V1().Leases(),
|
informerFactory.Coordination().V1().Leases(),
|
||||||
informerFactory.Coordination().V1alpha1().LeaseCandidates(),
|
informerFactory.Coordination().V1alpha1().LeaseCandidates(),
|
||||||
@ -349,8 +347,7 @@ func TestReconcileElectionStep(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
go informerFactory.Start(ctx.Done())
|
|
||||||
informerFactory.WaitForCacheSync(ctx.Done())
|
|
||||||
// Set up the fake client with the existing lease
|
// Set up the fake client with the existing lease
|
||||||
if tc.existingLease != nil {
|
if tc.existingLease != nil {
|
||||||
_, err = client.CoordinationV1().Leases(tc.existingLease.Namespace).Create(ctx, tc.existingLease, metav1.CreateOptions{})
|
_, err = client.CoordinationV1().Leases(tc.existingLease.Namespace).Create(ctx, tc.existingLease, metav1.CreateOptions{})
|
||||||
@ -366,7 +363,10 @@ func TestReconcileElectionStep(t *testing.T) {
|
|||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
cache.WaitForCacheSync(ctx.Done(), controller.leaseCandidateInformer.Informer().HasSynced)
|
|
||||||
|
informerFactory.Start(ctx.Done())
|
||||||
|
informerFactory.WaitForCacheSync(ctx.Done())
|
||||||
|
|
||||||
requeue, err := controller.reconcileElectionStep(ctx, tc.leaseNN)
|
requeue, err := controller.reconcileElectionStep(ctx, tc.leaseNN)
|
||||||
|
|
||||||
if (requeue != 0) != tc.expectedRequeue {
|
if (requeue != 0) != tc.expectedRequeue {
|
||||||
@ -639,7 +639,7 @@ func TestController(t *testing.T) {
|
|||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
go informerFactory.Start(ctx.Done())
|
informerFactory.Start(ctx.Done())
|
||||||
go controller.Run(ctx, 1)
|
go controller.Run(ctx, 1)
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
|
@ -115,7 +115,7 @@ func TestLeaseCandidateGCController(t *testing.T) {
|
|||||||
leaseCandidateInformer := informerFactory.Coordination().V1alpha1().LeaseCandidates()
|
leaseCandidateInformer := informerFactory.Coordination().V1alpha1().LeaseCandidates()
|
||||||
controller := NewLeaseCandidateGC(client, 10*time.Millisecond, leaseCandidateInformer)
|
controller := NewLeaseCandidateGC(client, 10*time.Millisecond, leaseCandidateInformer)
|
||||||
|
|
||||||
go informerFactory.Start(ctx.Done())
|
informerFactory.Start(ctx.Done())
|
||||||
informerFactory.WaitForCacheSync(ctx.Done())
|
informerFactory.WaitForCacheSync(ctx.Done())
|
||||||
|
|
||||||
// Create lease candidates
|
// Create lease candidates
|
||||||
|
@ -60,7 +60,7 @@ func TestNewServicesSourceApi_UpdatesAndMultipleServices(t *testing.T) {
|
|||||||
|
|
||||||
serviceConfig := NewServiceConfig(ctx, sharedInformers.Core().V1().Services(), time.Minute)
|
serviceConfig := NewServiceConfig(ctx, sharedInformers.Core().V1().Services(), time.Minute)
|
||||||
serviceConfig.RegisterEventHandler(handler)
|
serviceConfig.RegisterEventHandler(handler)
|
||||||
go sharedInformers.Start(stopCh)
|
sharedInformers.Start(stopCh)
|
||||||
go serviceConfig.Run(stopCh)
|
go serviceConfig.Run(stopCh)
|
||||||
|
|
||||||
// Add the first service
|
// Add the first service
|
||||||
@ -141,7 +141,7 @@ func TestNewEndpointsSourceApi_UpdatesAndMultipleEndpoints(t *testing.T) {
|
|||||||
|
|
||||||
endpointsliceConfig := NewEndpointSliceConfig(ctx, sharedInformers.Discovery().V1().EndpointSlices(), time.Minute)
|
endpointsliceConfig := NewEndpointSliceConfig(ctx, sharedInformers.Discovery().V1().EndpointSlices(), time.Minute)
|
||||||
endpointsliceConfig.RegisterEventHandler(handler)
|
endpointsliceConfig.RegisterEventHandler(handler)
|
||||||
go sharedInformers.Start(stopCh)
|
sharedInformers.Start(stopCh)
|
||||||
go endpointsliceConfig.Run(stopCh)
|
go endpointsliceConfig.Run(stopCh)
|
||||||
|
|
||||||
// Add the first endpoints
|
// Add the first endpoints
|
||||||
|
@ -240,7 +240,7 @@ func TestNewServiceAddedAndNotified(t *testing.T) {
|
|||||||
config := NewServiceConfig(ctx, sharedInformers.Core().V1().Services(), time.Minute)
|
config := NewServiceConfig(ctx, sharedInformers.Core().V1().Services(), time.Minute)
|
||||||
handler := NewServiceHandlerMock()
|
handler := NewServiceHandlerMock()
|
||||||
config.RegisterEventHandler(handler)
|
config.RegisterEventHandler(handler)
|
||||||
go sharedInformers.Start(stopCh)
|
sharedInformers.Start(stopCh)
|
||||||
go config.Run(stopCh)
|
go config.Run(stopCh)
|
||||||
|
|
||||||
service := &v1.Service{
|
service := &v1.Service{
|
||||||
@ -265,7 +265,7 @@ func TestServiceAddedRemovedSetAndNotified(t *testing.T) {
|
|||||||
config := NewServiceConfig(ctx, sharedInformers.Core().V1().Services(), time.Minute)
|
config := NewServiceConfig(ctx, sharedInformers.Core().V1().Services(), time.Minute)
|
||||||
handler := NewServiceHandlerMock()
|
handler := NewServiceHandlerMock()
|
||||||
config.RegisterEventHandler(handler)
|
config.RegisterEventHandler(handler)
|
||||||
go sharedInformers.Start(stopCh)
|
sharedInformers.Start(stopCh)
|
||||||
go config.Run(stopCh)
|
go config.Run(stopCh)
|
||||||
|
|
||||||
service1 := &v1.Service{
|
service1 := &v1.Service{
|
||||||
@ -304,7 +304,7 @@ func TestNewServicesMultipleHandlersAddedAndNotified(t *testing.T) {
|
|||||||
handler2 := NewServiceHandlerMock()
|
handler2 := NewServiceHandlerMock()
|
||||||
config.RegisterEventHandler(handler)
|
config.RegisterEventHandler(handler)
|
||||||
config.RegisterEventHandler(handler2)
|
config.RegisterEventHandler(handler2)
|
||||||
go sharedInformers.Start(stopCh)
|
sharedInformers.Start(stopCh)
|
||||||
go config.Run(stopCh)
|
go config.Run(stopCh)
|
||||||
|
|
||||||
service1 := &v1.Service{
|
service1 := &v1.Service{
|
||||||
@ -339,7 +339,7 @@ func TestNewEndpointsMultipleHandlersAddedAndNotified(t *testing.T) {
|
|||||||
handler2 := NewEndpointSliceHandlerMock()
|
handler2 := NewEndpointSliceHandlerMock()
|
||||||
config.RegisterEventHandler(handler)
|
config.RegisterEventHandler(handler)
|
||||||
config.RegisterEventHandler(handler2)
|
config.RegisterEventHandler(handler2)
|
||||||
go sharedInformers.Start(stopCh)
|
sharedInformers.Start(stopCh)
|
||||||
go config.Run(stopCh)
|
go config.Run(stopCh)
|
||||||
|
|
||||||
endpoints1 := &discoveryv1.EndpointSlice{
|
endpoints1 := &discoveryv1.EndpointSlice{
|
||||||
@ -386,7 +386,7 @@ func TestNewEndpointsMultipleHandlersAddRemoveSetAndNotified(t *testing.T) {
|
|||||||
handler2 := NewEndpointSliceHandlerMock()
|
handler2 := NewEndpointSliceHandlerMock()
|
||||||
config.RegisterEventHandler(handler)
|
config.RegisterEventHandler(handler)
|
||||||
config.RegisterEventHandler(handler2)
|
config.RegisterEventHandler(handler2)
|
||||||
go sharedInformers.Start(stopCh)
|
sharedInformers.Start(stopCh)
|
||||||
go config.Run(stopCh)
|
go config.Run(stopCh)
|
||||||
|
|
||||||
endpoints1 := &discoveryv1.EndpointSlice{
|
endpoints1 := &discoveryv1.EndpointSlice{
|
||||||
|
@ -120,7 +120,7 @@ func NewCandidate(clientset kubernetes.Interface,
|
|||||||
func (c *LeaseCandidate) Run(ctx context.Context) {
|
func (c *LeaseCandidate) Run(ctx context.Context) {
|
||||||
defer c.queue.ShutDown()
|
defer c.queue.ShutDown()
|
||||||
|
|
||||||
go c.informerFactory.Start(ctx.Done())
|
c.informerFactory.Start(ctx.Done())
|
||||||
if !cache.WaitForNamedCacheSync("leasecandidateclient", ctx.Done(), c.hasSynced) {
|
if !cache.WaitForNamedCacheSync("leasecandidateclient", ctx.Done(), c.hasSynced) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -299,6 +299,7 @@ type SharedInformerFactory interface {
|
|||||||
|
|
||||||
// Start initializes all requested informers. They are handled in goroutines
|
// Start initializes all requested informers. They are handled in goroutines
|
||||||
// which run until the stop channel gets closed.
|
// which run until the stop channel gets closed.
|
||||||
|
// Warning: Start does not block. When run in a go-routine, it will race with a later WaitForCacheSync.
|
||||||
Start(stopCh <-chan struct{})
|
Start(stopCh <-chan struct{})
|
||||||
|
|
||||||
// Shutdown marks a factory as shutting down. At that point no new
|
// Shutdown marks a factory as shutting down. At that point no new
|
||||||
|
Loading…
Reference in New Issue
Block a user