diff --git a/pkg/controller/volume/attachdetach/attach_detach_controller.go b/pkg/controller/volume/attachdetach/attach_detach_controller.go index 509846859c8..1a95a0f140e 100644 --- a/pkg/controller/volume/attachdetach/attach_detach_controller.go +++ b/pkg/controller/volume/attachdetach/attach_detach_controller.go @@ -150,10 +150,8 @@ func NewAttachDetachController( return nil, fmt.Errorf("could not initialize volume plugins for Attach/Detach Controller: %w", err) } - eventBroadcaster := record.NewBroadcaster() - eventBroadcaster.StartStructuredLogging(0) - eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")}) - recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "attachdetach-controller"}) + adc.broadcaster = record.NewBroadcaster() + recorder := adc.broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "attachdetach-controller"}) blkutil := volumepathhandler.NewBlockVolumePathHandler() adc.desiredStateOfWorld = cache.NewDesiredStateOfWorld(&adc.volumePluginMgr) @@ -302,8 +300,8 @@ type attachDetachController struct { // populate the current pods using podInformer. desiredStateOfWorldPopulator populator.DesiredStateOfWorldPopulator - // recorder is used to record events in the API server - recorder record.EventRecorder + // broadcaster is broadcasting events + broadcaster record.EventBroadcaster // pvcQueue is used to queue pvc objects pvcQueue workqueue.RateLimitingInterface @@ -322,6 +320,11 @@ func (adc *attachDetachController) Run(stopCh <-chan struct{}) { defer runtime.HandleCrash() defer adc.pvcQueue.ShutDown() + // Start events processing pipeline. + adc.broadcaster.StartStructuredLogging(0) + adc.broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: adc.kubeClient.CoreV1().Events("")}) + defer adc.broadcaster.Shutdown() + klog.Infof("Starting attach detach controller") defer klog.Infof("Shutting down attach detach controller") @@ -910,7 +913,7 @@ func (adc *attachDetachController) GetNodeName() types.NodeName { } func (adc *attachDetachController) GetEventRecorder() record.EventRecorder { - return adc.recorder + return nil } func (adc *attachDetachController) GetSubpather() subpath.Interface { diff --git a/pkg/controller/volume/persistentvolume/pv_controller.go b/pkg/controller/volume/persistentvolume/pv_controller.go index b5ca21856f2..2ff5b5138ec 100644 --- a/pkg/controller/volume/persistentvolume/pv_controller.go +++ b/pkg/controller/volume/persistentvolume/pv_controller.go @@ -166,6 +166,7 @@ type PersistentVolumeController struct { NodeListerSynced cache.InformerSynced kubeClient clientset.Interface + eventBroadcaster record.EventBroadcaster eventRecorder record.EventRecorder cloud cloudprovider.Interface volumePluginMgr vol.VolumePluginMgr diff --git a/pkg/controller/volume/persistentvolume/pv_controller_base.go b/pkg/controller/volume/persistentvolume/pv_controller_base.go index efce1e36448..336203fd004 100644 --- a/pkg/controller/volume/persistentvolume/pv_controller_base.go +++ b/pkg/controller/volume/persistentvolume/pv_controller_base.go @@ -81,17 +81,17 @@ type ControllerParameters struct { // NewController creates a new PersistentVolume controller func NewController(p ControllerParameters) (*PersistentVolumeController, error) { eventRecorder := p.EventRecorder + var eventBroadcaster record.EventBroadcaster if eventRecorder == nil { - broadcaster := record.NewBroadcaster() - broadcaster.StartStructuredLogging(0) - broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: p.KubeClient.CoreV1().Events("")}) - eventRecorder = broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "persistentvolume-controller"}) + eventBroadcaster = record.NewBroadcaster() + eventRecorder = eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "persistentvolume-controller"}) } controller := &PersistentVolumeController{ volumes: newPersistentVolumeOrderedIndex(), claims: cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc), kubeClient: p.KubeClient, + eventBroadcaster: eventBroadcaster, eventRecorder: eventRecorder, runningOperations: goroutinemap.NewGoRoutineMap(true /* exponentialBackOffOnError */), cloud: p.Cloud, @@ -308,6 +308,13 @@ func (ctrl *PersistentVolumeController) Run(ctx context.Context) { defer ctrl.claimQueue.ShutDown() defer ctrl.volumeQueue.ShutDown() + // Start events processing pipeline. + if ctrl.eventBroadcaster != nil { + ctrl.eventBroadcaster.StartStructuredLogging(0) + ctrl.eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: ctrl.kubeClient.CoreV1().Events("")}) + defer ctrl.eventBroadcaster.Shutdown() + } + klog.Infof("Starting persistent volume controller") defer klog.Infof("Shutting down persistent volume controller") diff --git a/test/integration/auth/rbac_test.go b/test/integration/auth/rbac_test.go index 74fa05cfad5..81aaa7d55cf 100644 --- a/test/integration/auth/rbac_test.go +++ b/test/integration/auth/rbac_test.go @@ -89,7 +89,7 @@ func (getter *testRESTOptionsGetter) GetRESTOptions(resource schema.GroupResourc return generic.RESTOptions{StorageConfig: storageConfig, Decorator: generic.UndecoratedStorage, ResourcePrefix: resource.Resource}, nil } -func newRBACAuthorizer(t *testing.T, config *controlplane.Config) authorizer.Authorizer { +func newRBACAuthorizer(t *testing.T, config *controlplane.Config) (authorizer.Authorizer, func()) { optsGetter := &testRESTOptionsGetter{config} roleRest, err := rolestore.NewREST(optsGetter) if err != nil { @@ -111,7 +111,14 @@ func newRBACAuthorizer(t *testing.T, config *controlplane.Config) authorizer.Aut t.Fatalf("unexpected error from REST storage: %v", err) } clusterRoleBindingRegistry := clusterrolebinding.AuthorizerAdapter{Registry: clusterrolebinding.NewRegistry(clusterrolebindingRest)} - return rbac.New(roleRegistry, roleBindingRegistry, clusterRoleRegistry, clusterRoleBindingRegistry) + + tearDownFn := func() { + roleRest.Destroy() + rolebindingRest.Destroy() + clusterroleRest.Destroy() + clusterrolebindingRest.Destroy() + } + return rbac.New(roleRegistry, roleBindingRegistry, clusterRoleRegistry, clusterRoleBindingRegistry), tearDownFn } // bootstrapRoles are a set of RBAC roles which will be populated before the test. @@ -533,6 +540,12 @@ func TestRBAC(t *testing.T) { "user-with-no-permissions": {Name: "user-with-no-permissions"}, }))) + var tearDownAuthorizerFn func() + defer func() { + if tearDownAuthorizerFn != nil { + tearDownAuthorizerFn() + } + }() _, kubeConfig, tearDownFn := framework.StartTestServer(t, framework.TestServerSetup{ ModifyServerRunOptions: func(opts *options.ServerRunOptions) { // Disable ServiceAccount admission plugin as we don't have serviceaccount controller running. @@ -542,7 +555,7 @@ func TestRBAC(t *testing.T) { }, ModifyServerConfig: func(config *controlplane.Config) { config.GenericConfig.Authentication.Authenticator = authenticator - config.GenericConfig.Authorization.Authorizer = newRBACAuthorizer(t, config) + config.GenericConfig.Authorization.Authorizer, tearDownAuthorizerFn = newRBACAuthorizer(t, config) }, }) defer tearDownFn() @@ -655,12 +668,18 @@ func TestRBAC(t *testing.T) { func TestBootstrapping(t *testing.T) { superUser := "admin/system:masters" + var tearDownAuthorizerFn func() + defer func() { + if tearDownAuthorizerFn != nil { + tearDownAuthorizerFn() + } + }() clientset, _, tearDownFn := framework.StartTestServer(t, framework.TestServerSetup{ ModifyServerConfig: func(config *controlplane.Config) { config.GenericConfig.Authentication.Authenticator = bearertoken.New(tokenfile.New(map[string]*user.DefaultInfo{ superUser: {Name: "admin", Groups: []string{"system:masters"}}, })) - config.GenericConfig.Authorization.Authorizer = newRBACAuthorizer(t, config) + config.GenericConfig.Authorization.Authorizer, tearDownAuthorizerFn = newRBACAuthorizer(t, config) }, }) defer tearDownFn() @@ -713,10 +732,17 @@ func TestDiscoveryUpgradeBootstrapping(t *testing.T) { tearDownFn() } }() + var tearDownAuthorizerFn func() + defer func() { + if tearDownAuthorizerFn != nil { + tearDownAuthorizerFn() + } + }() superUser := "admin/system:masters" etcdConfig := framework.SharedEtcd() + client, _, tearDownFn := framework.StartTestServer(t, framework.TestServerSetup{ ModifyServerRunOptions: func(opts *options.ServerRunOptions) { // Ensure we're using the same etcd across apiserver restarts. @@ -726,7 +752,7 @@ func TestDiscoveryUpgradeBootstrapping(t *testing.T) { config.GenericConfig.Authentication.Authenticator = bearertoken.New(tokenfile.New(map[string]*user.DefaultInfo{ superUser: {Name: "admin", Groups: []string{"system:masters"}}, })) - config.GenericConfig.Authorization.Authorizer = newRBACAuthorizer(t, config) + config.GenericConfig.Authorization.Authorizer, tearDownAuthorizerFn = newRBACAuthorizer(t, config) }, }) @@ -767,6 +793,8 @@ func TestDiscoveryUpgradeBootstrapping(t *testing.T) { // Stop the first API server. tearDownFn() tearDownFn = nil + tearDownAuthorizerFn() + tearDownAuthorizerFn = nil // Check that upgraded API servers inherit `system:public-info-viewer` settings from // `system:discovery`, and respect auto-reconciliation annotations. @@ -779,7 +807,7 @@ func TestDiscoveryUpgradeBootstrapping(t *testing.T) { config.GenericConfig.Authentication.Authenticator = bearertoken.New(tokenfile.New(map[string]*user.DefaultInfo{ superUser: {Name: "admin", Groups: []string{"system:masters"}}, })) - config.GenericConfig.Authorization.Authorizer = newRBACAuthorizer(t, config) + config.GenericConfig.Authorization.Authorizer, tearDownAuthorizerFn = newRBACAuthorizer(t, config) }, }) diff --git a/test/integration/volume/attach_detach_test.go b/test/integration/volume/attach_detach_test.go index 1a8267bb069..bb6360762f4 100644 --- a/test/integration/volume/attach_detach_test.go +++ b/test/integration/volume/attach_detach_test.go @@ -155,7 +155,7 @@ func TestPodDeletionWithDswp(t *testing.T) { }, } - testClient, ctrl, _, informers := createAdClients(t, server, defaultSyncPeriod, defaultTimerConfig) + testClient, ctrl, pvCtrl, informers := createAdClients(t, server, defaultSyncPeriod, defaultTimerConfig) ns := framework.CreateNamespaceOrDie(testClient, namespaceName, t) defer framework.DeleteNamespaceOrDie(testClient, ns, t) @@ -167,23 +167,25 @@ func TestPodDeletionWithDswp(t *testing.T) { t.Fatalf("Failed to created node : %v", err) } - stopCh := make(chan struct{}) + // start controller loop + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() - go informers.Core().V1().Nodes().Informer().Run(stopCh) + go informers.Core().V1().Nodes().Informer().Run(ctx.Done()) if _, err := testClient.CoreV1().Pods(ns.Name).Create(context.TODO(), pod, metav1.CreateOptions{}); err != nil { t.Errorf("Failed to create pod : %v", err) } podInformer := informers.Core().V1().Pods().Informer() - go podInformer.Run(podStopCh) + go podInformer.Run(ctx.Done()) - // start controller loop - go informers.Core().V1().PersistentVolumeClaims().Informer().Run(stopCh) - go informers.Core().V1().PersistentVolumes().Informer().Run(stopCh) - go informers.Storage().V1().VolumeAttachments().Informer().Run(stopCh) - initCSIObjects(stopCh, informers) - go ctrl.Run(stopCh) - defer close(stopCh) + go informers.Core().V1().PersistentVolumeClaims().Informer().Run(ctx.Done()) + go informers.Core().V1().PersistentVolumes().Informer().Run(ctx.Done()) + go informers.Storage().V1().VolumeAttachments().Informer().Run(ctx.Done()) + initCSIObjects(ctx.Done(), informers) + go ctrl.Run(ctx.Done()) + // Run pvCtrl to avoid leaking goroutines started during its creation. + go pvCtrl.Run(ctx) waitToObservePods(t, podInformer, 1) podKey, err := cache.MetaNamespaceKeyFunc(pod) @@ -230,13 +232,14 @@ func TestPodUpdateWithWithADC(t *testing.T) { }, } - testClient, ctrl, _, informers := createAdClients(t, server, defaultSyncPeriod, defaultTimerConfig) + testClient, ctrl, pvCtrl, informers := createAdClients(t, server, defaultSyncPeriod, defaultTimerConfig) ns := framework.CreateNamespaceOrDie(testClient, namespaceName, t) defer framework.DeleteNamespaceOrDie(testClient, ns, t) pod := fakePodWithVol(namespaceName) podStopCh := make(chan struct{}) + defer close(podStopCh) if _, err := testClient.CoreV1().Nodes().Create(context.TODO(), node, metav1.CreateOptions{}); err != nil { t.Fatalf("Failed to created node : %v", err) @@ -252,12 +255,16 @@ func TestPodUpdateWithWithADC(t *testing.T) { go podInformer.Run(podStopCh) // start controller loop - stopCh := make(chan struct{}) - go informers.Core().V1().PersistentVolumeClaims().Informer().Run(stopCh) - go informers.Core().V1().PersistentVolumes().Informer().Run(stopCh) - go informers.Storage().V1().VolumeAttachments().Informer().Run(stopCh) - initCSIObjects(stopCh, informers) - go ctrl.Run(stopCh) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go informers.Core().V1().PersistentVolumeClaims().Informer().Run(ctx.Done()) + go informers.Core().V1().PersistentVolumes().Informer().Run(ctx.Done()) + go informers.Storage().V1().VolumeAttachments().Informer().Run(ctx.Done()) + initCSIObjects(ctx.Done(), informers) + go ctrl.Run(ctx.Done()) + // Run pvCtrl to avoid leaking goroutines started during its creation. + go pvCtrl.Run(ctx) waitToObservePods(t, podInformer, 1) podKey, err := cache.MetaNamespaceKeyFunc(pod) @@ -280,9 +287,6 @@ func TestPodUpdateWithWithADC(t *testing.T) { } waitForPodFuncInDSWP(t, ctrl.GetDesiredStateOfWorld(), 20*time.Second, "expected 0 pods in dsw after pod completion", 0) - - close(podStopCh) - close(stopCh) } func TestPodUpdateWithKeepTerminatedPodVolumes(t *testing.T) { @@ -301,13 +305,14 @@ func TestPodUpdateWithKeepTerminatedPodVolumes(t *testing.T) { }, } - testClient, ctrl, _, informers := createAdClients(t, server, defaultSyncPeriod, defaultTimerConfig) + testClient, ctrl, pvCtrl, informers := createAdClients(t, server, defaultSyncPeriod, defaultTimerConfig) ns := framework.CreateNamespaceOrDie(testClient, namespaceName, t) defer framework.DeleteNamespaceOrDie(testClient, ns, t) pod := fakePodWithVol(namespaceName) podStopCh := make(chan struct{}) + defer close(podStopCh) if _, err := testClient.CoreV1().Nodes().Create(context.TODO(), node, metav1.CreateOptions{}); err != nil { t.Fatalf("Failed to created node : %v", err) @@ -323,12 +328,16 @@ func TestPodUpdateWithKeepTerminatedPodVolumes(t *testing.T) { go podInformer.Run(podStopCh) // start controller loop - stopCh := make(chan struct{}) - go informers.Core().V1().PersistentVolumeClaims().Informer().Run(stopCh) - go informers.Core().V1().PersistentVolumes().Informer().Run(stopCh) - go informers.Storage().V1().VolumeAttachments().Informer().Run(stopCh) - initCSIObjects(stopCh, informers) - go ctrl.Run(stopCh) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go informers.Core().V1().PersistentVolumeClaims().Informer().Run(ctx.Done()) + go informers.Core().V1().PersistentVolumes().Informer().Run(ctx.Done()) + go informers.Storage().V1().VolumeAttachments().Informer().Run(ctx.Done()) + initCSIObjects(ctx.Done(), informers) + go ctrl.Run(ctx.Done()) + // Run pvCtrl to avoid leaking goroutines started during its creation. + go pvCtrl.Run(ctx) waitToObservePods(t, podInformer, 1) podKey, err := cache.MetaNamespaceKeyFunc(pod) @@ -351,9 +360,6 @@ func TestPodUpdateWithKeepTerminatedPodVolumes(t *testing.T) { } waitForPodFuncInDSWP(t, ctrl.GetDesiredStateOfWorld(), 20*time.Second, "expected non-zero pods in dsw if KeepTerminatedPodVolumesAnnotation is set", 1) - - close(podStopCh) - close(stopCh) } // wait for the podInformer to observe the pods. Call this function before @@ -481,7 +487,7 @@ func TestPodAddedByDswp(t *testing.T) { }, }, } - testClient, ctrl, _, informers := createAdClients(t, server, defaultSyncPeriod, defaultTimerConfig) + testClient, ctrl, pvCtrl, informers := createAdClients(t, server, defaultSyncPeriod, defaultTimerConfig) ns := framework.CreateNamespaceOrDie(testClient, namespaceName, t) defer framework.DeleteNamespaceOrDie(testClient, ns, t) @@ -503,12 +509,16 @@ func TestPodAddedByDswp(t *testing.T) { go podInformer.Run(podStopCh) // start controller loop - stopCh := make(chan struct{}) - go informers.Core().V1().PersistentVolumeClaims().Informer().Run(stopCh) - go informers.Core().V1().PersistentVolumes().Informer().Run(stopCh) - go informers.Storage().V1().VolumeAttachments().Informer().Run(stopCh) - initCSIObjects(stopCh, informers) - go ctrl.Run(stopCh) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go informers.Core().V1().PersistentVolumeClaims().Informer().Run(ctx.Done()) + go informers.Core().V1().PersistentVolumes().Informer().Run(ctx.Done()) + go informers.Storage().V1().VolumeAttachments().Informer().Run(ctx.Done()) + initCSIObjects(ctx.Done(), informers) + go ctrl.Run(ctx.Done()) + // Run pvCtrl to avoid leaking goroutines started during its creation. + go pvCtrl.Run(ctx) waitToObservePods(t, podInformer, 1) podKey, err := cache.MetaNamespaceKeyFunc(pod) @@ -538,8 +548,6 @@ func TestPodAddedByDswp(t *testing.T) { // the findAndAddActivePods loop turns every 3 minute waitForPodFuncInDSWP(t, ctrl.GetDesiredStateOfWorld(), 200*time.Second, "expected 2 pods in dsw after pod addition", 2) - - close(stopCh) } func TestPVCBoundWithADC(t *testing.T) { @@ -592,6 +600,8 @@ func TestPVCBoundWithADC(t *testing.T) { // start controller loop ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + informers.Start(ctx.Done()) informers.WaitForCacheSync(ctx.Done()) initCSIObjects(ctx.Done(), informers) @@ -606,7 +616,6 @@ func TestPVCBoundWithADC(t *testing.T) { createPVForPVC(t, testClient, pvc) } waitForPodFuncInDSWP(t, ctrl.GetDesiredStateOfWorld(), 60*time.Second, "expected 4 pods in dsw after PVCs are bound", 4) - cancel() } // Create PV for PVC, pv controller will bind them together. diff --git a/test/integration/volume/persistent_volumes_test.go b/test/integration/volume/persistent_volumes_test.go index 5b7f3dcc73a..60f8c408d44 100644 --- a/test/integration/volume/persistent_volumes_test.go +++ b/test/integration/volume/persistent_volumes_test.go @@ -699,7 +699,7 @@ func TestPersistentVolumeMultiPVsPVCs(t *testing.T) { } klog.V(2).Infof("TestPersistentVolumeMultiPVsPVCs: claims are bound") - stopCh <- struct{}{} + close(stopCh) // check that everything is bound to something for i := 0; i < objCount; i++ {