mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-30 15:05:27 +00:00
add cel admission controller
This commit is contained in:
parent
c52fae186a
commit
2286501e22
@ -0,0 +1,320 @@
|
||||
/*
|
||||
Copyright 2022 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package cel
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
k8serrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/api/meta"
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/apiserver/pkg/admission"
|
||||
"k8s.io/apiserver/pkg/admission/plugin/cel/internal/generic"
|
||||
"k8s.io/client-go/dynamic"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/klog/v2"
|
||||
)
|
||||
|
||||
// celAdmissionController is the top-level controller for admission control using CEL
|
||||
// it is responsible for watching policy definitions, bindings, and config param CRDs
|
||||
type celAdmissionController struct {
|
||||
// Context under which the controller runs
|
||||
runningContext context.Context
|
||||
|
||||
policyDefinitionsController generic.Controller[PolicyDefinition]
|
||||
policyBindingController generic.Controller[PolicyBinding]
|
||||
|
||||
// dynamicclient used to create informers to watch the param crd types
|
||||
dynamicClient dynamic.Interface
|
||||
restMapper meta.RESTMapper
|
||||
|
||||
// Provided to the policy's Compile function as an injected dependency to
|
||||
// assist with compiling its expressions to CEL
|
||||
objectConverter ObjectConverter
|
||||
|
||||
// Lock which protects:
|
||||
// - definitionInfo
|
||||
// - bindingInfos
|
||||
// - paramCRDControllers
|
||||
// - definitionsToBindings
|
||||
// All other fields should be assumed constant
|
||||
mutex sync.RWMutex
|
||||
|
||||
// controller and metadata
|
||||
paramsCRDControllers map[schema.GroupVersionKind]*paramInfo
|
||||
|
||||
// Index for each definition namespace/name, contains all binding
|
||||
// namespace/names known to exist for that definition
|
||||
definitionInfo map[string]*definitionInfo
|
||||
|
||||
// Index for each bindings namespace/name. Contains compiled templates
|
||||
// for the binding depending on the policy/param combination.
|
||||
bindingInfos map[string]*bindingInfo
|
||||
|
||||
// Map from namespace/name of a definition to a set of namespace/name
|
||||
// of bindings which depend on it.
|
||||
// All keys must have at least one dependent binding
|
||||
// All binding names MUST exist as a key bindingInfos
|
||||
definitionsToBindings map[string]sets.String
|
||||
}
|
||||
|
||||
type definitionInfo struct {
|
||||
// Error about the state of the definition's configuration and the cluster
|
||||
// preventing its enforcement or compilation.
|
||||
// Reset every reconciliation
|
||||
configurationError error
|
||||
|
||||
// Last value seen by this controller to be used in policy enforcement
|
||||
// May not be nil
|
||||
lastReconciledValue PolicyDefinition
|
||||
}
|
||||
|
||||
type bindingInfo struct {
|
||||
// Compiled CEL expression turned into an evaluator
|
||||
evaluator EvaluatorFunc
|
||||
|
||||
// Last value seen by this controller to be used in policy enforcement
|
||||
// May not be nil
|
||||
lastReconciledValue PolicyBinding
|
||||
}
|
||||
|
||||
type paramInfo struct {
|
||||
// Controller which is watching this param CRD
|
||||
controller generic.Controller[*unstructured.Unstructured]
|
||||
|
||||
// Function to call to stop the informer and clean up the controller
|
||||
stop func()
|
||||
|
||||
// Policy Definitions which refer to this param CRD
|
||||
dependentDefinitions sets.String
|
||||
}
|
||||
|
||||
func NewAdmissionController(
|
||||
// Informers
|
||||
policyDefinitionsInformer cache.SharedIndexInformer,
|
||||
policyBindingInformer cache.SharedIndexInformer,
|
||||
|
||||
// Injected Dependencies
|
||||
objectConverter ObjectConverter,
|
||||
restMapper meta.RESTMapper,
|
||||
dynamicClient dynamic.Interface,
|
||||
) CELPolicyEvaluator {
|
||||
c := &celAdmissionController{
|
||||
definitionInfo: make(map[string]*definitionInfo),
|
||||
bindingInfos: make(map[string]*bindingInfo),
|
||||
paramsCRDControllers: make(map[schema.GroupVersionKind]*paramInfo),
|
||||
definitionsToBindings: make(map[string]sets.String),
|
||||
dynamicClient: dynamicClient,
|
||||
objectConverter: objectConverter,
|
||||
restMapper: restMapper,
|
||||
}
|
||||
|
||||
c.policyDefinitionsController = generic.NewController(
|
||||
generic.NewInformer[PolicyDefinition](policyDefinitionsInformer),
|
||||
c.reconcilePolicyDefinition,
|
||||
generic.ControllerOptions{
|
||||
Workers: 1,
|
||||
Name: "cel-policy-definitions",
|
||||
},
|
||||
)
|
||||
c.policyBindingController = generic.NewController(
|
||||
generic.NewInformer[PolicyBinding](policyBindingInformer),
|
||||
c.reconcilePolicyBinding,
|
||||
generic.ControllerOptions{
|
||||
Workers: 1,
|
||||
Name: "cel-policy-bindings",
|
||||
},
|
||||
)
|
||||
return c
|
||||
}
|
||||
|
||||
func (c *celAdmissionController) Run(stopCh <-chan struct{}) {
|
||||
if c.runningContext != nil {
|
||||
return
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
c.runningContext = ctx
|
||||
defer func() {
|
||||
c.runningContext = nil
|
||||
}()
|
||||
|
||||
wg := sync.WaitGroup{}
|
||||
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
c.policyDefinitionsController.Run(ctx)
|
||||
}()
|
||||
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
c.policyBindingController.Run(ctx)
|
||||
}()
|
||||
|
||||
<-stopCh
|
||||
cancel()
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func (c *celAdmissionController) Validate(
|
||||
ctx context.Context,
|
||||
a admission.Attributes,
|
||||
o admission.ObjectInterfaces,
|
||||
) (err error) {
|
||||
c.mutex.RLock()
|
||||
defer c.mutex.RUnlock()
|
||||
|
||||
var allDecisions []PolicyDecisionWithMetadata = nil
|
||||
|
||||
addConfigError := func(err error, definition PolicyDefinition, binding PolicyBinding) {
|
||||
wrappedError := fmt.Errorf("configuration error: %w", err)
|
||||
switch p := definition.GetFailurePolicy(); p {
|
||||
case Ignore:
|
||||
klog.Info(wrappedError)
|
||||
return
|
||||
case Fail:
|
||||
allDecisions = append(allDecisions, PolicyDecisionWithMetadata{
|
||||
PolicyDecision: PolicyDecision{
|
||||
Kind: Deny,
|
||||
Message: wrappedError.Error(),
|
||||
},
|
||||
Definition: definition,
|
||||
Binding: binding,
|
||||
})
|
||||
default:
|
||||
utilruntime.HandleError(fmt.Errorf("unrecognized failure policy: '%v'", p))
|
||||
}
|
||||
}
|
||||
for definitionNamespacedName, definitionInfo := range c.definitionInfo {
|
||||
definition := definitionInfo.lastReconciledValue
|
||||
if !definition.Matches(a) {
|
||||
// Policy definition does not match request
|
||||
continue
|
||||
} else if definitionInfo.configurationError != nil {
|
||||
// Configuration error.
|
||||
addConfigError(definitionInfo.configurationError, definition, nil)
|
||||
continue
|
||||
}
|
||||
|
||||
dependentBindings := c.definitionsToBindings[definitionNamespacedName]
|
||||
if len(dependentBindings) == 0 {
|
||||
// Definition has no known bindings yet.
|
||||
addConfigError(errors.New("no bindings found"), definition, nil)
|
||||
continue
|
||||
}
|
||||
|
||||
for namespacedBindingName := range dependentBindings {
|
||||
// If the key is inside dependentBindings, there is guaranteed to
|
||||
// be a bindingInfo for it
|
||||
bindingInfo := c.bindingInfos[namespacedBindingName]
|
||||
binding := bindingInfo.lastReconciledValue
|
||||
if !binding.Matches(a) {
|
||||
continue
|
||||
}
|
||||
|
||||
var param *unstructured.Unstructured
|
||||
|
||||
// If definition has no paramsource, always provide nil params to
|
||||
// evaluator. If binding specifies a params to use they are ignored.
|
||||
// Done this way so you can configure params before definition is ready.
|
||||
if paramSource := definition.GetParamSource(); paramSource != nil {
|
||||
paramsNamespace, paramsName := binding.GetTargetParams()
|
||||
|
||||
// Find the params referred by the binding by looking its name up
|
||||
// in our informer for its CRD
|
||||
paramInfo, ok := c.paramsCRDControllers[*paramSource]
|
||||
if !ok {
|
||||
addConfigError(fmt.Errorf("paramSource kind `%v` not known",
|
||||
paramSource.String()), definition, binding)
|
||||
continue
|
||||
}
|
||||
|
||||
if len(paramsNamespace) == 0 {
|
||||
param, err = paramInfo.controller.Informer().Get(paramsName)
|
||||
} else {
|
||||
param, err = paramInfo.controller.Informer().Namespaced(paramsNamespace).Get(paramsName)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
// Apply failure policy
|
||||
addConfigError(err, definition, binding)
|
||||
|
||||
if k8serrors.IsNotFound(err) {
|
||||
// Param doesnt exist yet?
|
||||
// Maybe just have to wait a bit.
|
||||
continue
|
||||
}
|
||||
|
||||
// There was a bad internal error
|
||||
utilruntime.HandleError(err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
if bindingInfo.evaluator == nil {
|
||||
// Compile policy definition using binding
|
||||
bindingInfo.evaluator, err = definition.Compile(c.objectConverter, c.restMapper)
|
||||
if err != nil {
|
||||
// compilation error. Apply failure policy
|
||||
wrappedError := fmt.Errorf("failed to compile CEL expression: %w", err)
|
||||
addConfigError(wrappedError, definition, binding)
|
||||
continue
|
||||
}
|
||||
c.bindingInfos[namespacedBindingName] = bindingInfo
|
||||
}
|
||||
|
||||
decisions := bindingInfo.evaluator(a, param)
|
||||
for _, decision := range decisions {
|
||||
switch decision.Kind {
|
||||
case Admit:
|
||||
// Do nothing
|
||||
case Deny:
|
||||
allDecisions = append(allDecisions, PolicyDecisionWithMetadata{
|
||||
Definition: definition,
|
||||
Binding: binding,
|
||||
PolicyDecision: decision,
|
||||
})
|
||||
default:
|
||||
// unrecognized decision. ignore
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if len(allDecisions) > 0 {
|
||||
return k8serrors.NewConflict(
|
||||
a.GetResource().GroupResource(), a.GetName(),
|
||||
&PolicyError{
|
||||
Decisions: allDecisions,
|
||||
})
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
func (c *celAdmissionController) HasSynced() bool {
|
||||
return c.policyBindingController.HasSynced() &&
|
||||
c.policyDefinitionsController.HasSynced()
|
||||
}
|
@ -0,0 +1,203 @@
|
||||
/*
|
||||
Copyright 2022 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package cel
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/apiserver/pkg/admission/plugin/cel/internal/generic"
|
||||
"k8s.io/client-go/dynamic/dynamicinformer"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
)
|
||||
|
||||
func (c *celAdmissionController) reconcilePolicyDefinition(namespace, name string, definition PolicyDefinition) error {
|
||||
c.mutex.Lock()
|
||||
defer c.mutex.Unlock()
|
||||
|
||||
// Namespace for policydefinition is empty. Leaving usage here for compatibility
|
||||
// with future NamespacedPolicyDefinition
|
||||
namespacedName := namespace + "/" + name
|
||||
info, ok := c.definitionInfo[namespacedName]
|
||||
if !ok {
|
||||
info = &definitionInfo{}
|
||||
c.definitionInfo[namespacedName] = info
|
||||
}
|
||||
|
||||
var paramSource *schema.GroupVersionKind
|
||||
if definition != nil {
|
||||
paramSource = definition.GetParamSource()
|
||||
}
|
||||
|
||||
// If param source has changed, remove definition as dependent of old params
|
||||
// If there are no more dependents of old param, stop and clean up controller
|
||||
if info.lastReconciledValue != nil && info.lastReconciledValue.GetParamSource() != nil {
|
||||
oldParamSource := *info.lastReconciledValue.GetParamSource()
|
||||
|
||||
// If we are:
|
||||
// - switching from having a param to not having a param (includes deletion)
|
||||
// - or from having a param to a different one
|
||||
// we remove dependency on the controller.
|
||||
if paramSource == nil || *paramSource != oldParamSource {
|
||||
if oldParamInfo, ok := c.paramsCRDControllers[oldParamSource]; ok {
|
||||
oldParamInfo.dependentDefinitions.Delete(namespacedName)
|
||||
if len(oldParamInfo.dependentDefinitions) == 0 {
|
||||
oldParamInfo.stop()
|
||||
delete(c.paramsCRDControllers, oldParamSource)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Reset all previously compiled evaluators in case something relevant in
|
||||
// definition has changed.
|
||||
for key := range c.definitionsToBindings[namespacedName] {
|
||||
bindingInfo := c.bindingInfos[key]
|
||||
bindingInfo.evaluator = nil
|
||||
c.bindingInfos[key] = bindingInfo
|
||||
}
|
||||
|
||||
if definition == nil {
|
||||
delete(c.definitionInfo, namespacedName)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Update definition info
|
||||
info.lastReconciledValue = definition
|
||||
info.configurationError = nil
|
||||
|
||||
if paramSource == nil {
|
||||
// Skip setting up controller for empty param type
|
||||
return nil
|
||||
}
|
||||
|
||||
// find GVR for params
|
||||
paramsGVR, err := c.restMapper.RESTMapping(paramSource.GroupKind(), paramSource.Version)
|
||||
if err != nil {
|
||||
// Failed to resolve. Return error so we retry again (rate limited)
|
||||
// Save a record of this definition with an evaluator that unconditionally
|
||||
//
|
||||
info.configurationError = fmt.Errorf("failed to find resource for param source: '%v'", paramSource.String())
|
||||
return info.configurationError
|
||||
}
|
||||
|
||||
// Start watching the param CRD
|
||||
if _, ok := c.paramsCRDControllers[*paramSource]; !ok {
|
||||
instanceContext, instanceCancel := context.WithCancel(c.runningContext)
|
||||
|
||||
// Watch for new instances of this policy
|
||||
informer := dynamicinformer.NewFilteredDynamicInformer(
|
||||
c.dynamicClient,
|
||||
paramsGVR.Resource,
|
||||
corev1.NamespaceAll,
|
||||
30*time.Second,
|
||||
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
|
||||
nil,
|
||||
)
|
||||
|
||||
controller := generic.NewController(
|
||||
generic.NewInformer[*unstructured.Unstructured](informer.Informer()),
|
||||
c.reconcileParams,
|
||||
generic.ControllerOptions{
|
||||
Workers: 1,
|
||||
Name: paramSource.String() + "-controller",
|
||||
},
|
||||
)
|
||||
|
||||
c.paramsCRDControllers[*paramSource] = ¶mInfo{
|
||||
controller: controller,
|
||||
stop: instanceCancel,
|
||||
dependentDefinitions: sets.NewString(namespacedName),
|
||||
}
|
||||
|
||||
go informer.Informer().Run(instanceContext.Done())
|
||||
go controller.Run(instanceContext)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *celAdmissionController) reconcilePolicyBinding(namespace, name string, binding PolicyBinding) error {
|
||||
c.mutex.Lock()
|
||||
defer c.mutex.Unlock()
|
||||
|
||||
// Namespace for PolicyBinding is empty. In the future a namespaced binding
|
||||
// may be added
|
||||
// https://github.com/kubernetes/enhancements/blob/bf5c3c81ea2081d60c1dc7c832faa98479e06209/keps/sig-api-machinery/3488-cel-admission-control/README.md?plain=1#L1042
|
||||
namespacedName := namespace + "/" + name
|
||||
info, ok := c.bindingInfos[namespacedName]
|
||||
if !ok {
|
||||
info = &bindingInfo{}
|
||||
c.bindingInfos[namespacedName] = info
|
||||
}
|
||||
|
||||
oldNamespacedDefinitionName := ""
|
||||
if info.lastReconciledValue != nil {
|
||||
oldefinitionNamespace, oldefinitionName := info.lastReconciledValue.GetTargetDefinition()
|
||||
oldNamespacedDefinitionName = oldefinitionNamespace + "/" + oldefinitionName
|
||||
}
|
||||
|
||||
namespacedDefinitionName := ""
|
||||
if binding != nil {
|
||||
newDefinitionNamespace, newDefinitionName := binding.GetTargetDefinition()
|
||||
namespacedDefinitionName = newDefinitionNamespace + "/" + newDefinitionName
|
||||
}
|
||||
|
||||
// Remove record of binding from old definition if the referred policy
|
||||
// has changed
|
||||
if oldNamespacedDefinitionName != namespacedDefinitionName {
|
||||
if dependentBindings, ok := c.definitionsToBindings[oldNamespacedDefinitionName]; ok {
|
||||
dependentBindings.Delete(namespacedName)
|
||||
|
||||
// if there are no more dependent bindings, remove knowledge of the
|
||||
// definition altogether
|
||||
if len(dependentBindings) == 0 {
|
||||
delete(c.definitionsToBindings, oldNamespacedDefinitionName)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if binding == nil {
|
||||
delete(c.bindingInfos, namespacedName)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Add record of binding to new definition
|
||||
if dependentBindings, ok := c.definitionsToBindings[namespacedDefinitionName]; ok {
|
||||
dependentBindings.Insert(namespacedName)
|
||||
} else {
|
||||
c.definitionsToBindings[namespacedDefinitionName] = sets.NewString(namespacedName)
|
||||
}
|
||||
|
||||
// Remove compiled template for old binding
|
||||
info.evaluator = nil
|
||||
info.lastReconciledValue = binding
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *celAdmissionController) reconcileParams(namespace, name string, params *unstructured.Unstructured) error {
|
||||
// Do nothing.
|
||||
// When we add informational type checking we will need to compile in the
|
||||
// reconcile loops instead of lazily so we can add compiler errors / type
|
||||
// checker errors to the status of the resources.
|
||||
return nil
|
||||
}
|
@ -0,0 +1,108 @@
|
||||
/*
|
||||
Copyright 2022 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package cel
|
||||
|
||||
import (
|
||||
"k8s.io/apimachinery/pkg/api/meta"
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apiserver/pkg/admission"
|
||||
"k8s.io/apiserver/pkg/cel"
|
||||
|
||||
"github.com/google/cel-go/common/types/ref"
|
||||
)
|
||||
|
||||
type FailurePolicy string
|
||||
|
||||
const (
|
||||
Fail FailurePolicy = "Fail"
|
||||
Ignore FailurePolicy = "Ignore"
|
||||
)
|
||||
|
||||
// EvaluatorFunc represents the AND of one or more compiled CEL expression's
|
||||
// evaluators `params` may be nil if definition does not specify a paramsource
|
||||
type EvaluatorFunc func(a admission.Attributes, params *unstructured.Unstructured) []PolicyDecision
|
||||
|
||||
// ObjectConverter is Dependency Injected into the PolicyDefinition's `Compile`
|
||||
// function to assist with converting types and values to/from CEL-typed values.
|
||||
type ObjectConverter interface {
|
||||
// DeclForResource looks up the openapi or JSONSchemaProps, structural schema, etc.
|
||||
// and compiles it into something that can be used to turn objects into CEL
|
||||
// values
|
||||
DeclForResource(gvr schema.GroupVersionResource) (*cel.DeclType, error)
|
||||
|
||||
// ValueForObject takes a Kubernetes Object and uses the CEL DeclType
|
||||
// to transform it into a CEL value.
|
||||
// Object may be a typed native object or an unstructured object
|
||||
ValueForObject(value runtime.Object, decl *cel.DeclType) (ref.Val, error)
|
||||
}
|
||||
|
||||
// PolicyDefinition is an interface for internal policy binding type.
|
||||
// Implemented by mock/testing types, and to be implemented by the public API
|
||||
// types once they have completed API review.
|
||||
//
|
||||
// The interface closely mirrors the format and functionality of the
|
||||
// PolicyDefinition proposed in the KEP.
|
||||
type PolicyDefinition interface {
|
||||
runtime.Object
|
||||
|
||||
// Matches says whether this policy definition matches the provided admission
|
||||
// resource request
|
||||
Matches(a admission.Attributes) bool
|
||||
|
||||
Compile(
|
||||
// Definition is provided with a converter which may be used by the
|
||||
// return evaluator function to convert objects into CEL-typed objects
|
||||
objectConverter ObjectConverter,
|
||||
// Injected RESTMapper to assist with compilation
|
||||
mapper meta.RESTMapper,
|
||||
) (EvaluatorFunc, error)
|
||||
|
||||
// GetParamSource returns the GVK for the CRD used as the source of
|
||||
// parameters used in the evaluation of instances of this policy
|
||||
// May return nil if there is no paramsource for this definition.
|
||||
GetParamSource() *schema.GroupVersionKind
|
||||
|
||||
// GetFailurePolicy returns how an object should be treated during an
|
||||
// admission when there is a configuration error preventing CEL evaluation
|
||||
GetFailurePolicy() FailurePolicy
|
||||
}
|
||||
|
||||
// PolicyBinding is an interface for internal policy binding type. Implemented
|
||||
// by mock/testing types, and to be implemented by the public API types once
|
||||
// they have completed API review.
|
||||
//
|
||||
// The interface closely mirrors the format and functionality of the
|
||||
// PolicyBinding proposed in the KEP.
|
||||
type PolicyBinding interface {
|
||||
runtime.Object
|
||||
|
||||
// Matches says whether this policy binding matches the provided admission
|
||||
// resource request
|
||||
Matches(a admission.Attributes) bool
|
||||
|
||||
// GetTargetDefinition returns the Namespace/Name of Policy Definition used
|
||||
// by this binding.
|
||||
GetTargetDefinition() (namespace, name string)
|
||||
|
||||
// GetTargetParams returns the Namespace/Name of instance of TargetDefinition's
|
||||
// ParamSource to be provided to the CEL expressions of the definition during
|
||||
// evaluation.
|
||||
// If TargetDefinition has nil ParamSource, this is ignored.
|
||||
GetTargetParams() (namespace, name string)
|
||||
}
|
@ -0,0 +1,264 @@
|
||||
/*
|
||||
Copyright 2022 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package generic
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
kerrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/api/meta"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
"k8s.io/klog/v2"
|
||||
)
|
||||
|
||||
var _ Controller[runtime.Object] = &controller[runtime.Object]{}
|
||||
|
||||
type controller[T runtime.Object] struct {
|
||||
informer Informer[T]
|
||||
queue workqueue.RateLimitingInterface
|
||||
|
||||
// Returns an error if there was a transient error during reconciliation
|
||||
// and the object should be tried again later.
|
||||
reconciler func(namespace, name string, newObj T) error
|
||||
|
||||
options ControllerOptions
|
||||
}
|
||||
|
||||
type ControllerOptions struct {
|
||||
Name string
|
||||
Workers uint
|
||||
}
|
||||
|
||||
func (c controller[T]) Informer() Informer[T] {
|
||||
return c.informer
|
||||
}
|
||||
|
||||
func NewController[T runtime.Object](
|
||||
informer Informer[T],
|
||||
reconciler func(namepace, name string, newObj T) error,
|
||||
options ControllerOptions,
|
||||
) Controller[T] {
|
||||
if options.Workers == 0 {
|
||||
options.Workers = 2
|
||||
}
|
||||
|
||||
if len(options.Name) == 0 {
|
||||
options.Name = fmt.Sprintf("%T-controller", *new(T))
|
||||
}
|
||||
|
||||
return &controller[T]{
|
||||
options: options,
|
||||
informer: informer,
|
||||
reconciler: reconciler,
|
||||
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), options.Name),
|
||||
}
|
||||
}
|
||||
|
||||
// Runs the controller and returns an error explaining why running was stopped.
|
||||
// Reconciliation ends as soon as the context completes. If there are events
|
||||
// waiting to be processed at that itme, they will be dropped.
|
||||
func (c *controller[T]) Run(ctx context.Context) error {
|
||||
klog.Infof("starting %s", c.options.Name)
|
||||
defer klog.Infof("stopping %s", c.options.Name)
|
||||
|
||||
enqueue := func(obj interface{}) {
|
||||
var key string
|
||||
var err error
|
||||
if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
|
||||
utilruntime.HandleError(err)
|
||||
return
|
||||
}
|
||||
c.queue.Add(key)
|
||||
}
|
||||
|
||||
registration, err := c.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(obj interface{}) {
|
||||
enqueue(obj)
|
||||
},
|
||||
UpdateFunc: func(oldObj, newObj interface{}) {
|
||||
oldMeta, err1 := meta.Accessor(oldObj)
|
||||
newMeta, err2 := meta.Accessor(newObj)
|
||||
|
||||
if err1 != nil || err2 != nil {
|
||||
if err1 != nil {
|
||||
utilruntime.HandleError(err1)
|
||||
}
|
||||
|
||||
if err2 != nil {
|
||||
utilruntime.HandleError(err2)
|
||||
}
|
||||
return
|
||||
} else if oldMeta.GetResourceVersion() == newMeta.GetResourceVersion() {
|
||||
if len(oldMeta.GetResourceVersion()) == 0 {
|
||||
klog.Warningf("%v throwing out update with empty RV. this is likely to happen if a test did not supply a resource version on an updated object", c.options.Name)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
enqueue(newObj)
|
||||
},
|
||||
DeleteFunc: func(obj interface{}) {
|
||||
// Enqueue
|
||||
enqueue(obj)
|
||||
},
|
||||
})
|
||||
|
||||
// Error might be raised if informer was started and stopped already
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Make sure event handler is removed from informer in case return early from
|
||||
// an error
|
||||
defer func() {
|
||||
// Remove event handler and Handle Error here. Error should only be raised
|
||||
// for improper usage of event handler API.
|
||||
if err := c.informer.RemoveEventHandler(registration); err != nil {
|
||||
utilruntime.HandleError(err)
|
||||
}
|
||||
}()
|
||||
|
||||
// Wait for initial cache list to complete before beginning to reconcile
|
||||
// objects.
|
||||
if !cache.WaitForNamedCacheSync(c.options.Name, ctx.Done(), c.informer.HasSynced) {
|
||||
// ctx cancelled during cache sync. return early
|
||||
err := ctx.Err()
|
||||
if err == nil {
|
||||
// if context wasnt cancelled then the sync failed for another reason
|
||||
err = errors.New("cache sync failed")
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
waitGroup := sync.WaitGroup{}
|
||||
|
||||
for i := uint(0); i < c.options.Workers; i++ {
|
||||
waitGroup.Add(1)
|
||||
go func() {
|
||||
wait.Until(c.runWorker, time.Second, ctx.Done())
|
||||
waitGroup.Done()
|
||||
}()
|
||||
}
|
||||
|
||||
klog.Infof("Started %v workers for %v", c.options.Workers, c.options.Name)
|
||||
|
||||
// Wait for context cancel.
|
||||
<-ctx.Done()
|
||||
|
||||
// Forcefully shutdown workqueue. Drop any enqueued items.
|
||||
c.queue.ShutDown()
|
||||
|
||||
// Workqueue shutdown signals for workers to stop. Wait for all workers to
|
||||
// clean up
|
||||
waitGroup.Wait()
|
||||
|
||||
// Only way for workers to ever stop is for caller to cancel the context
|
||||
return ctx.Err()
|
||||
}
|
||||
|
||||
func (c *controller[T]) HasSynced() bool {
|
||||
return c.informer.HasSynced()
|
||||
}
|
||||
|
||||
func (c *controller[T]) runWorker() {
|
||||
for {
|
||||
obj, shutdown := c.queue.Get()
|
||||
if shutdown {
|
||||
return
|
||||
}
|
||||
|
||||
// We wrap this block in a func so we can defer c.workqueue.Done.
|
||||
err := func(obj interface{}) error {
|
||||
// We call Done here so the workqueue knows we have finished
|
||||
// processing this item. We also must remember to call Forget if we
|
||||
// do not want this work item being re-queued. For example, we do
|
||||
// not call Forget if a transient error occurs, instead the item is
|
||||
// put back on the workqueue and attempted again after a back-off
|
||||
// period.
|
||||
defer c.queue.Done(obj)
|
||||
var key string
|
||||
var ok bool
|
||||
// We expect strings to come off the workqueue. These are of the
|
||||
// form namespace/name. We do this as the delayed nature of the
|
||||
// workqueue means the items in the informer cache may actually be
|
||||
// more up to date that when the item was initially put onto the
|
||||
// workqueue.
|
||||
if key, ok = obj.(string); !ok {
|
||||
// How did an incorrectly formatted key get in the workqueue?
|
||||
// Done is sufficient. (Forget resets rate limiter for the key,
|
||||
// but the key is invalid so there is no point in doing that)
|
||||
return fmt.Errorf("expected string in workqueue but got %#v", obj)
|
||||
}
|
||||
|
||||
if err := c.reconcile(key); err != nil {
|
||||
// Put the item back on the workqueue to handle any transient errors.
|
||||
c.queue.AddRateLimited(key)
|
||||
return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error())
|
||||
}
|
||||
// Finally, if no error occurs we Forget this item so it is allowed
|
||||
// to be re-enqueued without a long rate limit
|
||||
c.queue.Forget(obj)
|
||||
klog.Infof("Successfully synced '%s'", key)
|
||||
return nil
|
||||
}(obj)
|
||||
|
||||
if err != nil {
|
||||
utilruntime.HandleError(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *controller[T]) reconcile(key string) error {
|
||||
var newObj T
|
||||
var err error
|
||||
var namespace string
|
||||
var name string
|
||||
var lister NamespacedLister[T]
|
||||
|
||||
// Convert the namespace/name string into a distinct namespace and name
|
||||
namespace, name, err = cache.SplitMetaNamespaceKey(key)
|
||||
if err != nil {
|
||||
utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", key))
|
||||
return nil
|
||||
}
|
||||
|
||||
if len(namespace) > 0 {
|
||||
lister = c.informer.Namespaced(namespace)
|
||||
} else {
|
||||
lister = c.informer
|
||||
}
|
||||
|
||||
newObj, err = lister.Get(name)
|
||||
if err != nil {
|
||||
if !kerrors.IsNotFound(err) {
|
||||
return err
|
||||
}
|
||||
|
||||
// Deleted object. Inform reconciler with empty
|
||||
}
|
||||
|
||||
return c.reconciler(namespace, name, newObj)
|
||||
}
|
@ -0,0 +1,29 @@
|
||||
/*
|
||||
Copyright 2022 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
// Package generic contains a typed wrapper over cache SharedIndexInformer
|
||||
// and Lister (maybe eventually should have a home there?)
|
||||
//
|
||||
// This interface is being experimented with as an easier way to write controllers
|
||||
// with a bit less boilerplate.
|
||||
//
|
||||
// Informer/Lister classes are thin wrappers providing a type-safe interface
|
||||
// over regular interface{}-based Informers/Listers
|
||||
//
|
||||
// Controller[T] provides a reusable way to reconcile objects out of an informer
|
||||
// using the tried and true controller design pattern found all over k8s
|
||||
// codebase based upon syncFunc/reconcile
|
||||
package generic
|
@ -0,0 +1,36 @@
|
||||
/*
|
||||
Copyright 2022 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package generic
|
||||
|
||||
import (
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
)
|
||||
|
||||
var _ Informer[runtime.Object] = informer[runtime.Object]{}
|
||||
|
||||
type informer[T runtime.Object] struct {
|
||||
cache.SharedIndexInformer
|
||||
lister[T]
|
||||
}
|
||||
|
||||
func NewInformer[T runtime.Object](informe cache.SharedIndexInformer) Informer[T] {
|
||||
return informer[T]{
|
||||
SharedIndexInformer: informe,
|
||||
lister: NewLister[T](informe.GetIndexer()),
|
||||
}
|
||||
}
|
@ -0,0 +1,62 @@
|
||||
/*
|
||||
Copyright 2022 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package generic
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
)
|
||||
|
||||
type Controller[T runtime.Object] interface {
|
||||
// Meant to be run inside a goroutine
|
||||
// Waits for and reacts to changes in whatever type the controller
|
||||
// is concerned with.
|
||||
//
|
||||
// Returns an error always non-nil explaining why the worker stopped
|
||||
Run(ctx context.Context) error
|
||||
|
||||
// Retrieves the informer used to back this controller
|
||||
Informer() Informer[T]
|
||||
|
||||
// Returns true if the informer cache has synced, and all the objects from
|
||||
// the initial list have been reconciled at least once.
|
||||
HasSynced() bool
|
||||
}
|
||||
|
||||
type NamespacedLister[T any] interface {
|
||||
// List lists all ValidationRuleSets in the indexer for a given namespace.
|
||||
// Objects returned here must be treated as read-only.
|
||||
List(selector labels.Selector) (ret []T, err error)
|
||||
// Get retrieves the ValidationRuleSet from the indexer for a given namespace and name.
|
||||
// Objects returned here must be treated as read-only.
|
||||
Get(name string) (T, error)
|
||||
}
|
||||
|
||||
type Informer[T any] interface {
|
||||
cache.SharedIndexInformer
|
||||
Lister[T]
|
||||
}
|
||||
|
||||
// Lister[T] helps list Ts.
|
||||
// All objects returned here must be treated as read-only.
|
||||
type Lister[T any] interface {
|
||||
NamespacedLister[T]
|
||||
Namespaced(namespace string) NamespacedLister[T]
|
||||
}
|
@ -0,0 +1,100 @@
|
||||
/*
|
||||
Copyright 2022 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package generic
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
|
||||
kerrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
)
|
||||
|
||||
var _ Lister[runtime.Object] = lister[runtime.Object]{}
|
||||
|
||||
type namespacedLister[T runtime.Object] struct {
|
||||
indexer cache.Indexer
|
||||
namespace string
|
||||
}
|
||||
|
||||
func (w namespacedLister[T]) List(selector labels.Selector) (ret []T, err error) {
|
||||
err = cache.ListAllByNamespace(w.indexer, w.namespace, selector, func(m interface{}) {
|
||||
ret = append(ret, m.(T))
|
||||
})
|
||||
return ret, err
|
||||
}
|
||||
|
||||
func (w namespacedLister[T]) Get(name string) (T, error) {
|
||||
var result T
|
||||
|
||||
obj, exists, err := w.indexer.GetByKey(w.namespace + "/" + name)
|
||||
if err != nil {
|
||||
return result, err
|
||||
}
|
||||
if !exists {
|
||||
return result, &kerrors.StatusError{ErrStatus: metav1.Status{
|
||||
Status: metav1.StatusFailure,
|
||||
Code: http.StatusNotFound,
|
||||
Reason: metav1.StatusReasonNotFound,
|
||||
Message: fmt.Sprintf("%s not found", name),
|
||||
}}
|
||||
}
|
||||
result = obj.(T)
|
||||
return result, nil
|
||||
}
|
||||
|
||||
type lister[T runtime.Object] struct {
|
||||
indexer cache.Indexer
|
||||
}
|
||||
|
||||
func (w lister[T]) List(selector labels.Selector) (ret []T, err error) {
|
||||
err = cache.ListAll(w.indexer, selector, func(m interface{}) {
|
||||
ret = append(ret, m.(T))
|
||||
})
|
||||
return ret, err
|
||||
}
|
||||
|
||||
func (w lister[T]) Get(name string) (T, error) {
|
||||
var result T
|
||||
|
||||
obj, exists, err := w.indexer.GetByKey(name)
|
||||
if err != nil {
|
||||
return result, err
|
||||
}
|
||||
if !exists {
|
||||
// kerrors.StatusNotFound requires a GroupResource we cannot provide
|
||||
return result, &kerrors.StatusError{ErrStatus: metav1.Status{
|
||||
Status: metav1.StatusFailure,
|
||||
Code: http.StatusNotFound,
|
||||
Reason: metav1.StatusReasonNotFound,
|
||||
Message: fmt.Sprintf("%s not found", name),
|
||||
}}
|
||||
}
|
||||
result = obj.(T)
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (w lister[T]) Namespaced(namespace string) NamespacedLister[T] {
|
||||
return namespacedLister[T]{namespace: namespace, indexer: w.indexer}
|
||||
}
|
||||
|
||||
func NewLister[T runtime.Object](indexer cache.Indexer) lister[T] {
|
||||
return lister[T]{indexer: indexer}
|
||||
}
|
@ -0,0 +1,53 @@
|
||||
/*
|
||||
Copyright 2022 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package cel
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
)
|
||||
|
||||
type PolicyDecisionKind string
|
||||
|
||||
const (
|
||||
Admit PolicyDecisionKind = "Admit"
|
||||
Deny PolicyDecisionKind = "Deny"
|
||||
)
|
||||
|
||||
type PolicyDecision struct {
|
||||
Kind PolicyDecisionKind `json:"kind"`
|
||||
Message any `json:"message"`
|
||||
}
|
||||
|
||||
type PolicyDecisionWithMetadata struct {
|
||||
PolicyDecision `json:"decision"`
|
||||
Definition PolicyDefinition `json:"definition"`
|
||||
Binding PolicyBinding `json:"binding"`
|
||||
}
|
||||
|
||||
type PolicyError struct {
|
||||
Decisions []PolicyDecisionWithMetadata
|
||||
}
|
||||
|
||||
func (p *PolicyError) Error() string {
|
||||
// Just format the error as JSON
|
||||
jsonText, err := json.Marshal(p.Decisions)
|
||||
if err != nil {
|
||||
return fmt.Sprintf("error formatting PolicyError: %s", err.Error())
|
||||
}
|
||||
return string(jsonText)
|
||||
}
|
Loading…
Reference in New Issue
Block a user