diff --git a/pkg/registry/flowcontrol/ensurer/flowschema.go b/pkg/registry/flowcontrol/ensurer/flowschema.go new file mode 100644 index 00000000000..9e2b793626f --- /dev/null +++ b/pkg/registry/flowcontrol/ensurer/flowschema.go @@ -0,0 +1,199 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package ensurer + +import ( + "context" + "errors" + "fmt" + + flowcontrolv1beta1 "k8s.io/api/flowcontrol/v1beta1" + "k8s.io/apimachinery/pkg/api/equality" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/sets" + flowcontrolclient "k8s.io/client-go/kubernetes/typed/flowcontrol/v1beta1" + flowcontrolapisv1beta1 "k8s.io/kubernetes/pkg/apis/flowcontrol/v1beta1" +) + +var ( + errObjectNotFlowSchema = errors.New("object is not a FlowSchema type") +) + +// FlowSchemaEnsurer ensures the specified bootstrap configuration objects +type FlowSchemaEnsurer interface { + Ensure([]*flowcontrolv1beta1.FlowSchema) error +} + +// FlowSchemaRemover removes the specified bootstrap configuration objects +type FlowSchemaRemover interface { + Remove([]string) error +} + +// NewSuggestedFlowSchemaEnsurer returns a FlowSchemaEnsurer instance that +// can be used to ensure a set of suggested FlowSchema configuration objects. +// shouldCreate indicates whether a missing 'suggested' FlowSchema object should be recreated. +func NewSuggestedFlowSchemaEnsurer(client flowcontrolclient.FlowSchemaInterface, shouldCreate bool) FlowSchemaEnsurer { + wrapper := &flowSchemaWrapper{ + client: client, + } + return &fsEnsurer{ + strategy: newSuggestedEnsureStrategy(wrapper, shouldCreate), + wrapper: wrapper, + } +} + +// NewMandatoryFlowSchemaEnsurer returns a FlowSchemaEnsurer instance that +// can be used to ensure a set of mandatory FlowSchema configuration objects. +func NewMandatoryFlowSchemaEnsurer(client flowcontrolclient.FlowSchemaInterface) FlowSchemaEnsurer { + wrapper := &flowSchemaWrapper{ + client: client, + } + return &fsEnsurer{ + strategy: newMandatoryEnsureStrategy(wrapper), + wrapper: wrapper, + } +} + +// NewFlowSchemaRemover returns a FlowSchemaRemover instance that +// can be used to remove a set of FlowSchema configuration objects. +func NewFlowSchemaRemover(client flowcontrolclient.FlowSchemaInterface) FlowSchemaRemover { + return &fsEnsurer{ + wrapper: &flowSchemaWrapper{ + client: client, + }, + } +} + +// GetFlowSchemaRemoveCandidate returns a list of FlowSchema object +// names that are candidates for deletion from the cluster. +// bootstrap: a set of hard coded FlowSchema configuration objects +// kube-apiserver maintains in-memory. +func GetFlowSchemaRemoveCandidate(client flowcontrolclient.FlowSchemaInterface, bootstrap []*flowcontrolv1beta1.FlowSchema) ([]string, error) { + // TODO(101667): Use a lister here to avoid periodic LIST calls + fsList, err := client.List(context.TODO(), metav1.ListOptions{}) + if err != nil { + return nil, fmt.Errorf("failed to list FlowSchema - %w", err) + } + + bootstrapNames := sets.String{} + for i := range bootstrap { + bootstrapNames.Insert(bootstrap[i].GetName()) + } + + currentObjects := make([]metav1.Object, len(fsList.Items)) + for i := range fsList.Items { + currentObjects[i] = &fsList.Items[i] + } + + return getRemoveCandidate(bootstrapNames, currentObjects), nil +} + +type fsEnsurer struct { + strategy ensureStrategy + wrapper configurationWrapper +} + +func (e *fsEnsurer) Ensure(flowSchemas []*flowcontrolv1beta1.FlowSchema) error { + for _, flowSchema := range flowSchemas { + if err := ensureConfiguration(e.wrapper, e.strategy, flowSchema); err != nil { + return err + } + } + + return nil +} + +func (e *fsEnsurer) Remove(flowSchemas []string) error { + for _, flowSchema := range flowSchemas { + if err := removeConfiguration(e.wrapper, flowSchema); err != nil { + return err + } + } + + return nil +} + +// flowSchemaWrapper abstracts all FlowSchema specific logic, with this +// we can manage all boiler plate code in one place. +type flowSchemaWrapper struct { + client flowcontrolclient.FlowSchemaInterface +} + +func (fs *flowSchemaWrapper) TypeName() string { + return "FlowSchema" +} + +func (fs *flowSchemaWrapper) Create(object runtime.Object) (runtime.Object, error) { + fsObject, ok := object.(*flowcontrolv1beta1.FlowSchema) + if !ok { + return nil, errObjectNotFlowSchema + } + + return fs.client.Create(context.TODO(), fsObject, metav1.CreateOptions{FieldManager: fieldManager}) +} + +func (fs *flowSchemaWrapper) Update(object runtime.Object) (runtime.Object, error) { + fsObject, ok := object.(*flowcontrolv1beta1.FlowSchema) + if !ok { + return nil, errObjectNotFlowSchema + } + + return fs.client.Update(context.TODO(), fsObject, metav1.UpdateOptions{FieldManager: fieldManager}) +} + +func (fs *flowSchemaWrapper) Get(name string) (configurationObject, error) { + return fs.client.Get(context.TODO(), name, metav1.GetOptions{}) +} + +func (fs *flowSchemaWrapper) Delete(name string) error { + return fs.client.Delete(context.TODO(), name, metav1.DeleteOptions{}) +} + +func (fs *flowSchemaWrapper) CopySpec(bootstrap, current runtime.Object) error { + bootstrapFS, ok := bootstrap.(*flowcontrolv1beta1.FlowSchema) + if !ok { + return errObjectNotFlowSchema + } + currentFS, ok := current.(*flowcontrolv1beta1.FlowSchema) + if !ok { + return errObjectNotFlowSchema + } + + specCopy := bootstrapFS.Spec.DeepCopy() + currentFS.Spec = *specCopy + return nil +} + +func (fs *flowSchemaWrapper) HasSpecChanged(bootstrap, current runtime.Object) (bool, error) { + bootstrapFS, ok := bootstrap.(*flowcontrolv1beta1.FlowSchema) + if !ok { + return false, errObjectNotFlowSchema + } + currentFS, ok := current.(*flowcontrolv1beta1.FlowSchema) + if !ok { + return false, errObjectNotFlowSchema + } + + return flowSchemaSpecChanged(bootstrapFS, currentFS), nil +} + +func flowSchemaSpecChanged(expected, actual *flowcontrolv1beta1.FlowSchema) bool { + copiedExpectedFlowSchema := expected.DeepCopy() + flowcontrolapisv1beta1.SetObjectDefaults_FlowSchema(copiedExpectedFlowSchema) + return !equality.Semantic.DeepEqual(copiedExpectedFlowSchema.Spec, actual.Spec) +} diff --git a/pkg/registry/flowcontrol/ensurer/flowschema_test.go b/pkg/registry/flowcontrol/ensurer/flowschema_test.go new file mode 100644 index 00000000000..f0d5512cd3a --- /dev/null +++ b/pkg/registry/flowcontrol/ensurer/flowschema_test.go @@ -0,0 +1,478 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package ensurer + +import ( + "context" + "reflect" + "testing" + + flowcontrolv1beta1 "k8s.io/api/flowcontrol/v1beta1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apiserver/pkg/apis/flowcontrol/bootstrap" + "k8s.io/client-go/kubernetes/fake" + flowcontrolclient "k8s.io/client-go/kubernetes/typed/flowcontrol/v1beta1" + flowcontrolapisv1beta1 "k8s.io/kubernetes/pkg/apis/flowcontrol/v1beta1" + + "github.com/google/go-cmp/cmp" + "github.com/stretchr/testify/assert" +) + +func TestEnsureFlowSchema(t *testing.T) { + tests := []struct { + name string + strategy func(flowcontrolclient.FlowSchemaInterface) FlowSchemaEnsurer + current *flowcontrolv1beta1.FlowSchema + bootstrap *flowcontrolv1beta1.FlowSchema + expected *flowcontrolv1beta1.FlowSchema + }{ + // for suggested configurations + { + name: "suggested flow schema does not exist and we should ensure - the object should be created", + strategy: func(client flowcontrolclient.FlowSchemaInterface) FlowSchemaEnsurer { + return NewSuggestedFlowSchemaEnsurer(client, true) + }, + bootstrap: newFlowSchema("fs1", "pl1", 100).Object(), + current: nil, + expected: newFlowSchema("fs1", "pl1", 100).Object(), + }, + { + name: "suggested flow schema does not exist and we should not ensure - the object should not be created", + strategy: func(client flowcontrolclient.FlowSchemaInterface) FlowSchemaEnsurer { + return NewSuggestedFlowSchemaEnsurer(client, false) + }, + bootstrap: newFlowSchema("fs1", "pl1", 100).Object(), + current: nil, + expected: nil, + }, + { + name: "suggested flow schema exists, auto update is enabled, spec does not match - current object should be updated", + strategy: func(client flowcontrolclient.FlowSchemaInterface) FlowSchemaEnsurer { + return NewSuggestedFlowSchemaEnsurer(client, true) + }, + bootstrap: newFlowSchema("fs1", "pl1", 100).Object(), + current: newFlowSchema("fs1", "pl1", 200).WithAutoUpdateAnnotation("true").Object(), + expected: newFlowSchema("fs1", "pl1", 100).WithAutoUpdateAnnotation("true").Object(), + }, + { + name: "suggested flow schema exists, auto update is disabled, spec does not match - current object should not be updated", + strategy: func(client flowcontrolclient.FlowSchemaInterface) FlowSchemaEnsurer { + return NewSuggestedFlowSchemaEnsurer(client, true) + }, + bootstrap: newFlowSchema("fs1", "pl1", 100).Object(), + current: newFlowSchema("fs1", "pl1", 200).WithAutoUpdateAnnotation("false").Object(), + expected: newFlowSchema("fs1", "pl1", 200).WithAutoUpdateAnnotation("false").Object(), + }, + + // for mandatory configurations + { + name: "mandatory flow schema does not exist - new object should be created", + strategy: func(client flowcontrolclient.FlowSchemaInterface) FlowSchemaEnsurer { + return NewMandatoryFlowSchemaEnsurer(client) + }, + bootstrap: newFlowSchema("fs1", "pl1", 100).WithAutoUpdateAnnotation("true").Object(), + current: nil, + expected: newFlowSchema("fs1", "pl1", 100).WithAutoUpdateAnnotation("true").Object(), + }, + { + name: "mandatory flow schema exists, annotation is missing - annotation should be added", + strategy: func(client flowcontrolclient.FlowSchemaInterface) FlowSchemaEnsurer { + return NewMandatoryFlowSchemaEnsurer(client) + }, + bootstrap: newFlowSchema("fs1", "pl1", 100).Object(), + current: newFlowSchema("fs1", "pl1", 100).Object(), + expected: newFlowSchema("fs1", "pl1", 100).WithAutoUpdateAnnotation("true").Object(), + }, + { + name: "mandatory flow schema exists, auto update is disabled, spec does not match - current object should be updated", + strategy: func(client flowcontrolclient.FlowSchemaInterface) FlowSchemaEnsurer { + return NewMandatoryFlowSchemaEnsurer(client) + }, + bootstrap: newFlowSchema("fs1", "pl1", 100).Object(), + current: newFlowSchema("fs1", "pl1", 200).WithAutoUpdateAnnotation("false").Object(), + expected: newFlowSchema("fs1", "pl1", 100).WithAutoUpdateAnnotation("true").Object(), + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + client := fake.NewSimpleClientset().FlowcontrolV1beta1().FlowSchemas() + if test.current != nil { + client.Create(context.TODO(), test.current, metav1.CreateOptions{}) + } + + ensurer := test.strategy(client) + + err := ensurer.Ensure([]*flowcontrolv1beta1.FlowSchema{test.bootstrap}) + if err != nil { + t.Fatalf("Expected no error, but got: %v", err) + } + + fsGot, err := client.Get(context.TODO(), test.bootstrap.Name, metav1.GetOptions{}) + switch { + case test.expected == nil: + if !apierrors.IsNotFound(err) { + t.Fatalf("Expected GET to return an %q error, but got: %v", metav1.StatusReasonNotFound, err) + } + case err != nil: + t.Fatalf("Expected GET to return no error, but got: %v", err) + } + + if !reflect.DeepEqual(test.expected, fsGot) { + t.Errorf("FlowSchema does not match - diff: %s", cmp.Diff(test.expected, fsGot)) + } + }) + } +} + +func TestSuggestedFSEnsureStrategy_ShouldUpdate(t *testing.T) { + tests := []struct { + name string + current *flowcontrolv1beta1.FlowSchema + bootstrap *flowcontrolv1beta1.FlowSchema + newObjectExpected *flowcontrolv1beta1.FlowSchema + }{ + { + name: "auto update is enabled, first generation, spec does not match - spec update expected", + current: newFlowSchema("fs1", "pl1", 100).WithAutoUpdateAnnotation("true").WithGeneration(1).Object(), + bootstrap: newFlowSchema("fs1", "pl1", 200).Object(), + newObjectExpected: newFlowSchema("fs1", "pl1", 200).WithAutoUpdateAnnotation("true").WithGeneration(1).Object(), + }, + { + name: "auto update is enabled, first generation, spec matches - no update expected", + current: newFlowSchema("fs1", "pl1", 100).WithAutoUpdateAnnotation("true").WithGeneration(1).Object(), + bootstrap: newFlowSchema("fs1", "pl1", 100).Object(), + newObjectExpected: nil, + }, + { + name: "auto update is enabled, second generation, spec does not match - spec update expected", + current: newFlowSchema("fs1", "pl1", 100).WithAutoUpdateAnnotation("true").WithGeneration(2).Object(), + bootstrap: newFlowSchema("fs1", "pl2", 200).Object(), + newObjectExpected: newFlowSchema("fs1", "pl2", 200).WithAutoUpdateAnnotation("true").WithGeneration(2).Object(), + }, + { + name: "auto update is enabled, second generation, spec matches - no update expected", + current: newFlowSchema("fs1", "pl1", 100).WithAutoUpdateAnnotation("true").WithGeneration(2).Object(), + bootstrap: newFlowSchema("fs1", "pl1", 100).Object(), + newObjectExpected: nil, + }, + { + name: "auto update is disabled, first generation, spec does not match - no update expected", + current: newFlowSchema("fs1", "pl1", 100).WithAutoUpdateAnnotation("false").WithGeneration(1).Object(), + bootstrap: newFlowSchema("fs1", "pl1", 200).Object(), + newObjectExpected: nil, + }, + { + name: "auto update is disabled, first generation, spec matches - no update expected", + current: newFlowSchema("fs1", "pl1", 100).WithAutoUpdateAnnotation("false").WithGeneration(1).Object(), + bootstrap: newFlowSchema("fs1", "pl1", 100).Object(), + newObjectExpected: nil, + }, + { + name: "auto update is disabled, second generation, spec does not match - no update expected", + current: newFlowSchema("fs1", "pl1", 100).WithAutoUpdateAnnotation("false").WithGeneration(2).Object(), + bootstrap: newFlowSchema("fs1", "pl2", 200).Object(), + newObjectExpected: nil, + }, + { + name: "auto update is disabled, second generation, spec matches - no update expected", + current: newFlowSchema("fs1", "pl1", 100).WithAutoUpdateAnnotation("false").WithGeneration(2).Object(), + bootstrap: newFlowSchema("fs1", "pl1", 100).Object(), + newObjectExpected: nil, + }, + { + name: "annotation is missing, first generation, spec does not match - both annotation and spec update expected", + current: newFlowSchema("fs1", "pl1", 100).WithGeneration(1).Object(), + bootstrap: newFlowSchema("fs1", "pl2", 200).Object(), + newObjectExpected: newFlowSchema("fs1", "pl2", 200).WithAutoUpdateAnnotation("true").WithGeneration(1).Object(), + }, + { + name: "annotation is missing, first generation, spec matches - annotation update is expected", + current: newFlowSchema("fs1", "pl1", 100).WithGeneration(1).Object(), + bootstrap: newFlowSchema("fs1", "pl1", 100).Object(), + newObjectExpected: newFlowSchema("fs1", "pl1", 100).WithAutoUpdateAnnotation("true").WithGeneration(1).Object(), + }, + { + name: "annotation is missing, second generation, spec does not match - annotation update is expected", + current: newFlowSchema("fs1", "pl1", 100).WithGeneration(2).Object(), + bootstrap: newFlowSchema("fs1", "pl2", 200).Object(), + newObjectExpected: newFlowSchema("fs1", "pl1", 100).WithAutoUpdateAnnotation("false").WithGeneration(2).Object(), + }, + { + name: "annotation is missing, second generation, spec matches - annotation update is expected", + current: newFlowSchema("fs1", "pl1", 100).WithGeneration(2).Object(), + bootstrap: newFlowSchema("fs1", "pl1", 100).Object(), + newObjectExpected: newFlowSchema("fs1", "pl1", 100).WithAutoUpdateAnnotation("false").WithGeneration(2).Object(), + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + strategy := newSuggestedEnsureStrategy(&flowSchemaWrapper{}, false) + newObjectGot, updateGot, err := strategy.ShouldUpdate(test.current, test.bootstrap) + if err != nil { + t.Errorf("Expected no error, but got: %v", err) + } + + if test.newObjectExpected == nil { + if newObjectGot != nil { + t.Errorf("Expected a nil object, but got: %#v", newObjectGot) + } + if updateGot { + t.Errorf("Expected update=%t but got: %t", false, updateGot) + } + return + } + + if !updateGot { + t.Errorf("Expected update=%t but got: %t", true, updateGot) + } + if !reflect.DeepEqual(test.newObjectExpected, newObjectGot) { + t.Errorf("Expected the object to be updated to match - diff: %s", cmp.Diff(test.newObjectExpected, newObjectGot)) + } + }) + } +} + +func TestFlowSchemaSpecChanged(t *testing.T) { + fs1 := &flowcontrolv1beta1.FlowSchema{ + Spec: flowcontrolv1beta1.FlowSchemaSpec{}, + } + fs2 := &flowcontrolv1beta1.FlowSchema{ + Spec: flowcontrolv1beta1.FlowSchemaSpec{ + MatchingPrecedence: 1, + }, + } + fs1Defaulted := &flowcontrolv1beta1.FlowSchema{ + Spec: flowcontrolv1beta1.FlowSchemaSpec{ + MatchingPrecedence: flowcontrolapisv1beta1.FlowSchemaDefaultMatchingPrecedence, + }, + } + testCases := []struct { + name string + expected *flowcontrolv1beta1.FlowSchema + actual *flowcontrolv1beta1.FlowSchema + specChanged bool + }{ + { + name: "identical flow-schemas should work", + expected: bootstrap.MandatoryFlowSchemaCatchAll, + actual: bootstrap.MandatoryFlowSchemaCatchAll, + specChanged: false, + }, + { + name: "defaulted flow-schemas should work", + expected: fs1, + actual: fs1Defaulted, + specChanged: false, + }, + { + name: "non-defaulted flow-schema has wrong spec", + expected: fs1, + actual: fs2, + specChanged: true, + }, + } + for _, testCase := range testCases { + t.Run(testCase.name, func(t *testing.T) { + w := flowSchemaSpecChanged(testCase.expected, testCase.actual) + assert.Equal(t, testCase.specChanged, w) + }) + } +} + +func TestRemoveFlowSchema(t *testing.T) { + tests := []struct { + name string + current *flowcontrolv1beta1.FlowSchema + bootstrapName string + removeExpected bool + }{ + { + name: "flow schema does not exist", + bootstrapName: "fs1", + current: nil, + }, + { + name: "flow schema exists, auto update is enabled", + bootstrapName: "fs1", + current: newFlowSchema("fs1", "pl1", 200).WithAutoUpdateAnnotation("true").Object(), + removeExpected: true, + }, + { + name: "flow schema exists, auto update is disabled", + bootstrapName: "fs1", + current: newFlowSchema("fs1", "pl1", 200).WithAutoUpdateAnnotation("false").Object(), + removeExpected: false, + }, + { + name: "flow schema exists, the auto-update annotation is malformed", + bootstrapName: "fs1", + current: newFlowSchema("fs1", "pl1", 200).WithAutoUpdateAnnotation("invalid").Object(), + removeExpected: false, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + client := fake.NewSimpleClientset().FlowcontrolV1beta1().FlowSchemas() + if test.current != nil { + client.Create(context.TODO(), test.current, metav1.CreateOptions{}) + } + + remover := NewFlowSchemaRemover(client) + err := remover.Remove([]string{test.bootstrapName}) + if err != nil { + t.Fatalf("Expected no error, but got: %v", err) + } + + if test.current == nil { + return + } + _, err = client.Get(context.TODO(), test.bootstrapName, metav1.GetOptions{}) + switch { + case test.removeExpected: + if !apierrors.IsNotFound(err) { + t.Errorf("Expected error: %q, but got: %v", metav1.StatusReasonNotFound, err) + } + default: + if err != nil { + t.Errorf("Expected no error, but got: %v", err) + } + } + }) + } +} + +func TestGetFlowSchemaRemoveCandidate(t *testing.T) { + tests := []struct { + name string + current []*flowcontrolv1beta1.FlowSchema + bootstrap []*flowcontrolv1beta1.FlowSchema + expected []string + }{ + { + name: "no object has been removed from the bootstrap configuration", + bootstrap: []*flowcontrolv1beta1.FlowSchema{ + newFlowSchema("fs1", "pl1", 100).WithAutoUpdateAnnotation("true").Object(), + newFlowSchema("fs2", "pl2", 200).WithAutoUpdateAnnotation("true").Object(), + newFlowSchema("fs3", "pl3", 300).WithAutoUpdateAnnotation("true").Object(), + }, + current: []*flowcontrolv1beta1.FlowSchema{ + newFlowSchema("fs1", "pl1", 100).WithAutoUpdateAnnotation("true").Object(), + newFlowSchema("fs2", "pl2", 200).WithAutoUpdateAnnotation("true").Object(), + newFlowSchema("fs3", "pl3", 300).WithAutoUpdateAnnotation("true").Object(), + }, + expected: []string{}, + }, + { + name: "bootstrap is empty, all current objects with the annotation should be candidates", + bootstrap: []*flowcontrolv1beta1.FlowSchema{}, + current: []*flowcontrolv1beta1.FlowSchema{ + newFlowSchema("fs1", "pl1", 100).WithAutoUpdateAnnotation("true").Object(), + newFlowSchema("fs2", "pl2", 200).WithAutoUpdateAnnotation("true").Object(), + newFlowSchema("fs3", "pl3", 300).Object(), + }, + expected: []string{"fs1", "fs2"}, + }, + { + name: "object(s) have been removed from the bootstrap configuration", + bootstrap: []*flowcontrolv1beta1.FlowSchema{ + newFlowSchema("fs1", "pl1", 100).WithAutoUpdateAnnotation("true").Object(), + }, + current: []*flowcontrolv1beta1.FlowSchema{ + newFlowSchema("fs1", "pl1", 100).WithAutoUpdateAnnotation("true").Object(), + newFlowSchema("fs2", "pl2", 200).WithAutoUpdateAnnotation("true").Object(), + newFlowSchema("fs3", "pl3", 300).WithAutoUpdateAnnotation("true").Object(), + }, + expected: []string{"fs2", "fs3"}, + }, + { + name: "object(s) without the annotation key are ignored", + bootstrap: []*flowcontrolv1beta1.FlowSchema{ + newFlowSchema("fs1", "pl1", 100).WithAutoUpdateAnnotation("true").Object(), + }, + current: []*flowcontrolv1beta1.FlowSchema{ + newFlowSchema("fs1", "pl1", 100).WithAutoUpdateAnnotation("true").Object(), + newFlowSchema("fs2", "pl2", 200).Object(), + newFlowSchema("fs3", "pl3", 300).Object(), + }, + expected: []string{}, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + client := fake.NewSimpleClientset().FlowcontrolV1beta1().FlowSchemas() + for i := range test.current { + client.Create(context.TODO(), test.current[i], metav1.CreateOptions{}) + } + + removeListGot, err := GetFlowSchemaRemoveCandidate(client, test.bootstrap) + if err != nil { + t.Fatalf("Expected no error, but got: %v", err) + } + + if !cmp.Equal(test.expected, removeListGot) { + t.Errorf("Remove candidate list does not match - diff: %s", cmp.Diff(test.expected, removeListGot)) + } + }) + } +} + +type fsBuilder struct { + object *flowcontrolv1beta1.FlowSchema +} + +func newFlowSchema(name, plName string, matchingPrecedence int32) *fsBuilder { + return &fsBuilder{ + object: &flowcontrolv1beta1.FlowSchema{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + }, + Spec: flowcontrolv1beta1.FlowSchemaSpec{ + PriorityLevelConfiguration: flowcontrolv1beta1.PriorityLevelConfigurationReference{ + Name: plName, + }, + MatchingPrecedence: matchingPrecedence, + }, + }, + } +} + +func (b *fsBuilder) Object() *flowcontrolv1beta1.FlowSchema { + return b.object +} + +func (b *fsBuilder) WithGeneration(value int64) *fsBuilder { + b.object.SetGeneration(value) + return b +} + +func (b *fsBuilder) WithAutoUpdateAnnotation(value string) *fsBuilder { + setAnnotation(b.object, value) + return b +} + +func setAnnotation(accessor metav1.Object, value string) { + if accessor.GetAnnotations() == nil { + accessor.SetAnnotations(map[string]string{}) + } + + accessor.GetAnnotations()[flowcontrolv1beta1.AutoUpdateAnnotationKey] = value +} diff --git a/pkg/registry/flowcontrol/ensurer/prioritylevelconfiguration.go b/pkg/registry/flowcontrol/ensurer/prioritylevelconfiguration.go new file mode 100644 index 00000000000..9f8235a5c48 --- /dev/null +++ b/pkg/registry/flowcontrol/ensurer/prioritylevelconfiguration.go @@ -0,0 +1,199 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package ensurer + +import ( + "context" + "errors" + "fmt" + + flowcontrolv1beta1 "k8s.io/api/flowcontrol/v1beta1" + "k8s.io/apimachinery/pkg/api/equality" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/sets" + flowcontrolclient "k8s.io/client-go/kubernetes/typed/flowcontrol/v1beta1" + flowcontrolapisv1beta1 "k8s.io/kubernetes/pkg/apis/flowcontrol/v1beta1" +) + +var ( + errObjectNotPriorityLevel = errors.New("object is not a PriorityLevelConfiguration type") +) + +// PriorityLevelEnsurer ensures the specified bootstrap configuration objects +type PriorityLevelEnsurer interface { + Ensure([]*flowcontrolv1beta1.PriorityLevelConfiguration) error +} + +// PriorityLevelRemover removes the specified bootstrap configuration objects +type PriorityLevelRemover interface { + Remove([]string) error +} + +// NewSuggestedPriorityLevelEnsurerEnsurer returns a PriorityLevelEnsurer instance that +// can be used to ensure a set of suggested PriorityLevelConfiguration configuration objects. +// shouldCreate indicates whether a missing 'suggested' PriorityLevelConfiguration object should be recreated. +func NewSuggestedPriorityLevelEnsurerEnsurer(client flowcontrolclient.PriorityLevelConfigurationInterface, shouldCreate bool) PriorityLevelEnsurer { + wrapper := &priorityLevelConfigurationWrapper{ + client: client, + } + return &plEnsurer{ + strategy: newSuggestedEnsureStrategy(wrapper, shouldCreate), + wrapper: wrapper, + } +} + +// NewMandatoryPriorityLevelEnsurer returns a PriorityLevelEnsurer instance that +// can be used to ensure a set of mandatory PriorityLevelConfiguration configuration objects. +func NewMandatoryPriorityLevelEnsurer(client flowcontrolclient.PriorityLevelConfigurationInterface) PriorityLevelEnsurer { + wrapper := &priorityLevelConfigurationWrapper{ + client: client, + } + return &plEnsurer{ + strategy: newMandatoryEnsureStrategy(wrapper), + wrapper: wrapper, + } +} + +// NewPriorityLevelRemover returns a PriorityLevelRemover instance that +// can be used to remove a set of PriorityLevelConfiguration configuration objects. +func NewPriorityLevelRemover(client flowcontrolclient.PriorityLevelConfigurationInterface) PriorityLevelRemover { + return &plEnsurer{ + wrapper: &priorityLevelConfigurationWrapper{ + client: client, + }, + } +} + +// GetPriorityLevelRemoveCandidate returns a list of PriorityLevelConfiguration +// names that are candidates for removal from the cluster. +// bootstrap: a set of hard coded PriorityLevelConfiguration configuration +// objects kube-apiserver maintains in-memory. +func GetPriorityLevelRemoveCandidate(client flowcontrolclient.PriorityLevelConfigurationInterface, bootstrap []*flowcontrolv1beta1.PriorityLevelConfiguration) ([]string, error) { + // TODO(101667): Use a lister here to avoid periodic LIST calls + plList, err := client.List(context.TODO(), metav1.ListOptions{}) + if err != nil { + return nil, fmt.Errorf("failed to list PriorityLevelConfiguration - %w", err) + } + + bootstrapNames := sets.String{} + for i := range bootstrap { + bootstrapNames.Insert(bootstrap[i].GetName()) + } + + currentObjects := make([]metav1.Object, len(plList.Items)) + for i := range plList.Items { + currentObjects[i] = &plList.Items[i] + } + + return getRemoveCandidate(bootstrapNames, currentObjects), nil +} + +type plEnsurer struct { + strategy ensureStrategy + wrapper configurationWrapper +} + +func (e *plEnsurer) Ensure(priorityLevels []*flowcontrolv1beta1.PriorityLevelConfiguration) error { + for _, priorityLevel := range priorityLevels { + if err := ensureConfiguration(e.wrapper, e.strategy, priorityLevel); err != nil { + return err + } + } + + return nil +} + +func (e *plEnsurer) Remove(priorityLevels []string) error { + for _, priorityLevel := range priorityLevels { + if err := removeConfiguration(e.wrapper, priorityLevel); err != nil { + return err + } + } + + return nil +} + +// priorityLevelConfigurationWrapper abstracts all PriorityLevelConfiguration specific logic, +// with this we can manage all boiler plate code in one place. +type priorityLevelConfigurationWrapper struct { + client flowcontrolclient.PriorityLevelConfigurationInterface +} + +func (fs *priorityLevelConfigurationWrapper) TypeName() string { + return "PriorityLevelConfiguration" +} + +func (fs *priorityLevelConfigurationWrapper) Create(object runtime.Object) (runtime.Object, error) { + plObject, ok := object.(*flowcontrolv1beta1.PriorityLevelConfiguration) + if !ok { + return nil, errObjectNotPriorityLevel + } + + return fs.client.Create(context.TODO(), plObject, metav1.CreateOptions{FieldManager: fieldManager}) +} + +func (fs *priorityLevelConfigurationWrapper) Update(object runtime.Object) (runtime.Object, error) { + fsObject, ok := object.(*flowcontrolv1beta1.PriorityLevelConfiguration) + if !ok { + return nil, errObjectNotPriorityLevel + } + + return fs.client.Update(context.TODO(), fsObject, metav1.UpdateOptions{FieldManager: fieldManager}) +} + +func (fs *priorityLevelConfigurationWrapper) Get(name string) (configurationObject, error) { + return fs.client.Get(context.TODO(), name, metav1.GetOptions{}) +} + +func (fs *priorityLevelConfigurationWrapper) Delete(name string) error { + return fs.client.Delete(context.TODO(), name, metav1.DeleteOptions{}) +} + +func (fs *priorityLevelConfigurationWrapper) CopySpec(bootstrap, current runtime.Object) error { + bootstrapFS, ok := bootstrap.(*flowcontrolv1beta1.PriorityLevelConfiguration) + if !ok { + return errObjectNotPriorityLevel + } + currentFS, ok := current.(*flowcontrolv1beta1.PriorityLevelConfiguration) + if !ok { + return errObjectNotPriorityLevel + } + + specCopy := bootstrapFS.Spec.DeepCopy() + currentFS.Spec = *specCopy + return nil +} + +func (fs *priorityLevelConfigurationWrapper) HasSpecChanged(bootstrap, current runtime.Object) (bool, error) { + bootstrapFS, ok := bootstrap.(*flowcontrolv1beta1.PriorityLevelConfiguration) + if !ok { + return false, errObjectNotPriorityLevel + } + currentFS, ok := current.(*flowcontrolv1beta1.PriorityLevelConfiguration) + if !ok { + return false, errObjectNotPriorityLevel + } + + return priorityLevelSpecChanged(bootstrapFS, currentFS), nil +} + +func priorityLevelSpecChanged(expected, actual *flowcontrolv1beta1.PriorityLevelConfiguration) bool { + copiedExpectedPriorityLevel := expected.DeepCopy() + flowcontrolapisv1beta1.SetObjectDefaults_PriorityLevelConfiguration(copiedExpectedPriorityLevel) + return !equality.Semantic.DeepEqual(copiedExpectedPriorityLevel.Spec, actual.Spec) +} diff --git a/pkg/registry/flowcontrol/ensurer/prioritylevelconfiguration_test.go b/pkg/registry/flowcontrol/ensurer/prioritylevelconfiguration_test.go new file mode 100644 index 00000000000..9a20002df8d --- /dev/null +++ b/pkg/registry/flowcontrol/ensurer/prioritylevelconfiguration_test.go @@ -0,0 +1,508 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package ensurer + +import ( + "context" + "reflect" + "testing" + + flowcontrolv1beta1 "k8s.io/api/flowcontrol/v1beta1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apiserver/pkg/apis/flowcontrol/bootstrap" + "k8s.io/client-go/kubernetes/fake" + flowcontrolclient "k8s.io/client-go/kubernetes/typed/flowcontrol/v1beta1" + flowcontrolapisv1beta1 "k8s.io/kubernetes/pkg/apis/flowcontrol/v1beta1" + + "github.com/google/go-cmp/cmp" + "github.com/stretchr/testify/assert" +) + +func TestEnsurePriorityLevel(t *testing.T) { + tests := []struct { + name string + strategy func(flowcontrolclient.PriorityLevelConfigurationInterface) PriorityLevelEnsurer + current *flowcontrolv1beta1.PriorityLevelConfiguration + bootstrap *flowcontrolv1beta1.PriorityLevelConfiguration + expected *flowcontrolv1beta1.PriorityLevelConfiguration + }{ + // for suggested configurations + { + name: "suggested priority level configuration does not exist and we should ensure - new object should be created", + strategy: func(client flowcontrolclient.PriorityLevelConfigurationInterface) PriorityLevelEnsurer { + return NewSuggestedPriorityLevelEnsurerEnsurer(client, true) + }, + bootstrap: newPLConfiguration("pl1").WithLimited(10).Object(), + current: nil, + expected: newPLConfiguration("pl1").WithLimited(10).Object(), + }, + { + name: "suggested priority level configuration does not exist and we should not ensure - new object should not be created", + strategy: func(client flowcontrolclient.PriorityLevelConfigurationInterface) PriorityLevelEnsurer { + return NewSuggestedPriorityLevelEnsurerEnsurer(client, false) + }, + bootstrap: newPLConfiguration("pl1").WithLimited(10).Object(), + current: nil, + expected: nil, + }, + { + name: "suggested priority level configuration exists, auto update is enabled, spec does not match - current object should be updated", + strategy: func(client flowcontrolclient.PriorityLevelConfigurationInterface) PriorityLevelEnsurer { + return NewSuggestedPriorityLevelEnsurerEnsurer(client, true) + }, bootstrap: newPLConfiguration("pl1").WithLimited(20).Object(), + current: newPLConfiguration("pl1").WithAutoUpdateAnnotation("true").WithLimited(10).Object(), + expected: newPLConfiguration("pl1").WithAutoUpdateAnnotation("true").WithLimited(20).Object(), + }, + { + name: "suggested priority level configuration exists, auto update is disabled, spec does not match - current object should not be updated", + strategy: func(client flowcontrolclient.PriorityLevelConfigurationInterface) PriorityLevelEnsurer { + return NewSuggestedPriorityLevelEnsurerEnsurer(client, true) + }, + bootstrap: newPLConfiguration("pl1").WithLimited(20).Object(), + current: newPLConfiguration("pl1").WithAutoUpdateAnnotation("false").WithLimited(10).Object(), + expected: newPLConfiguration("pl1").WithAutoUpdateAnnotation("false").WithLimited(10).Object(), + }, + + // for mandatory configurations + { + name: "mandatory priority level configuration does not exist - new object should be created", + strategy: func(client flowcontrolclient.PriorityLevelConfigurationInterface) PriorityLevelEnsurer { + return NewMandatoryPriorityLevelEnsurer(client) + }, + bootstrap: newPLConfiguration("pl1").WithLimited(10).WithAutoUpdateAnnotation("true").Object(), + current: nil, + expected: newPLConfiguration("pl1").WithLimited(10).WithAutoUpdateAnnotation("true").Object(), + }, + { + name: "mandatory priority level configuration exists, annotation is missing - annotation is added", + strategy: func(client flowcontrolclient.PriorityLevelConfigurationInterface) PriorityLevelEnsurer { + return NewMandatoryPriorityLevelEnsurer(client) + }, + bootstrap: newPLConfiguration("pl1").WithLimited(20).Object(), + current: newPLConfiguration("pl1").WithLimited(20).Object(), + expected: newPLConfiguration("pl1").WithAutoUpdateAnnotation("true").WithLimited(20).Object(), + }, + { + name: "mandatory priority level configuration exists, auto update is disabled, spec does not match - current object should be updated", + strategy: func(client flowcontrolclient.PriorityLevelConfigurationInterface) PriorityLevelEnsurer { + return NewMandatoryPriorityLevelEnsurer(client) + }, + bootstrap: newPLConfiguration("pl1").WithLimited(20).Object(), + current: newPLConfiguration("pl1").WithAutoUpdateAnnotation("false").WithLimited(10).Object(), + expected: newPLConfiguration("pl1").WithAutoUpdateAnnotation("true").WithLimited(20).Object(), + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + + client := fake.NewSimpleClientset().FlowcontrolV1beta1().PriorityLevelConfigurations() + if test.current != nil { + client.Create(context.TODO(), test.current, metav1.CreateOptions{}) + } + + ensurer := test.strategy(client) + + err := ensurer.Ensure([]*flowcontrolv1beta1.PriorityLevelConfiguration{test.bootstrap}) + if err != nil { + t.Fatalf("Expected no error, but got: %v", err) + } + + plGot, err := client.Get(context.TODO(), test.bootstrap.Name, metav1.GetOptions{}) + switch { + case test.expected == nil: + if !apierrors.IsNotFound(err) { + t.Fatalf("Expected GET to return an %q error, but got: %v", metav1.StatusReasonNotFound, err) + } + case err != nil: + t.Fatalf("Expected GET to return no error, but got: %v", err) + } + + if !reflect.DeepEqual(test.expected, plGot) { + t.Errorf("PriorityLevelConfiguration does not match - diff: %s", cmp.Diff(test.expected, plGot)) + } + }) + } +} + +func TestSuggestedPLEnsureStrategy_ShouldUpdate(t *testing.T) { + tests := []struct { + name string + current *flowcontrolv1beta1.PriorityLevelConfiguration + bootstrap *flowcontrolv1beta1.PriorityLevelConfiguration + newObjectExpected *flowcontrolv1beta1.PriorityLevelConfiguration + }{ + { + name: "auto update is enabled, first generation, spec does not match - spec update expected", + current: newPLConfiguration("foo").WithAutoUpdateAnnotation("true").WithGeneration(1).WithLimited(5).Object(), + bootstrap: newPLConfiguration("foo").WithLimited(10).Object(), + newObjectExpected: newPLConfiguration("foo").WithAutoUpdateAnnotation("true").WithGeneration(1).WithLimited(10).Object(), + }, + { + name: "auto update is enabled, first generation, spec matches - no update expected", + current: newPLConfiguration("foo").WithAutoUpdateAnnotation("true").WithGeneration(1).WithLimited(5).Object(), + bootstrap: newPLConfiguration("foo").WithGeneration(1).WithLimited(5).Object(), + newObjectExpected: nil, + }, + { + name: "auto update is enabled, second generation, spec does not match - spec update expected", + current: newPLConfiguration("foo").WithAutoUpdateAnnotation("true").WithGeneration(2).WithLimited(5).Object(), + bootstrap: newPLConfiguration("foo").WithLimited(10).Object(), + newObjectExpected: newPLConfiguration("foo").WithAutoUpdateAnnotation("true").WithGeneration(2).WithLimited(10).Object(), + }, + { + name: "auto update is enabled, second generation, spec matches - no update expected", + current: newPLConfiguration("foo").WithAutoUpdateAnnotation("true").WithGeneration(2).WithLimited(5).Object(), + bootstrap: newPLConfiguration("foo").WithLimited(5).Object(), + newObjectExpected: nil, + }, + { + name: "auto update is disabled, first generation, spec does not match - no update expected", + current: newPLConfiguration("foo").WithAutoUpdateAnnotation("false").WithGeneration(1).WithLimited(5).Object(), + bootstrap: newPLConfiguration("foo").WithLimited(10).Object(), + newObjectExpected: nil, + }, + { + name: "auto update is disabled, first generation, spec matches - no update expected", + current: newPLConfiguration("foo").WithAutoUpdateAnnotation("false").WithGeneration(1).WithLimited(5).Object(), + bootstrap: newPLConfiguration("foo").WithLimited(5).Object(), + newObjectExpected: nil, + }, + { + name: "auto update is disabled, second generation, spec does not match - no update expected", + current: newPLConfiguration("foo").WithAutoUpdateAnnotation("false").WithGeneration(2).WithLimited(5).Object(), + bootstrap: newPLConfiguration("foo").WithLimited(10).Object(), + newObjectExpected: nil, + }, + { + name: "auto update is disabled, second generation, spec matches - no update expected", + current: newPLConfiguration("foo").WithAutoUpdateAnnotation("false").WithGeneration(2).WithLimited(5).Object(), + bootstrap: newPLConfiguration("foo").WithLimited(5).Object(), + newObjectExpected: nil, + }, + { + name: "annotation is missing, first generation, spec does not match - both annotation and spec update expected", + current: newPLConfiguration("foo").WithGeneration(1).WithLimited(5).Object(), + bootstrap: newPLConfiguration("foo").WithLimited(10).Object(), + newObjectExpected: newPLConfiguration("foo").WithAutoUpdateAnnotation("true").WithGeneration(1).WithLimited(10).Object(), + }, + { + name: "annotation is missing, first generation, spec matches - annotation update is expected", + current: newPLConfiguration("foo").WithGeneration(1).WithLimited(5).Object(), + bootstrap: newPLConfiguration("foo").WithLimited(5).Object(), + newObjectExpected: newPLConfiguration("foo").WithAutoUpdateAnnotation("true").WithGeneration(1).WithLimited(5).Object(), + }, + { + name: "annotation is missing, second generation, spec does not match - annotation update is expected", + current: newPLConfiguration("foo").WithGeneration(2).WithLimited(5).Object(), + bootstrap: newPLConfiguration("foo").WithLimited(10).Object(), + newObjectExpected: newPLConfiguration("foo").WithAutoUpdateAnnotation("false").WithGeneration(2).WithLimited(5).Object(), + }, + { + name: "annotation is missing, second generation, spec matches - annotation update is expected", + current: newPLConfiguration("foo").WithGeneration(2).WithLimited(5).Object(), + bootstrap: newPLConfiguration("foo").WithLimited(5).Object(), + newObjectExpected: newPLConfiguration("foo").WithAutoUpdateAnnotation("false").WithGeneration(2).WithLimited(5).Object(), + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + strategy := newSuggestedEnsureStrategy(&priorityLevelConfigurationWrapper{}, false) + newObjectGot, updateGot, err := strategy.ShouldUpdate(test.current, test.bootstrap) + if err != nil { + t.Errorf("Expected no error, but got: %v", err) + } + + if test.newObjectExpected == nil { + if newObjectGot != nil { + t.Errorf("Expected a nil object, but got: %#v", newObjectGot) + } + if updateGot { + t.Errorf("Expected update=%t but got: %t", false, updateGot) + } + return + } + + if !updateGot { + t.Errorf("Expected update=%t but got: %t", true, updateGot) + } + if !reflect.DeepEqual(test.newObjectExpected, newObjectGot) { + t.Errorf("Expected the object to be updated to match - diff: %s", cmp.Diff(test.newObjectExpected, newObjectGot)) + } + }) + } +} + +func TestPriorityLevelSpecChanged(t *testing.T) { + pl1 := &flowcontrolv1beta1.PriorityLevelConfiguration{ + Spec: flowcontrolv1beta1.PriorityLevelConfigurationSpec{ + Type: flowcontrolv1beta1.PriorityLevelEnablementLimited, + Limited: &flowcontrolv1beta1.LimitedPriorityLevelConfiguration{ + LimitResponse: flowcontrolv1beta1.LimitResponse{ + Type: flowcontrolv1beta1.LimitResponseTypeReject, + }, + }, + }, + } + pl2 := &flowcontrolv1beta1.PriorityLevelConfiguration{ + Spec: flowcontrolv1beta1.PriorityLevelConfigurationSpec{ + Type: flowcontrolv1beta1.PriorityLevelEnablementLimited, + Limited: &flowcontrolv1beta1.LimitedPriorityLevelConfiguration{ + AssuredConcurrencyShares: 1, + }, + }, + } + pl1Defaulted := &flowcontrolv1beta1.PriorityLevelConfiguration{ + Spec: flowcontrolv1beta1.PriorityLevelConfigurationSpec{ + Type: flowcontrolv1beta1.PriorityLevelEnablementLimited, + Limited: &flowcontrolv1beta1.LimitedPriorityLevelConfiguration{ + AssuredConcurrencyShares: flowcontrolapisv1beta1.PriorityLevelConfigurationDefaultAssuredConcurrencyShares, + LimitResponse: flowcontrolv1beta1.LimitResponse{ + Type: flowcontrolv1beta1.LimitResponseTypeReject, + }, + }, + }, + } + testCases := []struct { + name string + expected *flowcontrolv1beta1.PriorityLevelConfiguration + actual *flowcontrolv1beta1.PriorityLevelConfiguration + specChanged bool + }{ + { + name: "identical priority-level should work", + expected: bootstrap.MandatoryPriorityLevelConfigurationCatchAll, + actual: bootstrap.MandatoryPriorityLevelConfigurationCatchAll, + specChanged: false, + }, + { + name: "defaulted priority-level should work", + expected: pl1, + actual: pl1Defaulted, + specChanged: false, + }, + { + name: "non-defaulted priority-level has wrong spec", + expected: pl1, + actual: pl2, + specChanged: true, + }, + } + for _, testCase := range testCases { + t.Run(testCase.name, func(t *testing.T) { + w := priorityLevelSpecChanged(testCase.expected, testCase.actual) + assert.Equal(t, testCase.specChanged, w) + }) + } +} + +func TestRemovePriorityLevelConfiguration(t *testing.T) { + tests := []struct { + name string + current *flowcontrolv1beta1.PriorityLevelConfiguration + bootstrapName string + removeExpected bool + }{ + { + name: "priority level configuration does not exist", + bootstrapName: "pl1", + current: nil, + }, + { + name: "priority level configuration exists, auto update is enabled", + bootstrapName: "pl1", + current: newPLConfiguration("pl1").WithAutoUpdateAnnotation("true").Object(), + removeExpected: true, + }, + { + name: "priority level configuration exists, auto update is disabled", + bootstrapName: "pl1", + current: newPLConfiguration("pl1").WithAutoUpdateAnnotation("false").Object(), + removeExpected: false, + }, + { + name: "priority level configuration exists, the auto-update annotation is malformed", + bootstrapName: "pl1", + current: newPLConfiguration("pl1").WithAutoUpdateAnnotation("invalid").Object(), + removeExpected: false, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + client := fake.NewSimpleClientset().FlowcontrolV1beta1().PriorityLevelConfigurations() + if test.current != nil { + client.Create(context.TODO(), test.current, metav1.CreateOptions{}) + } + + remover := NewPriorityLevelRemover(client) + err := remover.Remove([]string{test.bootstrapName}) + if err != nil { + t.Fatalf("Expected no error, but got: %v", err) + } + + if test.current == nil { + return + } + _, err = client.Get(context.TODO(), test.bootstrapName, metav1.GetOptions{}) + switch { + case test.removeExpected: + if !apierrors.IsNotFound(err) { + t.Errorf("Expected error: %q, but got: %v", metav1.StatusReasonNotFound, err) + } + default: + if err != nil { + t.Errorf("Expected no error, but got: %v", err) + } + } + }) + } +} + +func TestGetPriorityLevelRemoveCandidate(t *testing.T) { + tests := []struct { + name string + current []*flowcontrolv1beta1.PriorityLevelConfiguration + bootstrap []*flowcontrolv1beta1.PriorityLevelConfiguration + expected []string + }{ + { + name: "no object has been removed from the bootstrap configuration", + bootstrap: []*flowcontrolv1beta1.PriorityLevelConfiguration{ + newPLConfiguration("pl1").WithAutoUpdateAnnotation("true").Object(), + newPLConfiguration("pl2").WithAutoUpdateAnnotation("true").Object(), + newPLConfiguration("pl3").WithAutoUpdateAnnotation("true").Object(), + }, + current: []*flowcontrolv1beta1.PriorityLevelConfiguration{ + newPLConfiguration("pl1").WithAutoUpdateAnnotation("true").Object(), + newPLConfiguration("pl2").WithAutoUpdateAnnotation("true").Object(), + newPLConfiguration("pl3").WithAutoUpdateAnnotation("true").Object(), + }, + expected: []string{}, + }, + { + name: "bootstrap is empty, all current objects with the annotation should be candidates", + bootstrap: []*flowcontrolv1beta1.PriorityLevelConfiguration{}, + current: []*flowcontrolv1beta1.PriorityLevelConfiguration{ + newPLConfiguration("pl1").WithAutoUpdateAnnotation("true").Object(), + newPLConfiguration("pl2").WithAutoUpdateAnnotation("true").Object(), + newPLConfiguration("pl3").Object(), + }, + expected: []string{"pl1", "pl2"}, + }, + { + name: "object(s) have been removed from the bootstrap configuration", + bootstrap: []*flowcontrolv1beta1.PriorityLevelConfiguration{ + newPLConfiguration("pl1").WithAutoUpdateAnnotation("true").Object(), + }, + current: []*flowcontrolv1beta1.PriorityLevelConfiguration{ + newPLConfiguration("pl1").WithAutoUpdateAnnotation("true").Object(), + newPLConfiguration("pl2").WithAutoUpdateAnnotation("true").Object(), + newPLConfiguration("pl3").WithAutoUpdateAnnotation("true").Object(), + }, + expected: []string{"pl2", "pl3"}, + }, + { + name: "object(s) without the annotation key are ignored", + bootstrap: []*flowcontrolv1beta1.PriorityLevelConfiguration{ + newPLConfiguration("pl1").WithAutoUpdateAnnotation("true").Object(), + }, + current: []*flowcontrolv1beta1.PriorityLevelConfiguration{ + newPLConfiguration("pl1").WithAutoUpdateAnnotation("true").Object(), + newPLConfiguration("pl2").Object(), + newPLConfiguration("pl3").Object(), + }, + expected: []string{}, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + client := fake.NewSimpleClientset().FlowcontrolV1beta1().PriorityLevelConfigurations() + for i := range test.current { + client.Create(context.TODO(), test.current[i], metav1.CreateOptions{}) + } + + removeListGot, err := GetPriorityLevelRemoveCandidate(client, test.bootstrap) + if err != nil { + t.Fatalf("Expected no error, but got: %v", err) + } + + if !cmp.Equal(test.expected, removeListGot) { + t.Errorf("Remove candidate list does not match - diff: %s", cmp.Diff(test.expected, removeListGot)) + } + }) + } +} + +type plBuilder struct { + object *flowcontrolv1beta1.PriorityLevelConfiguration +} + +func newPLConfiguration(name string) *plBuilder { + return &plBuilder{ + object: &flowcontrolv1beta1.PriorityLevelConfiguration{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + }, + }, + } +} + +func (b *plBuilder) Object() *flowcontrolv1beta1.PriorityLevelConfiguration { + return b.object +} + +func (b *plBuilder) WithGeneration(value int64) *plBuilder { + b.object.SetGeneration(value) + return b +} + +func (b *plBuilder) WithAutoUpdateAnnotation(value string) *plBuilder { + setAnnotation(b.object, value) + return b +} + +func (b *plBuilder) WithLimited(assuredConcurrencyShares int32) *plBuilder { + b.object.Spec.Type = flowcontrolv1beta1.PriorityLevelEnablementLimited + b.object.Spec.Limited = &flowcontrolv1beta1.LimitedPriorityLevelConfiguration{ + AssuredConcurrencyShares: assuredConcurrencyShares, + LimitResponse: flowcontrolv1beta1.LimitResponse{ + Type: flowcontrolv1beta1.LimitResponseTypeReject, + }, + } + return b +} + +// must be called after WithLimited +func (b *plBuilder) WithQueuing(queues, handSize, queueLengthLimit int32) *plBuilder { + limited := b.object.Spec.Limited + if limited == nil { + return b + } + + limited.LimitResponse.Type = flowcontrolv1beta1.LimitResponseTypeQueue + limited.LimitResponse.Queuing = &flowcontrolv1beta1.QueuingConfiguration{ + Queues: queues, + HandSize: handSize, + QueueLengthLimit: queueLengthLimit, + } + + return b +} diff --git a/pkg/registry/flowcontrol/ensurer/strategy.go b/pkg/registry/flowcontrol/ensurer/strategy.go new file mode 100644 index 00000000000..85f3028b070 --- /dev/null +++ b/pkg/registry/flowcontrol/ensurer/strategy.go @@ -0,0 +1,335 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package ensurer + +import ( + "errors" + "fmt" + "strconv" + + flowcontrolv1beta1 "k8s.io/api/flowcontrol/v1beta1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/klog/v2" + + "github.com/google/go-cmp/cmp" +) + +const ( + fieldManager = "api-priority-and-fairness-config-producer-v1" +) + +// ensureStrategy provides a strategy for ensuring apf bootstrap configurationWrapper. +// We have two types of configurationWrapper objects: +// - mandatory: the mandatory configurationWrapper objects are about ensuring that the P&F +// system itself won't crash; we have to be sure there's 'catch-all' place for +// everything to go. Any changes made by the cluster operators to these +// configurationWrapper objects will be stomped by the apiserver. +// +// - suggested: additional configurationWrapper objects for initial behavior. +// the cluster operators have an option to edit or delete these configurationWrapper objects. +type ensureStrategy interface { + // Name of the strategy, for now we have two: 'mandatory' and 'suggested'. + // This comes handy in logging. + Name() string + + // ShouldCreate returns true if a missing configuration object should be recreated. + ShouldCreate() bool + + // ShouldUpdate accepts the current and the bootstrap configuration and determines + // whether an update is necessary. + // current is the existing in-cluster configuration object. + // bootstrap is the configuration the kube-apiserver maintains in-memory. + // + // ok: true if auto update is required, otherwise false + // object: the new object represents the new configuration to be stored in-cluster. + // err: err is set when the function runs into an error and can not + // determine if auto update is needed. + ShouldUpdate(current, bootstrap configurationObject) (object runtime.Object, ok bool, err error) +} + +// this internal interface provides abstraction for dealing with the `Spec` +// of both 'FlowSchema' and 'PriorityLevelConfiguration' objects. +// Since the ensure logic for both types is common, we use a few internal interfaces +// to abstract out the differences of these two types. +type specCopier interface { + // HasSpecChanged returns true if the spec of both the bootstrap and + // the current configuration object is same, otherwise false. + HasSpecChanged(bootstrap, current runtime.Object) (bool, error) + + // CopySpec makes a deep copy the spec of the bootstrap object + // and copies it to that of the current object. + // CopySpec assumes that the current object is safe to mutate, so it + // rests with the caller to make a deep copy of the current. + CopySpec(bootstrap, current runtime.Object) error +} + +// this internal interface provides abstraction for CRUD operation +// related to both 'FlowSchema' and 'PriorityLevelConfiguration' objects. +// Since the ensure logic for both types is common, we use a few internal interfaces +// to abstract out the differences of these two types. +type configurationClient interface { + Create(object runtime.Object) (runtime.Object, error) + Update(object runtime.Object) (runtime.Object, error) + Get(name string) (configurationObject, error) + Delete(name string) error +} + +type configurationWrapper interface { + // TypeName returns the type of the configuration that this interface deals with. + // We use it to log the type name of the configuration object being ensured. + // It is either 'PriorityLevelConfiguration' or 'FlowSchema' + TypeName() string + + configurationClient + specCopier +} + +// A convenient wrapper interface that is used by the ensure logic. +type configurationObject interface { + metav1.Object + runtime.Object +} + +func newSuggestedEnsureStrategy(copier specCopier, shouldCreate bool) ensureStrategy { + return &strategy{ + copier: copier, + shouldCreate: shouldCreate, + alwaysAutoUpdateSpec: false, + name: "suggested", + } +} + +func newMandatoryEnsureStrategy(copier specCopier) ensureStrategy { + return &strategy{ + copier: copier, + shouldCreate: true, + alwaysAutoUpdateSpec: true, + name: "mandatory", + } +} + +// auto-update strategy for the configuration objects +type strategy struct { + copier specCopier + shouldCreate bool + alwaysAutoUpdateSpec bool + name string +} + +func (s *strategy) Name() string { + return s.name +} + +func (s *strategy) ShouldCreate() bool { + return s.shouldCreate +} + +func (s *strategy) ShouldUpdate(current, bootstrap configurationObject) (runtime.Object, bool, error) { + if current == nil || bootstrap == nil { + return nil, false, nil + } + + autoUpdateSpec := s.alwaysAutoUpdateSpec + if !autoUpdateSpec { + autoUpdateSpec = shouldUpdateSpec(current) + } + updateAnnotation := shouldUpdateAnnotation(current, autoUpdateSpec) + + var specChanged bool + if autoUpdateSpec { + changed, err := s.copier.HasSpecChanged(bootstrap, current) + if err != nil { + return nil, false, fmt.Errorf("failed to compare spec - %w", err) + } + specChanged = changed + } + + if !(updateAnnotation || specChanged) { + // the annotation key is up to date and the spec has not changed, no update is necessary + return nil, false, nil + } + + // if we are here, either we need to update the annotation key or the spec. + copy, ok := current.DeepCopyObject().(configurationObject) + if !ok { + // we should never be here + return nil, false, errors.New("incompatible object type") + } + + if updateAnnotation { + setAutoUpdateAnnotation(copy, autoUpdateSpec) + } + if specChanged { + s.copier.CopySpec(bootstrap, copy) + } + + return copy, true, nil +} + +// shouldUpdateSpec inspects the auto-update annotation key and generation field to determine +// whether the configurationWrapper object should be auto-updated. +func shouldUpdateSpec(accessor metav1.Object) bool { + value, _ := accessor.GetAnnotations()[flowcontrolv1beta1.AutoUpdateAnnotationKey] + if autoUpdate, err := strconv.ParseBool(value); err == nil { + return autoUpdate + } + + // We are here because of either a or b: + // a. the annotation key is missing. + // b. the annotation key is present but the value does not represent a boolean. + // In either case, if the operator hasn't changed the spec, we can safely auto update. + // Please note that we can't protect the changes made by the operator in the following scenario: + // - The operator deletes and recreates the same object with a variant spec (generation resets to 1). + if accessor.GetGeneration() == 1 { + return true + } + return false +} + +// shouldUpdateAnnotation determines whether the current value of the auto-update annotation +// key matches the desired value. +func shouldUpdateAnnotation(accessor metav1.Object, desired bool) bool { + if value, ok := accessor.GetAnnotations()[flowcontrolv1beta1.AutoUpdateAnnotationKey]; ok { + if current, err := strconv.ParseBool(value); err == nil && current == desired { + return false + } + } + + return true +} + +// setAutoUpdateAnnotation sets the auto-update annotation key to the specified value. +func setAutoUpdateAnnotation(accessor metav1.Object, autoUpdate bool) { + if accessor.GetAnnotations() == nil { + accessor.SetAnnotations(map[string]string{}) + } + + accessor.GetAnnotations()[flowcontrolv1beta1.AutoUpdateAnnotationKey] = strconv.FormatBool(autoUpdate) +} + +// ensureConfiguration ensures the boostrap configurationWrapper on the cluster based on the specified strategy. +func ensureConfiguration(wrapper configurationWrapper, strategy ensureStrategy, bootstrap configurationObject) error { + name := bootstrap.GetName() + configurationType := strategy.Name() + + current, err := wrapper.Get(bootstrap.GetName()) + if err != nil { + if !apierrors.IsNotFound(err) { + return fmt.Errorf("failed to retrieve %s type=%s name=%q error=%w", wrapper.TypeName(), configurationType, name, err) + } + + if strategy.ShouldCreate() { + if _, err := wrapper.Create(bootstrap); err != nil { + return fmt.Errorf("cannot create %s type=%s name=%q error=%w", wrapper.TypeName(), configurationType, name, err) + } + + klog.V(2).InfoS(fmt.Sprintf("Successfully created %s", wrapper.TypeName()), "type", configurationType, "name", name) + return nil + } + + klog.V(5).InfoS(fmt.Sprintf("Skipping creation of %s", wrapper.TypeName()), "type", configurationType, "name", name) + return nil + } + + klog.V(5).InfoS(fmt.Sprintf("The %s already exists, checking whether it is up to date", wrapper.TypeName()), "type", configurationType, "name", name) + newObject, update, err := strategy.ShouldUpdate(current, bootstrap) + if err != nil { + return fmt.Errorf("failed to determine whether auto-update is required for %s type=%s name=%q error=%w", wrapper.TypeName(), configurationType, name, err) + } + if !update { + if klog.V(5).Enabled() { + // TODO: if we use structured logging here the diff gets escaped and very awkward to read in the log + klog.Infof("No update required for the %s type=%s name=%q diff: %s", wrapper.TypeName(), configurationType, name, cmp.Diff(current, bootstrap)) + } + return nil + } + + if _, err := wrapper.Update(newObject); err != nil { + return fmt.Errorf("failed to update the %s, will retry later type=%s name=%q error=%w", wrapper.TypeName(), configurationType, name, err) + } + + klog.V(2).Infof("Updated the %s type=%s name=%q diff: %s", wrapper.TypeName(), configurationType, name, cmp.Diff(current, newObject)) + return nil +} + +func removeConfiguration(wrapper configurationWrapper, name string) error { + current, err := wrapper.Get(name) + if err != nil { + if apierrors.IsNotFound(err) { + return nil + } + + return fmt.Errorf("failed to retrieve the %s, will retry later name=%q error=%w", wrapper.TypeName(), name, err) + } + + value := current.GetAnnotations()[flowcontrolv1beta1.AutoUpdateAnnotationKey] + autoUpdate, err := strconv.ParseBool(value) + if err != nil { + klog.ErrorS(err, fmt.Sprintf("Skipping deletion of the %s", wrapper.TypeName()), "name", name) + + // This may need manual intervention, in case the annotation value is malformed, + // so don't return an error, that might trigger futile retry loop. + return nil + } + if !autoUpdate { + klog.V(5).InfoS(fmt.Sprintf("Skipping deletion of the %s", wrapper.TypeName()), "name", name) + return nil + } + + if err := wrapper.Delete(name); err != nil { + if apierrors.IsNotFound(err) { + return nil + } + + return fmt.Errorf("failed to delete the %s, will retry later name=%q error=%w", wrapper.TypeName(), name, err) + } + + klog.V(2).InfoS(fmt.Sprintf("Successfully deleted the %s", wrapper.TypeName()), "name", name) + return nil +} + +// getRemoveCandidate returns a list of configuration objects we should delete +// from the cluster given a set of bootstrap and current configuration. +// bootstrap: a set of hard coded configuration kube-apiserver maintains in-memory. +// current: a set of configuration objects that exist on the cluster +// Any object present in current is a candidate for removal if both a and b are true: +// a. the object in current is missing from the bootstrap configuration +// b. the object has the designated auto-update annotation key +// This function shares the common logic for both FlowSchema and PriorityLevelConfiguration +// type and hence it accepts metav1.Object only. +func getRemoveCandidate(bootstrap sets.String, current []metav1.Object) []string { + if len(current) == 0 { + return nil + } + + candidates := make([]string, 0) + for i := range current { + object := current[i] + if _, ok := object.GetAnnotations()[flowcontrolv1beta1.AutoUpdateAnnotationKey]; !ok { + // the configuration object does not have the annotation key + continue + } + + if _, ok := bootstrap[object.GetName()]; !ok { + candidates = append(candidates, object.GetName()) + } + } + return candidates +} diff --git a/pkg/registry/flowcontrol/rest/storage_flowcontrol.go b/pkg/registry/flowcontrol/rest/storage_flowcontrol.go index f968068b46e..9734cdde473 100644 --- a/pkg/registry/flowcontrol/rest/storage_flowcontrol.go +++ b/pkg/registry/flowcontrol/rest/storage_flowcontrol.go @@ -21,8 +21,6 @@ import ( "fmt" "time" - flowcontrolv1beta1 "k8s.io/api/flowcontrol/v1beta1" - "k8s.io/apimachinery/pkg/api/equality" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" @@ -37,6 +35,7 @@ import ( "k8s.io/kubernetes/pkg/apis/flowcontrol" flowcontrolapisv1alpha1 "k8s.io/kubernetes/pkg/apis/flowcontrol/v1alpha1" flowcontrolapisv1beta1 "k8s.io/kubernetes/pkg/apis/flowcontrol/v1beta1" + "k8s.io/kubernetes/pkg/registry/flowcontrol/ensurer" flowschemastore "k8s.io/kubernetes/pkg/registry/flowcontrol/flowschema/storage" prioritylevelconfigurationstore "k8s.io/kubernetes/pkg/registry/flowcontrol/prioritylevelconfiguration/storage" ) @@ -101,70 +100,78 @@ func (p RESTStorageProvider) GroupName() string { // PostStartHook returns the hook func that launches the config provider func (p RESTStorageProvider) PostStartHook() (string, genericapiserver.PostStartHookFunc, error) { - return PostStartHookName, func(hookContext genericapiserver.PostStartHookContext) error { - flowcontrolClientSet := flowcontrolclient.NewForConfigOrDie(hookContext.LoopbackClientConfig) - go func() { - const retryCreatingSuggestedSettingsInterval = time.Second - err := wait.PollImmediateUntil( - retryCreatingSuggestedSettingsInterval, - func() (bool, error) { - should, err := shouldEnsureSuggested(flowcontrolClientSet) - if err != nil { - klog.Errorf("failed getting exempt flow-schema, will retry later: %v", err) - return false, nil - } - if !should { - return true, nil - } - err = ensure( - flowcontrolClientSet, - flowcontrolbootstrap.SuggestedFlowSchemas, - flowcontrolbootstrap.SuggestedPriorityLevelConfigurations) - if err != nil { - klog.Errorf("failed ensuring suggested settings, will retry later: %v", err) - return false, nil - } - return true, nil - }, - hookContext.StopCh) - if err != nil { - klog.ErrorS(err, "Ensuring suggested configuration failed") - - // We should not attempt creation of mandatory objects if ensuring the suggested - // configuration resulted in an error. - // This only happens when the stop channel is closed. - // We rely on the presence of the "exempt" priority level configuration object in the cluster - // to indicate whether we should ensure suggested configuration. - return - } - - const retryCreatingMandatorySettingsInterval = time.Minute - _ = wait.PollImmediateUntil( - retryCreatingMandatorySettingsInterval, - func() (bool, error) { - if err := upgrade( - flowcontrolClientSet, - flowcontrolbootstrap.MandatoryFlowSchemas, - // Note: the "exempt" priority-level is supposed to be the last item in the pre-defined - // list, so that a crash in the midst of the first kube-apiserver startup does not prevent - // the full initial set of objects from being created. - flowcontrolbootstrap.MandatoryPriorityLevelConfigurations, - ); err != nil { - klog.Errorf("failed creating mandatory flowcontrol settings: %v", err) - return false, nil - } - return false, nil // always retry - }, - hookContext.StopCh) - }() - return nil - }, nil - + return PostStartHookName, ensureAPFBootstrapConfiguration, nil } -// shouldEnsureSuggested checks if the exempt priority level exists and returns +func ensureAPFBootstrapConfiguration(hookContext genericapiserver.PostStartHookContext) error { + clientset, err := flowcontrolclient.NewForConfig(hookContext.LoopbackClientConfig) + if err != nil { + return fmt.Errorf("failed to initialize clientset for APF - %w", err) + } + + // get a derived context that gets cancelled after 5m or + // when the StopCh gets closed, whichever happens first. + ctx, cancel := contextFromChannelAndMaxWaitDuration(hookContext.StopCh, 5*time.Minute) + defer cancel() + + err = wait.PollImmediateUntilWithContext( + ctx, + time.Second, + func(context.Context) (bool, error) { + if err := ensure(clientset); err != nil { + klog.ErrorS(err, "APF bootstrap ensurer ran into error, will retry later") + return false, nil + } + return true, nil + }) + if err != nil { + return fmt.Errorf("unable to initialize APF bootstrap configuration") + } + + // we have successfully initialized the bootstrap configuration, now we + // spin up a goroutine which reconciles the bootstrap configuration periodically. + go func() { + err := wait.PollImmediateUntil( + time.Minute, + func() (bool, error) { + if err := ensure(clientset); err != nil { + klog.ErrorS(err, "APF bootstrap ensurer ran into error, will retry later") + } + // always auto update both suggested and mandatory configuration + return false, nil + }, hookContext.StopCh) + if err != nil { + klog.ErrorS(err, "APF bootstrap ensurer is exiting") + } + }() + + return nil +} + +func ensure(clientset flowcontrolclient.FlowcontrolV1beta1Interface) error { + if err := ensureSuggestedConfiguration(clientset); err != nil { + // We should not attempt creation of mandatory objects if ensuring the suggested + // configuration resulted in an error. + // This only happens when the stop channel is closed. + // We rely on the presence of the "exempt" priority level configuration object in the cluster + // to indicate whether we should ensure suggested configuration. + return fmt.Errorf("failed ensuring suggested settings - %w", err) + } + + if err := ensureMandatoryConfiguration(clientset); err != nil { + return fmt.Errorf("failed ensuring mandatory settings - %w", err) + } + + if err := removeConfiguration(clientset); err != nil { + return fmt.Errorf("failed to delete removed settings - %w", err) + } + + return nil +} + +// shouldCreateSuggested checks if the exempt priority level exists and returns // whether the suggested flow schemas and priority levels should be ensured. -func shouldEnsureSuggested(flowcontrolClientSet flowcontrolclient.FlowcontrolV1beta1Interface) (bool, error) { +func shouldCreateSuggested(flowcontrolClientSet flowcontrolclient.FlowcontrolV1beta1Interface) (bool, error) { if _, err := flowcontrolClientSet.PriorityLevelConfigurations().Get(context.TODO(), flowcontrol.PriorityLevelConfigurationNameExempt, metav1.GetOptions{}); err != nil { if apierrors.IsNotFound(err) { return true, nil @@ -174,106 +181,89 @@ func shouldEnsureSuggested(flowcontrolClientSet flowcontrolclient.FlowcontrolV1b return false, nil } -const thisFieldManager = "api-priority-and-fairness-config-producer-v1" +func ensureSuggestedConfiguration(clientset flowcontrolclient.FlowcontrolV1beta1Interface) error { + shouldCreateSuggested, err := shouldCreateSuggested(clientset) + if err != nil { + return fmt.Errorf("failed to determine whether suggested configuration should be created - error: %w", err) + } -func ensure(flowcontrolClientSet flowcontrolclient.FlowcontrolV1beta1Interface, flowSchemas []*flowcontrolv1beta1.FlowSchema, priorityLevels []*flowcontrolv1beta1.PriorityLevelConfiguration) error { - for _, flowSchema := range flowSchemas { - _, err := flowcontrolClientSet.FlowSchemas().Create(context.TODO(), flowSchema, metav1.CreateOptions{FieldManager: thisFieldManager}) - if apierrors.IsAlreadyExists(err) { - klog.V(3).Infof("Suggested FlowSchema %s already exists, skipping creating", flowSchema.Name) - continue - } - if err != nil { - return fmt.Errorf("cannot create suggested FlowSchema %s due to %v", flowSchema.Name, err) - } - klog.V(3).Infof("Created suggested FlowSchema %s", flowSchema.Name) + fsEnsurer := ensurer.NewSuggestedFlowSchemaEnsurer(clientset.FlowSchemas(), shouldCreateSuggested) + if err := fsEnsurer.Ensure(flowcontrolbootstrap.SuggestedFlowSchemas); err != nil { + return err } - for _, priorityLevelConfiguration := range priorityLevels { - _, err := flowcontrolClientSet.PriorityLevelConfigurations().Create(context.TODO(), priorityLevelConfiguration, metav1.CreateOptions{FieldManager: thisFieldManager}) - if apierrors.IsAlreadyExists(err) { - klog.V(3).Infof("Suggested PriorityLevelConfiguration %s already exists, skipping creating", priorityLevelConfiguration.Name) - continue - } - if err != nil { - return fmt.Errorf("cannot create suggested PriorityLevelConfiguration %s due to %v", priorityLevelConfiguration.Name, err) - } - klog.V(3).Infof("Created suggested PriorityLevelConfiguration %s", priorityLevelConfiguration.Name) - } - return nil + + plEnsurer := ensurer.NewSuggestedPriorityLevelEnsurerEnsurer(clientset.PriorityLevelConfigurations(), shouldCreateSuggested) + return plEnsurer.Ensure(flowcontrolbootstrap.SuggestedPriorityLevelConfigurations) } -func upgrade(flowcontrolClientSet flowcontrolclient.FlowcontrolV1beta1Interface, flowSchemas []*flowcontrolv1beta1.FlowSchema, priorityLevels []*flowcontrolv1beta1.PriorityLevelConfiguration) error { - for _, expectedFlowSchema := range flowSchemas { - actualFlowSchema, err := flowcontrolClientSet.FlowSchemas().Get(context.TODO(), expectedFlowSchema.Name, metav1.GetOptions{}) - if err == nil { - // TODO(yue9944882): extract existing version from label and compare - // TODO(yue9944882): create w/ version string attached - wrongSpec, err := flowSchemaHasWrongSpec(expectedFlowSchema, actualFlowSchema) - if err != nil { - return fmt.Errorf("failed checking if mandatory FlowSchema %s is up-to-date due to %v, will retry later", expectedFlowSchema.Name, err) - } - if wrongSpec { - if _, err := flowcontrolClientSet.FlowSchemas().Update(context.TODO(), expectedFlowSchema, metav1.UpdateOptions{FieldManager: thisFieldManager}); err != nil { - return fmt.Errorf("failed upgrading mandatory FlowSchema %s due to %v, will retry later", expectedFlowSchema.Name, err) - } - klog.V(3).Infof("Updated mandatory FlowSchema %s because its spec was %#+v but it must be %#+v", expectedFlowSchema.Name, actualFlowSchema.Spec, expectedFlowSchema.Spec) - } - continue - } - if !apierrors.IsNotFound(err) { - return fmt.Errorf("failed getting mandatory FlowSchema %s due to %v, will retry later", expectedFlowSchema.Name, err) - } - _, err = flowcontrolClientSet.FlowSchemas().Create(context.TODO(), expectedFlowSchema, metav1.CreateOptions{FieldManager: thisFieldManager}) - if apierrors.IsAlreadyExists(err) { - klog.V(3).Infof("Mandatory FlowSchema %s already exists, skipping creating", expectedFlowSchema.Name) - continue - } - if err != nil { - return fmt.Errorf("cannot create mandatory FlowSchema %s due to %v", expectedFlowSchema.Name, err) - } - klog.V(3).Infof("Created mandatory FlowSchema %s", expectedFlowSchema.Name) +func ensureMandatoryConfiguration(clientset flowcontrolclient.FlowcontrolV1beta1Interface) error { + fsEnsurer := ensurer.NewMandatoryFlowSchemaEnsurer(clientset.FlowSchemas()) + if err := fsEnsurer.Ensure(flowcontrolbootstrap.MandatoryFlowSchemas); err != nil { + return err } - for _, expectedPriorityLevelConfiguration := range priorityLevels { - actualPriorityLevelConfiguration, err := flowcontrolClientSet.PriorityLevelConfigurations().Get(context.TODO(), expectedPriorityLevelConfiguration.Name, metav1.GetOptions{}) - if err == nil { - // TODO(yue9944882): extract existing version from label and compare - // TODO(yue9944882): create w/ version string attached - wrongSpec, err := priorityLevelHasWrongSpec(expectedPriorityLevelConfiguration, actualPriorityLevelConfiguration) - if err != nil { - return fmt.Errorf("failed checking if mandatory PriorityLevelConfiguration %s is up-to-date due to %v, will retry later", expectedPriorityLevelConfiguration.Name, err) - } - if wrongSpec { - if _, err := flowcontrolClientSet.PriorityLevelConfigurations().Update(context.TODO(), expectedPriorityLevelConfiguration, metav1.UpdateOptions{FieldManager: thisFieldManager}); err != nil { - return fmt.Errorf("failed upgrading mandatory PriorityLevelConfiguration %s due to %v, will retry later", expectedPriorityLevelConfiguration.Name, err) - } - klog.V(3).Infof("Updated mandatory PriorityLevelConfiguration %s because its spec was %#+v but must be %#+v", expectedPriorityLevelConfiguration.Name, actualPriorityLevelConfiguration.Spec, expectedPriorityLevelConfiguration.Spec) - } - continue - } - if !apierrors.IsNotFound(err) { - return fmt.Errorf("failed getting PriorityLevelConfiguration %s due to %v, will retry later", expectedPriorityLevelConfiguration.Name, err) - } - _, err = flowcontrolClientSet.PriorityLevelConfigurations().Create(context.TODO(), expectedPriorityLevelConfiguration, metav1.CreateOptions{FieldManager: thisFieldManager}) - if apierrors.IsAlreadyExists(err) { - klog.V(3).Infof("Mandatory PriorityLevelConfiguration %s already exists, skipping creating", expectedPriorityLevelConfiguration.Name) - continue - } - if err != nil { - return fmt.Errorf("cannot create mandatory PriorityLevelConfiguration %s due to %v", expectedPriorityLevelConfiguration.Name, err) - } - klog.V(3).Infof("Created mandatory PriorityLevelConfiguration %s", expectedPriorityLevelConfiguration.Name) - } - return nil + + plEnsurer := ensurer.NewMandatoryPriorityLevelEnsurer(clientset.PriorityLevelConfigurations()) + return plEnsurer.Ensure(flowcontrolbootstrap.MandatoryPriorityLevelConfigurations) } -func flowSchemaHasWrongSpec(expected, actual *flowcontrolv1beta1.FlowSchema) (bool, error) { - copiedExpectedFlowSchema := expected.DeepCopy() - flowcontrolapisv1beta1.SetObjectDefaults_FlowSchema(copiedExpectedFlowSchema) - return !equality.Semantic.DeepEqual(copiedExpectedFlowSchema.Spec, actual.Spec), nil +func removeConfiguration(clientset flowcontrolclient.FlowcontrolV1beta1Interface) error { + if err := removeFlowSchema(clientset.FlowSchemas()); err != nil { + return err + } + + return removePriorityLevel(clientset.PriorityLevelConfigurations()) } -func priorityLevelHasWrongSpec(expected, actual *flowcontrolv1beta1.PriorityLevelConfiguration) (bool, error) { - copiedExpectedPriorityLevel := expected.DeepCopy() - flowcontrolapisv1beta1.SetObjectDefaults_PriorityLevelConfiguration(copiedExpectedPriorityLevel) - return !equality.Semantic.DeepEqual(copiedExpectedPriorityLevel.Spec, actual.Spec), nil +func removeFlowSchema(client flowcontrolclient.FlowSchemaInterface) error { + bootstrap := append(flowcontrolbootstrap.MandatoryFlowSchemas, flowcontrolbootstrap.SuggestedFlowSchemas...) + candidates, err := ensurer.GetFlowSchemaRemoveCandidate(client, bootstrap) + if err != nil { + return err + } + if len(candidates) == 0 { + return nil + } + + fsRemover := ensurer.NewFlowSchemaRemover(client) + return fsRemover.Remove(candidates) +} + +func removePriorityLevel(client flowcontrolclient.PriorityLevelConfigurationInterface) error { + bootstrap := append(flowcontrolbootstrap.MandatoryPriorityLevelConfigurations, flowcontrolbootstrap.SuggestedPriorityLevelConfigurations...) + candidates, err := ensurer.GetPriorityLevelRemoveCandidate(client, bootstrap) + if err != nil { + return err + } + if len(candidates) == 0 { + return nil + } + + plRemover := ensurer.NewPriorityLevelRemover(client) + return plRemover.Remove(candidates) +} + +// contextFromChannelAndMaxWaitDuration returns a Context that is bound to the +// specified channel and the wait duration. The derived context will be +// cancelled when the specified channel stopCh is closed or the maximum wait +// duration specified in maxWait elapses, whichever happens first. +// +// Note the caller must *always* call the CancelFunc, otherwise resources may be leaked. +func contextFromChannelAndMaxWaitDuration(stopCh <-chan struct{}, maxWait time.Duration) (context.Context, context.CancelFunc) { + ctx, cancel := context.WithCancel(context.Background()) + + go func() { + defer cancel() + + select { + case <-stopCh: + case <-time.After(maxWait): + + // the caller can explicitly cancel the context which is an + // indication to us to exit the goroutine immediately. + // Note that we are calling cancel more than once when we are here, + // CancelFunc is idempotent and we expect no ripple effects here. + case <-ctx.Done(): + } + }() + return ctx, cancel } diff --git a/pkg/registry/flowcontrol/rest/storage_flowcontrol_test.go b/pkg/registry/flowcontrol/rest/storage_flowcontrol_test.go index ed840baa2df..3d389c9013b 100644 --- a/pkg/registry/flowcontrol/rest/storage_flowcontrol_test.go +++ b/pkg/registry/flowcontrol/rest/storage_flowcontrol_test.go @@ -19,14 +19,13 @@ package rest import ( "context" "testing" + "time" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" flowcontrolv1beta1 "k8s.io/api/flowcontrol/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apiserver/pkg/apis/flowcontrol/bootstrap" "k8s.io/client-go/kubernetes/fake" - flowcontrolapisv1beta1 "k8s.io/kubernetes/pkg/apis/flowcontrol/v1beta1" ) func TestShouldEnsurePredefinedSettings(t *testing.T) { @@ -53,121 +52,40 @@ func TestShouldEnsurePredefinedSettings(t *testing.T) { if testCase.existingPriorityLevel != nil { c.FlowcontrolV1beta1().PriorityLevelConfigurations().Create(context.TODO(), testCase.existingPriorityLevel, metav1.CreateOptions{}) } - should, err := shouldEnsureSuggested(c.FlowcontrolV1beta1()) + should, err := shouldCreateSuggested(c.FlowcontrolV1beta1()) assert.NoError(t, err) assert.Equal(t, testCase.expected, should) }) } } -func TestFlowSchemaHasWrongSpec(t *testing.T) { - fs1 := &flowcontrolv1beta1.FlowSchema{ - Spec: flowcontrolv1beta1.FlowSchemaSpec{}, +func TestContextFromChannelAndMaxWaitDurationWithChannelClosed(t *testing.T) { + stopCh := make(chan struct{}) + ctx, cancel := contextFromChannelAndMaxWaitDuration(stopCh, time.Hour) + defer cancel() + + select { + case <-ctx.Done(): + t.Fatalf("Expected the derived context to be not cancelled, but got: %v", ctx.Err()) + default: } - fs2 := &flowcontrolv1beta1.FlowSchema{ - Spec: flowcontrolv1beta1.FlowSchemaSpec{ - MatchingPrecedence: 1, - }, - } - fs1Defaulted := &flowcontrolv1beta1.FlowSchema{ - Spec: flowcontrolv1beta1.FlowSchemaSpec{ - MatchingPrecedence: flowcontrolapisv1beta1.FlowSchemaDefaultMatchingPrecedence, - }, - } - testCases := []struct { - name string - expected *flowcontrolv1beta1.FlowSchema - actual *flowcontrolv1beta1.FlowSchema - hasWrongSpec bool - }{ - { - name: "identical flow-schemas should work", - expected: bootstrap.MandatoryFlowSchemaCatchAll, - actual: bootstrap.MandatoryFlowSchemaCatchAll, - hasWrongSpec: false, - }, - { - name: "defaulted flow-schemas should work", - expected: fs1, - actual: fs1Defaulted, - hasWrongSpec: false, - }, - { - name: "non-defaulted flow-schema has wrong spec", - expected: fs1, - actual: fs2, - hasWrongSpec: true, - }, - } - for _, testCase := range testCases { - t.Run(testCase.name, func(t *testing.T) { - w, err := flowSchemaHasWrongSpec(testCase.expected, testCase.actual) - require.NoError(t, err) - assert.Equal(t, testCase.hasWrongSpec, w) - }) + + close(stopCh) + + <-ctx.Done() + if ctx.Err() != context.Canceled { + t.Errorf("Expected the context to be canceled with: %v, but got: %v", context.Canceled, ctx.Err()) } } -func TestPriorityLevelHasWrongSpec(t *testing.T) { - pl1 := &flowcontrolv1beta1.PriorityLevelConfiguration{ - Spec: flowcontrolv1beta1.PriorityLevelConfigurationSpec{ - Type: flowcontrolv1beta1.PriorityLevelEnablementLimited, - Limited: &flowcontrolv1beta1.LimitedPriorityLevelConfiguration{ - LimitResponse: flowcontrolv1beta1.LimitResponse{ - Type: flowcontrolv1beta1.LimitResponseTypeReject, - }, - }, - }, - } - pl2 := &flowcontrolv1beta1.PriorityLevelConfiguration{ - Spec: flowcontrolv1beta1.PriorityLevelConfigurationSpec{ - Type: flowcontrolv1beta1.PriorityLevelEnablementLimited, - Limited: &flowcontrolv1beta1.LimitedPriorityLevelConfiguration{ - AssuredConcurrencyShares: 1, - }, - }, - } - pl1Defaulted := &flowcontrolv1beta1.PriorityLevelConfiguration{ - Spec: flowcontrolv1beta1.PriorityLevelConfigurationSpec{ - Type: flowcontrolv1beta1.PriorityLevelEnablementLimited, - Limited: &flowcontrolv1beta1.LimitedPriorityLevelConfiguration{ - AssuredConcurrencyShares: flowcontrolapisv1beta1.PriorityLevelConfigurationDefaultAssuredConcurrencyShares, - LimitResponse: flowcontrolv1beta1.LimitResponse{ - Type: flowcontrolv1beta1.LimitResponseTypeReject, - }, - }, - }, - } - testCases := []struct { - name string - expected *flowcontrolv1beta1.PriorityLevelConfiguration - actual *flowcontrolv1beta1.PriorityLevelConfiguration - hasWrongSpec bool - }{ - { - name: "identical priority-level should work", - expected: bootstrap.MandatoryPriorityLevelConfigurationCatchAll, - actual: bootstrap.MandatoryPriorityLevelConfigurationCatchAll, - hasWrongSpec: false, - }, - { - name: "defaulted priority-level should work", - expected: pl1, - actual: pl1Defaulted, - hasWrongSpec: false, - }, - { - name: "non-defaulted priority-level has wrong spec", - expected: pl1, - actual: pl2, - hasWrongSpec: true, - }, - } - for _, testCase := range testCases { - t.Run(testCase.name, func(t *testing.T) { - w, err := priorityLevelHasWrongSpec(testCase.expected, testCase.actual) - require.NoError(t, err) - assert.Equal(t, testCase.hasWrongSpec, w) - }) +func TestContextFromChannelAndMaxWaitDurationWithMaxWaitElapsed(t *testing.T) { + stopCh := make(chan struct{}) + ctx, cancel := contextFromChannelAndMaxWaitDuration(stopCh, 100*time.Millisecond) + defer cancel() + + <-ctx.Done() + + if ctx.Err() != context.Canceled { + t.Errorf("Expected the context to be canceled with: %v, but got: %v", context.Canceled, ctx.Err()) } } diff --git a/staging/src/k8s.io/api/flowcontrol/v1beta1/types.go b/staging/src/k8s.io/api/flowcontrol/v1beta1/types.go index ece834e9292..673cde42dc7 100644 --- a/staging/src/k8s.io/api/flowcontrol/v1beta1/types.go +++ b/staging/src/k8s.io/api/flowcontrol/v1beta1/types.go @@ -57,6 +57,50 @@ const ( ResponseHeaderMatchedFlowSchemaUID = "X-Kubernetes-PF-FlowSchema-UID" ) +const ( + // AutoUpdateAnnotationKey is the name of an annotation that enables + // automatic update of the spec of the bootstrap configuration + // object(s), if set to 'true'. + // + // On a fresh install, all bootstrap configuration objects will have auto + // update enabled with the following annotation key: + // apf.kubernetes.io/autoupdate-spec: 'true' + // + // The kube-apiserver periodically checks the bootstrap configuration + // objects on the cluster and applies updates if necessary. + // + // kube-apiserver enforces an 'always auto-update' policy for the + // mandatory configuration object(s). This implies: + // - the auto-update annotation key is added with a value of 'true' + // if it is missing. + // - the auto-update annotation key is set to 'true' if its current value + // is a boolean false or has an invalid boolean representation + // (if the cluster operator sets it to 'false' it will be stomped) + // - any changes to the spec made by the cluster operator will be + // stomped. + // + // The kube-apiserver will apply updates on the suggested configuration if: + // - the cluster operator has enabled auto-update by setting the annotation + // (apf.kubernetes.io/autoupdate-spec: 'true') or + // - the annotation key is missing but the generation is 1 + // + // If the suggested configuration object is missing the annotation key, + // kube-apiserver will update the annotation appropriately: + // - it is set to 'true' if generation of the object is '1' which usually + // indicates that the spec of the object has not been changed. + // - it is set to 'false' if generation of the object is greater than 1. + // + // The goal is to enable the kube-apiserver to apply update on suggested + // configuration objects installed by previous releases but not overwrite + // changes made by the cluster operators. + // Note that this distinction is imperfectly detected: in the case where an + // operator deletes a suggested configuration object and later creates it + // but with a variant spec and then does no updates of the object + // (generation is 1), the technique outlined above will incorrectly + // determine that the object should be auto-updated. + AutoUpdateAnnotationKey = "apf.kubernetes.io/autoupdate-spec" +) + // +genclient // +genclient:nonNamespaced // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object diff --git a/staging/src/k8s.io/apiserver/pkg/apis/flowcontrol/bootstrap/default.go b/staging/src/k8s.io/apiserver/pkg/apis/flowcontrol/bootstrap/default.go index 64600beca31..dbb932aa34a 100644 --- a/staging/src/k8s.io/apiserver/pkg/apis/flowcontrol/bootstrap/default.go +++ b/staging/src/k8s.io/apiserver/pkg/apis/flowcontrol/bootstrap/default.go @@ -455,8 +455,14 @@ var ( func newPriorityLevelConfiguration(name string, spec flowcontrol.PriorityLevelConfigurationSpec) *flowcontrol.PriorityLevelConfiguration { return &flowcontrol.PriorityLevelConfiguration{ - ObjectMeta: metav1.ObjectMeta{Name: name}, - Spec: spec} + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Annotations: map[string]string{ + flowcontrol.AutoUpdateAnnotationKey: "true", + }, + }, + Spec: spec, + } } func newFlowSchema(name, plName string, matchingPrecedence int32, dmType flowcontrol.FlowDistinguisherMethodType, rules ...flowcontrol.PolicyRulesWithSubjects) *flowcontrol.FlowSchema { @@ -465,7 +471,12 @@ func newFlowSchema(name, plName string, matchingPrecedence int32, dmType flowcon dm = &flowcontrol.FlowDistinguisherMethod{Type: dmType} } return &flowcontrol.FlowSchema{ - ObjectMeta: metav1.ObjectMeta{Name: name}, + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Annotations: map[string]string{ + flowcontrol.AutoUpdateAnnotationKey: "true", + }, + }, Spec: flowcontrol.FlowSchemaSpec{ PriorityLevelConfiguration: flowcontrol.PriorityLevelConfigurationReference{ Name: plName,