diff --git a/pkg/controlplane/apiserver/server.go b/pkg/controlplane/apiserver/server.go index 1656e452c12..534d9fe85cc 100644 --- a/pkg/controlplane/apiserver/server.go +++ b/pkg/controlplane/apiserver/server.go @@ -26,7 +26,6 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/uuid" - "k8s.io/apimachinery/pkg/util/wait" apiserverfeatures "k8s.io/apiserver/pkg/features" peerreconcilers "k8s.io/apiserver/pkg/reconcilers" genericregistry "k8s.io/apiserver/pkg/registry/generic" @@ -136,7 +135,7 @@ func (c completedConfig) New(name string, delegationTarget genericapiserver.Dele } if len(c.SystemNamespaces) > 0 { s.GenericAPIServer.AddPostStartHookOrDie("start-system-namespaces-controller", func(hookContext genericapiserver.PostStartHookContext) error { - go systemnamespaces.NewController(c.SystemNamespaces, client, s.VersionedInformers.Core().V1().Namespaces()).Run(hookContext.StopCh) + go systemnamespaces.NewController(c.SystemNamespaces, client, s.VersionedInformers.Core().V1().Namespaces()).Run(hookContext.Done()) return nil }) } @@ -156,7 +155,7 @@ func (c completedConfig) New(name string, delegationTarget genericapiserver.Dele client) s.GenericAPIServer.AddPostStartHookOrDie("peer-endpoint-reconciler-controller", func(hookContext genericapiserver.PostStartHookContext) error { - peerEndpointCtrl.Start(hookContext.StopCh) + peerEndpointCtrl.Start(hookContext.Done()) return nil }) s.GenericAPIServer.AddPreShutdownHookOrDie("peer-endpoint-reconciler-controller", @@ -166,7 +165,7 @@ func (c completedConfig) New(name string, delegationTarget genericapiserver.Dele }) if c.Extra.PeerProxy != nil { s.GenericAPIServer.AddPostStartHookOrDie("unknown-version-proxy-filter", func(context genericapiserver.PostStartHookContext) error { - err := c.Extra.PeerProxy.WaitForCacheSync(context.StopCh) + err := c.Extra.PeerProxy.WaitForCacheSync(context.Done()) return err }) } @@ -174,43 +173,34 @@ func (c completedConfig) New(name string, delegationTarget genericapiserver.Dele s.GenericAPIServer.AddPostStartHookOrDie("start-cluster-authentication-info-controller", func(hookContext genericapiserver.PostStartHookContext) error { controller := clusterauthenticationtrust.NewClusterAuthenticationTrustController(s.ClusterAuthenticationInfo, client) - - // generate a context from stopCh. This is to avoid modifying files which are relying on apiserver - // TODO: See if we can pass ctx to the current method - ctx := wait.ContextForChannel(hookContext.StopCh) - // prime values and start listeners if s.ClusterAuthenticationInfo.ClientCA != nil { s.ClusterAuthenticationInfo.ClientCA.AddListener(controller) if controller, ok := s.ClusterAuthenticationInfo.ClientCA.(dynamiccertificates.ControllerRunner); ok { // runonce to be sure that we have a value. - if err := controller.RunOnce(ctx); err != nil { + if err := controller.RunOnce(hookContext); err != nil { runtime.HandleError(err) } - go controller.Run(ctx, 1) + go controller.Run(hookContext, 1) } } if s.ClusterAuthenticationInfo.RequestHeaderCA != nil { s.ClusterAuthenticationInfo.RequestHeaderCA.AddListener(controller) if controller, ok := s.ClusterAuthenticationInfo.RequestHeaderCA.(dynamiccertificates.ControllerRunner); ok { // runonce to be sure that we have a value. - if err := controller.RunOnce(ctx); err != nil { + if err := controller.RunOnce(hookContext); err != nil { runtime.HandleError(err) } - go controller.Run(ctx, 1) + go controller.Run(hookContext, 1) } } - go controller.Run(ctx, 1) + go controller.Run(hookContext, 1) return nil }) if utilfeature.DefaultFeatureGate.Enabled(apiserverfeatures.APIServerIdentity) { s.GenericAPIServer.AddPostStartHookOrDie("start-kube-apiserver-identity-lease-controller", func(hookContext genericapiserver.PostStartHookContext) error { - // generate a context from stopCh. This is to avoid modifying files which are relying on apiserver - // TODO: See if we can pass ctx to the current method - ctx := wait.ContextForChannel(hookContext.StopCh) - leaseName := s.GenericAPIServer.APIServerID holderIdentity := s.GenericAPIServer.APIServerID + "_" + string(uuid.NewUUID()) @@ -227,7 +217,7 @@ func (c completedConfig) New(name string, delegationTarget genericapiserver.Dele metav1.NamespaceSystem, // TODO: receive identity label value as a parameter when post start hook is moved to generic apiserver. labelAPIServerHeartbeatFunc(name, peeraddress)) - go controller.Run(ctx) + go controller.Run(hookContext) return nil }) // TODO: move this into generic apiserver and make the lease identity value configurable @@ -237,7 +227,7 @@ func (c completedConfig) New(name string, delegationTarget genericapiserver.Dele IdentityLeaseGCPeriod, metav1.NamespaceSystem, IdentityLeaseComponentLabelKey+"="+name, - ).Run(hookContext.StopCh) + ).Run(hookContext.Done()) return nil }) } @@ -247,7 +237,7 @@ func (c completedConfig) New(name string, delegationTarget genericapiserver.Dele } s.GenericAPIServer.AddPostStartHookOrDie("start-legacy-token-tracking-controller", func(hookContext genericapiserver.PostStartHookContext) error { - go legacytokentracking.NewController(client).Run(hookContext.StopCh) + go legacytokentracking.NewController(client).Run(hookContext.Done()) return nil }) diff --git a/pkg/controlplane/instance.go b/pkg/controlplane/instance.go index e52b901a2c1..8f4d9ff3a2e 100644 --- a/pkg/controlplane/instance.go +++ b/pkg/controlplane/instance.go @@ -349,7 +349,7 @@ func (c CompletedConfig) New(delegationTarget genericapiserver.DelegationTarget) KubernetesServiceNodePort: c.Extra.KubernetesServiceNodePort, }, client, c.ControlPlane.Extra.VersionedInformers.Core().V1().Services()) s.ControlPlane.GenericAPIServer.AddPostStartHookOrDie("bootstrap-controller", func(hookContext genericapiserver.PostStartHookContext) error { - kubernetesServiceCtrl.Start(hookContext.StopCh) + kubernetesServiceCtrl.Start(hookContext.Done()) return nil }) s.ControlPlane.GenericAPIServer.AddPreShutdownHookOrDie("stop-kubernetes-service-controller", func() error { diff --git a/pkg/registry/core/rest/storage_core.go b/pkg/registry/core/rest/storage_core.go index d7ff0bdeed4..e5ced412aad 100644 --- a/pkg/registry/core/rest/storage_core.go +++ b/pkg/registry/core/rest/storage_core.go @@ -514,7 +514,7 @@ func (p *legacyProvider) PostStartHook() (string, genericapiserver.PostStartHook runner.Start() go func() { defer runner.Stop() - <-context.StopCh + <-context.Done() }() // For backward compatibility, we ensure that if we never are able diff --git a/pkg/registry/flowcontrol/rest/storage_flowcontrol.go b/pkg/registry/flowcontrol/rest/storage_flowcontrol.go index 689746ef673..64413b2305c 100644 --- a/pkg/registry/flowcontrol/rest/storage_flowcontrol.go +++ b/pkg/registry/flowcontrol/rest/storage_flowcontrol.go @@ -145,7 +145,7 @@ func (bce *bootstrapConfigurationEnsurer) ensureAPFBootstrapConfiguration(hookCo err = func() error { // get a derived context that gets cancelled after 5m or // when the StopCh gets closed, whichever happens first. - ctx, cancel := contextFromChannelAndMaxWaitDuration(hookContext.StopCh, 5*time.Minute) + ctx, cancel := contextFromChannelAndMaxWaitDuration(hookContext.Done(), 5*time.Minute) defer cancel() if !cache.WaitForCacheSync(ctx.Done(), bce.informersSynced...) { @@ -174,16 +174,15 @@ func (bce *bootstrapConfigurationEnsurer) ensureAPFBootstrapConfiguration(hookCo // we have successfully initialized the bootstrap configuration, now we // spin up a goroutine which reconciles the bootstrap configuration periodically. go func() { - ctx := wait.ContextForChannel(hookContext.StopCh) wait.PollImmediateUntil( time.Minute, func() (bool, error) { - if err := ensure(ctx, clientset, bce.fsLister, bce.plcLister); err != nil { + if err := ensure(hookContext, clientset, bce.fsLister, bce.plcLister); err != nil { klog.ErrorS(err, "APF bootstrap ensurer ran into error, will retry later") } // always auto update both suggested and mandatory configuration return false, nil - }, hookContext.StopCh) + }, hookContext.Done()) klog.Info("APF bootstrap ensurer is exiting") }() diff --git a/staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/apiserver.go b/staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/apiserver.go index a6d6213a08b..acf69b66aab 100644 --- a/staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/apiserver.go +++ b/staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/apiserver.go @@ -220,7 +220,7 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) ) s.GenericAPIServer.AddPostStartHookOrDie("start-apiextensions-informers", func(context genericapiserver.PostStartHookContext) error { - s.Informers.Start(context.StopCh) + s.Informers.Start(context.Done()) return nil }) s.GenericAPIServer.AddPostStartHookOrDie("start-apiextensions-controllers", func(context genericapiserver.PostStartHookContext) error { @@ -231,20 +231,20 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) if s.GenericAPIServer.StaticOpenAPISpec != nil { if s.GenericAPIServer.OpenAPIVersionedService != nil { openapiController := openapicontroller.NewController(s.Informers.Apiextensions().V1().CustomResourceDefinitions()) - go openapiController.Run(s.GenericAPIServer.StaticOpenAPISpec, s.GenericAPIServer.OpenAPIVersionedService, context.StopCh) + go openapiController.Run(s.GenericAPIServer.StaticOpenAPISpec, s.GenericAPIServer.OpenAPIVersionedService, context.Done()) } if s.GenericAPIServer.OpenAPIV3VersionedService != nil { openapiv3Controller := openapiv3controller.NewController(s.Informers.Apiextensions().V1().CustomResourceDefinitions()) - go openapiv3Controller.Run(s.GenericAPIServer.OpenAPIV3VersionedService, context.StopCh) + go openapiv3Controller.Run(s.GenericAPIServer.OpenAPIV3VersionedService, context.Done()) } } - go namingController.Run(context.StopCh) - go establishingController.Run(context.StopCh) - go nonStructuralSchemaController.Run(5, context.StopCh) - go apiApprovalController.Run(5, context.StopCh) - go finalizingController.Run(5, context.StopCh) + go namingController.Run(context.Done()) + go establishingController.Run(context.Done()) + go nonStructuralSchemaController.Run(5, context.Done()) + go apiApprovalController.Run(5, context.Done()) + go finalizingController.Run(5, context.Done()) discoverySyncedCh := make(chan struct{}) go discoveryController.Run(context.StopCh, discoverySyncedCh) @@ -265,7 +265,7 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) return true, nil } return false, nil - }, context.StopCh) + }, context.Done()) }) return s, nil diff --git a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go index ac91fcf6ea0..d425ae7add0 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go @@ -297,25 +297,14 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg } // We are passing the context to ProxyCerts.RunOnce as it needs to implement RunOnce(ctx) however the // context is not used at all. So passing a empty context shouldn't be a problem - ctx := context.TODO() - if err := aggregatorProxyCerts.RunOnce(ctx); err != nil { + if err := aggregatorProxyCerts.RunOnce(context.Background()); err != nil { return nil, err } aggregatorProxyCerts.AddListener(apiserviceRegistrationController) s.proxyCurrentCertKeyContent = aggregatorProxyCerts.CurrentCertKeyContent s.GenericAPIServer.AddPostStartHookOrDie("aggregator-reload-proxy-client-cert", func(postStartHookContext genericapiserver.PostStartHookContext) error { - // generate a context from stopCh. This is to avoid modifying files which are relying on apiserver - // TODO: See if we can pass ctx to the current method - ctx, cancel := context.WithCancel(context.Background()) - go func() { - select { - case <-postStartHookContext.StopCh: - cancel() // stopCh closed, so cancel our context - case <-ctx.Done(): - } - }() - go aggregatorProxyCerts.Run(ctx, 1) + go aggregatorProxyCerts.Run(postStartHookContext, 1) return nil }) } @@ -373,9 +362,9 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg } s.GenericAPIServer.AddPostStartHookOrDie("apiservice-registration-controller", func(context genericapiserver.PostStartHookContext) error { - go apiserviceRegistrationController.Run(context.StopCh, apiServiceRegistrationControllerInitiated) + go apiserviceRegistrationController.Run(context.Done(), apiServiceRegistrationControllerInitiated) select { - case <-context.StopCh: + case <-context.Done(): case <-apiServiceRegistrationControllerInitiated: } @@ -394,7 +383,7 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg // Discovery aggregation depends on the apiservice registration controller // having the full list of APIServices already synced select { - case <-context.StopCh: + case <-context.Done(): return nil // Context cancelled, should abort/clean goroutines case <-apiServiceRegistrationControllerInitiated: @@ -405,10 +394,10 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg // When discovery is ready, all APIServices will be present, with APIServices // that have not successfully synced discovery to be present but marked as Stale. discoverySyncedCh := make(chan struct{}) - go s.discoveryAggregationController.Run(context.StopCh, discoverySyncedCh) + go s.discoveryAggregationController.Run(context.Done(), discoverySyncedCh) select { - case <-context.StopCh: + case <-context.Done(): return nil // Context cancelled, should abort/clean goroutines case <-discoverySyncedCh: @@ -440,7 +429,7 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg return false, err } return true, nil - }, hookContext.StopCh); err != nil { + }, hookContext.Done()); err != nil { return fmt.Errorf("failed to wait for apiserver-identity lease %s to be created: %v", s.GenericAPIServer.APIServerID, err) } @@ -456,14 +445,14 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg // to register all built-in resources when the generic apiservers install APIs. s.GenericAPIServer.StorageVersionManager.UpdateStorageVersions(hookContext.LoopbackClientConfig, s.GenericAPIServer.APIServerID) return false, nil - }, hookContext.StopCh) + }, hookContext.Done()) // Once the storage version updater finishes the first round of update, // the PostStartHook will return to unblock /healthz. The handler chain // won't block write requests anymore. Check every second since it's not // expensive. wait.PollImmediateUntil(1*time.Second, func() (bool, error) { return s.GenericAPIServer.StorageVersionManager.Completed(), nil - }, hookContext.StopCh) + }, hookContext.Done()) return nil }) } @@ -477,14 +466,14 @@ func (s *APIAggregator) PrepareRun() (preparedAPIAggregator, error) { // add post start hook before generic PrepareRun in order to be before /healthz installation if s.openAPIConfig != nil { s.GenericAPIServer.AddPostStartHookOrDie("apiservice-openapi-controller", func(context genericapiserver.PostStartHookContext) error { - go s.openAPIAggregationController.Run(context.StopCh) + go s.openAPIAggregationController.Run(context.Done()) return nil }) } if s.openAPIV3Config != nil { s.GenericAPIServer.AddPostStartHookOrDie("apiservice-openapiv3-controller", func(context genericapiserver.PostStartHookContext) error { - go s.openAPIV3AggregationController.Run(context.StopCh) + go s.openAPIV3AggregationController.Run(context.Done()) return nil }) } diff --git a/staging/src/k8s.io/sample-apiserver/pkg/cmd/server/start.go b/staging/src/k8s.io/sample-apiserver/pkg/cmd/server/start.go index 5e30e2bb319..be233af68d6 100644 --- a/staging/src/k8s.io/sample-apiserver/pkg/cmd/server/start.go +++ b/staging/src/k8s.io/sample-apiserver/pkg/cmd/server/start.go @@ -204,8 +204,8 @@ func (o WardleServerOptions) RunWardleServer(ctx context.Context) error { } server.GenericAPIServer.AddPostStartHookOrDie("start-sample-server-informers", func(context genericapiserver.PostStartHookContext) error { - config.GenericConfig.SharedInformerFactory.Start(context.StopCh) - o.SharedInformerFactory.Start(context.StopCh) + config.GenericConfig.SharedInformerFactory.Start(context.Done()) + o.SharedInformerFactory.Start(context.Done()) return nil })