refactor admission controller to avoid contention

refresh admission policies up to once per second based upon last known good data
This commit is contained in:
Alexander Zielenski 2022-12-15 16:30:52 -08:00
parent 517df8f305
commit 5f59f44983
4 changed files with 253 additions and 137 deletions

View File

@ -276,7 +276,7 @@ func setupTestCommon(t *testing.T, compiler ValidatorCompiler, shouldStartInform
// Override compiler used by controller for tests
controller = handler.evaluator.(*celAdmissionController)
controller.validatorCompiler = compiler
controller.policyController.ValidatorCompiler = compiler
t.Cleanup(func() {
testContextCancel()
@ -369,8 +369,8 @@ func (c *celAdmissionController) getCurrentObject(obj runtime.Object) (runtime.O
return nil, err
}
c.mutex.RLock()
defer c.mutex.RUnlock()
c.policyController.mutex.RLock()
defer c.policyController.mutex.RUnlock()
switch obj.(type) {
case *unstructured.Unstructured:
@ -380,7 +380,7 @@ func (c *celAdmissionController) getCurrentObject(obj runtime.Object) (runtime.O
Kind: paramSourceGVK.Kind,
}
var paramInformer generic.Informer[*unstructured.Unstructured]
if paramInfo, ok := c.paramsCRDControllers[paramKind]; ok {
if paramInfo, ok := c.policyController.paramsCRDControllers[paramKind]; ok {
paramInformer = paramInfo.controller.Informer()
} else {
// Treat unknown CRD the same as not found
@ -399,7 +399,7 @@ func (c *celAdmissionController) getCurrentObject(obj runtime.Object) (runtime.O
return item, nil
case *v1alpha1.ValidatingAdmissionPolicyBinding:
nn := getNamespaceName(accessor.GetNamespace(), accessor.GetName())
info, ok := c.bindingInfos[nn]
info, ok := c.policyController.bindingInfos[nn]
if !ok {
return nil, nil
}
@ -407,7 +407,7 @@ func (c *celAdmissionController) getCurrentObject(obj runtime.Object) (runtime.O
return info.lastReconciledValue, nil
case *v1alpha1.ValidatingAdmissionPolicy:
nn := getNamespaceName(accessor.GetNamespace(), accessor.GetName())
info, ok := c.definitionInfo[nn]
info, ok := c.policyController.definitionInfo[nn]
if !ok {
return nil, nil
}
@ -422,7 +422,15 @@ func (c *celAdmissionController) getCurrentObject(obj runtime.Object) (runtime.O
// their gvk/name in the controller
func waitForReconcile(ctx context.Context, controller *celAdmissionController, objects ...runtime.Object) error {
return wait.PollWithContext(ctx, 100*time.Millisecond, 1*time.Second, func(ctx context.Context) (done bool, err error) {
defer func() {
if done {
// force admission controller to refresh the information it
// uses for validation now that it is done in the background
controller.refreshPolicies()
}
}()
for _, obj := range objects {
objMeta, err := meta.Accessor(obj)
if err != nil {
return false, fmt.Errorf("error getting meta accessor for original %T object (%v): %w", obj, obj, err)
@ -462,6 +470,14 @@ func waitForReconcile(ctx context.Context, controller *celAdmissionController, o
// with the given GVKs and namespace/names
func waitForReconcileDeletion(ctx context.Context, controller *celAdmissionController, objects ...runtime.Object) error {
return wait.PollWithContext(ctx, 200*time.Millisecond, 3*time.Hour, func(ctx context.Context) (done bool, err error) {
defer func() {
if done {
// force admission controller to refresh the information it
// uses for validation now that it is done in the background
controller.refreshPolicies()
}
}()
for _, obj := range objects {
currentValue, err := controller.getCurrentObject(obj)
if err != nil {
@ -694,7 +710,6 @@ func TestDefinitionDoesntMatch(t *testing.T) {
attributeRecord(
nil, nonMatchingParams,
admission.Create), &admission.RuntimeObjectInterfaces{}))
require.Zero(t, numCompiles)
require.Empty(t, passedParams)
// Validate a matching input.
@ -791,9 +806,6 @@ func TestReconfigureBinding(t *testing.T) {
// Expect `Compile` only called once
require.Equal(t, 1, numCompiles, "expect `Compile` to be called only once")
// Show Evaluator was called
//require.Len(t, passedParams, 1, "expect evaluator is called due to proper configuration")
// Update the tracker to point at different params
require.NoError(t, tracker.Update(bindingsGVR, denyBinding2, ""))
@ -808,8 +820,6 @@ func TestReconfigureBinding(t *testing.T) {
)
require.ErrorContains(t, err, `failed to configure binding: replicas-test2.example.com not found`)
require.Equal(t, 1, numCompiles, "expect compile is not called when there is configuration error")
//require.Len(t, passedParams, 1, "expect evaluator was not called when there is configuration error")
// Add the missing params
require.NoError(t, paramTracker.Add(fakeParams2))

View File

@ -22,6 +22,7 @@ import (
"fmt"
"sync"
"sync/atomic"
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -33,6 +34,7 @@ import (
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/admission"
celmetrics "k8s.io/apiserver/pkg/admission/cel"
"k8s.io/apiserver/pkg/admission/plugin/validatingadmissionpolicy/internal/generic"
@ -47,44 +49,24 @@ var _ CELPolicyEvaluator = &celAdmissionController{}
// celAdmissionController is the top-level controller for admission control using CEL
// it is responsible for watching policy definitions, bindings, and config param CRDs
type celAdmissionController struct {
// Context under which the controller runs
runningContext context.Context
// Controller which manages book-keeping for the cluster's dynamic policy
// information.
policyController *policyController
policyDefinitionsController generic.Controller[*v1alpha1.ValidatingAdmissionPolicy]
policyBindingController generic.Controller[*v1alpha1.ValidatingAdmissionPolicyBinding]
// atomic []policyData
// list of every known policy definition, and all informatoin required to
// validate its bindings against an object.
// A snapshot of the current policy configuration is synced with this field
// asynchronously
definitions atomic.Value
}
// dynamicclient used to create informers to watch the param crd types
dynamicClient dynamic.Interface
restMapper meta.RESTMapper
// Provided to the policy's Compile function as an injected dependency to
// assist with compiling its expressions to CEL
validatorCompiler ValidatorCompiler
// Lock which protects:
// - definitionInfo
// - bindingInfos
// - paramCRDControllers
// - definitionsToBindings
// All other fields should be assumed constant
mutex sync.RWMutex
// controller and metadata
paramsCRDControllers map[v1alpha1.ParamKind]*paramInfo
// Index for each definition namespace/name, contains all binding
// namespace/names known to exist for that definition
definitionInfo map[namespacedName]*definitionInfo
// Index for each bindings namespace/name. Contains compiled templates
// for the binding depending on the policy/param combination.
bindingInfos map[namespacedName]*bindingInfo
// Map from namespace/name of a definition to a set of namespace/name
// of bindings which depend on it.
// All keys must have at least one dependent binding
// All binding names MUST exist as a key bindingInfos
definitionsToBindings map[namespacedName]sets.Set[namespacedName]
// Everything someone might need to validate a single ValidatingPolicyDefinition
// against all of its registered bindings.
type policyData struct {
definitionInfo
paramController generic.Controller[*unstructured.Unstructured]
bindings []bindingInfo
}
// namespaceName is used as a key in definitionInfo and bindingInfos
@ -105,7 +87,7 @@ type definitionInfo struct {
type bindingInfo struct {
// Compiled CEL expression turned into an validator
validator atomic.Pointer[Validator]
validator Validator
// Last value seen by this controller to be used in policy enforcement
// May not be nil
@ -130,66 +112,44 @@ func NewAdmissionController(
restMapper meta.RESTMapper,
dynamicClient dynamic.Interface,
) CELPolicyEvaluator {
matcher := matching.NewMatcher(informerFactory.Core().V1().Namespaces().Lister(), client)
validatorCompiler := &CELValidatorCompiler{
Matcher: matcher,
return &celAdmissionController{
definitions: atomic.Value{},
policyController: newPolicyController(
restMapper,
dynamicClient,
&CELValidatorCompiler{
Matcher: matching.NewMatcher(informerFactory.Core().V1().Namespaces().Lister(), client),
},
generic.NewInformer[*v1alpha1.ValidatingAdmissionPolicy](
informerFactory.Admissionregistration().V1alpha1().ValidatingAdmissionPolicies().Informer()),
generic.NewInformer[*v1alpha1.ValidatingAdmissionPolicyBinding](
informerFactory.Admissionregistration().V1alpha1().ValidatingAdmissionPolicyBindings().Informer()),
),
}
c := &celAdmissionController{
definitionInfo: make(map[namespacedName]*definitionInfo),
bindingInfos: make(map[namespacedName]*bindingInfo),
paramsCRDControllers: make(map[v1alpha1.ParamKind]*paramInfo),
definitionsToBindings: make(map[namespacedName]sets.Set[namespacedName]),
dynamicClient: dynamicClient,
validatorCompiler: validatorCompiler,
restMapper: restMapper,
}
c.policyDefinitionsController = generic.NewController(
generic.NewInformer[*v1alpha1.ValidatingAdmissionPolicy](
informerFactory.Admissionregistration().V1alpha1().ValidatingAdmissionPolicies().Informer()),
c.reconcilePolicyDefinition,
generic.ControllerOptions{
Workers: 1,
Name: "cel-policy-definitions",
},
)
c.policyBindingController = generic.NewController(
generic.NewInformer[*v1alpha1.ValidatingAdmissionPolicyBinding](
informerFactory.Admissionregistration().V1alpha1().ValidatingAdmissionPolicyBindings().Informer()),
c.reconcilePolicyBinding,
generic.ControllerOptions{
Workers: 1,
Name: "cel-policy-bindings",
},
)
return c
}
func (c *celAdmissionController) Run(stopCh <-chan struct{}) {
// TODO: Doesn't this comparison need a lock?
if c.runningContext != nil {
return
}
ctx, cancel := context.WithCancel(context.Background())
c.runningContext = ctx
defer func() {
c.runningContext = nil
}()
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
c.policyDefinitionsController.Run(ctx)
c.policyController.Run(ctx)
}()
wg.Add(1)
go func() {
defer wg.Done()
c.policyBindingController.Run(ctx)
// Wait indefinitely until policies/bindings are listed & handled before
// allowing policies to be refreshed
if !cache.WaitForNamedCacheSync("cel-admission-controller", ctx.Done(), c.policyController.HasSynced) {
return
}
// Loop every 1 second until context is cancelled, refreshing policies
wait.Until(c.refreshPolicies, 1*time.Second, ctx.Done())
}()
<-stopCh
@ -202,8 +162,9 @@ func (c *celAdmissionController) Validate(
a admission.Attributes,
o admission.ObjectInterfaces,
) (err error) {
c.mutex.RLock()
defer c.mutex.RUnlock()
if !c.HasSynced() {
return admission.NewForbidden(a, fmt.Errorf("not yet ready to handle request"))
}
var deniedDecisions []policyDecisionWithMetadata
@ -247,9 +208,11 @@ func (c *celAdmissionController) Validate(
})
}
}
for definitionNamespacedName, definitionInfo := range c.definitionInfo {
policyDatas := c.definitions.Load().([]policyData)
for _, definitionInfo := range policyDatas {
definition := definitionInfo.lastReconciledValue
matches, matchKind, err := c.validatorCompiler.DefinitionMatches(a, o, definition)
matches, matchKind, err := c.policyController.DefinitionMatches(a, o, definition)
if err != nil {
// Configuration error.
addConfigError(err, definition, nil)
@ -264,17 +227,11 @@ func (c *celAdmissionController) Validate(
continue
}
dependentBindings := c.definitionsToBindings[definitionNamespacedName]
if len(dependentBindings) == 0 {
continue
}
for namespacedBindingName := range dependentBindings {
for _, bindingInfo := range definitionInfo.bindings {
// If the key is inside dependentBindings, there is guaranteed to
// be a bindingInfo for it
bindingInfo := c.bindingInfos[namespacedBindingName]
binding := bindingInfo.lastReconciledValue
matches, err := c.validatorCompiler.BindingMatches(a, o, binding)
matches, err := c.policyController.BindingMatches(a, o, binding)
if err != nil {
// Configuration error.
addConfigError(err, definition, binding)
@ -291,11 +248,8 @@ func (c *celAdmissionController) Validate(
paramKind := definition.Spec.ParamKind
paramRef := binding.Spec.ParamRef
if paramKind != nil && paramRef != nil {
// Find the params referred by the binding by looking its name up
// in our informer for its CRD
paramInfo, ok := c.paramsCRDControllers[*paramKind]
if !ok {
paramController := definitionInfo.paramController
if paramController == nil {
addConfigError(fmt.Errorf("paramKind kind `%v` not known",
paramKind.String()), definition, binding)
continue
@ -304,19 +258,19 @@ func (c *celAdmissionController) Validate(
// If the param informer for this admission policy has not yet
// had time to perform an initial listing, don't attempt to use
// it.
//!TODO(alexzielenski): Add a shorter timeout
// than "forever" to this wait.
timeoutCtx, cancel := context.WithTimeout(c.policyController.context, 1*time.Second)
defer cancel()
if !cache.WaitForCacheSync(c.runningContext.Done(), paramInfo.controller.HasSynced) {
if !cache.WaitForCacheSync(timeoutCtx.Done(), paramController.HasSynced) {
addConfigError(fmt.Errorf("paramKind kind `%v` not yet synced to use for admission",
paramKind.String()), definition, binding)
continue
}
if len(paramRef.Namespace) == 0 {
param, err = paramInfo.controller.Informer().Get(paramRef.Name)
param, err = paramController.Informer().Get(paramRef.Name)
} else {
param, err = paramInfo.controller.Informer().Namespaced(paramRef.Namespace).Get(paramRef.Name)
param, err = paramController.Informer().Namespaced(paramRef.Namespace).Get(paramRef.Name)
}
if err != nil {
@ -338,17 +292,7 @@ func (c *celAdmissionController) Validate(
continue
}
}
validator := bindingInfo.validator.Load()
if validator == nil {
// Compile policy definition using binding
newValidator := c.validatorCompiler.Compile(definition)
validator = &newValidator
bindingInfo.validator.Store(validator)
}
decisions, err := (*validator).Validate(a, o, param, matchKind)
decisions, err := bindingInfo.validator.Validate(a, o, param, matchKind)
if err != nil {
// runtime error. Apply failure policy
wrappedError := fmt.Errorf("failed to evaluate CEL expression: %w", err)
@ -400,10 +344,13 @@ func (c *celAdmissionController) Validate(
}
func (c *celAdmissionController) HasSynced() bool {
return c.policyBindingController.HasSynced() &&
c.policyDefinitionsController.HasSynced()
return c.policyController.HasSynced() && c.definitions.Load() != nil
}
func (c *celAdmissionController) ValidateInitialization() error {
return c.validatorCompiler.ValidateInitialization()
return c.policyController.ValidateInitialization()
}
func (c *celAdmissionController) refreshPolicies() {
c.definitions.Store(c.policyController.latestPolicyData())
}

View File

@ -19,23 +19,133 @@ package validatingadmissionpolicy
import (
"context"
"fmt"
"sync"
"time"
"k8s.io/api/admissionregistration/v1alpha1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/sets"
celmetrics "k8s.io/apiserver/pkg/admission/cel"
"k8s.io/apiserver/pkg/admission/plugin/validatingadmissionpolicy/internal/generic"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/dynamic/dynamicinformer"
"k8s.io/client-go/tools/cache"
)
func (c *celAdmissionController) reconcilePolicyDefinition(namespace, name string, definition *v1alpha1.ValidatingAdmissionPolicy) error {
type policyController struct {
once sync.Once
context context.Context
dynamicClient dynamic.Interface
restMapper meta.RESTMapper
policyDefinitionsController generic.Controller[*v1alpha1.ValidatingAdmissionPolicy]
policyBindingController generic.Controller[*v1alpha1.ValidatingAdmissionPolicyBinding]
// Provided to the policy's Compile function as an injected dependency to
// assist with compiling its expressions to CEL
ValidatorCompiler
// Lock which protects:
// - cachedPolicies
// - paramCRDControllers
// - definitionInfo
// - bindingInfos
// - definitionsToBindings
// All other fields should be assumed constant
mutex sync.RWMutex
cachedPolicies []policyData
// controller and metadata
paramsCRDControllers map[v1alpha1.ParamKind]*paramInfo
// Index for each definition namespace/name, contains all binding
// namespace/names known to exist for that definition
definitionInfo map[namespacedName]*definitionInfo
// Index for each bindings namespace/name. Contains compiled templates
// for the binding depending on the policy/param combination.
bindingInfos map[namespacedName]*bindingInfo
// Map from namespace/name of a definition to a set of namespace/name
// of bindings which depend on it.
// All keys must have at least one dependent binding
// All binding names MUST exist as a key bindingInfos
definitionsToBindings map[namespacedName]sets.Set[namespacedName]
}
func newPolicyController(
restMapper meta.RESTMapper,
dynamicClient dynamic.Interface,
validatorCompiler ValidatorCompiler,
policiesInformer generic.Informer[*v1alpha1.ValidatingAdmissionPolicy],
bindingsInformer generic.Informer[*v1alpha1.ValidatingAdmissionPolicyBinding],
) *policyController {
res := &policyController{}
*res = policyController{
ValidatorCompiler: validatorCompiler,
definitionInfo: make(map[namespacedName]*definitionInfo),
bindingInfos: make(map[namespacedName]*bindingInfo),
paramsCRDControllers: make(map[v1alpha1.ParamKind]*paramInfo),
definitionsToBindings: make(map[namespacedName]sets.Set[namespacedName]),
policyDefinitionsController: generic.NewController(
policiesInformer,
res.reconcilePolicyDefinition,
generic.ControllerOptions{
Workers: 1,
Name: "cel-policy-definitions",
},
),
policyBindingController: generic.NewController(
bindingsInformer,
res.reconcilePolicyBinding,
generic.ControllerOptions{
Workers: 1,
Name: "cel-policy-bindings",
},
),
restMapper: restMapper,
dynamicClient: dynamicClient,
}
return res
}
func (c *policyController) Run(ctx context.Context) {
// Only support being run once
c.once.Do(func() {
c.context = ctx
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
c.policyDefinitionsController.Run(ctx)
}()
wg.Add(1)
go func() {
defer wg.Done()
c.policyBindingController.Run(ctx)
}()
<-ctx.Done()
wg.Wait()
})
}
func (c *policyController) HasSynced() bool {
return c.policyDefinitionsController.HasSynced() && c.policyBindingController.HasSynced()
}
func (c *policyController) reconcilePolicyDefinition(namespace, name string, definition *v1alpha1.ValidatingAdmissionPolicy) error {
c.mutex.Lock()
defer c.mutex.Unlock()
c.cachedPolicies = nil // invalidate cachedPolicies
// Namespace for policydefinition is empty.
nn := getNamespaceName(namespace, name)
info, ok := c.definitionInfo[nn]
@ -75,7 +185,7 @@ func (c *celAdmissionController) reconcilePolicyDefinition(namespace, name strin
// definition has changed.
for key := range c.definitionsToBindings[nn] {
bindingInfo := c.bindingInfos[key]
bindingInfo.validator.Store(nil)
bindingInfo.validator = nil
c.bindingInfos[key] = bindingInfo
}
@ -121,7 +231,7 @@ func (c *celAdmissionController) reconcilePolicyDefinition(namespace, name strin
// Start watching the param CRD
if _, ok := c.paramsCRDControllers[*paramSource]; !ok {
instanceContext, instanceCancel := context.WithCancel(c.runningContext)
instanceContext, instanceCancel := context.WithCancel(c.context)
// Watch for new instances of this policy
informer := dynamicinformer.NewFilteredDynamicInformer(
@ -155,10 +265,12 @@ func (c *celAdmissionController) reconcilePolicyDefinition(namespace, name strin
return nil
}
func (c *celAdmissionController) reconcilePolicyBinding(namespace, name string, binding *v1alpha1.ValidatingAdmissionPolicyBinding) error {
func (c *policyController) reconcilePolicyBinding(namespace, name string, binding *v1alpha1.ValidatingAdmissionPolicyBinding) error {
c.mutex.Lock()
defer c.mutex.Unlock()
c.cachedPolicies = nil // invalidate cachedPolicies
// Namespace for PolicyBinding is empty. In the future a namespaced binding
// may be added
// https://github.com/kubernetes/enhancements/blob/bf5c3c81ea2081d60c1dc7c832faa98479e06209/keps/sig-api-machinery/3488-cel-admission-control/README.md?plain=1#L1042
@ -208,12 +320,12 @@ func (c *celAdmissionController) reconcilePolicyBinding(namespace, name string,
}
// Remove compiled template for old binding
info.validator.Store(nil)
info.validator = nil
info.lastReconciledValue = binding
return nil
}
func (c *celAdmissionController) reconcileParams(namespace, name string, params *unstructured.Unstructured) error {
func (c *policyController) reconcileParams(namespace, name string, params *unstructured.Unstructured) error {
// Do nothing.
// When we add informational type checking we will need to compile in the
// reconcile loops instead of lazily so we can add compiler errors / type
@ -221,6 +333,52 @@ func (c *celAdmissionController) reconcileParams(namespace, name string, params
return nil
}
// Fetches the latest set of policy data or recalculates it if it has changed
// since it was last fetched
func (c *policyController) latestPolicyData() []policyData {
existing := func() []policyData {
c.mutex.RLock()
defer c.mutex.RUnlock()
return c.cachedPolicies
}()
if existing != nil {
return existing
}
c.mutex.Lock()
defer c.mutex.Unlock()
var res []policyData
for definitionNN, definitionInfo := range c.definitionInfo {
var bindingInfos []bindingInfo
for bindingNN := range c.definitionsToBindings[definitionNN] {
bindingInfo := c.bindingInfos[bindingNN]
if bindingInfo.validator == nil && definitionInfo.configurationError == nil {
bindingInfo.validator = c.ValidatorCompiler.Compile(definitionInfo.lastReconciledValue)
}
bindingInfos = append(bindingInfos, *bindingInfo)
}
var paramController generic.Controller[*unstructured.Unstructured]
if paramKind := definitionInfo.lastReconciledValue.Spec.ParamKind; paramKind != nil {
if info, ok := c.paramsCRDControllers[*paramKind]; ok {
paramController = info.controller
}
}
res = append(res, policyData{
definitionInfo: *definitionInfo,
paramController: paramController,
bindings: bindingInfos,
})
}
c.cachedPolicies = res
return res
}
func getNamespaceName(namespace, name string) namespacedName {
return namespacedName{
namespace: namespace,

View File

@ -150,13 +150,14 @@ func (c *controller[T]) Run(ctx context.Context) error {
enqueue(obj, false)
},
})
c.notificationsDelivered.Store(registration.HasSynced)
// Error might be raised if informer was started and stopped already
if err != nil {
return err
}
c.notificationsDelivered.Store(registration.HasSynced)
// Make sure event handler is removed from informer in case return early from
// an error
defer func() {