Clean shutdown of kcm, ccm and scheduler

This commit is contained in:
Wojciech Tyczyński 2022-05-25 10:00:06 +02:00
parent 55130ae2ab
commit fe3616cafb
10 changed files with 72 additions and 38 deletions

View File

@ -43,8 +43,8 @@ type Config struct {
// the rest config for the master
Kubeconfig *restclient.Config
// the event sink
EventRecorder record.EventRecorder
EventBroadcaster record.EventBroadcaster
EventRecorder record.EventRecorder
}
type completedConfig struct {

View File

@ -41,6 +41,7 @@ import (
utilfeature "k8s.io/apiserver/pkg/util/feature"
cacheddiscovery "k8s.io/client-go/discovery/cached"
"k8s.io/client-go/informers"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/metadata"
"k8s.io/client-go/metadata/metadatainformer"
restclient "k8s.io/client-go/rest"
@ -174,13 +175,18 @@ func ResyncPeriod(c *config.CompletedConfig) func() time.Duration {
}
}
// Run runs the KubeControllerManagerOptions. This should never exit.
// Run runs the KubeControllerManagerOptions.
func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error {
// To help debugging, immediately log version
klog.Infof("Version: %+v", version.Get())
klog.InfoS("Golang settings", "GOGC", os.Getenv("GOGC"), "GOMAXPROCS", os.Getenv("GOMAXPROCS"), "GOTRACEBACK", os.Getenv("GOTRACEBACK"))
// Start events processing pipeline.
c.EventBroadcaster.StartStructuredLogging(0)
c.EventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: c.Client.CoreV1().Events("")})
defer c.EventBroadcaster.Shutdown()
if cfgz, err := configz.New(ConfigzName); err == nil {
cfgz.Set(c.ComponentConfig)
} else {
@ -213,7 +219,6 @@ func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error {
saTokenControllerInitFunc := serviceAccountTokenControllerStarter{rootClientBuilder: rootClientBuilder}.startServiceAccountTokenController
run := func(ctx context.Context, startSATokenController InitFunc, initializersFunc ControllerInitializersFunc) {
controllerContext, err := CreateControllerContext(c, rootClientBuilder, clientBuilder, ctx.Done())
if err != nil {
klog.Fatalf("error building controller context: %v", err)
@ -227,13 +232,14 @@ func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error {
controllerContext.ObjectOrMetadataInformerFactory.Start(stopCh)
close(controllerContext.InformersStarted)
select {}
<-ctx.Done()
}
// No leader election, run directly
if !c.ComponentConfig.Generic.LeaderElection.LeaderElect {
run(context.TODO(), saTokenControllerInitFunc, NewControllerInitializers)
panic("unreachable")
ctx, _ := wait.ContextForChannel(stopCh)
run(ctx, saTokenControllerInitFunc, NewControllerInitializers)
return nil
}
id, err := os.Hostname()
@ -311,7 +317,8 @@ func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error {
})
}
select {}
<-stopCh
return nil
}
// ControllerContext defines the context object for controller

View File

@ -28,7 +28,6 @@ import (
utilfeature "k8s.io/apiserver/pkg/util/feature"
clientset "k8s.io/client-go/kubernetes"
clientgokubescheme "k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/record"
@ -430,12 +429,14 @@ func (s KubeControllerManagerOptions) Config(allControllers []string, disabledBy
return nil, err
}
eventRecorder := createRecorder(client, KubeControllerManagerUserAgent)
eventBroadcaster := record.NewBroadcaster()
eventRecorder := eventBroadcaster.NewRecorder(clientgokubescheme.Scheme, v1.EventSource{Component: KubeControllerManagerUserAgent})
c := &kubecontrollerconfig.Config{
Client: client,
Kubeconfig: kubeconfig,
EventRecorder: eventRecorder,
Client: client,
Kubeconfig: kubeconfig,
EventBroadcaster: eventBroadcaster,
EventRecorder: eventRecorder,
}
if err := s.ApplyTo(c); err != nil {
return nil, err
@ -444,10 +445,3 @@ func (s KubeControllerManagerOptions) Config(allControllers []string, disabledBy
return c, nil
}
func createRecorder(kubeClient clientset.Interface, userAgent string) record.EventRecorder {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartStructuredLogging(0)
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
return eventBroadcaster.NewRecorder(clientgokubescheme.Scheme, v1.EventSource{Component: userAgent})
}

View File

@ -155,8 +155,9 @@ func Run(ctx context.Context, cc *schedulerserverconfig.CompletedConfig, sched *
return fmt.Errorf("unable to register configz: %s", err)
}
// Prepare the event broadcaster.
// Start events processing pipeline.
cc.EventBroadcaster.StartRecordingToSink(ctx.Done())
defer cc.EventBroadcaster.Shutdown()
// Setup healthz checks.
var checks []healthz.HealthChecker

View File

@ -32,6 +32,8 @@ import (
"k8s.io/kubernetes/cmd/kube-scheduler/app"
kubeschedulerconfig "k8s.io/kubernetes/cmd/kube-scheduler/app/config"
"k8s.io/kubernetes/cmd/kube-scheduler/app/options"
"k8s.io/klog/v2"
)
// TearDownFunc is to be called to tear down a test server.
@ -61,8 +63,19 @@ type Logger interface {
// enough time to remove temporary files.
func StartTestServer(t Logger, customFlags []string) (result TestServer, err error) {
ctx, cancel := context.WithCancel(context.Background())
var errCh chan error
tearDown := func() {
cancel()
// If the scheduler was started, let's wait for it to
// shutdown clearly.
if errCh != nil {
err, ok := <-errCh
if ok && err != nil {
klog.ErrorS(err, "Failed to shutdown test server clearly")
}
}
if len(result.TmpDir) != 0 {
os.RemoveAll(result.TmpDir)
}
@ -103,8 +116,9 @@ func StartTestServer(t Logger, customFlags []string) (result TestServer, err err
return result, fmt.Errorf("failed to create config from options: %v", err)
}
errCh := make(chan error)
errCh = make(chan error)
go func(ctx context.Context) {
defer close(errCh)
if err := app.Run(ctx, cc, sched); err != nil {
errCh <- err
}

View File

@ -335,7 +335,16 @@ func New(client clientset.Interface,
// Run begins watching and scheduling. It starts scheduling and blocked until the context is done.
func (sched *Scheduler) Run(ctx context.Context) {
sched.SchedulingQueue.Run()
wait.UntilWithContext(ctx, sched.scheduleOne, 0)
// We need to start scheduleOne loop in a dedicated goroutine,
// because scheduleOne function hangs on getting the next item
// from the SchedulingQueue.
// If there are no new pods to schedule, it will be hanging there
// and if done in this goroutine it will be blocking closing
// SchedulingQueue, in effect causing a deadlock on shutdown.
go wait.UntilWithContext(ctx, sched.scheduleOne, 0)
<-ctx.Done()
sched.SchedulingQueue.Close()
}

View File

@ -45,7 +45,10 @@ type Config struct {
// the rest config for the master
Kubeconfig *restclient.Config
// the event sink
// EventBroadcaster is broadcaster events to all sinks.
EventBroadcaster record.EventBroadcaster
// EventRecord is a sink for events.
EventRecorder record.EventRecorder
// ClientBuilder will provide a client for this controller to use

View File

@ -34,6 +34,7 @@ import (
"k8s.io/apiserver/pkg/server/healthz"
cacheddiscovery "k8s.io/client-go/discovery/cached"
"k8s.io/client-go/informers"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/metadata"
"k8s.io/client-go/metadata/metadatainformer"
"k8s.io/client-go/restmapper"
@ -142,6 +143,11 @@ func Run(c *cloudcontrollerconfig.CompletedConfig, cloud cloudprovider.Interface
// To help debugging, immediately log version
klog.Infof("Version: %+v", version.Get())
// Start events processing pipeline.
c.EventBroadcaster.StartStructuredLogging(0)
c.EventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: c.Client.CoreV1().Events("")})
defer c.EventBroadcaster.Shutdown()
// setup /configz endpoint
if cz, err := configz.New(ConfigzName); err == nil {
cz.Set(c.ComponentConfig)
@ -182,8 +188,10 @@ func Run(c *cloudcontrollerconfig.CompletedConfig, cloud cloudprovider.Interface
}
if !c.ComponentConfig.Generic.LeaderElection.LeaderElect {
run(context.TODO(), controllerInitializers)
panic("unreachable")
ctx, _ := wait.ContextForChannel(stopCh)
run(ctx, controllerInitializers)
<-stopCh
return nil
}
// Identity used to distinguish between multiple cloud controller manager instances
@ -251,7 +259,8 @@ func Run(c *cloudcontrollerconfig.CompletedConfig, cloud cloudprovider.Interface
})
}
select {}
<-stopCh
return nil
}
// startControllers starts the cloud specific controller loops.
@ -304,7 +313,8 @@ func startControllers(ctx context.Context, cloud cloudprovider.Interface, contro
c.SharedInformers.Start(stopCh)
controllerContext.InformerFactory.Start(controllerContext.Stop)
select {}
<-stopCh
return nil
}
// InitCloudFunc is used to initialize cloud

View File

@ -30,7 +30,6 @@ import (
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/record"
@ -175,7 +174,8 @@ func (o *CloudControllerManagerOptions) ApplyTo(c *config.Config, userAgent stri
return err
}
c.EventRecorder = createRecorder(c.Client, userAgent)
c.EventBroadcaster = record.NewBroadcaster()
c.EventRecorder = c.EventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: userAgent})
rootClientBuilder := clientbuilder.SimpleControllerClientBuilder{
ClientConfig: c.Kubeconfig,
@ -241,10 +241,3 @@ func (o *CloudControllerManagerOptions) Config(allControllers, disabledByDefault
return c, nil
}
func createRecorder(kubeClient clientset.Interface, userAgent string) record.EventRecorder {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartStructuredLogging(0)
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
return eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: userAgent})
}

View File

@ -1444,7 +1444,10 @@ func initTestPreferNominatedNode(t *testing.T, nsPrefix string, opts ...schedule
f := testCtx.Scheduler.NextPod
testCtx.Scheduler.NextPod = func() (podInfo *framework.QueuedPodInfo) {
podInfo = f()
podInfo.Pod.Status.NominatedNodeName = "node-1"
// Scheduler.Next() may return nil when scheduler is shutting down.
if podInfo != nil {
podInfo.Pod.Status.NominatedNodeName = "node-1"
}
return podInfo
}
go testCtx.Scheduler.Run(testCtx.Ctx)