diff --git a/staging/src/k8s.io/apiserver/pkg/admission/plugin/cel/OWNERS b/staging/src/k8s.io/apiserver/pkg/admission/plugin/cel/OWNERS new file mode 100644 index 00000000000..6a637d28d58 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/admission/plugin/cel/OWNERS @@ -0,0 +1,10 @@ +# See the OWNERS docs at https://go.k8s.io/owners + +approvers: + - jpbetz + - cici37 + - alexzielenski +reviewers: + - jpbetz + - cici37 + - alexzielenski diff --git a/staging/src/k8s.io/apiserver/pkg/admission/plugin/cel/admission.go b/staging/src/k8s.io/apiserver/pkg/admission/plugin/cel/admission.go new file mode 100644 index 00000000000..50f679c857e --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/admission/plugin/cel/admission.go @@ -0,0 +1,103 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cel + +import ( + "context" + "errors" + "fmt" + "io" + "time" + + "k8s.io/apiserver/pkg/admission" + "k8s.io/client-go/tools/cache" +) + +//////////////////////////////////////////////////////////////////////////////// +// Plugin Definition +//////////////////////////////////////////////////////////////////////////////// + +// Definition for CEL admission plugin. This is the entry point into the +// CEL admission control system. +// +// Each plugin is asked to validate every object update. + +const ( + // PluginName indicates the name of admission plug-in + PluginName = "CEL" +) + +// Register registers a plugin +func Register(plugins *admission.Plugins) { + plugins.Register(PluginName, func(config io.Reader) (admission.Interface, error) { + return NewPlugin() + }) +} + +//////////////////////////////////////////////////////////////////////////////// +// Plugin Initialization & Dependency Injection +//////////////////////////////////////////////////////////////////////////////// + +type celAdmissionPlugin struct { + evaluator CELPolicyEvaluator +} + +var _ WantsCELPolicyEvaluator = &celAdmissionPlugin{} +var _ admission.ValidationInterface = &celAdmissionPlugin{} + +func NewPlugin() (*celAdmissionPlugin, error) { + result := &celAdmissionPlugin{} + return result, nil +} + +func (c *celAdmissionPlugin) SetCELPolicyEvaluator(evaluator CELPolicyEvaluator) { + c.evaluator = evaluator +} + +// Once clientset and informer factory are provided, creates and starts the +// admission controller +func (c *celAdmissionPlugin) ValidateInitialization() error { + if c.evaluator != nil { + return nil + } + + return errors.New("CELPolicyEvaluator not injected") +} + +//////////////////////////////////////////////////////////////////////////////// +// admission.ValidationInterface +//////////////////////////////////////////////////////////////////////////////// + +func (c *celAdmissionPlugin) Handles(operation admission.Operation) bool { + return true +} + +func (c *celAdmissionPlugin) Validate( + ctx context.Context, + a admission.Attributes, + o admission.ObjectInterfaces, +) (err error) { + + deadlined, cancel := context.WithTimeout(ctx, 2*time.Second) + defer cancel() + + if !cache.WaitForNamedCacheSync("cel-admission-plugin", deadlined.Done(), c.evaluator.HasSynced) { + return admission.NewForbidden(a, fmt.Errorf("not yet ready to handle request")) + } + + return c.evaluator.Validate(ctx, a, o) +} diff --git a/staging/src/k8s.io/apiserver/pkg/admission/plugin/cel/admission_test.go b/staging/src/k8s.io/apiserver/pkg/admission/plugin/cel/admission_test.go new file mode 100644 index 00000000000..f46661e0153 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/admission/plugin/cel/admission_test.go @@ -0,0 +1,1035 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cel + +import ( + "context" + "fmt" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/require" + k8serrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/runtime/serializer" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/watch" + + "k8s.io/apiserver/pkg/admission" + "k8s.io/apiserver/pkg/admission/plugin/cel/internal/generic" + dynamicfake "k8s.io/client-go/dynamic/fake" + clienttesting "k8s.io/client-go/testing" + "k8s.io/client-go/tools/cache" +) + +var ( + scheme *runtime.Scheme = runtime.NewScheme() + codecs serializer.CodecFactory = serializer.NewCodecFactory(scheme) + paramsGVK schema.GroupVersionKind = schema.GroupVersionKind{ + Group: "example.com", + Version: "v1", + Kind: "ParamsConfig", + } + fakeRestMapper *meta.DefaultRESTMapper = meta.NewDefaultRESTMapper([]schema.GroupVersion{ + { + Group: "", + Version: "v1", + }, + }) + + definitionGVK schema.GroupVersionKind = (&FakePolicyDefinition{}).GroupVersionKind() + bindingGVK schema.GroupVersionKind = (&FakePolicyBinding{}).GroupVersionKind() + + definitionsGVR schema.GroupVersionResource = definitionGVK.GroupVersion().WithResource("policydefinitions") + bindingsGVR schema.GroupVersionResource = bindingGVK.GroupVersion().WithResource("policybindings") +) + +func init() { + fakeRestMapper.Add(definitionGVK, meta.RESTScopeRoot) + fakeRestMapper.Add(bindingGVK, meta.RESTScopeNamespace) + fakeRestMapper.Add(paramsGVK, meta.RESTScopeNamespace) + + scheme.AddKnownTypeWithName(definitionGVK, &FakePolicyDefinition{}) + scheme.AddKnownTypeWithName(bindingGVK, &FakePolicyBinding{}) + + scheme.AddKnownTypeWithName((&FakePolicyDefinitionList{}).GroupVersionKind(), &FakePolicyDefinitionList{}) + scheme.AddKnownTypeWithName((&FakePolicyBindingList{}).GroupVersionKind(), &FakePolicyBindingList{}) + + scheme.AddKnownTypeWithName(paramsGVK, &unstructured.Unstructured{}) + scheme.AddKnownTypeWithName(schema.GroupVersionKind{ + Group: paramsGVK.Group, + Version: paramsGVK.Version, + Kind: paramsGVK.Kind + "List", + }, &unstructured.UnstructuredList{}) +} + +// Starts CEL admission controller and sets up a plugin configured with it as well +// as object trackers for manipulating the objects available to the system +// +// ParamTracker only knows the gvk `paramGVK`. If in the future we need to +// support multiple types of params this function needs to be augmented +// +// PolicyTracker expects FakePolicyDefinition and FakePolicyBinding types +func setupTest(t *testing.T) (plugin admission.ValidationInterface, paramTracker, policyTracker clienttesting.ObjectTracker, controller *celAdmissionController) { + testContext, testContextCancel := context.WithCancel(context.Background()) + t.Cleanup(testContextCancel) + + dynamicClient := dynamicfake.NewSimpleDynamicClient(scheme) + tracker := clienttesting.NewObjectTracker(scheme, codecs.UniversalDecoder()) + + // Set up fake informers that return instances of mock Policy definitoins + // and mock policy bindings + fakeDefinitionsInformer := cache.NewSharedIndexInformer(&cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + return tracker.List(definitionsGVR, definitionGVK, "") + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + return tracker.Watch(definitionsGVR, "") + }, + }, &FakePolicyDefinition{}, 30*time.Second, nil) + + fakeBindingsInformer := cache.NewSharedIndexInformer(&cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + return tracker.List(bindingsGVR, bindingGVK, "") + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + return tracker.Watch(bindingsGVR, "") + }, + }, &FakePolicyBinding{}, 30*time.Second, nil) + + go fakeDefinitionsInformer.Run(testContext.Done()) + go fakeBindingsInformer.Run(testContext.Done()) + + admissionController := NewAdmissionController( + fakeDefinitionsInformer, + fakeBindingsInformer, + nil, // objectConverter is unused by the `FakePolicyDefinition` compile func + fakeRestMapper, + dynamicClient, + ).(*celAdmissionController) + + handler, err := NewPlugin() + require.NoError(t, err) + + pluginInitializer := NewPluginInitializer(admissionController) + pluginInitializer.Initialize(handler) + err = admission.ValidateInitialization(handler) + require.NoError(t, err) + + go admissionController.Run(testContext.Done()) + return handler, dynamicClient.Tracker(), tracker, admissionController +} + +// Gets the last reconciled value in the controller of an object with the same +// gvk and name as the given object +func (c *celAdmissionController) getCurrentObject(obj runtime.Object) (runtime.Object, error) { + accessor, err := meta.Accessor(obj) + if err != nil { + return nil, err + } + + c.mutex.RLock() + defer c.mutex.RUnlock() + + switch obj.(type) { + case *unstructured.Unstructured: + paramSource := obj.GetObjectKind().GroupVersionKind() + var paramInformer generic.Informer[*unstructured.Unstructured] + if paramInfo, ok := c.paramsCRDControllers[paramSource]; ok { + paramInformer = paramInfo.controller.Informer() + } else { + return nil, fmt.Errorf("paramSource kind `%v` not known", paramSource.String()) + } + + // Param type. Just check informer for its GVK + item, err := paramInformer.Get(accessor.GetName()) + if err != nil { + if k8serrors.IsNotFound(err) { + return nil, nil + } + return nil, err + } + + return item, nil + case PolicyBinding: + namespacedName := accessor.GetNamespace() + "/" + accessor.GetName() + info, ok := c.bindingInfos[namespacedName] + if !ok { + return nil, nil + } + + return info.lastReconciledValue, nil + case PolicyDefinition: + namespacedName := accessor.GetNamespace() + "/" + accessor.GetName() + info, ok := c.definitionInfo[namespacedName] + if !ok { + return nil, nil + } + + return info.lastReconciledValue, nil + default: + panic(fmt.Errorf("unhandled object type: %T", obj)) + } +} + +// Waits for the given objects to have been the latest reconciled values of +// their gvk/name in the controller +func waitForReconcile(ctx context.Context, controller *celAdmissionController, objects ...runtime.Object) error { + return wait.PollWithContext(ctx, 200*time.Millisecond, 3*time.Hour, func(ctx context.Context) (done bool, err error) { + for _, obj := range objects { + currentValue, err := controller.getCurrentObject(obj) + if err != nil { + return false, err + } + + objMeta, err := meta.Accessor(obj) + if err != nil { + return false, err + } + valueMeta, err := meta.Accessor(currentValue) + if err != nil { + return false, err + } + + if len(objMeta.GetResourceVersion()) == 0 { + return false, fmt.Errorf("%s named %s has no resource version. please ensure your test objects have an RV", + obj.GetObjectKind().GroupVersionKind().String(), objMeta.GetName()) + } else if len(valueMeta.GetResourceVersion()) == 0 { + return false, fmt.Errorf("%s named %s has no resource version. please ensure your test objects have an RV", + currentValue.GetObjectKind().GroupVersionKind().String(), valueMeta.GetName()) + } else if objMeta.GetResourceVersion() != valueMeta.GetResourceVersion() { + return false, nil + } + } + + return true, nil + }) +} + +// Waits for the admissoin controller to have no knowledge of the objects +// with the given GVKs and namespace/names +func waitForReconcileDeletion(ctx context.Context, controller *celAdmissionController, objects ...runtime.Object) error { + return wait.PollWithContext(ctx, 200*time.Millisecond, 3*time.Hour, func(ctx context.Context) (done bool, err error) { + for _, obj := range objects { + currentValue, err := controller.getCurrentObject(obj) + if err != nil { + return false, err + } + + if currentValue != nil { + return false, nil + } + } + + return true, nil + }) +} + +func attributeRecord( + old, new runtime.Object, + operation admission.Operation, +) admission.Attributes { + accessor, err := meta.Accessor(new) + if err != nil { + panic(err) + } + + gvk := new.GetObjectKind().GroupVersionKind() + mapping, err := fakeRestMapper.RESTMapping(gvk.GroupKind(), gvk.Version) + if err != nil { + panic(err) + } + + return admission.NewAttributesRecord( + new, + old, + gvk, + accessor.GetNamespace(), + accessor.GetName(), + mapping.Resource, + "", + operation, + nil, + false, + nil, + ) +} + +func ptrTo[T any](obj T) *T { + return &obj +} + +//////////////////////////////////////////////////////////////////////////////// +// Functionality Tests +//////////////////////////////////////////////////////////////////////////////// + +func TestBasicPolicyDefinitionFailure(t *testing.T) { + testContext, testContextCancel := context.WithCancel(context.Background()) + defer testContextCancel() + + datalock := sync.Mutex{} + passedParams := []*unstructured.Unstructured{} + numCompiles := 0 + + fakeParams := &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": paramsGVK.GroupVersion().String(), + "kind": paramsGVK.Kind, + "metadata": map[string]interface{}{ + "name": "replicas-test.example.com", + "resourceVersion": "1", + }, + "maxReplicas": int64(3), + }, + } + + // Push some fake + denyPolicy := &FakePolicyDefinition{ + ObjectMeta: metav1.ObjectMeta{ + Name: "denypolicy.example.com", + ResourceVersion: "1", + }, + CompileFunc: ptrTo(func(converter ObjectConverter) (EvaluatorFunc, error) { + datalock.Lock() + numCompiles += 1 + datalock.Unlock() + + return func(a admission.Attributes, params *unstructured.Unstructured) []PolicyDecision { + datalock.Lock() + passedParams = append(passedParams, params) + datalock.Unlock() + + // Policy always denies + return []PolicyDecision{ + { + Kind: Deny, + Message: "Denied", + }, + } + }, nil + }), + ParamSource: ¶msGVK, + FailurePolicy: Fail, + } + + denyBinding := &FakePolicyBinding{ + ObjectMeta: metav1.ObjectMeta{ + Name: "denybinding.example.com", + Namespace: "", + ResourceVersion: "1", + }, + Params: "replicas-test.example.com", + Policy: "denypolicy.example.com", + } + + handler, paramTracker, tracker, controller := setupTest(t) + + require.NoError(t, paramTracker.Add(fakeParams)) + require.NoError(t, tracker.Add(denyPolicy)) + require.NoError(t, tracker.Add(denyBinding)) + + // Wait for controller to reconcile given objects + require.NoError(t, + waitForReconcile( + testContext, controller, + fakeParams, denyBinding, denyPolicy)) + + err := handler.Validate( + testContext, + // Object is irrelevant/unchecked for this test. Just test that + // the evaluator is executed, and returns a denial + attributeRecord(nil, denyBinding, admission.Create), + &admission.RuntimeObjectInterfaces{}, + ) + + require.ErrorContains(t, err, `{"kind":"Deny","message":"Denied"}`) + + require.Equal(t, []*unstructured.Unstructured{fakeParams}, passedParams) +} + +// Shows that if a definition does not match the input, it will not be used. +// But with a different input it will be used. +func TestDefinitionDoesntMatch(t *testing.T) { + handler, paramTracker, tracker, controller := setupTest(t) + testContext, testContextCancel := context.WithCancel(context.Background()) + defer testContextCancel() + + datalock := sync.Mutex{} + passedParams := []*unstructured.Unstructured{} + numCompiles := 0 + + denyPolicy := &FakePolicyDefinition{ + ObjectMeta: metav1.ObjectMeta{ + Name: "denypolicy.example.com", + ResourceVersion: "1", + }, + MatchFunc: ptrTo(func(a admission.Attributes) bool { + // Match names with even-numbered length + obj := a.GetObject() + + accessor, err := meta.Accessor(obj) + if err != nil { + t.Fatal(err) + return false + } + + return len(accessor.GetName())%2 == 0 + }), + CompileFunc: ptrTo(func(converter ObjectConverter) (EvaluatorFunc, error) { + datalock.Lock() + numCompiles += 1 + datalock.Unlock() + + return func(a admission.Attributes, params *unstructured.Unstructured) []PolicyDecision { + datalock.Lock() + passedParams = append(passedParams, params) + datalock.Unlock() + + // Policy always denies + return []PolicyDecision{ + { + Kind: Deny, + Message: "Denied", + }, + } + }, nil + }), + ParamSource: ¶msGVK, + FailurePolicy: Fail, + } + + fakeParams := &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": paramsGVK.GroupVersion().String(), + "kind": paramsGVK.Kind, + "metadata": map[string]interface{}{ + "name": "replicas-test.example.com", + "resourceVersion": "1", + }, + "maxReplicas": int64(3), + }, + } + + denyBinding := &FakePolicyBinding{ + ObjectMeta: metav1.ObjectMeta{ + Name: "denybinding.example.com", + Namespace: "", + ResourceVersion: "1", + }, + Params: "replicas-test.example.com", + Policy: "denypolicy.example.com", + } + + require.NoError(t, paramTracker.Add(fakeParams)) + require.NoError(t, tracker.Add(denyPolicy)) + require.NoError(t, tracker.Add(denyBinding)) + + // Wait for controller to reconcile given objects + require.NoError(t, + waitForReconcile( + testContext, controller, + fakeParams, denyBinding, denyPolicy)) + + // Validate a non-matching input. + // Should pass validation with no error. + + nonMatchingParams := &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": paramsGVK.GroupVersion().String(), + "kind": paramsGVK.Kind, + "metadata": map[string]interface{}{ + "name": "oddlength", + "resourceVersion": "1", + }, + }, + } + require.NoError(t, + handler.Validate(testContext, + attributeRecord( + nil, nonMatchingParams, + admission.Create), &admission.RuntimeObjectInterfaces{})) + require.Zero(t, numCompiles) + require.Empty(t, passedParams) + + // Validate a matching input. + // Should match and be denied. + matchingParams := &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": paramsGVK.GroupVersion().String(), + "kind": paramsGVK.Kind, + "metadata": map[string]interface{}{ + "name": "evenlength", + "resourceVersion": "1", + }, + }, + } + require.ErrorContains(t, + handler.Validate(testContext, + attributeRecord( + nil, matchingParams, + admission.Create), &admission.RuntimeObjectInterfaces{}), + `{"kind":"Deny","message":"Denied"}`) + require.Equal(t, numCompiles, 1) + require.Equal(t, passedParams, []*unstructured.Unstructured{fakeParams}) +} + +func TestReconfigureBinding(t *testing.T) { + testContext, testContextCancel := context.WithCancel(context.Background()) + defer testContextCancel() + + datalock := sync.Mutex{} + passedParams := []*unstructured.Unstructured{} + numCompiles := 0 + + fakeParams := &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": paramsGVK.GroupVersion().String(), + "kind": paramsGVK.Kind, + "metadata": map[string]interface{}{ + "name": "replicas-test.example.com", + "resourceVersion": "1", + }, + "maxReplicas": int64(3), + }, + } + + fakeParams2 := &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": paramsGVK.GroupVersion().String(), + "kind": paramsGVK.Kind, + "metadata": map[string]interface{}{ + "name": "replicas-test2.example.com", + "resourceVersion": "2", + }, + "maxReplicas": int64(35), + }, + } + + denyPolicy := &FakePolicyDefinition{ + ObjectMeta: metav1.ObjectMeta{ + Name: "denypolicy.example.com", + ResourceVersion: "1", + }, + CompileFunc: ptrTo(func(converter ObjectConverter) (EvaluatorFunc, error) { + datalock.Lock() + numCompiles += 1 + datalock.Unlock() + + return func(a admission.Attributes, params *unstructured.Unstructured) []PolicyDecision { + datalock.Lock() + passedParams = append(passedParams, params) + datalock.Unlock() + + // Policy always denies + return []PolicyDecision{ + { + Kind: Deny, + Message: "Denied", + }, + } + }, nil + }), + ParamSource: ¶msGVK, + FailurePolicy: Fail, + } + + denyBinding := &FakePolicyBinding{ + ObjectMeta: metav1.ObjectMeta{ + Name: "denybinding.example.com", + ResourceVersion: "1", + }, + Params: "replicas-test.example.com", + Policy: "denypolicy.example.com", + } + + denyBinding2 := &FakePolicyBinding{ + ObjectMeta: metav1.ObjectMeta{ + Name: "denybinding.example.com", + ResourceVersion: "2", + }, + Params: "replicas-test2.example.com", + Policy: "denypolicy.example.com", + } + + handler, paramTracker, tracker, controller := setupTest(t) + + require.NoError(t, paramTracker.Add(fakeParams)) + require.NoError(t, tracker.Add(denyPolicy)) + require.NoError(t, tracker.Add(denyBinding)) + + // Wait for controller to reconcile given objects + require.NoError(t, + waitForReconcile( + testContext, controller, + fakeParams, denyBinding, denyPolicy)) + + ar := attributeRecord(nil, denyBinding, admission.Create) + + err := handler.Validate( + testContext, + attributeRecord(nil, denyBinding, admission.Create), + &admission.RuntimeObjectInterfaces{}, + ) + + // Expect validation to fail for first time due to binding unconditionally + // failing + require.ErrorContains(t, err, `{"kind":"Deny","message":"Denied"}`, "expect policy validation error") + + // Expect `Compile` only called once + require.Equal(t, 1, numCompiles, "expect `Compile` to be called only once") + + // Show Evaluator was called + require.Len(t, passedParams, 1, "expect evaluator is called due to proper configuration") + + // Update the tracker to point at different params + require.NoError(t, tracker.Update(bindingsGVR, denyBinding2, "")) + + // Wait for update to propagate + // Wait for controller to reconcile given objects + require.NoError(t, waitForReconcile(testContext, controller, denyBinding2)) + + err = handler.Validate( + testContext, + ar, + &admission.RuntimeObjectInterfaces{}, + ) + + require.ErrorContains(t, err, `{"decision":{"kind":"Deny","message":"configuration error: replicas-test2.example.com not found"}`) + require.Equal(t, 1, numCompiles, "expect compile is not called when there is configuration error") + require.Len(t, passedParams, 1, "expect evaluator was not called when there is configuration error") + + // Add the missing params + require.NoError(t, paramTracker.Add(fakeParams2)) + + // Wait for update to propagate + require.NoError(t, waitForReconcile(testContext, controller, fakeParams2)) + + // Expect validation to now fail again. + err = handler.Validate( + testContext, + ar, + &admission.RuntimeObjectInterfaces{}, + ) + + // Expect validation to fail the third time due to validation failure + require.ErrorContains(t, err, `{"kind":"Deny","message":"Denied"}`, "expected a true policy failure, not a configuration error") + require.Equal(t, []*unstructured.Unstructured{fakeParams, fakeParams2}, passedParams, "expected call to `Validate` to cause call to evaluator") + require.Equal(t, 2, numCompiles, "expect changing binding causes a recompile") +} + +// Shows that a policy which is in effect will stop being in effect when removed +func TestRemoveDefinition(t *testing.T) { + testContext, testContextCancel := context.WithCancel(context.Background()) + defer testContextCancel() + + datalock := sync.Mutex{} + passedParams := []*unstructured.Unstructured{} + numCompiles := 0 + + fakeParams := &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": paramsGVK.GroupVersion().String(), + "kind": paramsGVK.Kind, + "metadata": map[string]interface{}{ + "name": "replicas-test.example.com", + "resourceVersion": "1", + }, + "maxReplicas": int64(3), + }, + } + + denyPolicy := &FakePolicyDefinition{ + ObjectMeta: metav1.ObjectMeta{ + Name: "denypolicy.example.com", + ResourceVersion: "1", + }, + CompileFunc: ptrTo(func(converter ObjectConverter) (EvaluatorFunc, error) { + datalock.Lock() + numCompiles += 1 + datalock.Unlock() + + return func(a admission.Attributes, params *unstructured.Unstructured) []PolicyDecision { + datalock.Lock() + passedParams = append(passedParams, params) + datalock.Unlock() + + // Policy always denies + return []PolicyDecision{ + { + Kind: Deny, + Message: "Denied", + }, + } + }, nil + }), + ParamSource: ¶msGVK, + FailurePolicy: Fail, + } + denyBinding := &FakePolicyBinding{ + ObjectMeta: metav1.ObjectMeta{ + Name: "denybinding.example.com", + Namespace: "", + ResourceVersion: "1", + }, + Params: "replicas-test.example.com", + Policy: "denypolicy.example.com", + } + + handler, paramTracker, tracker, controller := setupTest(t) + + require.NoError(t, paramTracker.Add(fakeParams)) + require.NoError(t, tracker.Add(denyPolicy)) + require.NoError(t, tracker.Add(denyBinding)) + + // Wait for controller to reconcile given objects + require.NoError(t, + waitForReconcile( + testContext, controller, + fakeParams, denyBinding, denyPolicy)) + + record := attributeRecord(nil, denyBinding, admission.Create) + require.ErrorContains(t, + handler.Validate( + testContext, + record, + &admission.RuntimeObjectInterfaces{}, + ), + `{"kind":"Deny","message":"Denied"}`) + + require.Equal(t, []*unstructured.Unstructured{fakeParams}, passedParams) + require.NoError(t, tracker.Delete(definitionsGVR, denyPolicy.Namespace, denyPolicy.Name)) + require.NoError(t, waitForReconcileDeletion(testContext, controller, denyPolicy)) + + require.NoError(t, handler.Validate( + testContext, + // Object is irrelevant/unchecked for this test. Just test that + // the evaluator is executed, and returns a denial + record, + &admission.RuntimeObjectInterfaces{}, + )) + +} + +// Shows that a binding which is in effect will stop being in effect when removed +func TestRemoveBinding(t *testing.T) { + testContext, testContextCancel := context.WithCancel(context.Background()) + defer testContextCancel() + + datalock := sync.Mutex{} + passedParams := []*unstructured.Unstructured{} + numCompiles := 0 + + fakeParams := &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": paramsGVK.GroupVersion().String(), + "kind": paramsGVK.Kind, + "metadata": map[string]interface{}{ + "name": "replicas-test.example.com", + "resourceVersion": "1", + }, + "maxReplicas": int64(3), + }, + } + + // Push some fake + denyPolicy := &FakePolicyDefinition{ + ObjectMeta: metav1.ObjectMeta{ + Name: "denypolicy.example.com", + ResourceVersion: "1", + }, + CompileFunc: ptrTo(func(converter ObjectConverter) (EvaluatorFunc, error) { + datalock.Lock() + numCompiles += 1 + datalock.Unlock() + + return func(a admission.Attributes, params *unstructured.Unstructured) []PolicyDecision { + datalock.Lock() + passedParams = append(passedParams, params) + datalock.Unlock() + + // Policy always denies + return []PolicyDecision{ + { + Kind: Deny, + Message: "Denied", + }, + } + }, nil + }), + ParamSource: ¶msGVK, + FailurePolicy: Fail, + } + + denyBinding := &FakePolicyBinding{ + ObjectMeta: metav1.ObjectMeta{ + Name: "denybinding.example.com", + Namespace: "", + ResourceVersion: "1", + }, + Params: "replicas-test.example.com", + Policy: "denypolicy.example.com", + } + + handler, paramTracker, tracker, controller := setupTest(t) + + require.NoError(t, paramTracker.Add(fakeParams)) + require.NoError(t, tracker.Add(denyPolicy)) + require.NoError(t, tracker.Add(denyBinding)) + + // Wait for controller to reconcile given objects + require.NoError(t, + waitForReconcile( + testContext, controller, + fakeParams, denyBinding, denyPolicy)) + + record := attributeRecord(nil, denyBinding, admission.Create) + + require.ErrorContains(t, + handler.Validate( + testContext, + record, + &admission.RuntimeObjectInterfaces{}, + ), + `{"kind":"Deny","message":"Denied"}`) + + require.Equal(t, []*unstructured.Unstructured{fakeParams}, passedParams) + require.NoError(t, tracker.Delete(bindingsGVR, denyBinding.Namespace, denyBinding.Name)) + require.NoError(t, waitForReconcileDeletion(testContext, controller, denyBinding)) + + require.ErrorContains(t, handler.Validate( + testContext, + // Object is irrelevant/unchecked for this test. Just test that + // the evaluator is executed, and returns a denial + record, + &admission.RuntimeObjectInterfaces{}, + ), `{"decision":{"kind":"Deny","message":"configuration error: no bindings found"}`) +} + +// Shows that an error is surfaced if a paramSource specified in a binding does +// not actually exist +func TestInvalidParamSourceGVK(t *testing.T) { + testContext, testContextCancel := context.WithCancel(context.Background()) + defer testContextCancel() + handler, _, tracker, controller := setupTest(t) + passedParams := make(chan *unstructured.Unstructured) + + denyPolicy := &FakePolicyDefinition{ + ObjectMeta: metav1.ObjectMeta{ + Name: "denypolicy.example.com", + Namespace: "", + ResourceVersion: "1", + }, + CompileFunc: ptrTo(func(converter ObjectConverter) (EvaluatorFunc, error) { + return func(a admission.Attributes, params *unstructured.Unstructured) []PolicyDecision { + // Policy always denies + return []PolicyDecision{ + { + Kind: Deny, + Message: "Denied", + }, + } + }, nil + }), + ParamSource: ptrTo(paramsGVK.GroupVersion().WithKind("BadParamKind")), + FailurePolicy: Fail, + } + + denyBinding := &FakePolicyBinding{ + ObjectMeta: metav1.ObjectMeta{ + Name: "denybinding.example.com", + Namespace: "", + ResourceVersion: "1", + }, + Params: "replicas-test.example.com", + Policy: "denypolicy.example.com", + } + + require.NoError(t, tracker.Add(denyPolicy)) + require.NoError(t, tracker.Add(denyBinding)) + + // Wait for controller to reconcile given objects + require.NoError(t, + waitForReconcile( + testContext, controller, + denyBinding, denyPolicy)) + + err := handler.Validate( + testContext, + attributeRecord(nil, denyBinding, admission.Create), + &admission.RuntimeObjectInterfaces{}, + ) + + // expect the specific error to be that the param was not found, not that CRD + // is not existing + require.ErrorContains(t, err, + `{"decision":{"kind":"Deny","message":"configuration error: failed to find resource for param source: 'example.com/v1, Kind=BadParamKind'"}`) + + close(passedParams) + require.Len(t, passedParams, 0) +} + +// Shows that an error is surfaced if a param specified in a binding does not +// actually exist +func TestInvalidParamSourceInstanceName(t *testing.T) { + testContext, testContextCancel := context.WithCancel(context.Background()) + defer testContextCancel() + handler, _, tracker, controller := setupTest(t) + + datalock := sync.Mutex{} + passedParams := []*unstructured.Unstructured{} + numCompiles := 0 + + denyPolicy := &FakePolicyDefinition{ + ObjectMeta: metav1.ObjectMeta{ + Name: "denypolicy.example.com", + ResourceVersion: "1", + }, + CompileFunc: ptrTo(func(converter ObjectConverter) (EvaluatorFunc, error) { + datalock.Lock() + numCompiles += 1 + datalock.Unlock() + + return func(a admission.Attributes, params *unstructured.Unstructured) []PolicyDecision { + datalock.Lock() + passedParams = append(passedParams, params) + datalock.Unlock() + + // Policy always denies + return []PolicyDecision{ + { + Kind: Deny, + Message: "Denied", + }, + } + }, nil + }), + ParamSource: ¶msGVK, + FailurePolicy: Fail, + } + + denyBinding := &FakePolicyBinding{ + ObjectMeta: metav1.ObjectMeta{ + Name: "denybinding.example.com", + Namespace: "", + ResourceVersion: "1", + }, + Params: "replicas-test.example.com", + Policy: "denypolicy.example.com", + } + + require.NoError(t, tracker.Add(denyPolicy)) + require.NoError(t, tracker.Add(denyBinding)) + + // Wait for controller to reconcile given objects + require.NoError(t, + waitForReconcile( + testContext, controller, + denyBinding, denyPolicy)) + + err := handler.Validate( + testContext, + attributeRecord(nil, denyBinding, admission.Create), + &admission.RuntimeObjectInterfaces{}, + ) + + // expect the specific error to be that the param was not found, not that CRD + // is not existing + require.ErrorContains(t, err, + `{"decision":{"kind":"Deny","message":"configuration error: replicas-test.example.com not found"}`) + require.Len(t, passedParams, 0) +} + +// Shows that a definition with no param source works just fine, and has +// nil params passed to its evaluator. +// +// Also shows that if binding has specified params in this instance then they +// are silently ignored. +func TestEmptyParamSource(t *testing.T) { + testContext, testContextCancel := context.WithCancel(context.Background()) + defer testContextCancel() + + datalock := sync.Mutex{} + passedParams := []*unstructured.Unstructured{} + numCompiles := 0 + + // Push some fake + denyPolicy := &FakePolicyDefinition{ + ObjectMeta: metav1.ObjectMeta{ + Name: "denypolicy.example.com", + ResourceVersion: "1", + }, + CompileFunc: ptrTo(func(converter ObjectConverter) (EvaluatorFunc, error) { + datalock.Lock() + numCompiles += 1 + datalock.Unlock() + + return func(a admission.Attributes, params *unstructured.Unstructured) []PolicyDecision { + datalock.Lock() + passedParams = append(passedParams, params) + datalock.Unlock() + + // Policy always denies + return []PolicyDecision{ + { + Kind: Deny, + Message: "Denied", + }, + } + }, nil + }), + ParamSource: nil, + FailurePolicy: Fail, + } + + denyBinding := &FakePolicyBinding{ + ObjectMeta: metav1.ObjectMeta{ + Name: "denybinding.example.com", + Namespace: "", + ResourceVersion: "1", + }, + Params: "replicas-test.example.com", + Policy: "denypolicy.example.com", + } + + handler, _, tracker, controller := setupTest(t) + + require.NoError(t, tracker.Add(denyPolicy)) + require.NoError(t, tracker.Add(denyBinding)) + + // Wait for controller to reconcile given objects + require.NoError(t, + waitForReconcile( + testContext, controller, + denyBinding, denyPolicy)) + + err := handler.Validate( + testContext, + // Object is irrelevant/unchecked for this test. Just test that + // the evaluator is executed, and returns a denial + attributeRecord(nil, denyBinding, admission.Create), + &admission.RuntimeObjectInterfaces{}, + ) + + require.ErrorContains(t, err, `{"kind":"Deny","message":"Denied"}`) + require.Equal(t, 1, numCompiles) + require.Equal(t, []*unstructured.Unstructured{nil}, passedParams) +} diff --git a/staging/src/k8s.io/apiserver/pkg/admission/plugin/cel/controller.go b/staging/src/k8s.io/apiserver/pkg/admission/plugin/cel/controller.go new file mode 100644 index 00000000000..150940747e1 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/admission/plugin/cel/controller.go @@ -0,0 +1,320 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cel + +import ( + "context" + "errors" + "fmt" + "sync" + + k8serrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apiserver/pkg/admission" + "k8s.io/apiserver/pkg/admission/plugin/cel/internal/generic" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/tools/cache" + "k8s.io/klog/v2" +) + +// celAdmissionController is the top-level controller for admission control using CEL +// it is responsible for watching policy definitions, bindings, and config param CRDs +type celAdmissionController struct { + // Context under which the controller runs + runningContext context.Context + + policyDefinitionsController generic.Controller[PolicyDefinition] + policyBindingController generic.Controller[PolicyBinding] + + // dynamicclient used to create informers to watch the param crd types + dynamicClient dynamic.Interface + restMapper meta.RESTMapper + + // Provided to the policy's Compile function as an injected dependency to + // assist with compiling its expressions to CEL + objectConverter ObjectConverter + + // Lock which protects: + // - definitionInfo + // - bindingInfos + // - paramCRDControllers + // - definitionsToBindings + // All other fields should be assumed constant + mutex sync.RWMutex + + // controller and metadata + paramsCRDControllers map[schema.GroupVersionKind]*paramInfo + + // Index for each definition namespace/name, contains all binding + // namespace/names known to exist for that definition + definitionInfo map[string]*definitionInfo + + // Index for each bindings namespace/name. Contains compiled templates + // for the binding depending on the policy/param combination. + bindingInfos map[string]*bindingInfo + + // Map from namespace/name of a definition to a set of namespace/name + // of bindings which depend on it. + // All keys must have at least one dependent binding + // All binding names MUST exist as a key bindingInfos + definitionsToBindings map[string]sets.String +} + +type definitionInfo struct { + // Error about the state of the definition's configuration and the cluster + // preventing its enforcement or compilation. + // Reset every reconciliation + configurationError error + + // Last value seen by this controller to be used in policy enforcement + // May not be nil + lastReconciledValue PolicyDefinition +} + +type bindingInfo struct { + // Compiled CEL expression turned into an evaluator + evaluator EvaluatorFunc + + // Last value seen by this controller to be used in policy enforcement + // May not be nil + lastReconciledValue PolicyBinding +} + +type paramInfo struct { + // Controller which is watching this param CRD + controller generic.Controller[*unstructured.Unstructured] + + // Function to call to stop the informer and clean up the controller + stop func() + + // Policy Definitions which refer to this param CRD + dependentDefinitions sets.String +} + +func NewAdmissionController( + // Informers + policyDefinitionsInformer cache.SharedIndexInformer, + policyBindingInformer cache.SharedIndexInformer, + + // Injected Dependencies + objectConverter ObjectConverter, + restMapper meta.RESTMapper, + dynamicClient dynamic.Interface, +) CELPolicyEvaluator { + c := &celAdmissionController{ + definitionInfo: make(map[string]*definitionInfo), + bindingInfos: make(map[string]*bindingInfo), + paramsCRDControllers: make(map[schema.GroupVersionKind]*paramInfo), + definitionsToBindings: make(map[string]sets.String), + dynamicClient: dynamicClient, + objectConverter: objectConverter, + restMapper: restMapper, + } + + c.policyDefinitionsController = generic.NewController( + generic.NewInformer[PolicyDefinition](policyDefinitionsInformer), + c.reconcilePolicyDefinition, + generic.ControllerOptions{ + Workers: 1, + Name: "cel-policy-definitions", + }, + ) + c.policyBindingController = generic.NewController( + generic.NewInformer[PolicyBinding](policyBindingInformer), + c.reconcilePolicyBinding, + generic.ControllerOptions{ + Workers: 1, + Name: "cel-policy-bindings", + }, + ) + return c +} + +func (c *celAdmissionController) Run(stopCh <-chan struct{}) { + if c.runningContext != nil { + return + } + + ctx, cancel := context.WithCancel(context.Background()) + + c.runningContext = ctx + defer func() { + c.runningContext = nil + }() + + wg := sync.WaitGroup{} + + wg.Add(1) + go func() { + defer wg.Done() + c.policyDefinitionsController.Run(ctx) + }() + + wg.Add(1) + go func() { + defer wg.Done() + c.policyBindingController.Run(ctx) + }() + + <-stopCh + cancel() + wg.Wait() +} + +func (c *celAdmissionController) Validate( + ctx context.Context, + a admission.Attributes, + o admission.ObjectInterfaces, +) (err error) { + c.mutex.RLock() + defer c.mutex.RUnlock() + + var allDecisions []PolicyDecisionWithMetadata = nil + + addConfigError := func(err error, definition PolicyDefinition, binding PolicyBinding) { + wrappedError := fmt.Errorf("configuration error: %w", err) + switch p := definition.GetFailurePolicy(); p { + case Ignore: + klog.Info(wrappedError) + return + case Fail: + allDecisions = append(allDecisions, PolicyDecisionWithMetadata{ + PolicyDecision: PolicyDecision{ + Kind: Deny, + Message: wrappedError.Error(), + }, + Definition: definition, + Binding: binding, + }) + default: + utilruntime.HandleError(fmt.Errorf("unrecognized failure policy: '%v'", p)) + } + } + for definitionNamespacedName, definitionInfo := range c.definitionInfo { + definition := definitionInfo.lastReconciledValue + if !definition.Matches(a) { + // Policy definition does not match request + continue + } else if definitionInfo.configurationError != nil { + // Configuration error. + addConfigError(definitionInfo.configurationError, definition, nil) + continue + } + + dependentBindings := c.definitionsToBindings[definitionNamespacedName] + if len(dependentBindings) == 0 { + // Definition has no known bindings yet. + addConfigError(errors.New("no bindings found"), definition, nil) + continue + } + + for namespacedBindingName := range dependentBindings { + // If the key is inside dependentBindings, there is guaranteed to + // be a bindingInfo for it + bindingInfo := c.bindingInfos[namespacedBindingName] + binding := bindingInfo.lastReconciledValue + if !binding.Matches(a) { + continue + } + + var param *unstructured.Unstructured + + // If definition has no paramsource, always provide nil params to + // evaluator. If binding specifies a params to use they are ignored. + // Done this way so you can configure params before definition is ready. + if paramSource := definition.GetParamSource(); paramSource != nil { + paramsNamespace, paramsName := binding.GetTargetParams() + + // Find the params referred by the binding by looking its name up + // in our informer for its CRD + paramInfo, ok := c.paramsCRDControllers[*paramSource] + if !ok { + addConfigError(fmt.Errorf("paramSource kind `%v` not known", + paramSource.String()), definition, binding) + continue + } + + if len(paramsNamespace) == 0 { + param, err = paramInfo.controller.Informer().Get(paramsName) + } else { + param, err = paramInfo.controller.Informer().Namespaced(paramsNamespace).Get(paramsName) + } + + if err != nil { + // Apply failure policy + addConfigError(err, definition, binding) + + if k8serrors.IsNotFound(err) { + // Param doesnt exist yet? + // Maybe just have to wait a bit. + continue + } + + // There was a bad internal error + utilruntime.HandleError(err) + continue + } + } + + if bindingInfo.evaluator == nil { + // Compile policy definition using binding + bindingInfo.evaluator, err = definition.Compile(c.objectConverter, c.restMapper) + if err != nil { + // compilation error. Apply failure policy + wrappedError := fmt.Errorf("failed to compile CEL expression: %w", err) + addConfigError(wrappedError, definition, binding) + continue + } + c.bindingInfos[namespacedBindingName] = bindingInfo + } + + decisions := bindingInfo.evaluator(a, param) + for _, decision := range decisions { + switch decision.Kind { + case Admit: + // Do nothing + case Deny: + allDecisions = append(allDecisions, PolicyDecisionWithMetadata{ + Definition: definition, + Binding: binding, + PolicyDecision: decision, + }) + default: + // unrecognized decision. ignore + } + } + } + } + + if len(allDecisions) > 0 { + return k8serrors.NewConflict( + a.GetResource().GroupResource(), a.GetName(), + &PolicyError{ + Decisions: allDecisions, + }) + } + + return nil +} +func (c *celAdmissionController) HasSynced() bool { + return c.policyBindingController.HasSynced() && + c.policyDefinitionsController.HasSynced() +} diff --git a/staging/src/k8s.io/apiserver/pkg/admission/plugin/cel/controller_reconcile.go b/staging/src/k8s.io/apiserver/pkg/admission/plugin/cel/controller_reconcile.go new file mode 100644 index 00000000000..d0aa7d1ae78 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/admission/plugin/cel/controller_reconcile.go @@ -0,0 +1,203 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cel + +import ( + "context" + "fmt" + "time" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apiserver/pkg/admission/plugin/cel/internal/generic" + "k8s.io/client-go/dynamic/dynamicinformer" + "k8s.io/client-go/tools/cache" +) + +func (c *celAdmissionController) reconcilePolicyDefinition(namespace, name string, definition PolicyDefinition) error { + c.mutex.Lock() + defer c.mutex.Unlock() + + // Namespace for policydefinition is empty. Leaving usage here for compatibility + // with future NamespacedPolicyDefinition + namespacedName := namespace + "/" + name + info, ok := c.definitionInfo[namespacedName] + if !ok { + info = &definitionInfo{} + c.definitionInfo[namespacedName] = info + } + + var paramSource *schema.GroupVersionKind + if definition != nil { + paramSource = definition.GetParamSource() + } + + // If param source has changed, remove definition as dependent of old params + // If there are no more dependents of old param, stop and clean up controller + if info.lastReconciledValue != nil && info.lastReconciledValue.GetParamSource() != nil { + oldParamSource := *info.lastReconciledValue.GetParamSource() + + // If we are: + // - switching from having a param to not having a param (includes deletion) + // - or from having a param to a different one + // we remove dependency on the controller. + if paramSource == nil || *paramSource != oldParamSource { + if oldParamInfo, ok := c.paramsCRDControllers[oldParamSource]; ok { + oldParamInfo.dependentDefinitions.Delete(namespacedName) + if len(oldParamInfo.dependentDefinitions) == 0 { + oldParamInfo.stop() + delete(c.paramsCRDControllers, oldParamSource) + } + } + } + } + + // Reset all previously compiled evaluators in case something relevant in + // definition has changed. + for key := range c.definitionsToBindings[namespacedName] { + bindingInfo := c.bindingInfos[key] + bindingInfo.evaluator = nil + c.bindingInfos[key] = bindingInfo + } + + if definition == nil { + delete(c.definitionInfo, namespacedName) + return nil + } + + // Update definition info + info.lastReconciledValue = definition + info.configurationError = nil + + if paramSource == nil { + // Skip setting up controller for empty param type + return nil + } + + // find GVR for params + paramsGVR, err := c.restMapper.RESTMapping(paramSource.GroupKind(), paramSource.Version) + if err != nil { + // Failed to resolve. Return error so we retry again (rate limited) + // Save a record of this definition with an evaluator that unconditionally + // + info.configurationError = fmt.Errorf("failed to find resource for param source: '%v'", paramSource.String()) + return info.configurationError + } + + // Start watching the param CRD + if _, ok := c.paramsCRDControllers[*paramSource]; !ok { + instanceContext, instanceCancel := context.WithCancel(c.runningContext) + + // Watch for new instances of this policy + informer := dynamicinformer.NewFilteredDynamicInformer( + c.dynamicClient, + paramsGVR.Resource, + corev1.NamespaceAll, + 30*time.Second, + cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, + nil, + ) + + controller := generic.NewController( + generic.NewInformer[*unstructured.Unstructured](informer.Informer()), + c.reconcileParams, + generic.ControllerOptions{ + Workers: 1, + Name: paramSource.String() + "-controller", + }, + ) + + c.paramsCRDControllers[*paramSource] = ¶mInfo{ + controller: controller, + stop: instanceCancel, + dependentDefinitions: sets.NewString(namespacedName), + } + + go informer.Informer().Run(instanceContext.Done()) + go controller.Run(instanceContext) + } + + return nil +} + +func (c *celAdmissionController) reconcilePolicyBinding(namespace, name string, binding PolicyBinding) error { + c.mutex.Lock() + defer c.mutex.Unlock() + + // Namespace for PolicyBinding is empty. In the future a namespaced binding + // may be added + // https://github.com/kubernetes/enhancements/blob/bf5c3c81ea2081d60c1dc7c832faa98479e06209/keps/sig-api-machinery/3488-cel-admission-control/README.md?plain=1#L1042 + namespacedName := namespace + "/" + name + info, ok := c.bindingInfos[namespacedName] + if !ok { + info = &bindingInfo{} + c.bindingInfos[namespacedName] = info + } + + oldNamespacedDefinitionName := "" + if info.lastReconciledValue != nil { + oldefinitionNamespace, oldefinitionName := info.lastReconciledValue.GetTargetDefinition() + oldNamespacedDefinitionName = oldefinitionNamespace + "/" + oldefinitionName + } + + namespacedDefinitionName := "" + if binding != nil { + newDefinitionNamespace, newDefinitionName := binding.GetTargetDefinition() + namespacedDefinitionName = newDefinitionNamespace + "/" + newDefinitionName + } + + // Remove record of binding from old definition if the referred policy + // has changed + if oldNamespacedDefinitionName != namespacedDefinitionName { + if dependentBindings, ok := c.definitionsToBindings[oldNamespacedDefinitionName]; ok { + dependentBindings.Delete(namespacedName) + + // if there are no more dependent bindings, remove knowledge of the + // definition altogether + if len(dependentBindings) == 0 { + delete(c.definitionsToBindings, oldNamespacedDefinitionName) + } + } + } + + if binding == nil { + delete(c.bindingInfos, namespacedName) + return nil + } + + // Add record of binding to new definition + if dependentBindings, ok := c.definitionsToBindings[namespacedDefinitionName]; ok { + dependentBindings.Insert(namespacedName) + } else { + c.definitionsToBindings[namespacedDefinitionName] = sets.NewString(namespacedName) + } + + // Remove compiled template for old binding + info.evaluator = nil + info.lastReconciledValue = binding + return nil +} + +func (c *celAdmissionController) reconcileParams(namespace, name string, params *unstructured.Unstructured) error { + // Do nothing. + // When we add informational type checking we will need to compile in the + // reconcile loops instead of lazily so we can add compiler errors / type + // checker errors to the status of the resources. + return nil +} diff --git a/staging/src/k8s.io/apiserver/pkg/admission/plugin/cel/fake.go b/staging/src/k8s.io/apiserver/pkg/admission/plugin/cel/fake.go new file mode 100644 index 00000000000..9163f0e9e8a --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/admission/plugin/cel/fake.go @@ -0,0 +1,258 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cel + +import ( + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apiserver/pkg/admission" +) + +//////////////////////////////////////////////////////////////////////////////// +// Fake Policy Definitions +//////////////////////////////////////////////////////////////////////////////// + +type FakePolicyDefinition struct { + metav1.TypeMeta + metav1.ObjectMeta + + // Function called when `Matches` is called + // If nil, a default function that always returns true is used + // Specified as a function pointer so that this type is still comparable + MatchFunc *func(admission.Attributes) bool `json:"-"` + + // Func invoked for implementation of `Compile` + // Specified as a function pointer so that this type is still comparable + CompileFunc *func(converter ObjectConverter) (EvaluatorFunc, error) `json:"-"` + + // GVK to return when ParamSource() is called + ParamSource *schema.GroupVersionKind `json:"paramSource"` + + FailurePolicy FailurePolicy `json:"failurePolicy"` +} + +var _ PolicyDefinition = &FakePolicyDefinition{} + +func (f *FakePolicyDefinition) SetGroupVersionKind(kind schema.GroupVersionKind) { + f.TypeMeta.APIVersion = kind.GroupVersion().String() + f.TypeMeta.Kind = kind.Kind +} + +func (f *FakePolicyDefinition) GroupVersionKind() schema.GroupVersionKind { + parsedGV, err := schema.ParseGroupVersion(f.TypeMeta.APIVersion) + if err != nil || f.TypeMeta.Kind == "" || parsedGV.Empty() { + return schema.GroupVersionKind{ + Group: "admission.k8s.io", + Version: "v1alpha1", + Kind: "PolicyDefinition", + } + } + return schema.GroupVersionKind{ + Group: parsedGV.Group, + Version: parsedGV.Version, + Kind: f.TypeMeta.Kind, + } +} + +func (f *FakePolicyDefinition) GetObjectKind() schema.ObjectKind { + return f +} + +func (f *FakePolicyDefinition) DeepCopyObject() runtime.Object { + copied := *f + f.ObjectMeta.DeepCopyInto(&copied.ObjectMeta) + return &copied +} + +func (f *FakePolicyDefinition) GetName() string { + return f.ObjectMeta.Name +} + +func (f *FakePolicyDefinition) GetNamespace() string { + return f.ObjectMeta.Namespace +} + +func (f *FakePolicyDefinition) Matches(a admission.Attributes) bool { + if f.MatchFunc == nil || *f.MatchFunc == nil { + return true + } + return (*f.MatchFunc)(a) +} + +func (f *FakePolicyDefinition) Compile( + converter ObjectConverter, + mapper meta.RESTMapper, +) (EvaluatorFunc, error) { + if f.CompileFunc == nil || *f.CompileFunc == nil { + panic("must provide a CompileFunc to policy definition") + } + return (*f.CompileFunc)(converter) +} + +func (f *FakePolicyDefinition) GetParamSource() *schema.GroupVersionKind { + return f.ParamSource +} + +func (f *FakePolicyDefinition) GetFailurePolicy() FailurePolicy { + return f.FailurePolicy +} + +//////////////////////////////////////////////////////////////////////////////// +// Fake Policy Binding +//////////////////////////////////////////////////////////////////////////////// + +type FakePolicyBinding struct { + metav1.TypeMeta + metav1.ObjectMeta + + // Specified as a function pointer so that this type is still comparable + MatchFunc *func(admission.Attributes) bool `json:"-"` + Params string `json:"params"` + Policy string `json:"policy"` +} + +var _ PolicyBinding = &FakePolicyBinding{} + +func (f *FakePolicyBinding) SetGroupVersionKind(kind schema.GroupVersionKind) { + f.TypeMeta.APIVersion = kind.GroupVersion().String() + f.TypeMeta.Kind = kind.Kind +} + +func (f *FakePolicyBinding) GroupVersionKind() schema.GroupVersionKind { + parsedGV, err := schema.ParseGroupVersion(f.TypeMeta.APIVersion) + if err != nil || f.TypeMeta.Kind == "" || parsedGV.Empty() { + return schema.GroupVersionKind{ + Group: "admission.k8s.io", + Version: "v1alpha1", + Kind: "PolicyBinding", + } + } + return schema.GroupVersionKind{ + Group: parsedGV.Group, + Version: parsedGV.Version, + Kind: f.TypeMeta.Kind, + } +} + +func (f *FakePolicyBinding) GetObjectKind() schema.ObjectKind { + return f +} + +func (f *FakePolicyBinding) DeepCopyObject() runtime.Object { + copied := *f + f.ObjectMeta.DeepCopyInto(&copied.ObjectMeta) + return &copied +} + +func (f *FakePolicyBinding) Matches(a admission.Attributes) bool { + if f.MatchFunc == nil || *f.MatchFunc == nil { + return true + } + return (*f.MatchFunc)(a) +} + +func (f *FakePolicyBinding) GetTargetDefinition() (namespace, name string) { + return f.Namespace, f.Policy +} + +func (f *FakePolicyBinding) GetTargetParams() (namespace, name string) { + return f.Namespace, f.Params +} + +/// List Types + +type FakePolicyDefinitionList struct { + metav1.TypeMeta + metav1.ListMeta + + Items []FakePolicyDefinition +} + +func (f *FakePolicyDefinitionList) SetGroupVersionKind(kind schema.GroupVersionKind) { + f.TypeMeta.APIVersion = kind.GroupVersion().String() + f.TypeMeta.Kind = kind.Kind +} + +func (f *FakePolicyDefinitionList) GroupVersionKind() schema.GroupVersionKind { + parsedGV, err := schema.ParseGroupVersion(f.TypeMeta.APIVersion) + if err != nil || f.TypeMeta.Kind == "" || parsedGV.Empty() { + return schema.GroupVersionKind{ + Group: "admission.k8s.io", + Version: "v1alpha1", + Kind: "PolicyDefinitionList", + } + } + return schema.GroupVersionKind{ + Group: parsedGV.Group, + Version: parsedGV.Version, + Kind: f.TypeMeta.Kind, + } +} + +func (f *FakePolicyDefinitionList) GetObjectKind() schema.ObjectKind { + return f +} + +func (f *FakePolicyDefinitionList) DeepCopyObject() runtime.Object { + copied := *f + f.ListMeta.DeepCopyInto(&copied.ListMeta) + copied.Items = make([]FakePolicyDefinition, len(f.Items)) + copy(copied.Items, f.Items) + return &copied +} + +type FakePolicyBindingList struct { + metav1.TypeMeta + metav1.ListMeta + + Items []FakePolicyBinding +} + +func (f *FakePolicyBindingList) SetGroupVersionKind(kind schema.GroupVersionKind) { + f.TypeMeta.APIVersion = kind.GroupVersion().String() + f.TypeMeta.Kind = kind.Kind +} + +func (f *FakePolicyBindingList) GroupVersionKind() schema.GroupVersionKind { + parsedGV, err := schema.ParseGroupVersion(f.TypeMeta.APIVersion) + if err != nil || f.TypeMeta.Kind == "" || parsedGV.Empty() { + return schema.GroupVersionKind{ + Group: "admission.k8s.io", + Version: "v1alpha1", + Kind: "PolicyBindingList", + } + } + return schema.GroupVersionKind{ + Group: parsedGV.Group, + Version: parsedGV.Version, + Kind: f.TypeMeta.Kind, + } +} + +func (f *FakePolicyBindingList) GetObjectKind() schema.ObjectKind { + return f +} + +func (f *FakePolicyBindingList) DeepCopyObject() runtime.Object { + copied := *f + f.ListMeta.DeepCopyInto(&copied.ListMeta) + copied.Items = make([]FakePolicyBinding, len(f.Items)) + copy(copied.Items, f.Items) + return &copied +} diff --git a/staging/src/k8s.io/apiserver/pkg/admission/plugin/cel/initializer.go b/staging/src/k8s.io/apiserver/pkg/admission/plugin/cel/initializer.go new file mode 100644 index 00000000000..18fa3e119d0 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/admission/plugin/cel/initializer.go @@ -0,0 +1,55 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cel + +import ( + "context" + + "k8s.io/apiserver/pkg/admission" +) + +type CELPolicyEvaluator interface { + Validate(ctx context.Context, a admission.Attributes, o admission.ObjectInterfaces) error + HasSynced() bool +} + +// NewPluginInitializer creates a plugin initializer which dependency injects a +// singleton cel admission controller into the plugins which desire it +func NewPluginInitializer(validator CELPolicyEvaluator) *PluginInitializer { + return &PluginInitializer{validator: validator} +} + +// WantsCELPolicyEvaluator gives the ability to have the shared +// CEL Admission Controller dependency injected at initialization-time. +type WantsCELPolicyEvaluator interface { + SetCELPolicyEvaluator(CELPolicyEvaluator) +} + +// PluginInitializer is used for initialization of the webhook admission plugin. +type PluginInitializer struct { + validator CELPolicyEvaluator +} + +var _ admission.PluginInitializer = &PluginInitializer{} + +// Initialize checks the initialization interfaces implemented by each plugin +// and provide the appropriate initialization data +func (i *PluginInitializer) Initialize(plugin admission.Interface) { + if wants, ok := plugin.(WantsCELPolicyEvaluator); ok { + wants.SetCELPolicyEvaluator(i.validator) + } +} diff --git a/staging/src/k8s.io/apiserver/pkg/admission/plugin/cel/interface.go b/staging/src/k8s.io/apiserver/pkg/admission/plugin/cel/interface.go new file mode 100644 index 00000000000..d23ef46b838 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/admission/plugin/cel/interface.go @@ -0,0 +1,108 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cel + +import ( + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apiserver/pkg/admission" + "k8s.io/apiserver/pkg/cel" + + "github.com/google/cel-go/common/types/ref" +) + +type FailurePolicy string + +const ( + Fail FailurePolicy = "Fail" + Ignore FailurePolicy = "Ignore" +) + +// EvaluatorFunc represents the AND of one or more compiled CEL expression's +// evaluators `params` may be nil if definition does not specify a paramsource +type EvaluatorFunc func(a admission.Attributes, params *unstructured.Unstructured) []PolicyDecision + +// ObjectConverter is Dependency Injected into the PolicyDefinition's `Compile` +// function to assist with converting types and values to/from CEL-typed values. +type ObjectConverter interface { + // DeclForResource looks up the openapi or JSONSchemaProps, structural schema, etc. + // and compiles it into something that can be used to turn objects into CEL + // values + DeclForResource(gvr schema.GroupVersionResource) (*cel.DeclType, error) + + // ValueForObject takes a Kubernetes Object and uses the CEL DeclType + // to transform it into a CEL value. + // Object may be a typed native object or an unstructured object + ValueForObject(value runtime.Object, decl *cel.DeclType) (ref.Val, error) +} + +// PolicyDefinition is an interface for internal policy binding type. +// Implemented by mock/testing types, and to be implemented by the public API +// types once they have completed API review. +// +// The interface closely mirrors the format and functionality of the +// PolicyDefinition proposed in the KEP. +type PolicyDefinition interface { + runtime.Object + + // Matches says whether this policy definition matches the provided admission + // resource request + Matches(a admission.Attributes) bool + + Compile( + // Definition is provided with a converter which may be used by the + // return evaluator function to convert objects into CEL-typed objects + objectConverter ObjectConverter, + // Injected RESTMapper to assist with compilation + mapper meta.RESTMapper, + ) (EvaluatorFunc, error) + + // GetParamSource returns the GVK for the CRD used as the source of + // parameters used in the evaluation of instances of this policy + // May return nil if there is no paramsource for this definition. + GetParamSource() *schema.GroupVersionKind + + // GetFailurePolicy returns how an object should be treated during an + // admission when there is a configuration error preventing CEL evaluation + GetFailurePolicy() FailurePolicy +} + +// PolicyBinding is an interface for internal policy binding type. Implemented +// by mock/testing types, and to be implemented by the public API types once +// they have completed API review. +// +// The interface closely mirrors the format and functionality of the +// PolicyBinding proposed in the KEP. +type PolicyBinding interface { + runtime.Object + + // Matches says whether this policy binding matches the provided admission + // resource request + Matches(a admission.Attributes) bool + + // GetTargetDefinition returns the Namespace/Name of Policy Definition used + // by this binding. + GetTargetDefinition() (namespace, name string) + + // GetTargetParams returns the Namespace/Name of instance of TargetDefinition's + // ParamSource to be provided to the CEL expressions of the definition during + // evaluation. + // If TargetDefinition has nil ParamSource, this is ignored. + GetTargetParams() (namespace, name string) +} diff --git a/staging/src/k8s.io/apiserver/pkg/admission/plugin/cel/internal/generic/controller.go b/staging/src/k8s.io/apiserver/pkg/admission/plugin/cel/internal/generic/controller.go new file mode 100644 index 00000000000..10e42b5009c --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/admission/plugin/cel/internal/generic/controller.go @@ -0,0 +1,264 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package generic + +import ( + "context" + "errors" + "fmt" + "sync" + "time" + + kerrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/runtime" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" +) + +var _ Controller[runtime.Object] = &controller[runtime.Object]{} + +type controller[T runtime.Object] struct { + informer Informer[T] + queue workqueue.RateLimitingInterface + + // Returns an error if there was a transient error during reconciliation + // and the object should be tried again later. + reconciler func(namespace, name string, newObj T) error + + options ControllerOptions +} + +type ControllerOptions struct { + Name string + Workers uint +} + +func (c controller[T]) Informer() Informer[T] { + return c.informer +} + +func NewController[T runtime.Object]( + informer Informer[T], + reconciler func(namepace, name string, newObj T) error, + options ControllerOptions, +) Controller[T] { + if options.Workers == 0 { + options.Workers = 2 + } + + if len(options.Name) == 0 { + options.Name = fmt.Sprintf("%T-controller", *new(T)) + } + + return &controller[T]{ + options: options, + informer: informer, + reconciler: reconciler, + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), options.Name), + } +} + +// Runs the controller and returns an error explaining why running was stopped. +// Reconciliation ends as soon as the context completes. If there are events +// waiting to be processed at that itme, they will be dropped. +func (c *controller[T]) Run(ctx context.Context) error { + klog.Infof("starting %s", c.options.Name) + defer klog.Infof("stopping %s", c.options.Name) + + enqueue := func(obj interface{}) { + var key string + var err error + if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil { + utilruntime.HandleError(err) + return + } + c.queue.Add(key) + } + + registration, err := c.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + enqueue(obj) + }, + UpdateFunc: func(oldObj, newObj interface{}) { + oldMeta, err1 := meta.Accessor(oldObj) + newMeta, err2 := meta.Accessor(newObj) + + if err1 != nil || err2 != nil { + if err1 != nil { + utilruntime.HandleError(err1) + } + + if err2 != nil { + utilruntime.HandleError(err2) + } + return + } else if oldMeta.GetResourceVersion() == newMeta.GetResourceVersion() { + if len(oldMeta.GetResourceVersion()) == 0 { + klog.Warningf("%v throwing out update with empty RV. this is likely to happen if a test did not supply a resource version on an updated object", c.options.Name) + } + return + } + + enqueue(newObj) + }, + DeleteFunc: func(obj interface{}) { + // Enqueue + enqueue(obj) + }, + }) + + // Error might be raised if informer was started and stopped already + if err != nil { + return err + } + + // Make sure event handler is removed from informer in case return early from + // an error + defer func() { + // Remove event handler and Handle Error here. Error should only be raised + // for improper usage of event handler API. + if err := c.informer.RemoveEventHandler(registration); err != nil { + utilruntime.HandleError(err) + } + }() + + // Wait for initial cache list to complete before beginning to reconcile + // objects. + if !cache.WaitForNamedCacheSync(c.options.Name, ctx.Done(), c.informer.HasSynced) { + // ctx cancelled during cache sync. return early + err := ctx.Err() + if err == nil { + // if context wasnt cancelled then the sync failed for another reason + err = errors.New("cache sync failed") + } + return err + } + + waitGroup := sync.WaitGroup{} + + for i := uint(0); i < c.options.Workers; i++ { + waitGroup.Add(1) + go func() { + wait.Until(c.runWorker, time.Second, ctx.Done()) + waitGroup.Done() + }() + } + + klog.Infof("Started %v workers for %v", c.options.Workers, c.options.Name) + + // Wait for context cancel. + <-ctx.Done() + + // Forcefully shutdown workqueue. Drop any enqueued items. + c.queue.ShutDown() + + // Workqueue shutdown signals for workers to stop. Wait for all workers to + // clean up + waitGroup.Wait() + + // Only way for workers to ever stop is for caller to cancel the context + return ctx.Err() +} + +func (c *controller[T]) HasSynced() bool { + return c.informer.HasSynced() +} + +func (c *controller[T]) runWorker() { + for { + obj, shutdown := c.queue.Get() + if shutdown { + return + } + + // We wrap this block in a func so we can defer c.workqueue.Done. + err := func(obj interface{}) error { + // We call Done here so the workqueue knows we have finished + // processing this item. We also must remember to call Forget if we + // do not want this work item being re-queued. For example, we do + // not call Forget if a transient error occurs, instead the item is + // put back on the workqueue and attempted again after a back-off + // period. + defer c.queue.Done(obj) + var key string + var ok bool + // We expect strings to come off the workqueue. These are of the + // form namespace/name. We do this as the delayed nature of the + // workqueue means the items in the informer cache may actually be + // more up to date that when the item was initially put onto the + // workqueue. + if key, ok = obj.(string); !ok { + // How did an incorrectly formatted key get in the workqueue? + // Done is sufficient. (Forget resets rate limiter for the key, + // but the key is invalid so there is no point in doing that) + return fmt.Errorf("expected string in workqueue but got %#v", obj) + } + + if err := c.reconcile(key); err != nil { + // Put the item back on the workqueue to handle any transient errors. + c.queue.AddRateLimited(key) + return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error()) + } + // Finally, if no error occurs we Forget this item so it is allowed + // to be re-enqueued without a long rate limit + c.queue.Forget(obj) + klog.Infof("Successfully synced '%s'", key) + return nil + }(obj) + + if err != nil { + utilruntime.HandleError(err) + } + } +} + +func (c *controller[T]) reconcile(key string) error { + var newObj T + var err error + var namespace string + var name string + var lister NamespacedLister[T] + + // Convert the namespace/name string into a distinct namespace and name + namespace, name, err = cache.SplitMetaNamespaceKey(key) + if err != nil { + utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", key)) + return nil + } + + if len(namespace) > 0 { + lister = c.informer.Namespaced(namespace) + } else { + lister = c.informer + } + + newObj, err = lister.Get(name) + if err != nil { + if !kerrors.IsNotFound(err) { + return err + } + + // Deleted object. Inform reconciler with empty + } + + return c.reconciler(namespace, name, newObj) +} diff --git a/staging/src/k8s.io/apiserver/pkg/admission/plugin/cel/internal/generic/controller_test.go b/staging/src/k8s.io/apiserver/pkg/admission/plugin/cel/internal/generic/controller_test.go new file mode 100644 index 00000000000..ad78dfbabfc --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/admission/plugin/cel/internal/generic/controller_test.go @@ -0,0 +1,516 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package generic_test + +import ( + "context" + "errors" + "fmt" + "reflect" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + "github.com/stretchr/testify/require" + + k8serrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/runtime/serializer" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/watch" + + "k8s.io/apiserver/pkg/admission/plugin/cel/internal/generic" + + clienttesting "k8s.io/client-go/testing" + "k8s.io/client-go/tools/cache" +) + +type testInformer struct { + cache.SharedIndexInformer + + lock sync.Mutex + registrations map[interface{}]struct{} +} + +func (t *testInformer) AddEventHandler(handler cache.ResourceEventHandler) (cache.ResourceEventHandlerRegistration, error) { + res, err := t.SharedIndexInformer.AddEventHandler(handler) + if err != nil { + return res, err + } + + func() { + t.lock.Lock() + defer t.lock.Unlock() + if t.registrations == nil { + t.registrations = make(map[interface{}]struct{}) + } + t.registrations[res] = struct{}{} + }() + + return res, err +} + +func (t *testInformer) RemoveEventHandler(registration cache.ResourceEventHandlerRegistration) error { + func() { + t.lock.Lock() + defer t.lock.Unlock() + + if _, ok := t.registrations[registration]; !ok { + panic("removing unknown event handler?") + } + delete(t.registrations, registration) + }() + + return t.SharedIndexInformer.RemoveEventHandler(registration) +} + +var ( + scheme *runtime.Scheme = runtime.NewScheme() + codecs serializer.CodecFactory = serializer.NewCodecFactory(scheme) + fakeGVR schema.GroupVersionResource = schema.GroupVersionResource{ + Group: "fake.example.com", + Version: "v1", + Resource: "fakes", + } + fakeGVK schema.GroupVersionKind = fakeGVR.GroupVersion().WithKind("Fake") + fakeGVKList schema.GroupVersionKind = fakeGVR.GroupVersion().WithKind("FakeList") +) + +func init() { + scheme.AddKnownTypeWithName(fakeGVK, &unstructured.Unstructured{}) + scheme.AddKnownTypeWithName(fakeGVKList, &unstructured.UnstructuredList{}) +} + +func setupTest(ctx context.Context, customReconciler func(string, string, runtime.Object) error) ( + tracker clienttesting.ObjectTracker, + controller generic.Controller[*unstructured.Unstructured], + informer *testInformer, + waitForReconcile func(runtime.Object) error, +) { + tracker = clienttesting.NewObjectTracker(scheme, codecs.UniversalDecoder()) + reconciledObjects := make(chan runtime.Object) + + // Set up fake informers that return instances of mock Policy definitoins + // and mock policy bindings + informer = &testInformer{SharedIndexInformer: cache.NewSharedIndexInformer(&cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + return tracker.List(fakeGVR, fakeGVK, "") + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + return tracker.Watch(fakeGVR, "") + }, + }, &unstructured.Unstructured{}, 30*time.Second, nil)} + + reconciler := func(namespace, name string, newObj *unstructured.Unstructured) error { + var err error + copied := newObj.DeepCopyObject() + if customReconciler != nil { + err = customReconciler(namespace, name, newObj) + } + reconciledObjects <- copied + return err + } + + waitForReconcile = func(obj runtime.Object) error { + select { + case reconciledObj := <-reconciledObjects: + if reflect.DeepEqual(obj, reconciledObj) { + return nil + } + return fmt.Errorf("expected equal objects: %v", cmp.Diff(obj, reconciledObj)) + case <-ctx.Done(): + return fmt.Errorf("context done before reconcile: %w", ctx.Err()) + } + } + + myController := generic.NewController( + generic.NewInformer[*unstructured.Unstructured](informer), + reconciler, + generic.ControllerOptions{}, + ) + + go func() { + <-ctx.Done() + close(reconciledObjects) + + for leftover := range reconciledObjects { + panic(fmt.Errorf("leftover object which was not anticipated by test: %v", leftover)) + } + }() + + return tracker, myController, informer, waitForReconcile +} + +func TestReconcile(t *testing.T) { + testContext, testCancel := context.WithTimeout(context.Background(), 2*time.Second) + defer testCancel() + + tracker, myController, informer, waitForReconcile := setupTest(testContext, nil) + + // Add object to informer + initialObject := &unstructured.Unstructured{} + initialObject.SetUnstructuredContent(map[string]interface{}{ + "metadata": map[string]interface{}{ + "name": "object1", + "resourceVersion": "1", + }, + }) + initialObject.SetGroupVersionKind(fakeGVK) + + require.NoError(t, tracker.Add(initialObject)) + + wg := sync.WaitGroup{} + + // Start informer + wg.Add(1) + go func() { + defer wg.Done() + informer.Run(testContext.Done()) + }() + + // Start controller + wg.Add(1) + go func() { + defer wg.Done() + stopReason := myController.Run(testContext) + require.ErrorIs(t, stopReason, context.Canceled) + }() + + require.True(t, cache.WaitForCacheSync(testContext.Done(), myController.HasSynced)) + + // Wait for all enqueued reconciliations + require.NoError(t, waitForReconcile(initialObject)) + + // Updated object + updatedObject := &unstructured.Unstructured{} + updatedObject.SetUnstructuredContent(map[string]interface{}{ + "metadata": map[string]interface{}{ + "name": "object1", + "resourceVersion": "2", + }, + "newKey": "a key", + }) + updatedObject.SetGroupVersionKind(fakeGVK) + require.NoError(t, tracker.Update(fakeGVR, updatedObject, "")) + + // Wait for all enqueued reconciliations + require.NoError(t, waitForReconcile(updatedObject)) + require.NoError(t, tracker.Delete(fakeGVR, updatedObject.GetNamespace(), updatedObject.GetName())) + require.NoError(t, waitForReconcile(nil)) + + testCancel() + wg.Wait() +} + +func TestShutdown(t *testing.T) { + testContext, testCancel := context.WithTimeout(context.Background(), 2*time.Second) + defer testCancel() + + _, myController, informer, _ := setupTest(testContext, nil) + + wg := sync.WaitGroup{} + + // Start informer + wg.Add(1) + go func() { + defer wg.Done() + informer.Run(testContext.Done()) + }() + + // Start controller + wg.Add(1) + go func() { + defer wg.Done() + stopReason := myController.Run(testContext) + require.ErrorIs(t, stopReason, context.Canceled) + }() + + // Wait for controller and informer to start up + require.True(t, cache.WaitForCacheSync(testContext.Done(), myController.HasSynced)) + + // Stop the controller and informer + testCancel() + + // Wait for controller and informer to stop + wg.Wait() + + // Ensure the event handler was cleaned up + require.Empty(t, informer.registrations) +} + +// Show an error is thrown informer isn't started when the controller runs +func TestInformerNeverStarts(t *testing.T) { + testContext, testCancel := context.WithTimeout(context.Background(), 400*time.Millisecond) + defer testCancel() + + _, myController, informer, _ := setupTest(testContext, nil) + + wg := sync.WaitGroup{} + + // Start controller + wg.Add(1) + go func() { + defer wg.Done() + stopReason := myController.Run(testContext) + require.ErrorIs(t, stopReason, context.DeadlineExceeded) + }() + + // Wait for deadline to pass without syncing the cache + require.False(t, cache.WaitForCacheSync(testContext.Done(), myController.HasSynced)) + + // Wait for controller to stop (or context deadline will pass quickly) + wg.Wait() + + // Ensure there are no event handlers + require.Empty(t, informer.registrations) +} + +// Shows that if RV does not change, the reconciler does not get called +func TestIgnoredUpdate(t *testing.T) { + testContext, testCancel := context.WithTimeout(context.Background(), 2*time.Second) + defer testCancel() + + tracker, myController, informer, waitForReconcile := setupTest(testContext, nil) + + // Add object to informer + initialObject := &unstructured.Unstructured{} + initialObject.SetUnstructuredContent(map[string]interface{}{ + "metadata": map[string]interface{}{ + "name": "object1", + "resourceVersion": "1", + }, + }) + initialObject.SetGroupVersionKind(fakeGVK) + + require.NoError(t, tracker.Add(initialObject)) + + wg := sync.WaitGroup{} + + // Start informer + wg.Add(1) + go func() { + defer wg.Done() + informer.Run(testContext.Done()) + }() + + // Start controller + wg.Add(1) + go func() { + defer wg.Done() + stopReason := myController.Run(testContext) + require.ErrorIs(t, stopReason, context.Canceled) + }() + + require.True(t, cache.WaitForCacheSync(testContext.Done(), myController.HasSynced)) + + // Wait for all enqueued reconciliations + require.NoError(t, waitForReconcile(initialObject)) + + // Send update with the same object + require.NoError(t, tracker.Update(fakeGVR, initialObject, "")) + + // Don't wait for it to be reconciled + + testCancel() + wg.Wait() + + // Test infrastructure has logic to panic if there are any reconciled objects + // that weren't "expected" +} + +// Shows that an object which fails reconciliation will retry +func TestReconcileRetry(t *testing.T) { + testContext, testCancel := context.WithTimeout(context.Background(), 2*time.Second) + defer testCancel() + + calls := atomic.Uint64{} + success := atomic.Bool{} + tracker, myController, _, waitForReconcile := setupTest(testContext, func(s1, s2 string, o runtime.Object) error { + + if calls.Add(1) > 2 { + // Suddenly start liking the object + success.Store(true) + return nil + } + return errors.New("i dont like this object") + }) + + // Start informer + wg := sync.WaitGroup{} + + wg.Add(1) + go func() { + defer wg.Done() + myController.Informer().Run(testContext.Done()) + }() + + // Start controller + wg.Add(1) + go func() { + defer wg.Done() + stopReason := myController.Run(testContext) + require.ErrorIs(t, stopReason, context.Canceled) + }() + + // Add object to informer + initialObject := &unstructured.Unstructured{} + initialObject.SetUnstructuredContent(map[string]interface{}{ + "metadata": map[string]interface{}{ + "name": "object1", + "resourceVersion": "1", + }, + }) + initialObject.SetGroupVersionKind(fakeGVK) + require.NoError(t, tracker.Add(initialObject)) + + require.NoError(t, waitForReconcile(initialObject), "initial reconcile") + require.NoError(t, waitForReconcile(initialObject), "previous reconcile failed, should retry quickly") + require.NoError(t, waitForReconcile(initialObject), "previous reconcile failed, should retry quickly") + // Will not try again since calls > 2 for last reconcile + require.True(t, success.Load(), "last call to reconcile should return success") + testCancel() + wg.Wait() +} + +func TestInformerList(t *testing.T) { + testContext, testCancel := context.WithTimeout(context.Background(), 2*time.Second) + defer testCancel() + + tracker, myController, _, _ := setupTest(testContext, nil) + + wg := sync.WaitGroup{} + + wg.Add(1) + go func() { + defer wg.Done() + myController.Informer().Run(testContext.Done()) + }() + + require.True(t, cache.WaitForCacheSync(testContext.Done(), myController.HasSynced)) + + object1 := &unstructured.Unstructured{} + object1.SetUnstructuredContent(map[string]interface{}{ + "metadata": map[string]interface{}{ + "name": "object1", + "resourceVersion": "object1", + }, + }) + object1.SetGroupVersionKind(fakeGVK) + + object1v2 := &unstructured.Unstructured{} + object1v2.SetUnstructuredContent(map[string]interface{}{ + "metadata": map[string]interface{}{ + "name": "object1", + "resourceVersion": "object1v2", + }, + }) + object1v2.SetGroupVersionKind(fakeGVK) + + object2 := &unstructured.Unstructured{} + object2.SetUnstructuredContent(map[string]interface{}{ + "metadata": map[string]interface{}{ + "name": "object2", + "resourceVersion": "object2", + }, + }) + object2.SetGroupVersionKind(fakeGVK) + + object3 := &unstructured.Unstructured{} + object3.SetUnstructuredContent(map[string]interface{}{ + "metadata": map[string]interface{}{ + "name": "object3", + "resourceVersion": "object3", + }, + }) + object3.SetGroupVersionKind(fakeGVK) + + namespacedObject1 := &unstructured.Unstructured{} + namespacedObject1.SetUnstructuredContent(map[string]interface{}{ + "metadata": map[string]interface{}{ + "name": "namespacedObject1", + "namespace": "test", + "resourceVersion": "namespacedObject1", + }, + }) + namespacedObject1.SetGroupVersionKind(fakeGVK) + + namespacedObject2 := &unstructured.Unstructured{} + namespacedObject2.SetUnstructuredContent(map[string]interface{}{ + "metadata": map[string]interface{}{ + "name": "namespacedObject2", + "namespace": "test", + "resourceVersion": "namespacedObject2", + }, + }) + namespacedObject2.SetGroupVersionKind(fakeGVK) + + require.NoError(t, tracker.Add(object1)) + require.NoError(t, tracker.Add(object2)) + + require.NoError(t, wait.PollWithContext(testContext, 100*time.Millisecond, 500*time.Millisecond, func(ctx context.Context) (done bool, err error) { + return myController.Informer().LastSyncResourceVersion() == object2.GetResourceVersion(), nil + })) + + values, err := myController.Informer().List(labels.Everything()) + require.NoError(t, err) + require.ElementsMatch(t, []*unstructured.Unstructured{object1, object2}, values) + + require.NoError(t, tracker.Update(fakeGVR, object1v2, object1v2.GetNamespace())) + require.NoError(t, tracker.Delete(fakeGVR, object2.GetNamespace(), object2.GetName())) + require.NoError(t, tracker.Add(object3)) + + require.NoError(t, wait.PollWithContext(testContext, 100*time.Millisecond, 500*time.Millisecond, func(ctx context.Context) (done bool, err error) { + return myController.Informer().LastSyncResourceVersion() == object3.GetResourceVersion(), nil + })) + + values, err = myController.Informer().List(labels.Everything()) + require.NoError(t, err) + require.ElementsMatch(t, []*unstructured.Unstructured{object1v2, object3}, values) + + require.NoError(t, tracker.Add(namespacedObject1)) + require.NoError(t, tracker.Add(namespacedObject2)) + + require.NoError(t, wait.PollWithContext(testContext, 100*time.Millisecond, 500*time.Millisecond, func(ctx context.Context) (done bool, err error) { + return myController.Informer().LastSyncResourceVersion() == namespacedObject2.GetResourceVersion(), nil + })) + values, err = myController.Informer().Namespaced(namespacedObject1.GetNamespace()).List(labels.Everything()) + require.NoError(t, err) + require.ElementsMatch(t, []*unstructured.Unstructured{namespacedObject1, namespacedObject2}, values) + + value, err := myController.Informer().Get(object3.GetName()) + require.NoError(t, err) + require.Equal(t, value, object3) + + value, err = myController.Informer().Namespaced(namespacedObject1.GetNamespace()).Get(namespacedObject1.GetName()) + require.NoError(t, err) + require.Equal(t, value, namespacedObject1) + + _, err = myController.Informer().Get("fakeobjectname") + require.True(t, k8serrors.IsNotFound(err)) + + _, err = myController.Informer().Namespaced("test").Get("fakeobjectname") + require.True(t, k8serrors.IsNotFound(err)) + + _, err = myController.Informer().Namespaced("fakenamespace").Get("fakeobjectname") + require.True(t, k8serrors.IsNotFound(err)) +} diff --git a/staging/src/k8s.io/apiserver/pkg/admission/plugin/cel/internal/generic/doc.go b/staging/src/k8s.io/apiserver/pkg/admission/plugin/cel/internal/generic/doc.go new file mode 100644 index 00000000000..2acfad989f6 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/admission/plugin/cel/internal/generic/doc.go @@ -0,0 +1,29 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package generic contains a typed wrapper over cache SharedIndexInformer +// and Lister (maybe eventually should have a home there?) +// +// This interface is being experimented with as an easier way to write controllers +// with a bit less boilerplate. +// +// Informer/Lister classes are thin wrappers providing a type-safe interface +// over regular interface{}-based Informers/Listers +// +// Controller[T] provides a reusable way to reconcile objects out of an informer +// using the tried and true controller design pattern found all over k8s +// codebase based upon syncFunc/reconcile +package generic diff --git a/staging/src/k8s.io/apiserver/pkg/admission/plugin/cel/internal/generic/informer.go b/staging/src/k8s.io/apiserver/pkg/admission/plugin/cel/internal/generic/informer.go new file mode 100644 index 00000000000..3025aa19533 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/admission/plugin/cel/internal/generic/informer.go @@ -0,0 +1,36 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package generic + +import ( + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/tools/cache" +) + +var _ Informer[runtime.Object] = informer[runtime.Object]{} + +type informer[T runtime.Object] struct { + cache.SharedIndexInformer + lister[T] +} + +func NewInformer[T runtime.Object](informe cache.SharedIndexInformer) Informer[T] { + return informer[T]{ + SharedIndexInformer: informe, + lister: NewLister[T](informe.GetIndexer()), + } +} diff --git a/staging/src/k8s.io/apiserver/pkg/admission/plugin/cel/internal/generic/interface.go b/staging/src/k8s.io/apiserver/pkg/admission/plugin/cel/internal/generic/interface.go new file mode 100644 index 00000000000..a17821e777e --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/admission/plugin/cel/internal/generic/interface.go @@ -0,0 +1,62 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package generic + +import ( + "context" + + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/tools/cache" +) + +type Controller[T runtime.Object] interface { + // Meant to be run inside a goroutine + // Waits for and reacts to changes in whatever type the controller + // is concerned with. + // + // Returns an error always non-nil explaining why the worker stopped + Run(ctx context.Context) error + + // Retrieves the informer used to back this controller + Informer() Informer[T] + + // Returns true if the informer cache has synced, and all the objects from + // the initial list have been reconciled at least once. + HasSynced() bool +} + +type NamespacedLister[T any] interface { + // List lists all ValidationRuleSets in the indexer for a given namespace. + // Objects returned here must be treated as read-only. + List(selector labels.Selector) (ret []T, err error) + // Get retrieves the ValidationRuleSet from the indexer for a given namespace and name. + // Objects returned here must be treated as read-only. + Get(name string) (T, error) +} + +type Informer[T any] interface { + cache.SharedIndexInformer + Lister[T] +} + +// Lister[T] helps list Ts. +// All objects returned here must be treated as read-only. +type Lister[T any] interface { + NamespacedLister[T] + Namespaced(namespace string) NamespacedLister[T] +} diff --git a/staging/src/k8s.io/apiserver/pkg/admission/plugin/cel/internal/generic/lister.go b/staging/src/k8s.io/apiserver/pkg/admission/plugin/cel/internal/generic/lister.go new file mode 100644 index 00000000000..aa6b090324c --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/admission/plugin/cel/internal/generic/lister.go @@ -0,0 +1,100 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package generic + +import ( + "fmt" + "net/http" + + kerrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/tools/cache" +) + +var _ Lister[runtime.Object] = lister[runtime.Object]{} + +type namespacedLister[T runtime.Object] struct { + indexer cache.Indexer + namespace string +} + +func (w namespacedLister[T]) List(selector labels.Selector) (ret []T, err error) { + err = cache.ListAllByNamespace(w.indexer, w.namespace, selector, func(m interface{}) { + ret = append(ret, m.(T)) + }) + return ret, err +} + +func (w namespacedLister[T]) Get(name string) (T, error) { + var result T + + obj, exists, err := w.indexer.GetByKey(w.namespace + "/" + name) + if err != nil { + return result, err + } + if !exists { + return result, &kerrors.StatusError{ErrStatus: metav1.Status{ + Status: metav1.StatusFailure, + Code: http.StatusNotFound, + Reason: metav1.StatusReasonNotFound, + Message: fmt.Sprintf("%s not found", name), + }} + } + result = obj.(T) + return result, nil +} + +type lister[T runtime.Object] struct { + indexer cache.Indexer +} + +func (w lister[T]) List(selector labels.Selector) (ret []T, err error) { + err = cache.ListAll(w.indexer, selector, func(m interface{}) { + ret = append(ret, m.(T)) + }) + return ret, err +} + +func (w lister[T]) Get(name string) (T, error) { + var result T + + obj, exists, err := w.indexer.GetByKey(name) + if err != nil { + return result, err + } + if !exists { + // kerrors.StatusNotFound requires a GroupResource we cannot provide + return result, &kerrors.StatusError{ErrStatus: metav1.Status{ + Status: metav1.StatusFailure, + Code: http.StatusNotFound, + Reason: metav1.StatusReasonNotFound, + Message: fmt.Sprintf("%s not found", name), + }} + } + result = obj.(T) + return result, nil +} + +func (w lister[T]) Namespaced(namespace string) NamespacedLister[T] { + return namespacedLister[T]{namespace: namespace, indexer: w.indexer} +} + +func NewLister[T runtime.Object](indexer cache.Indexer) lister[T] { + return lister[T]{indexer: indexer} +} diff --git a/staging/src/k8s.io/apiserver/pkg/admission/plugin/cel/policy_decision.go b/staging/src/k8s.io/apiserver/pkg/admission/plugin/cel/policy_decision.go new file mode 100644 index 00000000000..800da35498b --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/admission/plugin/cel/policy_decision.go @@ -0,0 +1,53 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cel + +import ( + "encoding/json" + "fmt" +) + +type PolicyDecisionKind string + +const ( + Admit PolicyDecisionKind = "Admit" + Deny PolicyDecisionKind = "Deny" +) + +type PolicyDecision struct { + Kind PolicyDecisionKind `json:"kind"` + Message any `json:"message"` +} + +type PolicyDecisionWithMetadata struct { + PolicyDecision `json:"decision"` + Definition PolicyDefinition `json:"definition"` + Binding PolicyBinding `json:"binding"` +} + +type PolicyError struct { + Decisions []PolicyDecisionWithMetadata +} + +func (p *PolicyError) Error() string { + // Just format the error as JSON + jsonText, err := json.Marshal(p.Decisions) + if err != nil { + return fmt.Sprintf("error formatting PolicyError: %s", err.Error()) + } + return string(jsonText) +}