clean deprecated context.StopCh

This commit is contained in:
Mangirdas Judeikis 2024-06-24 12:08:03 +03:00
parent 29defc15aa
commit 24ecb20e41
7 changed files with 38 additions and 49 deletions

View File

@ -26,7 +26,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/apimachinery/pkg/util/wait"
apiserverfeatures "k8s.io/apiserver/pkg/features"
peerreconcilers "k8s.io/apiserver/pkg/reconcilers"
genericregistry "k8s.io/apiserver/pkg/registry/generic"
@ -139,7 +138,7 @@ func (c completedConfig) New(name string, delegationTarget genericapiserver.Dele
}
if len(c.SystemNamespaces) > 0 {
s.GenericAPIServer.AddPostStartHookOrDie("start-system-namespaces-controller", func(hookContext genericapiserver.PostStartHookContext) error {
go systemnamespaces.NewController(c.SystemNamespaces, client, s.VersionedInformers.Core().V1().Namespaces()).Run(hookContext.StopCh)
go systemnamespaces.NewController(c.SystemNamespaces, client, s.VersionedInformers.Core().V1().Namespaces()).Run(hookContext.Done())
return nil
})
}
@ -162,7 +161,7 @@ func (c completedConfig) New(name string, delegationTarget genericapiserver.Dele
}
s.GenericAPIServer.AddPostStartHookOrDie("peer-endpoint-reconciler-controller",
func(hookContext genericapiserver.PostStartHookContext) error {
peerEndpointCtrl.Start(hookContext.StopCh)
peerEndpointCtrl.Start(hookContext.Done())
return nil
})
s.GenericAPIServer.AddPreShutdownHookOrDie("peer-endpoint-reconciler-controller",
@ -172,7 +171,7 @@ func (c completedConfig) New(name string, delegationTarget genericapiserver.Dele
})
if c.Extra.PeerProxy != nil {
s.GenericAPIServer.AddPostStartHookOrDie("unknown-version-proxy-filter", func(context genericapiserver.PostStartHookContext) error {
err := c.Extra.PeerProxy.WaitForCacheSync(context.StopCh)
err := c.Extra.PeerProxy.WaitForCacheSync(context.Done())
return err
})
}
@ -180,43 +179,34 @@ func (c completedConfig) New(name string, delegationTarget genericapiserver.Dele
s.GenericAPIServer.AddPostStartHookOrDie("start-cluster-authentication-info-controller", func(hookContext genericapiserver.PostStartHookContext) error {
controller := clusterauthenticationtrust.NewClusterAuthenticationTrustController(s.ClusterAuthenticationInfo, client)
// generate a context from stopCh. This is to avoid modifying files which are relying on apiserver
// TODO: See if we can pass ctx to the current method
ctx := wait.ContextForChannel(hookContext.StopCh)
// prime values and start listeners
if s.ClusterAuthenticationInfo.ClientCA != nil {
s.ClusterAuthenticationInfo.ClientCA.AddListener(controller)
if controller, ok := s.ClusterAuthenticationInfo.ClientCA.(dynamiccertificates.ControllerRunner); ok {
// runonce to be sure that we have a value.
if err := controller.RunOnce(ctx); err != nil {
if err := controller.RunOnce(hookContext); err != nil {
runtime.HandleError(err)
}
go controller.Run(ctx, 1)
go controller.Run(hookContext, 1)
}
}
if s.ClusterAuthenticationInfo.RequestHeaderCA != nil {
s.ClusterAuthenticationInfo.RequestHeaderCA.AddListener(controller)
if controller, ok := s.ClusterAuthenticationInfo.RequestHeaderCA.(dynamiccertificates.ControllerRunner); ok {
// runonce to be sure that we have a value.
if err := controller.RunOnce(ctx); err != nil {
if err := controller.RunOnce(hookContext); err != nil {
runtime.HandleError(err)
}
go controller.Run(ctx, 1)
go controller.Run(hookContext, 1)
}
}
go controller.Run(ctx, 1)
go controller.Run(hookContext, 1)
return nil
})
if utilfeature.DefaultFeatureGate.Enabled(apiserverfeatures.APIServerIdentity) {
s.GenericAPIServer.AddPostStartHookOrDie("start-kube-apiserver-identity-lease-controller", func(hookContext genericapiserver.PostStartHookContext) error {
// generate a context from stopCh. This is to avoid modifying files which are relying on apiserver
// TODO: See if we can pass ctx to the current method
ctx := wait.ContextForChannel(hookContext.StopCh)
leaseName := s.GenericAPIServer.APIServerID
holderIdentity := s.GenericAPIServer.APIServerID + "_" + string(uuid.NewUUID())
@ -233,7 +223,7 @@ func (c completedConfig) New(name string, delegationTarget genericapiserver.Dele
metav1.NamespaceSystem,
// TODO: receive identity label value as a parameter when post start hook is moved to generic apiserver.
labelAPIServerHeartbeatFunc(name, peeraddress))
go controller.Run(ctx)
go controller.Run(hookContext)
return nil
})
// TODO: move this into generic apiserver and make the lease identity value configurable
@ -243,13 +233,13 @@ func (c completedConfig) New(name string, delegationTarget genericapiserver.Dele
IdentityLeaseGCPeriod,
metav1.NamespaceSystem,
IdentityLeaseComponentLabelKey+"="+name,
).Run(hookContext.StopCh)
).Run(hookContext.Done())
return nil
})
}
s.GenericAPIServer.AddPostStartHookOrDie("start-legacy-token-tracking-controller", func(hookContext genericapiserver.PostStartHookContext) error {
go legacytokentracking.NewController(client).Run(hookContext.StopCh)
go legacytokentracking.NewController(client).Run(hookContext.Done())
return nil
})

View File

@ -348,7 +348,7 @@ func (c CompletedConfig) New(delegationTarget genericapiserver.DelegationTarget)
KubernetesServiceNodePort: c.Extra.KubernetesServiceNodePort,
}, client, c.ControlPlane.Extra.VersionedInformers.Core().V1().Services())
s.ControlPlane.GenericAPIServer.AddPostStartHookOrDie("bootstrap-controller", func(hookContext genericapiserver.PostStartHookContext) error {
kubernetesServiceCtrl.Start(hookContext.StopCh)
kubernetesServiceCtrl.Start(hookContext.Done())
return nil
})
s.ControlPlane.GenericAPIServer.AddPreShutdownHookOrDie("stop-kubernetes-service-controller", func() error {

View File

@ -449,7 +449,7 @@ func (p *legacyProvider) PostStartHook() (string, genericapiserver.PostStartHook
runner.Start()
go func() {
defer runner.Stop()
<-context.StopCh
<-context.Done()
}()
// For backward compatibility, we ensure that if we never are able

View File

@ -145,7 +145,7 @@ func (bce *bootstrapConfigurationEnsurer) ensureAPFBootstrapConfiguration(hookCo
err = func() error {
// get a derived context that gets cancelled after 5m or
// when the StopCh gets closed, whichever happens first.
ctx, cancel := contextFromChannelAndMaxWaitDuration(hookContext.StopCh, 5*time.Minute)
ctx, cancel := contextFromChannelAndMaxWaitDuration(hookContext.Done(), 5*time.Minute)
defer cancel()
if !cache.WaitForCacheSync(ctx.Done(), bce.informersSynced...) {
@ -174,16 +174,15 @@ func (bce *bootstrapConfigurationEnsurer) ensureAPFBootstrapConfiguration(hookCo
// we have successfully initialized the bootstrap configuration, now we
// spin up a goroutine which reconciles the bootstrap configuration periodically.
go func() {
ctx := wait.ContextForChannel(hookContext.StopCh)
wait.PollImmediateUntil(
time.Minute,
func() (bool, error) {
if err := ensure(ctx, clientset, bce.fsLister, bce.plcLister); err != nil {
if err := ensure(hookContext, clientset, bce.fsLister, bce.plcLister); err != nil {
klog.ErrorS(err, "APF bootstrap ensurer ran into error, will retry later")
}
// always auto update both suggested and mandatory configuration
return false, nil
}, hookContext.StopCh)
}, hookContext.Done())
klog.Info("APF bootstrap ensurer is exiting")
}()

View File

@ -220,7 +220,7 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget)
)
s.GenericAPIServer.AddPostStartHookOrDie("start-apiextensions-informers", func(context genericapiserver.PostStartHookContext) error {
s.Informers.Start(context.StopCh)
s.Informers.Start(context.Done())
return nil
})
s.GenericAPIServer.AddPostStartHookOrDie("start-apiextensions-controllers", func(context genericapiserver.PostStartHookContext) error {
@ -231,20 +231,20 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget)
if s.GenericAPIServer.StaticOpenAPISpec != nil {
if s.GenericAPIServer.OpenAPIVersionedService != nil {
openapiController := openapicontroller.NewController(s.Informers.Apiextensions().V1().CustomResourceDefinitions())
go openapiController.Run(s.GenericAPIServer.StaticOpenAPISpec, s.GenericAPIServer.OpenAPIVersionedService, context.StopCh)
go openapiController.Run(s.GenericAPIServer.StaticOpenAPISpec, s.GenericAPIServer.OpenAPIVersionedService, context.Done())
}
if s.GenericAPIServer.OpenAPIV3VersionedService != nil {
openapiv3Controller := openapiv3controller.NewController(s.Informers.Apiextensions().V1().CustomResourceDefinitions())
go openapiv3Controller.Run(s.GenericAPIServer.OpenAPIV3VersionedService, context.StopCh)
go openapiv3Controller.Run(s.GenericAPIServer.OpenAPIV3VersionedService, context.Done())
}
}
go namingController.Run(context.StopCh)
go establishingController.Run(context.StopCh)
go nonStructuralSchemaController.Run(5, context.StopCh)
go apiApprovalController.Run(5, context.StopCh)
go finalizingController.Run(5, context.StopCh)
go namingController.Run(context.Done())
go establishingController.Run(context.Done())
go nonStructuralSchemaController.Run(5, context.Done())
go apiApprovalController.Run(5, context.Done())
go finalizingController.Run(5, context.Done())
discoverySyncedCh := make(chan struct{})
go discoveryController.Run(context.StopCh, discoverySyncedCh)
@ -265,7 +265,7 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget)
return true, nil
}
return false, nil
}, context.StopCh)
}, context.Done())
})
return s, nil

View File

@ -304,7 +304,7 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg
ctx, cancel := context.WithCancel(context.Background())
go func() {
select {
case <-postStartHookContext.StopCh:
case <-postStartHookContext.Done():
cancel() // stopCh closed, so cancel our context
case <-ctx.Done():
}
@ -344,9 +344,9 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg
}
s.GenericAPIServer.AddPostStartHookOrDie("apiservice-registration-controller", func(context genericapiserver.PostStartHookContext) error {
go apiserviceRegistrationController.Run(context.StopCh, apiServiceRegistrationControllerInitiated)
go apiserviceRegistrationController.Run(context.Done(), apiServiceRegistrationControllerInitiated)
select {
case <-context.StopCh:
case <-context.Done():
case <-apiServiceRegistrationControllerInitiated:
}
@ -365,7 +365,7 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg
// Discovery aggregation depends on the apiservice registration controller
// having the full list of APIServices already synced
select {
case <-context.StopCh:
case <-context.Done():
return nil
// Context cancelled, should abort/clean goroutines
case <-apiServiceRegistrationControllerInitiated:
@ -376,10 +376,10 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg
// When discovery is ready, all APIServices will be present, with APIServices
// that have not successfully synced discovery to be present but marked as Stale.
discoverySyncedCh := make(chan struct{})
go s.discoveryAggregationController.Run(context.StopCh, discoverySyncedCh)
go s.discoveryAggregationController.Run(context.Done(), discoverySyncedCh)
select {
case <-context.StopCh:
case <-context.Done():
return nil
// Context cancelled, should abort/clean goroutines
case <-discoverySyncedCh:
@ -411,7 +411,7 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg
return false, err
}
return true, nil
}, hookContext.StopCh); err != nil {
}, hookContext.Done()); err != nil {
return fmt.Errorf("failed to wait for apiserver-identity lease %s to be created: %v",
s.GenericAPIServer.APIServerID, err)
}
@ -427,14 +427,14 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg
// to register all built-in resources when the generic apiservers install APIs.
s.GenericAPIServer.StorageVersionManager.UpdateStorageVersions(hookContext.LoopbackClientConfig, s.GenericAPIServer.APIServerID)
return false, nil
}, hookContext.StopCh)
}, hookContext.Done())
// Once the storage version updater finishes the first round of update,
// the PostStartHook will return to unblock /healthz. The handler chain
// won't block write requests anymore. Check every second since it's not
// expensive.
wait.PollImmediateUntil(1*time.Second, func() (bool, error) {
return s.GenericAPIServer.StorageVersionManager.Completed(), nil
}, hookContext.StopCh)
}, hookContext.Done())
return nil
})
}
@ -448,14 +448,14 @@ func (s *APIAggregator) PrepareRun() (preparedAPIAggregator, error) {
// add post start hook before generic PrepareRun in order to be before /healthz installation
if s.openAPIConfig != nil {
s.GenericAPIServer.AddPostStartHookOrDie("apiservice-openapi-controller", func(context genericapiserver.PostStartHookContext) error {
go s.openAPIAggregationController.Run(context.StopCh)
go s.openAPIAggregationController.Run(context.Done())
return nil
})
}
if s.openAPIV3Config != nil {
s.GenericAPIServer.AddPostStartHookOrDie("apiservice-openapiv3-controller", func(context genericapiserver.PostStartHookContext) error {
go s.openAPIV3AggregationController.Run(context.StopCh)
go s.openAPIV3AggregationController.Run(context.Done())
return nil
})
}

View File

@ -204,8 +204,8 @@ func (o WardleServerOptions) RunWardleServer(ctx context.Context) error {
}
server.GenericAPIServer.AddPostStartHookOrDie("start-sample-server-informers", func(context genericapiserver.PostStartHookContext) error {
config.GenericConfig.SharedInformerFactory.Start(context.StopCh)
o.SharedInformerFactory.Start(context.StopCh)
config.GenericConfig.SharedInformerFactory.Start(context.Done())
o.SharedInformerFactory.Start(context.Done())
return nil
})