Merge pull request #112858 from alexzielenski/cel-admission-hook

CEL Admission Plugin
This commit is contained in:
Kubernetes Prow Robot 2022-10-19 17:45:15 -07:00 committed by GitHub
commit 0a689af469
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 3152 additions and 0 deletions

View File

@ -0,0 +1,10 @@
# See the OWNERS docs at https://go.k8s.io/owners
approvers:
- jpbetz
- cici37
- alexzielenski
reviewers:
- jpbetz
- cici37
- alexzielenski

View File

@ -0,0 +1,103 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cel
import (
"context"
"errors"
"fmt"
"io"
"time"
"k8s.io/apiserver/pkg/admission"
"k8s.io/client-go/tools/cache"
)
////////////////////////////////////////////////////////////////////////////////
// Plugin Definition
////////////////////////////////////////////////////////////////////////////////
// Definition for CEL admission plugin. This is the entry point into the
// CEL admission control system.
//
// Each plugin is asked to validate every object update.
const (
// PluginName indicates the name of admission plug-in
PluginName = "CEL"
)
// Register registers a plugin
func Register(plugins *admission.Plugins) {
plugins.Register(PluginName, func(config io.Reader) (admission.Interface, error) {
return NewPlugin()
})
}
////////////////////////////////////////////////////////////////////////////////
// Plugin Initialization & Dependency Injection
////////////////////////////////////////////////////////////////////////////////
type celAdmissionPlugin struct {
evaluator CELPolicyEvaluator
}
var _ WantsCELPolicyEvaluator = &celAdmissionPlugin{}
var _ admission.ValidationInterface = &celAdmissionPlugin{}
func NewPlugin() (*celAdmissionPlugin, error) {
result := &celAdmissionPlugin{}
return result, nil
}
func (c *celAdmissionPlugin) SetCELPolicyEvaluator(evaluator CELPolicyEvaluator) {
c.evaluator = evaluator
}
// Once clientset and informer factory are provided, creates and starts the
// admission controller
func (c *celAdmissionPlugin) ValidateInitialization() error {
if c.evaluator != nil {
return nil
}
return errors.New("CELPolicyEvaluator not injected")
}
////////////////////////////////////////////////////////////////////////////////
// admission.ValidationInterface
////////////////////////////////////////////////////////////////////////////////
func (c *celAdmissionPlugin) Handles(operation admission.Operation) bool {
return true
}
func (c *celAdmissionPlugin) Validate(
ctx context.Context,
a admission.Attributes,
o admission.ObjectInterfaces,
) (err error) {
deadlined, cancel := context.WithTimeout(ctx, 2*time.Second)
defer cancel()
if !cache.WaitForNamedCacheSync("cel-admission-plugin", deadlined.Done(), c.evaluator.HasSynced) {
return admission.NewForbidden(a, fmt.Errorf("not yet ready to handle request"))
}
return c.evaluator.Validate(ctx, a, o)
}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,320 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cel
import (
"context"
"errors"
"fmt"
"sync"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apiserver/pkg/admission"
"k8s.io/apiserver/pkg/admission/plugin/cel/internal/generic"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
)
// celAdmissionController is the top-level controller for admission control using CEL
// it is responsible for watching policy definitions, bindings, and config param CRDs
type celAdmissionController struct {
// Context under which the controller runs
runningContext context.Context
policyDefinitionsController generic.Controller[PolicyDefinition]
policyBindingController generic.Controller[PolicyBinding]
// dynamicclient used to create informers to watch the param crd types
dynamicClient dynamic.Interface
restMapper meta.RESTMapper
// Provided to the policy's Compile function as an injected dependency to
// assist with compiling its expressions to CEL
objectConverter ObjectConverter
// Lock which protects:
// - definitionInfo
// - bindingInfos
// - paramCRDControllers
// - definitionsToBindings
// All other fields should be assumed constant
mutex sync.RWMutex
// controller and metadata
paramsCRDControllers map[schema.GroupVersionKind]*paramInfo
// Index for each definition namespace/name, contains all binding
// namespace/names known to exist for that definition
definitionInfo map[string]*definitionInfo
// Index for each bindings namespace/name. Contains compiled templates
// for the binding depending on the policy/param combination.
bindingInfos map[string]*bindingInfo
// Map from namespace/name of a definition to a set of namespace/name
// of bindings which depend on it.
// All keys must have at least one dependent binding
// All binding names MUST exist as a key bindingInfos
definitionsToBindings map[string]sets.String
}
type definitionInfo struct {
// Error about the state of the definition's configuration and the cluster
// preventing its enforcement or compilation.
// Reset every reconciliation
configurationError error
// Last value seen by this controller to be used in policy enforcement
// May not be nil
lastReconciledValue PolicyDefinition
}
type bindingInfo struct {
// Compiled CEL expression turned into an evaluator
evaluator EvaluatorFunc
// Last value seen by this controller to be used in policy enforcement
// May not be nil
lastReconciledValue PolicyBinding
}
type paramInfo struct {
// Controller which is watching this param CRD
controller generic.Controller[*unstructured.Unstructured]
// Function to call to stop the informer and clean up the controller
stop func()
// Policy Definitions which refer to this param CRD
dependentDefinitions sets.String
}
func NewAdmissionController(
// Informers
policyDefinitionsInformer cache.SharedIndexInformer,
policyBindingInformer cache.SharedIndexInformer,
// Injected Dependencies
objectConverter ObjectConverter,
restMapper meta.RESTMapper,
dynamicClient dynamic.Interface,
) CELPolicyEvaluator {
c := &celAdmissionController{
definitionInfo: make(map[string]*definitionInfo),
bindingInfos: make(map[string]*bindingInfo),
paramsCRDControllers: make(map[schema.GroupVersionKind]*paramInfo),
definitionsToBindings: make(map[string]sets.String),
dynamicClient: dynamicClient,
objectConverter: objectConverter,
restMapper: restMapper,
}
c.policyDefinitionsController = generic.NewController(
generic.NewInformer[PolicyDefinition](policyDefinitionsInformer),
c.reconcilePolicyDefinition,
generic.ControllerOptions{
Workers: 1,
Name: "cel-policy-definitions",
},
)
c.policyBindingController = generic.NewController(
generic.NewInformer[PolicyBinding](policyBindingInformer),
c.reconcilePolicyBinding,
generic.ControllerOptions{
Workers: 1,
Name: "cel-policy-bindings",
},
)
return c
}
func (c *celAdmissionController) Run(stopCh <-chan struct{}) {
if c.runningContext != nil {
return
}
ctx, cancel := context.WithCancel(context.Background())
c.runningContext = ctx
defer func() {
c.runningContext = nil
}()
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
c.policyDefinitionsController.Run(ctx)
}()
wg.Add(1)
go func() {
defer wg.Done()
c.policyBindingController.Run(ctx)
}()
<-stopCh
cancel()
wg.Wait()
}
func (c *celAdmissionController) Validate(
ctx context.Context,
a admission.Attributes,
o admission.ObjectInterfaces,
) (err error) {
c.mutex.RLock()
defer c.mutex.RUnlock()
var allDecisions []PolicyDecisionWithMetadata = nil
addConfigError := func(err error, definition PolicyDefinition, binding PolicyBinding) {
wrappedError := fmt.Errorf("configuration error: %w", err)
switch p := definition.GetFailurePolicy(); p {
case Ignore:
klog.Info(wrappedError)
return
case Fail:
allDecisions = append(allDecisions, PolicyDecisionWithMetadata{
PolicyDecision: PolicyDecision{
Kind: Deny,
Message: wrappedError.Error(),
},
Definition: definition,
Binding: binding,
})
default:
utilruntime.HandleError(fmt.Errorf("unrecognized failure policy: '%v'", p))
}
}
for definitionNamespacedName, definitionInfo := range c.definitionInfo {
definition := definitionInfo.lastReconciledValue
if !definition.Matches(a) {
// Policy definition does not match request
continue
} else if definitionInfo.configurationError != nil {
// Configuration error.
addConfigError(definitionInfo.configurationError, definition, nil)
continue
}
dependentBindings := c.definitionsToBindings[definitionNamespacedName]
if len(dependentBindings) == 0 {
// Definition has no known bindings yet.
addConfigError(errors.New("no bindings found"), definition, nil)
continue
}
for namespacedBindingName := range dependentBindings {
// If the key is inside dependentBindings, there is guaranteed to
// be a bindingInfo for it
bindingInfo := c.bindingInfos[namespacedBindingName]
binding := bindingInfo.lastReconciledValue
if !binding.Matches(a) {
continue
}
var param *unstructured.Unstructured
// If definition has no paramsource, always provide nil params to
// evaluator. If binding specifies a params to use they are ignored.
// Done this way so you can configure params before definition is ready.
if paramSource := definition.GetParamSource(); paramSource != nil {
paramsNamespace, paramsName := binding.GetTargetParams()
// Find the params referred by the binding by looking its name up
// in our informer for its CRD
paramInfo, ok := c.paramsCRDControllers[*paramSource]
if !ok {
addConfigError(fmt.Errorf("paramSource kind `%v` not known",
paramSource.String()), definition, binding)
continue
}
if len(paramsNamespace) == 0 {
param, err = paramInfo.controller.Informer().Get(paramsName)
} else {
param, err = paramInfo.controller.Informer().Namespaced(paramsNamespace).Get(paramsName)
}
if err != nil {
// Apply failure policy
addConfigError(err, definition, binding)
if k8serrors.IsNotFound(err) {
// Param doesnt exist yet?
// Maybe just have to wait a bit.
continue
}
// There was a bad internal error
utilruntime.HandleError(err)
continue
}
}
if bindingInfo.evaluator == nil {
// Compile policy definition using binding
bindingInfo.evaluator, err = definition.Compile(c.objectConverter, c.restMapper)
if err != nil {
// compilation error. Apply failure policy
wrappedError := fmt.Errorf("failed to compile CEL expression: %w", err)
addConfigError(wrappedError, definition, binding)
continue
}
c.bindingInfos[namespacedBindingName] = bindingInfo
}
decisions := bindingInfo.evaluator(a, param)
for _, decision := range decisions {
switch decision.Kind {
case Admit:
// Do nothing
case Deny:
allDecisions = append(allDecisions, PolicyDecisionWithMetadata{
Definition: definition,
Binding: binding,
PolicyDecision: decision,
})
default:
// unrecognized decision. ignore
}
}
}
}
if len(allDecisions) > 0 {
return k8serrors.NewConflict(
a.GetResource().GroupResource(), a.GetName(),
&PolicyError{
Decisions: allDecisions,
})
}
return nil
}
func (c *celAdmissionController) HasSynced() bool {
return c.policyBindingController.HasSynced() &&
c.policyDefinitionsController.HasSynced()
}

View File

@ -0,0 +1,203 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cel
import (
"context"
"fmt"
"time"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apiserver/pkg/admission/plugin/cel/internal/generic"
"k8s.io/client-go/dynamic/dynamicinformer"
"k8s.io/client-go/tools/cache"
)
func (c *celAdmissionController) reconcilePolicyDefinition(namespace, name string, definition PolicyDefinition) error {
c.mutex.Lock()
defer c.mutex.Unlock()
// Namespace for policydefinition is empty. Leaving usage here for compatibility
// with future NamespacedPolicyDefinition
namespacedName := namespace + "/" + name
info, ok := c.definitionInfo[namespacedName]
if !ok {
info = &definitionInfo{}
c.definitionInfo[namespacedName] = info
}
var paramSource *schema.GroupVersionKind
if definition != nil {
paramSource = definition.GetParamSource()
}
// If param source has changed, remove definition as dependent of old params
// If there are no more dependents of old param, stop and clean up controller
if info.lastReconciledValue != nil && info.lastReconciledValue.GetParamSource() != nil {
oldParamSource := *info.lastReconciledValue.GetParamSource()
// If we are:
// - switching from having a param to not having a param (includes deletion)
// - or from having a param to a different one
// we remove dependency on the controller.
if paramSource == nil || *paramSource != oldParamSource {
if oldParamInfo, ok := c.paramsCRDControllers[oldParamSource]; ok {
oldParamInfo.dependentDefinitions.Delete(namespacedName)
if len(oldParamInfo.dependentDefinitions) == 0 {
oldParamInfo.stop()
delete(c.paramsCRDControllers, oldParamSource)
}
}
}
}
// Reset all previously compiled evaluators in case something relevant in
// definition has changed.
for key := range c.definitionsToBindings[namespacedName] {
bindingInfo := c.bindingInfos[key]
bindingInfo.evaluator = nil
c.bindingInfos[key] = bindingInfo
}
if definition == nil {
delete(c.definitionInfo, namespacedName)
return nil
}
// Update definition info
info.lastReconciledValue = definition
info.configurationError = nil
if paramSource == nil {
// Skip setting up controller for empty param type
return nil
}
// find GVR for params
paramsGVR, err := c.restMapper.RESTMapping(paramSource.GroupKind(), paramSource.Version)
if err != nil {
// Failed to resolve. Return error so we retry again (rate limited)
// Save a record of this definition with an evaluator that unconditionally
//
info.configurationError = fmt.Errorf("failed to find resource for param source: '%v'", paramSource.String())
return info.configurationError
}
// Start watching the param CRD
if _, ok := c.paramsCRDControllers[*paramSource]; !ok {
instanceContext, instanceCancel := context.WithCancel(c.runningContext)
// Watch for new instances of this policy
informer := dynamicinformer.NewFilteredDynamicInformer(
c.dynamicClient,
paramsGVR.Resource,
corev1.NamespaceAll,
30*time.Second,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
nil,
)
controller := generic.NewController(
generic.NewInformer[*unstructured.Unstructured](informer.Informer()),
c.reconcileParams,
generic.ControllerOptions{
Workers: 1,
Name: paramSource.String() + "-controller",
},
)
c.paramsCRDControllers[*paramSource] = &paramInfo{
controller: controller,
stop: instanceCancel,
dependentDefinitions: sets.NewString(namespacedName),
}
go informer.Informer().Run(instanceContext.Done())
go controller.Run(instanceContext)
}
return nil
}
func (c *celAdmissionController) reconcilePolicyBinding(namespace, name string, binding PolicyBinding) error {
c.mutex.Lock()
defer c.mutex.Unlock()
// Namespace for PolicyBinding is empty. In the future a namespaced binding
// may be added
// https://github.com/kubernetes/enhancements/blob/bf5c3c81ea2081d60c1dc7c832faa98479e06209/keps/sig-api-machinery/3488-cel-admission-control/README.md?plain=1#L1042
namespacedName := namespace + "/" + name
info, ok := c.bindingInfos[namespacedName]
if !ok {
info = &bindingInfo{}
c.bindingInfos[namespacedName] = info
}
oldNamespacedDefinitionName := ""
if info.lastReconciledValue != nil {
oldefinitionNamespace, oldefinitionName := info.lastReconciledValue.GetTargetDefinition()
oldNamespacedDefinitionName = oldefinitionNamespace + "/" + oldefinitionName
}
namespacedDefinitionName := ""
if binding != nil {
newDefinitionNamespace, newDefinitionName := binding.GetTargetDefinition()
namespacedDefinitionName = newDefinitionNamespace + "/" + newDefinitionName
}
// Remove record of binding from old definition if the referred policy
// has changed
if oldNamespacedDefinitionName != namespacedDefinitionName {
if dependentBindings, ok := c.definitionsToBindings[oldNamespacedDefinitionName]; ok {
dependentBindings.Delete(namespacedName)
// if there are no more dependent bindings, remove knowledge of the
// definition altogether
if len(dependentBindings) == 0 {
delete(c.definitionsToBindings, oldNamespacedDefinitionName)
}
}
}
if binding == nil {
delete(c.bindingInfos, namespacedName)
return nil
}
// Add record of binding to new definition
if dependentBindings, ok := c.definitionsToBindings[namespacedDefinitionName]; ok {
dependentBindings.Insert(namespacedName)
} else {
c.definitionsToBindings[namespacedDefinitionName] = sets.NewString(namespacedName)
}
// Remove compiled template for old binding
info.evaluator = nil
info.lastReconciledValue = binding
return nil
}
func (c *celAdmissionController) reconcileParams(namespace, name string, params *unstructured.Unstructured) error {
// Do nothing.
// When we add informational type checking we will need to compile in the
// reconcile loops instead of lazily so we can add compiler errors / type
// checker errors to the status of the resources.
return nil
}

View File

@ -0,0 +1,258 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cel
import (
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apiserver/pkg/admission"
)
////////////////////////////////////////////////////////////////////////////////
// Fake Policy Definitions
////////////////////////////////////////////////////////////////////////////////
type FakePolicyDefinition struct {
metav1.TypeMeta
metav1.ObjectMeta
// Function called when `Matches` is called
// If nil, a default function that always returns true is used
// Specified as a function pointer so that this type is still comparable
MatchFunc *func(admission.Attributes) bool `json:"-"`
// Func invoked for implementation of `Compile`
// Specified as a function pointer so that this type is still comparable
CompileFunc *func(converter ObjectConverter) (EvaluatorFunc, error) `json:"-"`
// GVK to return when ParamSource() is called
ParamSource *schema.GroupVersionKind `json:"paramSource"`
FailurePolicy FailurePolicy `json:"failurePolicy"`
}
var _ PolicyDefinition = &FakePolicyDefinition{}
func (f *FakePolicyDefinition) SetGroupVersionKind(kind schema.GroupVersionKind) {
f.TypeMeta.APIVersion = kind.GroupVersion().String()
f.TypeMeta.Kind = kind.Kind
}
func (f *FakePolicyDefinition) GroupVersionKind() schema.GroupVersionKind {
parsedGV, err := schema.ParseGroupVersion(f.TypeMeta.APIVersion)
if err != nil || f.TypeMeta.Kind == "" || parsedGV.Empty() {
return schema.GroupVersionKind{
Group: "admission.k8s.io",
Version: "v1alpha1",
Kind: "PolicyDefinition",
}
}
return schema.GroupVersionKind{
Group: parsedGV.Group,
Version: parsedGV.Version,
Kind: f.TypeMeta.Kind,
}
}
func (f *FakePolicyDefinition) GetObjectKind() schema.ObjectKind {
return f
}
func (f *FakePolicyDefinition) DeepCopyObject() runtime.Object {
copied := *f
f.ObjectMeta.DeepCopyInto(&copied.ObjectMeta)
return &copied
}
func (f *FakePolicyDefinition) GetName() string {
return f.ObjectMeta.Name
}
func (f *FakePolicyDefinition) GetNamespace() string {
return f.ObjectMeta.Namespace
}
func (f *FakePolicyDefinition) Matches(a admission.Attributes) bool {
if f.MatchFunc == nil || *f.MatchFunc == nil {
return true
}
return (*f.MatchFunc)(a)
}
func (f *FakePolicyDefinition) Compile(
converter ObjectConverter,
mapper meta.RESTMapper,
) (EvaluatorFunc, error) {
if f.CompileFunc == nil || *f.CompileFunc == nil {
panic("must provide a CompileFunc to policy definition")
}
return (*f.CompileFunc)(converter)
}
func (f *FakePolicyDefinition) GetParamSource() *schema.GroupVersionKind {
return f.ParamSource
}
func (f *FakePolicyDefinition) GetFailurePolicy() FailurePolicy {
return f.FailurePolicy
}
////////////////////////////////////////////////////////////////////////////////
// Fake Policy Binding
////////////////////////////////////////////////////////////////////////////////
type FakePolicyBinding struct {
metav1.TypeMeta
metav1.ObjectMeta
// Specified as a function pointer so that this type is still comparable
MatchFunc *func(admission.Attributes) bool `json:"-"`
Params string `json:"params"`
Policy string `json:"policy"`
}
var _ PolicyBinding = &FakePolicyBinding{}
func (f *FakePolicyBinding) SetGroupVersionKind(kind schema.GroupVersionKind) {
f.TypeMeta.APIVersion = kind.GroupVersion().String()
f.TypeMeta.Kind = kind.Kind
}
func (f *FakePolicyBinding) GroupVersionKind() schema.GroupVersionKind {
parsedGV, err := schema.ParseGroupVersion(f.TypeMeta.APIVersion)
if err != nil || f.TypeMeta.Kind == "" || parsedGV.Empty() {
return schema.GroupVersionKind{
Group: "admission.k8s.io",
Version: "v1alpha1",
Kind: "PolicyBinding",
}
}
return schema.GroupVersionKind{
Group: parsedGV.Group,
Version: parsedGV.Version,
Kind: f.TypeMeta.Kind,
}
}
func (f *FakePolicyBinding) GetObjectKind() schema.ObjectKind {
return f
}
func (f *FakePolicyBinding) DeepCopyObject() runtime.Object {
copied := *f
f.ObjectMeta.DeepCopyInto(&copied.ObjectMeta)
return &copied
}
func (f *FakePolicyBinding) Matches(a admission.Attributes) bool {
if f.MatchFunc == nil || *f.MatchFunc == nil {
return true
}
return (*f.MatchFunc)(a)
}
func (f *FakePolicyBinding) GetTargetDefinition() (namespace, name string) {
return f.Namespace, f.Policy
}
func (f *FakePolicyBinding) GetTargetParams() (namespace, name string) {
return f.Namespace, f.Params
}
/// List Types
type FakePolicyDefinitionList struct {
metav1.TypeMeta
metav1.ListMeta
Items []FakePolicyDefinition
}
func (f *FakePolicyDefinitionList) SetGroupVersionKind(kind schema.GroupVersionKind) {
f.TypeMeta.APIVersion = kind.GroupVersion().String()
f.TypeMeta.Kind = kind.Kind
}
func (f *FakePolicyDefinitionList) GroupVersionKind() schema.GroupVersionKind {
parsedGV, err := schema.ParseGroupVersion(f.TypeMeta.APIVersion)
if err != nil || f.TypeMeta.Kind == "" || parsedGV.Empty() {
return schema.GroupVersionKind{
Group: "admission.k8s.io",
Version: "v1alpha1",
Kind: "PolicyDefinitionList",
}
}
return schema.GroupVersionKind{
Group: parsedGV.Group,
Version: parsedGV.Version,
Kind: f.TypeMeta.Kind,
}
}
func (f *FakePolicyDefinitionList) GetObjectKind() schema.ObjectKind {
return f
}
func (f *FakePolicyDefinitionList) DeepCopyObject() runtime.Object {
copied := *f
f.ListMeta.DeepCopyInto(&copied.ListMeta)
copied.Items = make([]FakePolicyDefinition, len(f.Items))
copy(copied.Items, f.Items)
return &copied
}
type FakePolicyBindingList struct {
metav1.TypeMeta
metav1.ListMeta
Items []FakePolicyBinding
}
func (f *FakePolicyBindingList) SetGroupVersionKind(kind schema.GroupVersionKind) {
f.TypeMeta.APIVersion = kind.GroupVersion().String()
f.TypeMeta.Kind = kind.Kind
}
func (f *FakePolicyBindingList) GroupVersionKind() schema.GroupVersionKind {
parsedGV, err := schema.ParseGroupVersion(f.TypeMeta.APIVersion)
if err != nil || f.TypeMeta.Kind == "" || parsedGV.Empty() {
return schema.GroupVersionKind{
Group: "admission.k8s.io",
Version: "v1alpha1",
Kind: "PolicyBindingList",
}
}
return schema.GroupVersionKind{
Group: parsedGV.Group,
Version: parsedGV.Version,
Kind: f.TypeMeta.Kind,
}
}
func (f *FakePolicyBindingList) GetObjectKind() schema.ObjectKind {
return f
}
func (f *FakePolicyBindingList) DeepCopyObject() runtime.Object {
copied := *f
f.ListMeta.DeepCopyInto(&copied.ListMeta)
copied.Items = make([]FakePolicyBinding, len(f.Items))
copy(copied.Items, f.Items)
return &copied
}

View File

@ -0,0 +1,55 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cel
import (
"context"
"k8s.io/apiserver/pkg/admission"
)
type CELPolicyEvaluator interface {
Validate(ctx context.Context, a admission.Attributes, o admission.ObjectInterfaces) error
HasSynced() bool
}
// NewPluginInitializer creates a plugin initializer which dependency injects a
// singleton cel admission controller into the plugins which desire it
func NewPluginInitializer(validator CELPolicyEvaluator) *PluginInitializer {
return &PluginInitializer{validator: validator}
}
// WantsCELPolicyEvaluator gives the ability to have the shared
// CEL Admission Controller dependency injected at initialization-time.
type WantsCELPolicyEvaluator interface {
SetCELPolicyEvaluator(CELPolicyEvaluator)
}
// PluginInitializer is used for initialization of the webhook admission plugin.
type PluginInitializer struct {
validator CELPolicyEvaluator
}
var _ admission.PluginInitializer = &PluginInitializer{}
// Initialize checks the initialization interfaces implemented by each plugin
// and provide the appropriate initialization data
func (i *PluginInitializer) Initialize(plugin admission.Interface) {
if wants, ok := plugin.(WantsCELPolicyEvaluator); ok {
wants.SetCELPolicyEvaluator(i.validator)
}
}

View File

@ -0,0 +1,108 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cel
import (
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apiserver/pkg/admission"
"k8s.io/apiserver/pkg/cel"
"github.com/google/cel-go/common/types/ref"
)
type FailurePolicy string
const (
Fail FailurePolicy = "Fail"
Ignore FailurePolicy = "Ignore"
)
// EvaluatorFunc represents the AND of one or more compiled CEL expression's
// evaluators `params` may be nil if definition does not specify a paramsource
type EvaluatorFunc func(a admission.Attributes, params *unstructured.Unstructured) []PolicyDecision
// ObjectConverter is Dependency Injected into the PolicyDefinition's `Compile`
// function to assist with converting types and values to/from CEL-typed values.
type ObjectConverter interface {
// DeclForResource looks up the openapi or JSONSchemaProps, structural schema, etc.
// and compiles it into something that can be used to turn objects into CEL
// values
DeclForResource(gvr schema.GroupVersionResource) (*cel.DeclType, error)
// ValueForObject takes a Kubernetes Object and uses the CEL DeclType
// to transform it into a CEL value.
// Object may be a typed native object or an unstructured object
ValueForObject(value runtime.Object, decl *cel.DeclType) (ref.Val, error)
}
// PolicyDefinition is an interface for internal policy binding type.
// Implemented by mock/testing types, and to be implemented by the public API
// types once they have completed API review.
//
// The interface closely mirrors the format and functionality of the
// PolicyDefinition proposed in the KEP.
type PolicyDefinition interface {
runtime.Object
// Matches says whether this policy definition matches the provided admission
// resource request
Matches(a admission.Attributes) bool
Compile(
// Definition is provided with a converter which may be used by the
// return evaluator function to convert objects into CEL-typed objects
objectConverter ObjectConverter,
// Injected RESTMapper to assist with compilation
mapper meta.RESTMapper,
) (EvaluatorFunc, error)
// GetParamSource returns the GVK for the CRD used as the source of
// parameters used in the evaluation of instances of this policy
// May return nil if there is no paramsource for this definition.
GetParamSource() *schema.GroupVersionKind
// GetFailurePolicy returns how an object should be treated during an
// admission when there is a configuration error preventing CEL evaluation
GetFailurePolicy() FailurePolicy
}
// PolicyBinding is an interface for internal policy binding type. Implemented
// by mock/testing types, and to be implemented by the public API types once
// they have completed API review.
//
// The interface closely mirrors the format and functionality of the
// PolicyBinding proposed in the KEP.
type PolicyBinding interface {
runtime.Object
// Matches says whether this policy binding matches the provided admission
// resource request
Matches(a admission.Attributes) bool
// GetTargetDefinition returns the Namespace/Name of Policy Definition used
// by this binding.
GetTargetDefinition() (namespace, name string)
// GetTargetParams returns the Namespace/Name of instance of TargetDefinition's
// ParamSource to be provided to the CEL expressions of the definition during
// evaluation.
// If TargetDefinition has nil ParamSource, this is ignored.
GetTargetParams() (namespace, name string)
}

View File

@ -0,0 +1,264 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package generic
import (
"context"
"errors"
"fmt"
"sync"
"time"
kerrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
)
var _ Controller[runtime.Object] = &controller[runtime.Object]{}
type controller[T runtime.Object] struct {
informer Informer[T]
queue workqueue.RateLimitingInterface
// Returns an error if there was a transient error during reconciliation
// and the object should be tried again later.
reconciler func(namespace, name string, newObj T) error
options ControllerOptions
}
type ControllerOptions struct {
Name string
Workers uint
}
func (c controller[T]) Informer() Informer[T] {
return c.informer
}
func NewController[T runtime.Object](
informer Informer[T],
reconciler func(namepace, name string, newObj T) error,
options ControllerOptions,
) Controller[T] {
if options.Workers == 0 {
options.Workers = 2
}
if len(options.Name) == 0 {
options.Name = fmt.Sprintf("%T-controller", *new(T))
}
return &controller[T]{
options: options,
informer: informer,
reconciler: reconciler,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), options.Name),
}
}
// Runs the controller and returns an error explaining why running was stopped.
// Reconciliation ends as soon as the context completes. If there are events
// waiting to be processed at that itme, they will be dropped.
func (c *controller[T]) Run(ctx context.Context) error {
klog.Infof("starting %s", c.options.Name)
defer klog.Infof("stopping %s", c.options.Name)
enqueue := func(obj interface{}) {
var key string
var err error
if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
utilruntime.HandleError(err)
return
}
c.queue.Add(key)
}
registration, err := c.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
enqueue(obj)
},
UpdateFunc: func(oldObj, newObj interface{}) {
oldMeta, err1 := meta.Accessor(oldObj)
newMeta, err2 := meta.Accessor(newObj)
if err1 != nil || err2 != nil {
if err1 != nil {
utilruntime.HandleError(err1)
}
if err2 != nil {
utilruntime.HandleError(err2)
}
return
} else if oldMeta.GetResourceVersion() == newMeta.GetResourceVersion() {
if len(oldMeta.GetResourceVersion()) == 0 {
klog.Warningf("%v throwing out update with empty RV. this is likely to happen if a test did not supply a resource version on an updated object", c.options.Name)
}
return
}
enqueue(newObj)
},
DeleteFunc: func(obj interface{}) {
// Enqueue
enqueue(obj)
},
})
// Error might be raised if informer was started and stopped already
if err != nil {
return err
}
// Make sure event handler is removed from informer in case return early from
// an error
defer func() {
// Remove event handler and Handle Error here. Error should only be raised
// for improper usage of event handler API.
if err := c.informer.RemoveEventHandler(registration); err != nil {
utilruntime.HandleError(err)
}
}()
// Wait for initial cache list to complete before beginning to reconcile
// objects.
if !cache.WaitForNamedCacheSync(c.options.Name, ctx.Done(), c.informer.HasSynced) {
// ctx cancelled during cache sync. return early
err := ctx.Err()
if err == nil {
// if context wasnt cancelled then the sync failed for another reason
err = errors.New("cache sync failed")
}
return err
}
waitGroup := sync.WaitGroup{}
for i := uint(0); i < c.options.Workers; i++ {
waitGroup.Add(1)
go func() {
wait.Until(c.runWorker, time.Second, ctx.Done())
waitGroup.Done()
}()
}
klog.Infof("Started %v workers for %v", c.options.Workers, c.options.Name)
// Wait for context cancel.
<-ctx.Done()
// Forcefully shutdown workqueue. Drop any enqueued items.
c.queue.ShutDown()
// Workqueue shutdown signals for workers to stop. Wait for all workers to
// clean up
waitGroup.Wait()
// Only way for workers to ever stop is for caller to cancel the context
return ctx.Err()
}
func (c *controller[T]) HasSynced() bool {
return c.informer.HasSynced()
}
func (c *controller[T]) runWorker() {
for {
obj, shutdown := c.queue.Get()
if shutdown {
return
}
// We wrap this block in a func so we can defer c.workqueue.Done.
err := func(obj interface{}) error {
// We call Done here so the workqueue knows we have finished
// processing this item. We also must remember to call Forget if we
// do not want this work item being re-queued. For example, we do
// not call Forget if a transient error occurs, instead the item is
// put back on the workqueue and attempted again after a back-off
// period.
defer c.queue.Done(obj)
var key string
var ok bool
// We expect strings to come off the workqueue. These are of the
// form namespace/name. We do this as the delayed nature of the
// workqueue means the items in the informer cache may actually be
// more up to date that when the item was initially put onto the
// workqueue.
if key, ok = obj.(string); !ok {
// How did an incorrectly formatted key get in the workqueue?
// Done is sufficient. (Forget resets rate limiter for the key,
// but the key is invalid so there is no point in doing that)
return fmt.Errorf("expected string in workqueue but got %#v", obj)
}
if err := c.reconcile(key); err != nil {
// Put the item back on the workqueue to handle any transient errors.
c.queue.AddRateLimited(key)
return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error())
}
// Finally, if no error occurs we Forget this item so it is allowed
// to be re-enqueued without a long rate limit
c.queue.Forget(obj)
klog.Infof("Successfully synced '%s'", key)
return nil
}(obj)
if err != nil {
utilruntime.HandleError(err)
}
}
}
func (c *controller[T]) reconcile(key string) error {
var newObj T
var err error
var namespace string
var name string
var lister NamespacedLister[T]
// Convert the namespace/name string into a distinct namespace and name
namespace, name, err = cache.SplitMetaNamespaceKey(key)
if err != nil {
utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", key))
return nil
}
if len(namespace) > 0 {
lister = c.informer.Namespaced(namespace)
} else {
lister = c.informer
}
newObj, err = lister.Get(name)
if err != nil {
if !kerrors.IsNotFound(err) {
return err
}
// Deleted object. Inform reconciler with empty
}
return c.reconciler(namespace, name, newObj)
}

View File

@ -0,0 +1,516 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package generic_test
import (
"context"
"errors"
"fmt"
"reflect"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/require"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/admission/plugin/cel/internal/generic"
clienttesting "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache"
)
type testInformer struct {
cache.SharedIndexInformer
lock sync.Mutex
registrations map[interface{}]struct{}
}
func (t *testInformer) AddEventHandler(handler cache.ResourceEventHandler) (cache.ResourceEventHandlerRegistration, error) {
res, err := t.SharedIndexInformer.AddEventHandler(handler)
if err != nil {
return res, err
}
func() {
t.lock.Lock()
defer t.lock.Unlock()
if t.registrations == nil {
t.registrations = make(map[interface{}]struct{})
}
t.registrations[res] = struct{}{}
}()
return res, err
}
func (t *testInformer) RemoveEventHandler(registration cache.ResourceEventHandlerRegistration) error {
func() {
t.lock.Lock()
defer t.lock.Unlock()
if _, ok := t.registrations[registration]; !ok {
panic("removing unknown event handler?")
}
delete(t.registrations, registration)
}()
return t.SharedIndexInformer.RemoveEventHandler(registration)
}
var (
scheme *runtime.Scheme = runtime.NewScheme()
codecs serializer.CodecFactory = serializer.NewCodecFactory(scheme)
fakeGVR schema.GroupVersionResource = schema.GroupVersionResource{
Group: "fake.example.com",
Version: "v1",
Resource: "fakes",
}
fakeGVK schema.GroupVersionKind = fakeGVR.GroupVersion().WithKind("Fake")
fakeGVKList schema.GroupVersionKind = fakeGVR.GroupVersion().WithKind("FakeList")
)
func init() {
scheme.AddKnownTypeWithName(fakeGVK, &unstructured.Unstructured{})
scheme.AddKnownTypeWithName(fakeGVKList, &unstructured.UnstructuredList{})
}
func setupTest(ctx context.Context, customReconciler func(string, string, runtime.Object) error) (
tracker clienttesting.ObjectTracker,
controller generic.Controller[*unstructured.Unstructured],
informer *testInformer,
waitForReconcile func(runtime.Object) error,
) {
tracker = clienttesting.NewObjectTracker(scheme, codecs.UniversalDecoder())
reconciledObjects := make(chan runtime.Object)
// Set up fake informers that return instances of mock Policy definitoins
// and mock policy bindings
informer = &testInformer{SharedIndexInformer: cache.NewSharedIndexInformer(&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return tracker.List(fakeGVR, fakeGVK, "")
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return tracker.Watch(fakeGVR, "")
},
}, &unstructured.Unstructured{}, 30*time.Second, nil)}
reconciler := func(namespace, name string, newObj *unstructured.Unstructured) error {
var err error
copied := newObj.DeepCopyObject()
if customReconciler != nil {
err = customReconciler(namespace, name, newObj)
}
reconciledObjects <- copied
return err
}
waitForReconcile = func(obj runtime.Object) error {
select {
case reconciledObj := <-reconciledObjects:
if reflect.DeepEqual(obj, reconciledObj) {
return nil
}
return fmt.Errorf("expected equal objects: %v", cmp.Diff(obj, reconciledObj))
case <-ctx.Done():
return fmt.Errorf("context done before reconcile: %w", ctx.Err())
}
}
myController := generic.NewController(
generic.NewInformer[*unstructured.Unstructured](informer),
reconciler,
generic.ControllerOptions{},
)
go func() {
<-ctx.Done()
close(reconciledObjects)
for leftover := range reconciledObjects {
panic(fmt.Errorf("leftover object which was not anticipated by test: %v", leftover))
}
}()
return tracker, myController, informer, waitForReconcile
}
func TestReconcile(t *testing.T) {
testContext, testCancel := context.WithTimeout(context.Background(), 2*time.Second)
defer testCancel()
tracker, myController, informer, waitForReconcile := setupTest(testContext, nil)
// Add object to informer
initialObject := &unstructured.Unstructured{}
initialObject.SetUnstructuredContent(map[string]interface{}{
"metadata": map[string]interface{}{
"name": "object1",
"resourceVersion": "1",
},
})
initialObject.SetGroupVersionKind(fakeGVK)
require.NoError(t, tracker.Add(initialObject))
wg := sync.WaitGroup{}
// Start informer
wg.Add(1)
go func() {
defer wg.Done()
informer.Run(testContext.Done())
}()
// Start controller
wg.Add(1)
go func() {
defer wg.Done()
stopReason := myController.Run(testContext)
require.ErrorIs(t, stopReason, context.Canceled)
}()
require.True(t, cache.WaitForCacheSync(testContext.Done(), myController.HasSynced))
// Wait for all enqueued reconciliations
require.NoError(t, waitForReconcile(initialObject))
// Updated object
updatedObject := &unstructured.Unstructured{}
updatedObject.SetUnstructuredContent(map[string]interface{}{
"metadata": map[string]interface{}{
"name": "object1",
"resourceVersion": "2",
},
"newKey": "a key",
})
updatedObject.SetGroupVersionKind(fakeGVK)
require.NoError(t, tracker.Update(fakeGVR, updatedObject, ""))
// Wait for all enqueued reconciliations
require.NoError(t, waitForReconcile(updatedObject))
require.NoError(t, tracker.Delete(fakeGVR, updatedObject.GetNamespace(), updatedObject.GetName()))
require.NoError(t, waitForReconcile(nil))
testCancel()
wg.Wait()
}
func TestShutdown(t *testing.T) {
testContext, testCancel := context.WithTimeout(context.Background(), 2*time.Second)
defer testCancel()
_, myController, informer, _ := setupTest(testContext, nil)
wg := sync.WaitGroup{}
// Start informer
wg.Add(1)
go func() {
defer wg.Done()
informer.Run(testContext.Done())
}()
// Start controller
wg.Add(1)
go func() {
defer wg.Done()
stopReason := myController.Run(testContext)
require.ErrorIs(t, stopReason, context.Canceled)
}()
// Wait for controller and informer to start up
require.True(t, cache.WaitForCacheSync(testContext.Done(), myController.HasSynced))
// Stop the controller and informer
testCancel()
// Wait for controller and informer to stop
wg.Wait()
// Ensure the event handler was cleaned up
require.Empty(t, informer.registrations)
}
// Show an error is thrown informer isn't started when the controller runs
func TestInformerNeverStarts(t *testing.T) {
testContext, testCancel := context.WithTimeout(context.Background(), 400*time.Millisecond)
defer testCancel()
_, myController, informer, _ := setupTest(testContext, nil)
wg := sync.WaitGroup{}
// Start controller
wg.Add(1)
go func() {
defer wg.Done()
stopReason := myController.Run(testContext)
require.ErrorIs(t, stopReason, context.DeadlineExceeded)
}()
// Wait for deadline to pass without syncing the cache
require.False(t, cache.WaitForCacheSync(testContext.Done(), myController.HasSynced))
// Wait for controller to stop (or context deadline will pass quickly)
wg.Wait()
// Ensure there are no event handlers
require.Empty(t, informer.registrations)
}
// Shows that if RV does not change, the reconciler does not get called
func TestIgnoredUpdate(t *testing.T) {
testContext, testCancel := context.WithTimeout(context.Background(), 2*time.Second)
defer testCancel()
tracker, myController, informer, waitForReconcile := setupTest(testContext, nil)
// Add object to informer
initialObject := &unstructured.Unstructured{}
initialObject.SetUnstructuredContent(map[string]interface{}{
"metadata": map[string]interface{}{
"name": "object1",
"resourceVersion": "1",
},
})
initialObject.SetGroupVersionKind(fakeGVK)
require.NoError(t, tracker.Add(initialObject))
wg := sync.WaitGroup{}
// Start informer
wg.Add(1)
go func() {
defer wg.Done()
informer.Run(testContext.Done())
}()
// Start controller
wg.Add(1)
go func() {
defer wg.Done()
stopReason := myController.Run(testContext)
require.ErrorIs(t, stopReason, context.Canceled)
}()
require.True(t, cache.WaitForCacheSync(testContext.Done(), myController.HasSynced))
// Wait for all enqueued reconciliations
require.NoError(t, waitForReconcile(initialObject))
// Send update with the same object
require.NoError(t, tracker.Update(fakeGVR, initialObject, ""))
// Don't wait for it to be reconciled
testCancel()
wg.Wait()
// Test infrastructure has logic to panic if there are any reconciled objects
// that weren't "expected"
}
// Shows that an object which fails reconciliation will retry
func TestReconcileRetry(t *testing.T) {
testContext, testCancel := context.WithTimeout(context.Background(), 2*time.Second)
defer testCancel()
calls := atomic.Uint64{}
success := atomic.Bool{}
tracker, myController, _, waitForReconcile := setupTest(testContext, func(s1, s2 string, o runtime.Object) error {
if calls.Add(1) > 2 {
// Suddenly start liking the object
success.Store(true)
return nil
}
return errors.New("i dont like this object")
})
// Start informer
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
myController.Informer().Run(testContext.Done())
}()
// Start controller
wg.Add(1)
go func() {
defer wg.Done()
stopReason := myController.Run(testContext)
require.ErrorIs(t, stopReason, context.Canceled)
}()
// Add object to informer
initialObject := &unstructured.Unstructured{}
initialObject.SetUnstructuredContent(map[string]interface{}{
"metadata": map[string]interface{}{
"name": "object1",
"resourceVersion": "1",
},
})
initialObject.SetGroupVersionKind(fakeGVK)
require.NoError(t, tracker.Add(initialObject))
require.NoError(t, waitForReconcile(initialObject), "initial reconcile")
require.NoError(t, waitForReconcile(initialObject), "previous reconcile failed, should retry quickly")
require.NoError(t, waitForReconcile(initialObject), "previous reconcile failed, should retry quickly")
// Will not try again since calls > 2 for last reconcile
require.True(t, success.Load(), "last call to reconcile should return success")
testCancel()
wg.Wait()
}
func TestInformerList(t *testing.T) {
testContext, testCancel := context.WithTimeout(context.Background(), 2*time.Second)
defer testCancel()
tracker, myController, _, _ := setupTest(testContext, nil)
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
myController.Informer().Run(testContext.Done())
}()
require.True(t, cache.WaitForCacheSync(testContext.Done(), myController.HasSynced))
object1 := &unstructured.Unstructured{}
object1.SetUnstructuredContent(map[string]interface{}{
"metadata": map[string]interface{}{
"name": "object1",
"resourceVersion": "object1",
},
})
object1.SetGroupVersionKind(fakeGVK)
object1v2 := &unstructured.Unstructured{}
object1v2.SetUnstructuredContent(map[string]interface{}{
"metadata": map[string]interface{}{
"name": "object1",
"resourceVersion": "object1v2",
},
})
object1v2.SetGroupVersionKind(fakeGVK)
object2 := &unstructured.Unstructured{}
object2.SetUnstructuredContent(map[string]interface{}{
"metadata": map[string]interface{}{
"name": "object2",
"resourceVersion": "object2",
},
})
object2.SetGroupVersionKind(fakeGVK)
object3 := &unstructured.Unstructured{}
object3.SetUnstructuredContent(map[string]interface{}{
"metadata": map[string]interface{}{
"name": "object3",
"resourceVersion": "object3",
},
})
object3.SetGroupVersionKind(fakeGVK)
namespacedObject1 := &unstructured.Unstructured{}
namespacedObject1.SetUnstructuredContent(map[string]interface{}{
"metadata": map[string]interface{}{
"name": "namespacedObject1",
"namespace": "test",
"resourceVersion": "namespacedObject1",
},
})
namespacedObject1.SetGroupVersionKind(fakeGVK)
namespacedObject2 := &unstructured.Unstructured{}
namespacedObject2.SetUnstructuredContent(map[string]interface{}{
"metadata": map[string]interface{}{
"name": "namespacedObject2",
"namespace": "test",
"resourceVersion": "namespacedObject2",
},
})
namespacedObject2.SetGroupVersionKind(fakeGVK)
require.NoError(t, tracker.Add(object1))
require.NoError(t, tracker.Add(object2))
require.NoError(t, wait.PollWithContext(testContext, 100*time.Millisecond, 500*time.Millisecond, func(ctx context.Context) (done bool, err error) {
return myController.Informer().LastSyncResourceVersion() == object2.GetResourceVersion(), nil
}))
values, err := myController.Informer().List(labels.Everything())
require.NoError(t, err)
require.ElementsMatch(t, []*unstructured.Unstructured{object1, object2}, values)
require.NoError(t, tracker.Update(fakeGVR, object1v2, object1v2.GetNamespace()))
require.NoError(t, tracker.Delete(fakeGVR, object2.GetNamespace(), object2.GetName()))
require.NoError(t, tracker.Add(object3))
require.NoError(t, wait.PollWithContext(testContext, 100*time.Millisecond, 500*time.Millisecond, func(ctx context.Context) (done bool, err error) {
return myController.Informer().LastSyncResourceVersion() == object3.GetResourceVersion(), nil
}))
values, err = myController.Informer().List(labels.Everything())
require.NoError(t, err)
require.ElementsMatch(t, []*unstructured.Unstructured{object1v2, object3}, values)
require.NoError(t, tracker.Add(namespacedObject1))
require.NoError(t, tracker.Add(namespacedObject2))
require.NoError(t, wait.PollWithContext(testContext, 100*time.Millisecond, 500*time.Millisecond, func(ctx context.Context) (done bool, err error) {
return myController.Informer().LastSyncResourceVersion() == namespacedObject2.GetResourceVersion(), nil
}))
values, err = myController.Informer().Namespaced(namespacedObject1.GetNamespace()).List(labels.Everything())
require.NoError(t, err)
require.ElementsMatch(t, []*unstructured.Unstructured{namespacedObject1, namespacedObject2}, values)
value, err := myController.Informer().Get(object3.GetName())
require.NoError(t, err)
require.Equal(t, value, object3)
value, err = myController.Informer().Namespaced(namespacedObject1.GetNamespace()).Get(namespacedObject1.GetName())
require.NoError(t, err)
require.Equal(t, value, namespacedObject1)
_, err = myController.Informer().Get("fakeobjectname")
require.True(t, k8serrors.IsNotFound(err))
_, err = myController.Informer().Namespaced("test").Get("fakeobjectname")
require.True(t, k8serrors.IsNotFound(err))
_, err = myController.Informer().Namespaced("fakenamespace").Get("fakeobjectname")
require.True(t, k8serrors.IsNotFound(err))
}

View File

@ -0,0 +1,29 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package generic contains a typed wrapper over cache SharedIndexInformer
// and Lister (maybe eventually should have a home there?)
//
// This interface is being experimented with as an easier way to write controllers
// with a bit less boilerplate.
//
// Informer/Lister classes are thin wrappers providing a type-safe interface
// over regular interface{}-based Informers/Listers
//
// Controller[T] provides a reusable way to reconcile objects out of an informer
// using the tried and true controller design pattern found all over k8s
// codebase based upon syncFunc/reconcile
package generic

View File

@ -0,0 +1,36 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package generic
import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/cache"
)
var _ Informer[runtime.Object] = informer[runtime.Object]{}
type informer[T runtime.Object] struct {
cache.SharedIndexInformer
lister[T]
}
func NewInformer[T runtime.Object](informe cache.SharedIndexInformer) Informer[T] {
return informer[T]{
SharedIndexInformer: informe,
lister: NewLister[T](informe.GetIndexer()),
}
}

View File

@ -0,0 +1,62 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package generic
import (
"context"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/cache"
)
type Controller[T runtime.Object] interface {
// Meant to be run inside a goroutine
// Waits for and reacts to changes in whatever type the controller
// is concerned with.
//
// Returns an error always non-nil explaining why the worker stopped
Run(ctx context.Context) error
// Retrieves the informer used to back this controller
Informer() Informer[T]
// Returns true if the informer cache has synced, and all the objects from
// the initial list have been reconciled at least once.
HasSynced() bool
}
type NamespacedLister[T any] interface {
// List lists all ValidationRuleSets in the indexer for a given namespace.
// Objects returned here must be treated as read-only.
List(selector labels.Selector) (ret []T, err error)
// Get retrieves the ValidationRuleSet from the indexer for a given namespace and name.
// Objects returned here must be treated as read-only.
Get(name string) (T, error)
}
type Informer[T any] interface {
cache.SharedIndexInformer
Lister[T]
}
// Lister[T] helps list Ts.
// All objects returned here must be treated as read-only.
type Lister[T any] interface {
NamespacedLister[T]
Namespaced(namespace string) NamespacedLister[T]
}

View File

@ -0,0 +1,100 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package generic
import (
"fmt"
"net/http"
kerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/cache"
)
var _ Lister[runtime.Object] = lister[runtime.Object]{}
type namespacedLister[T runtime.Object] struct {
indexer cache.Indexer
namespace string
}
func (w namespacedLister[T]) List(selector labels.Selector) (ret []T, err error) {
err = cache.ListAllByNamespace(w.indexer, w.namespace, selector, func(m interface{}) {
ret = append(ret, m.(T))
})
return ret, err
}
func (w namespacedLister[T]) Get(name string) (T, error) {
var result T
obj, exists, err := w.indexer.GetByKey(w.namespace + "/" + name)
if err != nil {
return result, err
}
if !exists {
return result, &kerrors.StatusError{ErrStatus: metav1.Status{
Status: metav1.StatusFailure,
Code: http.StatusNotFound,
Reason: metav1.StatusReasonNotFound,
Message: fmt.Sprintf("%s not found", name),
}}
}
result = obj.(T)
return result, nil
}
type lister[T runtime.Object] struct {
indexer cache.Indexer
}
func (w lister[T]) List(selector labels.Selector) (ret []T, err error) {
err = cache.ListAll(w.indexer, selector, func(m interface{}) {
ret = append(ret, m.(T))
})
return ret, err
}
func (w lister[T]) Get(name string) (T, error) {
var result T
obj, exists, err := w.indexer.GetByKey(name)
if err != nil {
return result, err
}
if !exists {
// kerrors.StatusNotFound requires a GroupResource we cannot provide
return result, &kerrors.StatusError{ErrStatus: metav1.Status{
Status: metav1.StatusFailure,
Code: http.StatusNotFound,
Reason: metav1.StatusReasonNotFound,
Message: fmt.Sprintf("%s not found", name),
}}
}
result = obj.(T)
return result, nil
}
func (w lister[T]) Namespaced(namespace string) NamespacedLister[T] {
return namespacedLister[T]{namespace: namespace, indexer: w.indexer}
}
func NewLister[T runtime.Object](indexer cache.Indexer) lister[T] {
return lister[T]{indexer: indexer}
}

View File

@ -0,0 +1,53 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cel
import (
"encoding/json"
"fmt"
)
type PolicyDecisionKind string
const (
Admit PolicyDecisionKind = "Admit"
Deny PolicyDecisionKind = "Deny"
)
type PolicyDecision struct {
Kind PolicyDecisionKind `json:"kind"`
Message any `json:"message"`
}
type PolicyDecisionWithMetadata struct {
PolicyDecision `json:"decision"`
Definition PolicyDefinition `json:"definition"`
Binding PolicyBinding `json:"binding"`
}
type PolicyError struct {
Decisions []PolicyDecisionWithMetadata
}
func (p *PolicyError) Error() string {
// Just format the error as JSON
jsonText, err := json.Marshal(p.Decisions)
if err != nil {
return fmt.Sprintf("error formatting PolicyError: %s", err.Error())
}
return string(jsonText)
}