From 2286501e227ead064e95880a6f28904526f887a6 Mon Sep 17 00:00:00 2001 From: Alexander Zielenski <351783+alexzielenski@users.noreply.github.com> Date: Wed, 12 Oct 2022 10:21:08 -0700 Subject: [PATCH] add cel admission controller --- .../pkg/admission/plugin/cel/controller.go | 320 ++++++++++++++++++ .../plugin/cel/controller_reconcile.go | 203 +++++++++++ .../pkg/admission/plugin/cel/interface.go | 108 ++++++ .../plugin/cel/internal/generic/controller.go | 264 +++++++++++++++ .../plugin/cel/internal/generic/doc.go | 29 ++ .../plugin/cel/internal/generic/informer.go | 36 ++ .../plugin/cel/internal/generic/interface.go | 62 ++++ .../plugin/cel/internal/generic/lister.go | 100 ++++++ .../admission/plugin/cel/policy_decision.go | 53 +++ 9 files changed, 1175 insertions(+) create mode 100644 staging/src/k8s.io/apiserver/pkg/admission/plugin/cel/controller.go create mode 100644 staging/src/k8s.io/apiserver/pkg/admission/plugin/cel/controller_reconcile.go create mode 100644 staging/src/k8s.io/apiserver/pkg/admission/plugin/cel/interface.go create mode 100644 staging/src/k8s.io/apiserver/pkg/admission/plugin/cel/internal/generic/controller.go create mode 100644 staging/src/k8s.io/apiserver/pkg/admission/plugin/cel/internal/generic/doc.go create mode 100644 staging/src/k8s.io/apiserver/pkg/admission/plugin/cel/internal/generic/informer.go create mode 100644 staging/src/k8s.io/apiserver/pkg/admission/plugin/cel/internal/generic/interface.go create mode 100644 staging/src/k8s.io/apiserver/pkg/admission/plugin/cel/internal/generic/lister.go create mode 100644 staging/src/k8s.io/apiserver/pkg/admission/plugin/cel/policy_decision.go diff --git a/staging/src/k8s.io/apiserver/pkg/admission/plugin/cel/controller.go b/staging/src/k8s.io/apiserver/pkg/admission/plugin/cel/controller.go new file mode 100644 index 00000000000..150940747e1 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/admission/plugin/cel/controller.go @@ -0,0 +1,320 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cel + +import ( + "context" + "errors" + "fmt" + "sync" + + k8serrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apiserver/pkg/admission" + "k8s.io/apiserver/pkg/admission/plugin/cel/internal/generic" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/tools/cache" + "k8s.io/klog/v2" +) + +// celAdmissionController is the top-level controller for admission control using CEL +// it is responsible for watching policy definitions, bindings, and config param CRDs +type celAdmissionController struct { + // Context under which the controller runs + runningContext context.Context + + policyDefinitionsController generic.Controller[PolicyDefinition] + policyBindingController generic.Controller[PolicyBinding] + + // dynamicclient used to create informers to watch the param crd types + dynamicClient dynamic.Interface + restMapper meta.RESTMapper + + // Provided to the policy's Compile function as an injected dependency to + // assist with compiling its expressions to CEL + objectConverter ObjectConverter + + // Lock which protects: + // - definitionInfo + // - bindingInfos + // - paramCRDControllers + // - definitionsToBindings + // All other fields should be assumed constant + mutex sync.RWMutex + + // controller and metadata + paramsCRDControllers map[schema.GroupVersionKind]*paramInfo + + // Index for each definition namespace/name, contains all binding + // namespace/names known to exist for that definition + definitionInfo map[string]*definitionInfo + + // Index for each bindings namespace/name. Contains compiled templates + // for the binding depending on the policy/param combination. + bindingInfos map[string]*bindingInfo + + // Map from namespace/name of a definition to a set of namespace/name + // of bindings which depend on it. + // All keys must have at least one dependent binding + // All binding names MUST exist as a key bindingInfos + definitionsToBindings map[string]sets.String +} + +type definitionInfo struct { + // Error about the state of the definition's configuration and the cluster + // preventing its enforcement or compilation. + // Reset every reconciliation + configurationError error + + // Last value seen by this controller to be used in policy enforcement + // May not be nil + lastReconciledValue PolicyDefinition +} + +type bindingInfo struct { + // Compiled CEL expression turned into an evaluator + evaluator EvaluatorFunc + + // Last value seen by this controller to be used in policy enforcement + // May not be nil + lastReconciledValue PolicyBinding +} + +type paramInfo struct { + // Controller which is watching this param CRD + controller generic.Controller[*unstructured.Unstructured] + + // Function to call to stop the informer and clean up the controller + stop func() + + // Policy Definitions which refer to this param CRD + dependentDefinitions sets.String +} + +func NewAdmissionController( + // Informers + policyDefinitionsInformer cache.SharedIndexInformer, + policyBindingInformer cache.SharedIndexInformer, + + // Injected Dependencies + objectConverter ObjectConverter, + restMapper meta.RESTMapper, + dynamicClient dynamic.Interface, +) CELPolicyEvaluator { + c := &celAdmissionController{ + definitionInfo: make(map[string]*definitionInfo), + bindingInfos: make(map[string]*bindingInfo), + paramsCRDControllers: make(map[schema.GroupVersionKind]*paramInfo), + definitionsToBindings: make(map[string]sets.String), + dynamicClient: dynamicClient, + objectConverter: objectConverter, + restMapper: restMapper, + } + + c.policyDefinitionsController = generic.NewController( + generic.NewInformer[PolicyDefinition](policyDefinitionsInformer), + c.reconcilePolicyDefinition, + generic.ControllerOptions{ + Workers: 1, + Name: "cel-policy-definitions", + }, + ) + c.policyBindingController = generic.NewController( + generic.NewInformer[PolicyBinding](policyBindingInformer), + c.reconcilePolicyBinding, + generic.ControllerOptions{ + Workers: 1, + Name: "cel-policy-bindings", + }, + ) + return c +} + +func (c *celAdmissionController) Run(stopCh <-chan struct{}) { + if c.runningContext != nil { + return + } + + ctx, cancel := context.WithCancel(context.Background()) + + c.runningContext = ctx + defer func() { + c.runningContext = nil + }() + + wg := sync.WaitGroup{} + + wg.Add(1) + go func() { + defer wg.Done() + c.policyDefinitionsController.Run(ctx) + }() + + wg.Add(1) + go func() { + defer wg.Done() + c.policyBindingController.Run(ctx) + }() + + <-stopCh + cancel() + wg.Wait() +} + +func (c *celAdmissionController) Validate( + ctx context.Context, + a admission.Attributes, + o admission.ObjectInterfaces, +) (err error) { + c.mutex.RLock() + defer c.mutex.RUnlock() + + var allDecisions []PolicyDecisionWithMetadata = nil + + addConfigError := func(err error, definition PolicyDefinition, binding PolicyBinding) { + wrappedError := fmt.Errorf("configuration error: %w", err) + switch p := definition.GetFailurePolicy(); p { + case Ignore: + klog.Info(wrappedError) + return + case Fail: + allDecisions = append(allDecisions, PolicyDecisionWithMetadata{ + PolicyDecision: PolicyDecision{ + Kind: Deny, + Message: wrappedError.Error(), + }, + Definition: definition, + Binding: binding, + }) + default: + utilruntime.HandleError(fmt.Errorf("unrecognized failure policy: '%v'", p)) + } + } + for definitionNamespacedName, definitionInfo := range c.definitionInfo { + definition := definitionInfo.lastReconciledValue + if !definition.Matches(a) { + // Policy definition does not match request + continue + } else if definitionInfo.configurationError != nil { + // Configuration error. + addConfigError(definitionInfo.configurationError, definition, nil) + continue + } + + dependentBindings := c.definitionsToBindings[definitionNamespacedName] + if len(dependentBindings) == 0 { + // Definition has no known bindings yet. + addConfigError(errors.New("no bindings found"), definition, nil) + continue + } + + for namespacedBindingName := range dependentBindings { + // If the key is inside dependentBindings, there is guaranteed to + // be a bindingInfo for it + bindingInfo := c.bindingInfos[namespacedBindingName] + binding := bindingInfo.lastReconciledValue + if !binding.Matches(a) { + continue + } + + var param *unstructured.Unstructured + + // If definition has no paramsource, always provide nil params to + // evaluator. If binding specifies a params to use they are ignored. + // Done this way so you can configure params before definition is ready. + if paramSource := definition.GetParamSource(); paramSource != nil { + paramsNamespace, paramsName := binding.GetTargetParams() + + // Find the params referred by the binding by looking its name up + // in our informer for its CRD + paramInfo, ok := c.paramsCRDControllers[*paramSource] + if !ok { + addConfigError(fmt.Errorf("paramSource kind `%v` not known", + paramSource.String()), definition, binding) + continue + } + + if len(paramsNamespace) == 0 { + param, err = paramInfo.controller.Informer().Get(paramsName) + } else { + param, err = paramInfo.controller.Informer().Namespaced(paramsNamespace).Get(paramsName) + } + + if err != nil { + // Apply failure policy + addConfigError(err, definition, binding) + + if k8serrors.IsNotFound(err) { + // Param doesnt exist yet? + // Maybe just have to wait a bit. + continue + } + + // There was a bad internal error + utilruntime.HandleError(err) + continue + } + } + + if bindingInfo.evaluator == nil { + // Compile policy definition using binding + bindingInfo.evaluator, err = definition.Compile(c.objectConverter, c.restMapper) + if err != nil { + // compilation error. Apply failure policy + wrappedError := fmt.Errorf("failed to compile CEL expression: %w", err) + addConfigError(wrappedError, definition, binding) + continue + } + c.bindingInfos[namespacedBindingName] = bindingInfo + } + + decisions := bindingInfo.evaluator(a, param) + for _, decision := range decisions { + switch decision.Kind { + case Admit: + // Do nothing + case Deny: + allDecisions = append(allDecisions, PolicyDecisionWithMetadata{ + Definition: definition, + Binding: binding, + PolicyDecision: decision, + }) + default: + // unrecognized decision. ignore + } + } + } + } + + if len(allDecisions) > 0 { + return k8serrors.NewConflict( + a.GetResource().GroupResource(), a.GetName(), + &PolicyError{ + Decisions: allDecisions, + }) + } + + return nil +} +func (c *celAdmissionController) HasSynced() bool { + return c.policyBindingController.HasSynced() && + c.policyDefinitionsController.HasSynced() +} diff --git a/staging/src/k8s.io/apiserver/pkg/admission/plugin/cel/controller_reconcile.go b/staging/src/k8s.io/apiserver/pkg/admission/plugin/cel/controller_reconcile.go new file mode 100644 index 00000000000..d0aa7d1ae78 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/admission/plugin/cel/controller_reconcile.go @@ -0,0 +1,203 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cel + +import ( + "context" + "fmt" + "time" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apiserver/pkg/admission/plugin/cel/internal/generic" + "k8s.io/client-go/dynamic/dynamicinformer" + "k8s.io/client-go/tools/cache" +) + +func (c *celAdmissionController) reconcilePolicyDefinition(namespace, name string, definition PolicyDefinition) error { + c.mutex.Lock() + defer c.mutex.Unlock() + + // Namespace for policydefinition is empty. Leaving usage here for compatibility + // with future NamespacedPolicyDefinition + namespacedName := namespace + "/" + name + info, ok := c.definitionInfo[namespacedName] + if !ok { + info = &definitionInfo{} + c.definitionInfo[namespacedName] = info + } + + var paramSource *schema.GroupVersionKind + if definition != nil { + paramSource = definition.GetParamSource() + } + + // If param source has changed, remove definition as dependent of old params + // If there are no more dependents of old param, stop and clean up controller + if info.lastReconciledValue != nil && info.lastReconciledValue.GetParamSource() != nil { + oldParamSource := *info.lastReconciledValue.GetParamSource() + + // If we are: + // - switching from having a param to not having a param (includes deletion) + // - or from having a param to a different one + // we remove dependency on the controller. + if paramSource == nil || *paramSource != oldParamSource { + if oldParamInfo, ok := c.paramsCRDControllers[oldParamSource]; ok { + oldParamInfo.dependentDefinitions.Delete(namespacedName) + if len(oldParamInfo.dependentDefinitions) == 0 { + oldParamInfo.stop() + delete(c.paramsCRDControllers, oldParamSource) + } + } + } + } + + // Reset all previously compiled evaluators in case something relevant in + // definition has changed. + for key := range c.definitionsToBindings[namespacedName] { + bindingInfo := c.bindingInfos[key] + bindingInfo.evaluator = nil + c.bindingInfos[key] = bindingInfo + } + + if definition == nil { + delete(c.definitionInfo, namespacedName) + return nil + } + + // Update definition info + info.lastReconciledValue = definition + info.configurationError = nil + + if paramSource == nil { + // Skip setting up controller for empty param type + return nil + } + + // find GVR for params + paramsGVR, err := c.restMapper.RESTMapping(paramSource.GroupKind(), paramSource.Version) + if err != nil { + // Failed to resolve. Return error so we retry again (rate limited) + // Save a record of this definition with an evaluator that unconditionally + // + info.configurationError = fmt.Errorf("failed to find resource for param source: '%v'", paramSource.String()) + return info.configurationError + } + + // Start watching the param CRD + if _, ok := c.paramsCRDControllers[*paramSource]; !ok { + instanceContext, instanceCancel := context.WithCancel(c.runningContext) + + // Watch for new instances of this policy + informer := dynamicinformer.NewFilteredDynamicInformer( + c.dynamicClient, + paramsGVR.Resource, + corev1.NamespaceAll, + 30*time.Second, + cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, + nil, + ) + + controller := generic.NewController( + generic.NewInformer[*unstructured.Unstructured](informer.Informer()), + c.reconcileParams, + generic.ControllerOptions{ + Workers: 1, + Name: paramSource.String() + "-controller", + }, + ) + + c.paramsCRDControllers[*paramSource] = ¶mInfo{ + controller: controller, + stop: instanceCancel, + dependentDefinitions: sets.NewString(namespacedName), + } + + go informer.Informer().Run(instanceContext.Done()) + go controller.Run(instanceContext) + } + + return nil +} + +func (c *celAdmissionController) reconcilePolicyBinding(namespace, name string, binding PolicyBinding) error { + c.mutex.Lock() + defer c.mutex.Unlock() + + // Namespace for PolicyBinding is empty. In the future a namespaced binding + // may be added + // https://github.com/kubernetes/enhancements/blob/bf5c3c81ea2081d60c1dc7c832faa98479e06209/keps/sig-api-machinery/3488-cel-admission-control/README.md?plain=1#L1042 + namespacedName := namespace + "/" + name + info, ok := c.bindingInfos[namespacedName] + if !ok { + info = &bindingInfo{} + c.bindingInfos[namespacedName] = info + } + + oldNamespacedDefinitionName := "" + if info.lastReconciledValue != nil { + oldefinitionNamespace, oldefinitionName := info.lastReconciledValue.GetTargetDefinition() + oldNamespacedDefinitionName = oldefinitionNamespace + "/" + oldefinitionName + } + + namespacedDefinitionName := "" + if binding != nil { + newDefinitionNamespace, newDefinitionName := binding.GetTargetDefinition() + namespacedDefinitionName = newDefinitionNamespace + "/" + newDefinitionName + } + + // Remove record of binding from old definition if the referred policy + // has changed + if oldNamespacedDefinitionName != namespacedDefinitionName { + if dependentBindings, ok := c.definitionsToBindings[oldNamespacedDefinitionName]; ok { + dependentBindings.Delete(namespacedName) + + // if there are no more dependent bindings, remove knowledge of the + // definition altogether + if len(dependentBindings) == 0 { + delete(c.definitionsToBindings, oldNamespacedDefinitionName) + } + } + } + + if binding == nil { + delete(c.bindingInfos, namespacedName) + return nil + } + + // Add record of binding to new definition + if dependentBindings, ok := c.definitionsToBindings[namespacedDefinitionName]; ok { + dependentBindings.Insert(namespacedName) + } else { + c.definitionsToBindings[namespacedDefinitionName] = sets.NewString(namespacedName) + } + + // Remove compiled template for old binding + info.evaluator = nil + info.lastReconciledValue = binding + return nil +} + +func (c *celAdmissionController) reconcileParams(namespace, name string, params *unstructured.Unstructured) error { + // Do nothing. + // When we add informational type checking we will need to compile in the + // reconcile loops instead of lazily so we can add compiler errors / type + // checker errors to the status of the resources. + return nil +} diff --git a/staging/src/k8s.io/apiserver/pkg/admission/plugin/cel/interface.go b/staging/src/k8s.io/apiserver/pkg/admission/plugin/cel/interface.go new file mode 100644 index 00000000000..d23ef46b838 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/admission/plugin/cel/interface.go @@ -0,0 +1,108 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cel + +import ( + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apiserver/pkg/admission" + "k8s.io/apiserver/pkg/cel" + + "github.com/google/cel-go/common/types/ref" +) + +type FailurePolicy string + +const ( + Fail FailurePolicy = "Fail" + Ignore FailurePolicy = "Ignore" +) + +// EvaluatorFunc represents the AND of one or more compiled CEL expression's +// evaluators `params` may be nil if definition does not specify a paramsource +type EvaluatorFunc func(a admission.Attributes, params *unstructured.Unstructured) []PolicyDecision + +// ObjectConverter is Dependency Injected into the PolicyDefinition's `Compile` +// function to assist with converting types and values to/from CEL-typed values. +type ObjectConverter interface { + // DeclForResource looks up the openapi or JSONSchemaProps, structural schema, etc. + // and compiles it into something that can be used to turn objects into CEL + // values + DeclForResource(gvr schema.GroupVersionResource) (*cel.DeclType, error) + + // ValueForObject takes a Kubernetes Object and uses the CEL DeclType + // to transform it into a CEL value. + // Object may be a typed native object or an unstructured object + ValueForObject(value runtime.Object, decl *cel.DeclType) (ref.Val, error) +} + +// PolicyDefinition is an interface for internal policy binding type. +// Implemented by mock/testing types, and to be implemented by the public API +// types once they have completed API review. +// +// The interface closely mirrors the format and functionality of the +// PolicyDefinition proposed in the KEP. +type PolicyDefinition interface { + runtime.Object + + // Matches says whether this policy definition matches the provided admission + // resource request + Matches(a admission.Attributes) bool + + Compile( + // Definition is provided with a converter which may be used by the + // return evaluator function to convert objects into CEL-typed objects + objectConverter ObjectConverter, + // Injected RESTMapper to assist with compilation + mapper meta.RESTMapper, + ) (EvaluatorFunc, error) + + // GetParamSource returns the GVK for the CRD used as the source of + // parameters used in the evaluation of instances of this policy + // May return nil if there is no paramsource for this definition. + GetParamSource() *schema.GroupVersionKind + + // GetFailurePolicy returns how an object should be treated during an + // admission when there is a configuration error preventing CEL evaluation + GetFailurePolicy() FailurePolicy +} + +// PolicyBinding is an interface for internal policy binding type. Implemented +// by mock/testing types, and to be implemented by the public API types once +// they have completed API review. +// +// The interface closely mirrors the format and functionality of the +// PolicyBinding proposed in the KEP. +type PolicyBinding interface { + runtime.Object + + // Matches says whether this policy binding matches the provided admission + // resource request + Matches(a admission.Attributes) bool + + // GetTargetDefinition returns the Namespace/Name of Policy Definition used + // by this binding. + GetTargetDefinition() (namespace, name string) + + // GetTargetParams returns the Namespace/Name of instance of TargetDefinition's + // ParamSource to be provided to the CEL expressions of the definition during + // evaluation. + // If TargetDefinition has nil ParamSource, this is ignored. + GetTargetParams() (namespace, name string) +} diff --git a/staging/src/k8s.io/apiserver/pkg/admission/plugin/cel/internal/generic/controller.go b/staging/src/k8s.io/apiserver/pkg/admission/plugin/cel/internal/generic/controller.go new file mode 100644 index 00000000000..10e42b5009c --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/admission/plugin/cel/internal/generic/controller.go @@ -0,0 +1,264 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package generic + +import ( + "context" + "errors" + "fmt" + "sync" + "time" + + kerrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/runtime" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" +) + +var _ Controller[runtime.Object] = &controller[runtime.Object]{} + +type controller[T runtime.Object] struct { + informer Informer[T] + queue workqueue.RateLimitingInterface + + // Returns an error if there was a transient error during reconciliation + // and the object should be tried again later. + reconciler func(namespace, name string, newObj T) error + + options ControllerOptions +} + +type ControllerOptions struct { + Name string + Workers uint +} + +func (c controller[T]) Informer() Informer[T] { + return c.informer +} + +func NewController[T runtime.Object]( + informer Informer[T], + reconciler func(namepace, name string, newObj T) error, + options ControllerOptions, +) Controller[T] { + if options.Workers == 0 { + options.Workers = 2 + } + + if len(options.Name) == 0 { + options.Name = fmt.Sprintf("%T-controller", *new(T)) + } + + return &controller[T]{ + options: options, + informer: informer, + reconciler: reconciler, + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), options.Name), + } +} + +// Runs the controller and returns an error explaining why running was stopped. +// Reconciliation ends as soon as the context completes. If there are events +// waiting to be processed at that itme, they will be dropped. +func (c *controller[T]) Run(ctx context.Context) error { + klog.Infof("starting %s", c.options.Name) + defer klog.Infof("stopping %s", c.options.Name) + + enqueue := func(obj interface{}) { + var key string + var err error + if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil { + utilruntime.HandleError(err) + return + } + c.queue.Add(key) + } + + registration, err := c.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + enqueue(obj) + }, + UpdateFunc: func(oldObj, newObj interface{}) { + oldMeta, err1 := meta.Accessor(oldObj) + newMeta, err2 := meta.Accessor(newObj) + + if err1 != nil || err2 != nil { + if err1 != nil { + utilruntime.HandleError(err1) + } + + if err2 != nil { + utilruntime.HandleError(err2) + } + return + } else if oldMeta.GetResourceVersion() == newMeta.GetResourceVersion() { + if len(oldMeta.GetResourceVersion()) == 0 { + klog.Warningf("%v throwing out update with empty RV. this is likely to happen if a test did not supply a resource version on an updated object", c.options.Name) + } + return + } + + enqueue(newObj) + }, + DeleteFunc: func(obj interface{}) { + // Enqueue + enqueue(obj) + }, + }) + + // Error might be raised if informer was started and stopped already + if err != nil { + return err + } + + // Make sure event handler is removed from informer in case return early from + // an error + defer func() { + // Remove event handler and Handle Error here. Error should only be raised + // for improper usage of event handler API. + if err := c.informer.RemoveEventHandler(registration); err != nil { + utilruntime.HandleError(err) + } + }() + + // Wait for initial cache list to complete before beginning to reconcile + // objects. + if !cache.WaitForNamedCacheSync(c.options.Name, ctx.Done(), c.informer.HasSynced) { + // ctx cancelled during cache sync. return early + err := ctx.Err() + if err == nil { + // if context wasnt cancelled then the sync failed for another reason + err = errors.New("cache sync failed") + } + return err + } + + waitGroup := sync.WaitGroup{} + + for i := uint(0); i < c.options.Workers; i++ { + waitGroup.Add(1) + go func() { + wait.Until(c.runWorker, time.Second, ctx.Done()) + waitGroup.Done() + }() + } + + klog.Infof("Started %v workers for %v", c.options.Workers, c.options.Name) + + // Wait for context cancel. + <-ctx.Done() + + // Forcefully shutdown workqueue. Drop any enqueued items. + c.queue.ShutDown() + + // Workqueue shutdown signals for workers to stop. Wait for all workers to + // clean up + waitGroup.Wait() + + // Only way for workers to ever stop is for caller to cancel the context + return ctx.Err() +} + +func (c *controller[T]) HasSynced() bool { + return c.informer.HasSynced() +} + +func (c *controller[T]) runWorker() { + for { + obj, shutdown := c.queue.Get() + if shutdown { + return + } + + // We wrap this block in a func so we can defer c.workqueue.Done. + err := func(obj interface{}) error { + // We call Done here so the workqueue knows we have finished + // processing this item. We also must remember to call Forget if we + // do not want this work item being re-queued. For example, we do + // not call Forget if a transient error occurs, instead the item is + // put back on the workqueue and attempted again after a back-off + // period. + defer c.queue.Done(obj) + var key string + var ok bool + // We expect strings to come off the workqueue. These are of the + // form namespace/name. We do this as the delayed nature of the + // workqueue means the items in the informer cache may actually be + // more up to date that when the item was initially put onto the + // workqueue. + if key, ok = obj.(string); !ok { + // How did an incorrectly formatted key get in the workqueue? + // Done is sufficient. (Forget resets rate limiter for the key, + // but the key is invalid so there is no point in doing that) + return fmt.Errorf("expected string in workqueue but got %#v", obj) + } + + if err := c.reconcile(key); err != nil { + // Put the item back on the workqueue to handle any transient errors. + c.queue.AddRateLimited(key) + return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error()) + } + // Finally, if no error occurs we Forget this item so it is allowed + // to be re-enqueued without a long rate limit + c.queue.Forget(obj) + klog.Infof("Successfully synced '%s'", key) + return nil + }(obj) + + if err != nil { + utilruntime.HandleError(err) + } + } +} + +func (c *controller[T]) reconcile(key string) error { + var newObj T + var err error + var namespace string + var name string + var lister NamespacedLister[T] + + // Convert the namespace/name string into a distinct namespace and name + namespace, name, err = cache.SplitMetaNamespaceKey(key) + if err != nil { + utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", key)) + return nil + } + + if len(namespace) > 0 { + lister = c.informer.Namespaced(namespace) + } else { + lister = c.informer + } + + newObj, err = lister.Get(name) + if err != nil { + if !kerrors.IsNotFound(err) { + return err + } + + // Deleted object. Inform reconciler with empty + } + + return c.reconciler(namespace, name, newObj) +} diff --git a/staging/src/k8s.io/apiserver/pkg/admission/plugin/cel/internal/generic/doc.go b/staging/src/k8s.io/apiserver/pkg/admission/plugin/cel/internal/generic/doc.go new file mode 100644 index 00000000000..2acfad989f6 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/admission/plugin/cel/internal/generic/doc.go @@ -0,0 +1,29 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package generic contains a typed wrapper over cache SharedIndexInformer +// and Lister (maybe eventually should have a home there?) +// +// This interface is being experimented with as an easier way to write controllers +// with a bit less boilerplate. +// +// Informer/Lister classes are thin wrappers providing a type-safe interface +// over regular interface{}-based Informers/Listers +// +// Controller[T] provides a reusable way to reconcile objects out of an informer +// using the tried and true controller design pattern found all over k8s +// codebase based upon syncFunc/reconcile +package generic diff --git a/staging/src/k8s.io/apiserver/pkg/admission/plugin/cel/internal/generic/informer.go b/staging/src/k8s.io/apiserver/pkg/admission/plugin/cel/internal/generic/informer.go new file mode 100644 index 00000000000..3025aa19533 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/admission/plugin/cel/internal/generic/informer.go @@ -0,0 +1,36 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package generic + +import ( + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/tools/cache" +) + +var _ Informer[runtime.Object] = informer[runtime.Object]{} + +type informer[T runtime.Object] struct { + cache.SharedIndexInformer + lister[T] +} + +func NewInformer[T runtime.Object](informe cache.SharedIndexInformer) Informer[T] { + return informer[T]{ + SharedIndexInformer: informe, + lister: NewLister[T](informe.GetIndexer()), + } +} diff --git a/staging/src/k8s.io/apiserver/pkg/admission/plugin/cel/internal/generic/interface.go b/staging/src/k8s.io/apiserver/pkg/admission/plugin/cel/internal/generic/interface.go new file mode 100644 index 00000000000..a17821e777e --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/admission/plugin/cel/internal/generic/interface.go @@ -0,0 +1,62 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package generic + +import ( + "context" + + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/tools/cache" +) + +type Controller[T runtime.Object] interface { + // Meant to be run inside a goroutine + // Waits for and reacts to changes in whatever type the controller + // is concerned with. + // + // Returns an error always non-nil explaining why the worker stopped + Run(ctx context.Context) error + + // Retrieves the informer used to back this controller + Informer() Informer[T] + + // Returns true if the informer cache has synced, and all the objects from + // the initial list have been reconciled at least once. + HasSynced() bool +} + +type NamespacedLister[T any] interface { + // List lists all ValidationRuleSets in the indexer for a given namespace. + // Objects returned here must be treated as read-only. + List(selector labels.Selector) (ret []T, err error) + // Get retrieves the ValidationRuleSet from the indexer for a given namespace and name. + // Objects returned here must be treated as read-only. + Get(name string) (T, error) +} + +type Informer[T any] interface { + cache.SharedIndexInformer + Lister[T] +} + +// Lister[T] helps list Ts. +// All objects returned here must be treated as read-only. +type Lister[T any] interface { + NamespacedLister[T] + Namespaced(namespace string) NamespacedLister[T] +} diff --git a/staging/src/k8s.io/apiserver/pkg/admission/plugin/cel/internal/generic/lister.go b/staging/src/k8s.io/apiserver/pkg/admission/plugin/cel/internal/generic/lister.go new file mode 100644 index 00000000000..aa6b090324c --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/admission/plugin/cel/internal/generic/lister.go @@ -0,0 +1,100 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package generic + +import ( + "fmt" + "net/http" + + kerrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/tools/cache" +) + +var _ Lister[runtime.Object] = lister[runtime.Object]{} + +type namespacedLister[T runtime.Object] struct { + indexer cache.Indexer + namespace string +} + +func (w namespacedLister[T]) List(selector labels.Selector) (ret []T, err error) { + err = cache.ListAllByNamespace(w.indexer, w.namespace, selector, func(m interface{}) { + ret = append(ret, m.(T)) + }) + return ret, err +} + +func (w namespacedLister[T]) Get(name string) (T, error) { + var result T + + obj, exists, err := w.indexer.GetByKey(w.namespace + "/" + name) + if err != nil { + return result, err + } + if !exists { + return result, &kerrors.StatusError{ErrStatus: metav1.Status{ + Status: metav1.StatusFailure, + Code: http.StatusNotFound, + Reason: metav1.StatusReasonNotFound, + Message: fmt.Sprintf("%s not found", name), + }} + } + result = obj.(T) + return result, nil +} + +type lister[T runtime.Object] struct { + indexer cache.Indexer +} + +func (w lister[T]) List(selector labels.Selector) (ret []T, err error) { + err = cache.ListAll(w.indexer, selector, func(m interface{}) { + ret = append(ret, m.(T)) + }) + return ret, err +} + +func (w lister[T]) Get(name string) (T, error) { + var result T + + obj, exists, err := w.indexer.GetByKey(name) + if err != nil { + return result, err + } + if !exists { + // kerrors.StatusNotFound requires a GroupResource we cannot provide + return result, &kerrors.StatusError{ErrStatus: metav1.Status{ + Status: metav1.StatusFailure, + Code: http.StatusNotFound, + Reason: metav1.StatusReasonNotFound, + Message: fmt.Sprintf("%s not found", name), + }} + } + result = obj.(T) + return result, nil +} + +func (w lister[T]) Namespaced(namespace string) NamespacedLister[T] { + return namespacedLister[T]{namespace: namespace, indexer: w.indexer} +} + +func NewLister[T runtime.Object](indexer cache.Indexer) lister[T] { + return lister[T]{indexer: indexer} +} diff --git a/staging/src/k8s.io/apiserver/pkg/admission/plugin/cel/policy_decision.go b/staging/src/k8s.io/apiserver/pkg/admission/plugin/cel/policy_decision.go new file mode 100644 index 00000000000..800da35498b --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/admission/plugin/cel/policy_decision.go @@ -0,0 +1,53 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cel + +import ( + "encoding/json" + "fmt" +) + +type PolicyDecisionKind string + +const ( + Admit PolicyDecisionKind = "Admit" + Deny PolicyDecisionKind = "Deny" +) + +type PolicyDecision struct { + Kind PolicyDecisionKind `json:"kind"` + Message any `json:"message"` +} + +type PolicyDecisionWithMetadata struct { + PolicyDecision `json:"decision"` + Definition PolicyDefinition `json:"definition"` + Binding PolicyBinding `json:"binding"` +} + +type PolicyError struct { + Decisions []PolicyDecisionWithMetadata +} + +func (p *PolicyError) Error() string { + // Just format the error as JSON + jsonText, err := json.Marshal(p.Decisions) + if err != nil { + return fmt.Sprintf("error formatting PolicyError: %s", err.Error()) + } + return string(jsonText) +}