diff --git a/cmd/kube-controller-manager/app/config/config.go b/cmd/kube-controller-manager/app/config/config.go index c5ae3348370..2d2db31801f 100644 --- a/cmd/kube-controller-manager/app/config/config.go +++ b/cmd/kube-controller-manager/app/config/config.go @@ -43,8 +43,8 @@ type Config struct { // the rest config for the master Kubeconfig *restclient.Config - // the event sink - EventRecorder record.EventRecorder + EventBroadcaster record.EventBroadcaster + EventRecorder record.EventRecorder } type completedConfig struct { diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index 6d402568fa5..443daab6f76 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -41,6 +41,7 @@ import ( utilfeature "k8s.io/apiserver/pkg/util/feature" cacheddiscovery "k8s.io/client-go/discovery/cached" "k8s.io/client-go/informers" + v1core "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/metadata" "k8s.io/client-go/metadata/metadatainformer" restclient "k8s.io/client-go/rest" @@ -174,13 +175,18 @@ func ResyncPeriod(c *config.CompletedConfig) func() time.Duration { } } -// Run runs the KubeControllerManagerOptions. This should never exit. +// Run runs the KubeControllerManagerOptions. func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error { // To help debugging, immediately log version klog.Infof("Version: %+v", version.Get()) klog.InfoS("Golang settings", "GOGC", os.Getenv("GOGC"), "GOMAXPROCS", os.Getenv("GOMAXPROCS"), "GOTRACEBACK", os.Getenv("GOTRACEBACK")) + // Start events processing pipeline. + c.EventBroadcaster.StartStructuredLogging(0) + c.EventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: c.Client.CoreV1().Events("")}) + defer c.EventBroadcaster.Shutdown() + if cfgz, err := configz.New(ConfigzName); err == nil { cfgz.Set(c.ComponentConfig) } else { @@ -213,7 +219,6 @@ func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error { saTokenControllerInitFunc := serviceAccountTokenControllerStarter{rootClientBuilder: rootClientBuilder}.startServiceAccountTokenController run := func(ctx context.Context, startSATokenController InitFunc, initializersFunc ControllerInitializersFunc) { - controllerContext, err := CreateControllerContext(c, rootClientBuilder, clientBuilder, ctx.Done()) if err != nil { klog.Fatalf("error building controller context: %v", err) @@ -227,13 +232,14 @@ func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error { controllerContext.ObjectOrMetadataInformerFactory.Start(stopCh) close(controllerContext.InformersStarted) - select {} + <-ctx.Done() } // No leader election, run directly if !c.ComponentConfig.Generic.LeaderElection.LeaderElect { - run(context.TODO(), saTokenControllerInitFunc, NewControllerInitializers) - panic("unreachable") + ctx, _ := wait.ContextForChannel(stopCh) + run(ctx, saTokenControllerInitFunc, NewControllerInitializers) + return nil } id, err := os.Hostname() @@ -311,7 +317,8 @@ func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error { }) } - select {} + <-stopCh + return nil } // ControllerContext defines the context object for controller diff --git a/cmd/kube-controller-manager/app/options/options.go b/cmd/kube-controller-manager/app/options/options.go index 5c7a15b4aac..59bffd53fc1 100644 --- a/cmd/kube-controller-manager/app/options/options.go +++ b/cmd/kube-controller-manager/app/options/options.go @@ -28,7 +28,6 @@ import ( utilfeature "k8s.io/apiserver/pkg/util/feature" clientset "k8s.io/client-go/kubernetes" clientgokubescheme "k8s.io/client-go/kubernetes/scheme" - v1core "k8s.io/client-go/kubernetes/typed/core/v1" restclient "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/tools/record" @@ -430,12 +429,14 @@ func (s KubeControllerManagerOptions) Config(allControllers []string, disabledBy return nil, err } - eventRecorder := createRecorder(client, KubeControllerManagerUserAgent) + eventBroadcaster := record.NewBroadcaster() + eventRecorder := eventBroadcaster.NewRecorder(clientgokubescheme.Scheme, v1.EventSource{Component: KubeControllerManagerUserAgent}) c := &kubecontrollerconfig.Config{ - Client: client, - Kubeconfig: kubeconfig, - EventRecorder: eventRecorder, + Client: client, + Kubeconfig: kubeconfig, + EventBroadcaster: eventBroadcaster, + EventRecorder: eventRecorder, } if err := s.ApplyTo(c); err != nil { return nil, err @@ -444,10 +445,3 @@ func (s KubeControllerManagerOptions) Config(allControllers []string, disabledBy return c, nil } - -func createRecorder(kubeClient clientset.Interface, userAgent string) record.EventRecorder { - eventBroadcaster := record.NewBroadcaster() - eventBroadcaster.StartStructuredLogging(0) - eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")}) - return eventBroadcaster.NewRecorder(clientgokubescheme.Scheme, v1.EventSource{Component: userAgent}) -} diff --git a/cmd/kube-scheduler/app/server.go b/cmd/kube-scheduler/app/server.go index e6bda61b6e6..9826e3f67e5 100644 --- a/cmd/kube-scheduler/app/server.go +++ b/cmd/kube-scheduler/app/server.go @@ -155,8 +155,9 @@ func Run(ctx context.Context, cc *schedulerserverconfig.CompletedConfig, sched * return fmt.Errorf("unable to register configz: %s", err) } - // Prepare the event broadcaster. + // Start events processing pipeline. cc.EventBroadcaster.StartRecordingToSink(ctx.Done()) + defer cc.EventBroadcaster.Shutdown() // Setup healthz checks. var checks []healthz.HealthChecker diff --git a/cmd/kube-scheduler/app/testing/testserver.go b/cmd/kube-scheduler/app/testing/testserver.go index a7610ebe253..e76b2204b23 100644 --- a/cmd/kube-scheduler/app/testing/testserver.go +++ b/cmd/kube-scheduler/app/testing/testserver.go @@ -32,6 +32,8 @@ import ( "k8s.io/kubernetes/cmd/kube-scheduler/app" kubeschedulerconfig "k8s.io/kubernetes/cmd/kube-scheduler/app/config" "k8s.io/kubernetes/cmd/kube-scheduler/app/options" + + "k8s.io/klog/v2" ) // TearDownFunc is to be called to tear down a test server. @@ -61,8 +63,19 @@ type Logger interface { // enough time to remove temporary files. func StartTestServer(t Logger, customFlags []string) (result TestServer, err error) { ctx, cancel := context.WithCancel(context.Background()) + + var errCh chan error tearDown := func() { cancel() + + // If the scheduler was started, let's wait for it to + // shutdown clearly. + if errCh != nil { + err, ok := <-errCh + if ok && err != nil { + klog.ErrorS(err, "Failed to shutdown test server clearly") + } + } if len(result.TmpDir) != 0 { os.RemoveAll(result.TmpDir) } @@ -103,8 +116,9 @@ func StartTestServer(t Logger, customFlags []string) (result TestServer, err err return result, fmt.Errorf("failed to create config from options: %v", err) } - errCh := make(chan error) + errCh = make(chan error) go func(ctx context.Context) { + defer close(errCh) if err := app.Run(ctx, cc, sched); err != nil { errCh <- err } diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 8d9cc2c2763..c165b34ffc2 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -335,7 +335,16 @@ func New(client clientset.Interface, // Run begins watching and scheduling. It starts scheduling and blocked until the context is done. func (sched *Scheduler) Run(ctx context.Context) { sched.SchedulingQueue.Run() - wait.UntilWithContext(ctx, sched.scheduleOne, 0) + + // We need to start scheduleOne loop in a dedicated goroutine, + // because scheduleOne function hangs on getting the next item + // from the SchedulingQueue. + // If there are no new pods to schedule, it will be hanging there + // and if done in this goroutine it will be blocking closing + // SchedulingQueue, in effect causing a deadlock on shutdown. + go wait.UntilWithContext(ctx, sched.scheduleOne, 0) + + <-ctx.Done() sched.SchedulingQueue.Close() } diff --git a/staging/src/k8s.io/cloud-provider/app/config/config.go b/staging/src/k8s.io/cloud-provider/app/config/config.go index d6971adcac7..db662864c7b 100644 --- a/staging/src/k8s.io/cloud-provider/app/config/config.go +++ b/staging/src/k8s.io/cloud-provider/app/config/config.go @@ -45,7 +45,10 @@ type Config struct { // the rest config for the master Kubeconfig *restclient.Config - // the event sink + // EventBroadcaster is broadcaster events to all sinks. + EventBroadcaster record.EventBroadcaster + + // EventRecord is a sink for events. EventRecorder record.EventRecorder // ClientBuilder will provide a client for this controller to use diff --git a/staging/src/k8s.io/cloud-provider/app/controllermanager.go b/staging/src/k8s.io/cloud-provider/app/controllermanager.go index dff84a2d7e6..ca116532f5b 100644 --- a/staging/src/k8s.io/cloud-provider/app/controllermanager.go +++ b/staging/src/k8s.io/cloud-provider/app/controllermanager.go @@ -34,6 +34,7 @@ import ( "k8s.io/apiserver/pkg/server/healthz" cacheddiscovery "k8s.io/client-go/discovery/cached" "k8s.io/client-go/informers" + v1core "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/metadata" "k8s.io/client-go/metadata/metadatainformer" "k8s.io/client-go/restmapper" @@ -142,6 +143,11 @@ func Run(c *cloudcontrollerconfig.CompletedConfig, cloud cloudprovider.Interface // To help debugging, immediately log version klog.Infof("Version: %+v", version.Get()) + // Start events processing pipeline. + c.EventBroadcaster.StartStructuredLogging(0) + c.EventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: c.Client.CoreV1().Events("")}) + defer c.EventBroadcaster.Shutdown() + // setup /configz endpoint if cz, err := configz.New(ConfigzName); err == nil { cz.Set(c.ComponentConfig) @@ -182,8 +188,10 @@ func Run(c *cloudcontrollerconfig.CompletedConfig, cloud cloudprovider.Interface } if !c.ComponentConfig.Generic.LeaderElection.LeaderElect { - run(context.TODO(), controllerInitializers) - panic("unreachable") + ctx, _ := wait.ContextForChannel(stopCh) + run(ctx, controllerInitializers) + <-stopCh + return nil } // Identity used to distinguish between multiple cloud controller manager instances @@ -251,7 +259,8 @@ func Run(c *cloudcontrollerconfig.CompletedConfig, cloud cloudprovider.Interface }) } - select {} + <-stopCh + return nil } // startControllers starts the cloud specific controller loops. @@ -304,7 +313,8 @@ func startControllers(ctx context.Context, cloud cloudprovider.Interface, contro c.SharedInformers.Start(stopCh) controllerContext.InformerFactory.Start(controllerContext.Stop) - select {} + <-stopCh + return nil } // InitCloudFunc is used to initialize cloud diff --git a/staging/src/k8s.io/cloud-provider/options/options.go b/staging/src/k8s.io/cloud-provider/options/options.go index b6b7abd0770..7c29fa39574 100644 --- a/staging/src/k8s.io/cloud-provider/options/options.go +++ b/staging/src/k8s.io/cloud-provider/options/options.go @@ -30,7 +30,6 @@ import ( "k8s.io/client-go/informers" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" - v1core "k8s.io/client-go/kubernetes/typed/core/v1" restclient "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/tools/record" @@ -175,7 +174,8 @@ func (o *CloudControllerManagerOptions) ApplyTo(c *config.Config, userAgent stri return err } - c.EventRecorder = createRecorder(c.Client, userAgent) + c.EventBroadcaster = record.NewBroadcaster() + c.EventRecorder = c.EventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: userAgent}) rootClientBuilder := clientbuilder.SimpleControllerClientBuilder{ ClientConfig: c.Kubeconfig, @@ -241,10 +241,3 @@ func (o *CloudControllerManagerOptions) Config(allControllers, disabledByDefault return c, nil } - -func createRecorder(kubeClient clientset.Interface, userAgent string) record.EventRecorder { - eventBroadcaster := record.NewBroadcaster() - eventBroadcaster.StartStructuredLogging(0) - eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")}) - return eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: userAgent}) -} diff --git a/test/integration/scheduler/preemption/preemption_test.go b/test/integration/scheduler/preemption/preemption_test.go index 5fc65ded844..4c56b7ec44a 100644 --- a/test/integration/scheduler/preemption/preemption_test.go +++ b/test/integration/scheduler/preemption/preemption_test.go @@ -1444,7 +1444,10 @@ func initTestPreferNominatedNode(t *testing.T, nsPrefix string, opts ...schedule f := testCtx.Scheduler.NextPod testCtx.Scheduler.NextPod = func() (podInfo *framework.QueuedPodInfo) { podInfo = f() - podInfo.Pod.Status.NominatedNodeName = "node-1" + // Scheduler.Next() may return nil when scheduler is shutting down. + if podInfo != nil { + podInfo.Pod.Status.NominatedNodeName = "node-1" + } return podInfo } go testCtx.Scheduler.Run(testCtx.Ctx)