diff --git a/staging/src/k8s.io/apiserver/pkg/admission/plugin/validatingadmissionpolicy/admission_test.go b/staging/src/k8s.io/apiserver/pkg/admission/plugin/validatingadmissionpolicy/admission_test.go index 684a4659b7e..2ba862136cb 100644 --- a/staging/src/k8s.io/apiserver/pkg/admission/plugin/validatingadmissionpolicy/admission_test.go +++ b/staging/src/k8s.io/apiserver/pkg/admission/plugin/validatingadmissionpolicy/admission_test.go @@ -276,7 +276,7 @@ func setupTestCommon(t *testing.T, compiler ValidatorCompiler, shouldStartInform // Override compiler used by controller for tests controller = handler.evaluator.(*celAdmissionController) - controller.validatorCompiler = compiler + controller.policyController.ValidatorCompiler = compiler t.Cleanup(func() { testContextCancel() @@ -369,8 +369,8 @@ func (c *celAdmissionController) getCurrentObject(obj runtime.Object) (runtime.O return nil, err } - c.mutex.RLock() - defer c.mutex.RUnlock() + c.policyController.mutex.RLock() + defer c.policyController.mutex.RUnlock() switch obj.(type) { case *unstructured.Unstructured: @@ -380,7 +380,7 @@ func (c *celAdmissionController) getCurrentObject(obj runtime.Object) (runtime.O Kind: paramSourceGVK.Kind, } var paramInformer generic.Informer[*unstructured.Unstructured] - if paramInfo, ok := c.paramsCRDControllers[paramKind]; ok { + if paramInfo, ok := c.policyController.paramsCRDControllers[paramKind]; ok { paramInformer = paramInfo.controller.Informer() } else { // Treat unknown CRD the same as not found @@ -399,7 +399,7 @@ func (c *celAdmissionController) getCurrentObject(obj runtime.Object) (runtime.O return item, nil case *v1alpha1.ValidatingAdmissionPolicyBinding: nn := getNamespaceName(accessor.GetNamespace(), accessor.GetName()) - info, ok := c.bindingInfos[nn] + info, ok := c.policyController.bindingInfos[nn] if !ok { return nil, nil } @@ -407,7 +407,7 @@ func (c *celAdmissionController) getCurrentObject(obj runtime.Object) (runtime.O return info.lastReconciledValue, nil case *v1alpha1.ValidatingAdmissionPolicy: nn := getNamespaceName(accessor.GetNamespace(), accessor.GetName()) - info, ok := c.definitionInfo[nn] + info, ok := c.policyController.definitionInfo[nn] if !ok { return nil, nil } @@ -422,7 +422,15 @@ func (c *celAdmissionController) getCurrentObject(obj runtime.Object) (runtime.O // their gvk/name in the controller func waitForReconcile(ctx context.Context, controller *celAdmissionController, objects ...runtime.Object) error { return wait.PollWithContext(ctx, 100*time.Millisecond, 1*time.Second, func(ctx context.Context) (done bool, err error) { + defer func() { + if done { + // force admission controller to refresh the information it + // uses for validation now that it is done in the background + controller.refreshPolicies() + } + }() for _, obj := range objects { + objMeta, err := meta.Accessor(obj) if err != nil { return false, fmt.Errorf("error getting meta accessor for original %T object (%v): %w", obj, obj, err) @@ -462,6 +470,14 @@ func waitForReconcile(ctx context.Context, controller *celAdmissionController, o // with the given GVKs and namespace/names func waitForReconcileDeletion(ctx context.Context, controller *celAdmissionController, objects ...runtime.Object) error { return wait.PollWithContext(ctx, 200*time.Millisecond, 3*time.Hour, func(ctx context.Context) (done bool, err error) { + defer func() { + if done { + // force admission controller to refresh the information it + // uses for validation now that it is done in the background + controller.refreshPolicies() + } + }() + for _, obj := range objects { currentValue, err := controller.getCurrentObject(obj) if err != nil { @@ -694,7 +710,6 @@ func TestDefinitionDoesntMatch(t *testing.T) { attributeRecord( nil, nonMatchingParams, admission.Create), &admission.RuntimeObjectInterfaces{})) - require.Zero(t, numCompiles) require.Empty(t, passedParams) // Validate a matching input. @@ -791,9 +806,6 @@ func TestReconfigureBinding(t *testing.T) { // Expect `Compile` only called once require.Equal(t, 1, numCompiles, "expect `Compile` to be called only once") - // Show Evaluator was called - //require.Len(t, passedParams, 1, "expect evaluator is called due to proper configuration") - // Update the tracker to point at different params require.NoError(t, tracker.Update(bindingsGVR, denyBinding2, "")) @@ -808,8 +820,6 @@ func TestReconfigureBinding(t *testing.T) { ) require.ErrorContains(t, err, `failed to configure binding: replicas-test2.example.com not found`) - require.Equal(t, 1, numCompiles, "expect compile is not called when there is configuration error") - //require.Len(t, passedParams, 1, "expect evaluator was not called when there is configuration error") // Add the missing params require.NoError(t, paramTracker.Add(fakeParams2)) diff --git a/staging/src/k8s.io/apiserver/pkg/admission/plugin/validatingadmissionpolicy/controller.go b/staging/src/k8s.io/apiserver/pkg/admission/plugin/validatingadmissionpolicy/controller.go index bdb2a0680ab..6767c7d11fb 100644 --- a/staging/src/k8s.io/apiserver/pkg/admission/plugin/validatingadmissionpolicy/controller.go +++ b/staging/src/k8s.io/apiserver/pkg/admission/plugin/validatingadmissionpolicy/controller.go @@ -22,6 +22,7 @@ import ( "fmt" "sync" "sync/atomic" + "time" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -33,6 +34,7 @@ import ( "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apiserver/pkg/admission" celmetrics "k8s.io/apiserver/pkg/admission/cel" "k8s.io/apiserver/pkg/admission/plugin/validatingadmissionpolicy/internal/generic" @@ -47,44 +49,24 @@ var _ CELPolicyEvaluator = &celAdmissionController{} // celAdmissionController is the top-level controller for admission control using CEL // it is responsible for watching policy definitions, bindings, and config param CRDs type celAdmissionController struct { - // Context under which the controller runs - runningContext context.Context + // Controller which manages book-keeping for the cluster's dynamic policy + // information. + policyController *policyController - policyDefinitionsController generic.Controller[*v1alpha1.ValidatingAdmissionPolicy] - policyBindingController generic.Controller[*v1alpha1.ValidatingAdmissionPolicyBinding] + // atomic []policyData + // list of every known policy definition, and all informatoin required to + // validate its bindings against an object. + // A snapshot of the current policy configuration is synced with this field + // asynchronously + definitions atomic.Value +} - // dynamicclient used to create informers to watch the param crd types - dynamicClient dynamic.Interface - restMapper meta.RESTMapper - - // Provided to the policy's Compile function as an injected dependency to - // assist with compiling its expressions to CEL - validatorCompiler ValidatorCompiler - - // Lock which protects: - // - definitionInfo - // - bindingInfos - // - paramCRDControllers - // - definitionsToBindings - // All other fields should be assumed constant - mutex sync.RWMutex - - // controller and metadata - paramsCRDControllers map[v1alpha1.ParamKind]*paramInfo - - // Index for each definition namespace/name, contains all binding - // namespace/names known to exist for that definition - definitionInfo map[namespacedName]*definitionInfo - - // Index for each bindings namespace/name. Contains compiled templates - // for the binding depending on the policy/param combination. - bindingInfos map[namespacedName]*bindingInfo - - // Map from namespace/name of a definition to a set of namespace/name - // of bindings which depend on it. - // All keys must have at least one dependent binding - // All binding names MUST exist as a key bindingInfos - definitionsToBindings map[namespacedName]sets.Set[namespacedName] +// Everything someone might need to validate a single ValidatingPolicyDefinition +// against all of its registered bindings. +type policyData struct { + definitionInfo + paramController generic.Controller[*unstructured.Unstructured] + bindings []bindingInfo } // namespaceName is used as a key in definitionInfo and bindingInfos @@ -105,7 +87,7 @@ type definitionInfo struct { type bindingInfo struct { // Compiled CEL expression turned into an validator - validator atomic.Pointer[Validator] + validator Validator // Last value seen by this controller to be used in policy enforcement // May not be nil @@ -130,66 +112,44 @@ func NewAdmissionController( restMapper meta.RESTMapper, dynamicClient dynamic.Interface, ) CELPolicyEvaluator { - matcher := matching.NewMatcher(informerFactory.Core().V1().Namespaces().Lister(), client) - validatorCompiler := &CELValidatorCompiler{ - Matcher: matcher, + return &celAdmissionController{ + definitions: atomic.Value{}, + policyController: newPolicyController( + restMapper, + dynamicClient, + &CELValidatorCompiler{ + Matcher: matching.NewMatcher(informerFactory.Core().V1().Namespaces().Lister(), client), + }, + generic.NewInformer[*v1alpha1.ValidatingAdmissionPolicy]( + informerFactory.Admissionregistration().V1alpha1().ValidatingAdmissionPolicies().Informer()), + generic.NewInformer[*v1alpha1.ValidatingAdmissionPolicyBinding]( + informerFactory.Admissionregistration().V1alpha1().ValidatingAdmissionPolicyBindings().Informer()), + ), } - c := &celAdmissionController{ - definitionInfo: make(map[namespacedName]*definitionInfo), - bindingInfos: make(map[namespacedName]*bindingInfo), - paramsCRDControllers: make(map[v1alpha1.ParamKind]*paramInfo), - definitionsToBindings: make(map[namespacedName]sets.Set[namespacedName]), - dynamicClient: dynamicClient, - validatorCompiler: validatorCompiler, - restMapper: restMapper, - } - - c.policyDefinitionsController = generic.NewController( - generic.NewInformer[*v1alpha1.ValidatingAdmissionPolicy]( - informerFactory.Admissionregistration().V1alpha1().ValidatingAdmissionPolicies().Informer()), - c.reconcilePolicyDefinition, - generic.ControllerOptions{ - Workers: 1, - Name: "cel-policy-definitions", - }, - ) - c.policyBindingController = generic.NewController( - generic.NewInformer[*v1alpha1.ValidatingAdmissionPolicyBinding]( - informerFactory.Admissionregistration().V1alpha1().ValidatingAdmissionPolicyBindings().Informer()), - c.reconcilePolicyBinding, - generic.ControllerOptions{ - Workers: 1, - Name: "cel-policy-bindings", - }, - ) - return c } func (c *celAdmissionController) Run(stopCh <-chan struct{}) { - // TODO: Doesn't this comparison need a lock? - if c.runningContext != nil { - return - } - ctx, cancel := context.WithCancel(context.Background()) - - c.runningContext = ctx - defer func() { - c.runningContext = nil - }() - wg := sync.WaitGroup{} wg.Add(1) go func() { defer wg.Done() - c.policyDefinitionsController.Run(ctx) + c.policyController.Run(ctx) }() wg.Add(1) go func() { defer wg.Done() - c.policyBindingController.Run(ctx) + + // Wait indefinitely until policies/bindings are listed & handled before + // allowing policies to be refreshed + if !cache.WaitForNamedCacheSync("cel-admission-controller", ctx.Done(), c.policyController.HasSynced) { + return + } + + // Loop every 1 second until context is cancelled, refreshing policies + wait.Until(c.refreshPolicies, 1*time.Second, ctx.Done()) }() <-stopCh @@ -202,8 +162,9 @@ func (c *celAdmissionController) Validate( a admission.Attributes, o admission.ObjectInterfaces, ) (err error) { - c.mutex.RLock() - defer c.mutex.RUnlock() + if !c.HasSynced() { + return admission.NewForbidden(a, fmt.Errorf("not yet ready to handle request")) + } var deniedDecisions []policyDecisionWithMetadata @@ -247,9 +208,11 @@ func (c *celAdmissionController) Validate( }) } } - for definitionNamespacedName, definitionInfo := range c.definitionInfo { + policyDatas := c.definitions.Load().([]policyData) + + for _, definitionInfo := range policyDatas { definition := definitionInfo.lastReconciledValue - matches, matchKind, err := c.validatorCompiler.DefinitionMatches(a, o, definition) + matches, matchKind, err := c.policyController.DefinitionMatches(a, o, definition) if err != nil { // Configuration error. addConfigError(err, definition, nil) @@ -264,17 +227,11 @@ func (c *celAdmissionController) Validate( continue } - dependentBindings := c.definitionsToBindings[definitionNamespacedName] - if len(dependentBindings) == 0 { - continue - } - - for namespacedBindingName := range dependentBindings { + for _, bindingInfo := range definitionInfo.bindings { // If the key is inside dependentBindings, there is guaranteed to // be a bindingInfo for it - bindingInfo := c.bindingInfos[namespacedBindingName] binding := bindingInfo.lastReconciledValue - matches, err := c.validatorCompiler.BindingMatches(a, o, binding) + matches, err := c.policyController.BindingMatches(a, o, binding) if err != nil { // Configuration error. addConfigError(err, definition, binding) @@ -291,11 +248,8 @@ func (c *celAdmissionController) Validate( paramKind := definition.Spec.ParamKind paramRef := binding.Spec.ParamRef if paramKind != nil && paramRef != nil { - - // Find the params referred by the binding by looking its name up - // in our informer for its CRD - paramInfo, ok := c.paramsCRDControllers[*paramKind] - if !ok { + paramController := definitionInfo.paramController + if paramController == nil { addConfigError(fmt.Errorf("paramKind kind `%v` not known", paramKind.String()), definition, binding) continue @@ -304,19 +258,19 @@ func (c *celAdmissionController) Validate( // If the param informer for this admission policy has not yet // had time to perform an initial listing, don't attempt to use // it. - //!TODO(alexzielenski): Add a shorter timeout - // than "forever" to this wait. + timeoutCtx, cancel := context.WithTimeout(c.policyController.context, 1*time.Second) + defer cancel() - if !cache.WaitForCacheSync(c.runningContext.Done(), paramInfo.controller.HasSynced) { + if !cache.WaitForCacheSync(timeoutCtx.Done(), paramController.HasSynced) { addConfigError(fmt.Errorf("paramKind kind `%v` not yet synced to use for admission", paramKind.String()), definition, binding) continue } if len(paramRef.Namespace) == 0 { - param, err = paramInfo.controller.Informer().Get(paramRef.Name) + param, err = paramController.Informer().Get(paramRef.Name) } else { - param, err = paramInfo.controller.Informer().Namespaced(paramRef.Namespace).Get(paramRef.Name) + param, err = paramController.Informer().Namespaced(paramRef.Namespace).Get(paramRef.Name) } if err != nil { @@ -338,17 +292,7 @@ func (c *celAdmissionController) Validate( continue } } - - validator := bindingInfo.validator.Load() - if validator == nil { - // Compile policy definition using binding - newValidator := c.validatorCompiler.Compile(definition) - validator = &newValidator - - bindingInfo.validator.Store(validator) - } - - decisions, err := (*validator).Validate(a, o, param, matchKind) + decisions, err := bindingInfo.validator.Validate(a, o, param, matchKind) if err != nil { // runtime error. Apply failure policy wrappedError := fmt.Errorf("failed to evaluate CEL expression: %w", err) @@ -400,10 +344,13 @@ func (c *celAdmissionController) Validate( } func (c *celAdmissionController) HasSynced() bool { - return c.policyBindingController.HasSynced() && - c.policyDefinitionsController.HasSynced() + return c.policyController.HasSynced() && c.definitions.Load() != nil } func (c *celAdmissionController) ValidateInitialization() error { - return c.validatorCompiler.ValidateInitialization() + return c.policyController.ValidateInitialization() +} + +func (c *celAdmissionController) refreshPolicies() { + c.definitions.Store(c.policyController.latestPolicyData()) } diff --git a/staging/src/k8s.io/apiserver/pkg/admission/plugin/validatingadmissionpolicy/controller_reconcile.go b/staging/src/k8s.io/apiserver/pkg/admission/plugin/validatingadmissionpolicy/controller_reconcile.go index 17c4a9a6492..b3fdcaa9ee3 100644 --- a/staging/src/k8s.io/apiserver/pkg/admission/plugin/validatingadmissionpolicy/controller_reconcile.go +++ b/staging/src/k8s.io/apiserver/pkg/admission/plugin/validatingadmissionpolicy/controller_reconcile.go @@ -19,23 +19,133 @@ package validatingadmissionpolicy import ( "context" "fmt" + "sync" "time" "k8s.io/api/admissionregistration/v1alpha1" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/sets" celmetrics "k8s.io/apiserver/pkg/admission/cel" "k8s.io/apiserver/pkg/admission/plugin/validatingadmissionpolicy/internal/generic" + "k8s.io/client-go/dynamic" "k8s.io/client-go/dynamic/dynamicinformer" "k8s.io/client-go/tools/cache" ) -func (c *celAdmissionController) reconcilePolicyDefinition(namespace, name string, definition *v1alpha1.ValidatingAdmissionPolicy) error { +type policyController struct { + once sync.Once + context context.Context + dynamicClient dynamic.Interface + restMapper meta.RESTMapper + policyDefinitionsController generic.Controller[*v1alpha1.ValidatingAdmissionPolicy] + policyBindingController generic.Controller[*v1alpha1.ValidatingAdmissionPolicyBinding] + + // Provided to the policy's Compile function as an injected dependency to + // assist with compiling its expressions to CEL + ValidatorCompiler + + // Lock which protects: + // - cachedPolicies + // - paramCRDControllers + // - definitionInfo + // - bindingInfos + // - definitionsToBindings + // All other fields should be assumed constant + mutex sync.RWMutex + + cachedPolicies []policyData + + // controller and metadata + paramsCRDControllers map[v1alpha1.ParamKind]*paramInfo + + // Index for each definition namespace/name, contains all binding + // namespace/names known to exist for that definition + definitionInfo map[namespacedName]*definitionInfo + + // Index for each bindings namespace/name. Contains compiled templates + // for the binding depending on the policy/param combination. + bindingInfos map[namespacedName]*bindingInfo + + // Map from namespace/name of a definition to a set of namespace/name + // of bindings which depend on it. + // All keys must have at least one dependent binding + // All binding names MUST exist as a key bindingInfos + definitionsToBindings map[namespacedName]sets.Set[namespacedName] +} + +func newPolicyController( + restMapper meta.RESTMapper, + dynamicClient dynamic.Interface, + validatorCompiler ValidatorCompiler, + policiesInformer generic.Informer[*v1alpha1.ValidatingAdmissionPolicy], + bindingsInformer generic.Informer[*v1alpha1.ValidatingAdmissionPolicyBinding], +) *policyController { + res := &policyController{} + *res = policyController{ + ValidatorCompiler: validatorCompiler, + definitionInfo: make(map[namespacedName]*definitionInfo), + bindingInfos: make(map[namespacedName]*bindingInfo), + paramsCRDControllers: make(map[v1alpha1.ParamKind]*paramInfo), + definitionsToBindings: make(map[namespacedName]sets.Set[namespacedName]), + policyDefinitionsController: generic.NewController( + policiesInformer, + res.reconcilePolicyDefinition, + generic.ControllerOptions{ + Workers: 1, + Name: "cel-policy-definitions", + }, + ), + policyBindingController: generic.NewController( + bindingsInformer, + res.reconcilePolicyBinding, + generic.ControllerOptions{ + Workers: 1, + Name: "cel-policy-bindings", + }, + ), + restMapper: restMapper, + dynamicClient: dynamicClient, + } + return res +} + +func (c *policyController) Run(ctx context.Context) { + // Only support being run once + c.once.Do(func() { + c.context = ctx + + wg := sync.WaitGroup{} + + wg.Add(1) + go func() { + defer wg.Done() + c.policyDefinitionsController.Run(ctx) + }() + + wg.Add(1) + go func() { + defer wg.Done() + c.policyBindingController.Run(ctx) + }() + + <-ctx.Done() + wg.Wait() + }) +} + +func (c *policyController) HasSynced() bool { + return c.policyDefinitionsController.HasSynced() && c.policyBindingController.HasSynced() +} + +func (c *policyController) reconcilePolicyDefinition(namespace, name string, definition *v1alpha1.ValidatingAdmissionPolicy) error { c.mutex.Lock() defer c.mutex.Unlock() + c.cachedPolicies = nil // invalidate cachedPolicies + // Namespace for policydefinition is empty. nn := getNamespaceName(namespace, name) info, ok := c.definitionInfo[nn] @@ -75,7 +185,7 @@ func (c *celAdmissionController) reconcilePolicyDefinition(namespace, name strin // definition has changed. for key := range c.definitionsToBindings[nn] { bindingInfo := c.bindingInfos[key] - bindingInfo.validator.Store(nil) + bindingInfo.validator = nil c.bindingInfos[key] = bindingInfo } @@ -121,7 +231,7 @@ func (c *celAdmissionController) reconcilePolicyDefinition(namespace, name strin // Start watching the param CRD if _, ok := c.paramsCRDControllers[*paramSource]; !ok { - instanceContext, instanceCancel := context.WithCancel(c.runningContext) + instanceContext, instanceCancel := context.WithCancel(c.context) // Watch for new instances of this policy informer := dynamicinformer.NewFilteredDynamicInformer( @@ -155,10 +265,12 @@ func (c *celAdmissionController) reconcilePolicyDefinition(namespace, name strin return nil } -func (c *celAdmissionController) reconcilePolicyBinding(namespace, name string, binding *v1alpha1.ValidatingAdmissionPolicyBinding) error { +func (c *policyController) reconcilePolicyBinding(namespace, name string, binding *v1alpha1.ValidatingAdmissionPolicyBinding) error { c.mutex.Lock() defer c.mutex.Unlock() + c.cachedPolicies = nil // invalidate cachedPolicies + // Namespace for PolicyBinding is empty. In the future a namespaced binding // may be added // https://github.com/kubernetes/enhancements/blob/bf5c3c81ea2081d60c1dc7c832faa98479e06209/keps/sig-api-machinery/3488-cel-admission-control/README.md?plain=1#L1042 @@ -208,12 +320,12 @@ func (c *celAdmissionController) reconcilePolicyBinding(namespace, name string, } // Remove compiled template for old binding - info.validator.Store(nil) + info.validator = nil info.lastReconciledValue = binding return nil } -func (c *celAdmissionController) reconcileParams(namespace, name string, params *unstructured.Unstructured) error { +func (c *policyController) reconcileParams(namespace, name string, params *unstructured.Unstructured) error { // Do nothing. // When we add informational type checking we will need to compile in the // reconcile loops instead of lazily so we can add compiler errors / type @@ -221,6 +333,52 @@ func (c *celAdmissionController) reconcileParams(namespace, name string, params return nil } +// Fetches the latest set of policy data or recalculates it if it has changed +// since it was last fetched +func (c *policyController) latestPolicyData() []policyData { + existing := func() []policyData { + c.mutex.RLock() + defer c.mutex.RUnlock() + + return c.cachedPolicies + }() + + if existing != nil { + return existing + } + + c.mutex.Lock() + defer c.mutex.Unlock() + + var res []policyData + for definitionNN, definitionInfo := range c.definitionInfo { + var bindingInfos []bindingInfo + for bindingNN := range c.definitionsToBindings[definitionNN] { + bindingInfo := c.bindingInfos[bindingNN] + if bindingInfo.validator == nil && definitionInfo.configurationError == nil { + bindingInfo.validator = c.ValidatorCompiler.Compile(definitionInfo.lastReconciledValue) + } + bindingInfos = append(bindingInfos, *bindingInfo) + } + + var paramController generic.Controller[*unstructured.Unstructured] + if paramKind := definitionInfo.lastReconciledValue.Spec.ParamKind; paramKind != nil { + if info, ok := c.paramsCRDControllers[*paramKind]; ok { + paramController = info.controller + } + } + + res = append(res, policyData{ + definitionInfo: *definitionInfo, + paramController: paramController, + bindings: bindingInfos, + }) + } + + c.cachedPolicies = res + return res +} + func getNamespaceName(namespace, name string) namespacedName { return namespacedName{ namespace: namespace, diff --git a/staging/src/k8s.io/apiserver/pkg/admission/plugin/validatingadmissionpolicy/internal/generic/controller.go b/staging/src/k8s.io/apiserver/pkg/admission/plugin/validatingadmissionpolicy/internal/generic/controller.go index e1e1b04ebc6..4334c0dd82c 100644 --- a/staging/src/k8s.io/apiserver/pkg/admission/plugin/validatingadmissionpolicy/internal/generic/controller.go +++ b/staging/src/k8s.io/apiserver/pkg/admission/plugin/validatingadmissionpolicy/internal/generic/controller.go @@ -150,13 +150,14 @@ func (c *controller[T]) Run(ctx context.Context) error { enqueue(obj, false) }, }) - c.notificationsDelivered.Store(registration.HasSynced) // Error might be raised if informer was started and stopped already if err != nil { return err } + c.notificationsDelivered.Store(registration.HasSynced) + // Make sure event handler is removed from informer in case return early from // an error defer func() { @@ -185,8 +186,8 @@ func (c *controller[T]) Run(ctx context.Context) error { for i := uint(0); i < c.options.Workers; i++ { waitGroup.Add(1) go func() { + defer waitGroup.Done() wait.Until(c.runWorker, time.Second, ctx.Done()) - waitGroup.Done() }() }