diff --git a/pkg/registry/flowcontrol/ensurer/flowschema.go b/pkg/registry/flowcontrol/ensurer/flowschema.go index 6cad99ffd91..4bc6e26f5cf 100644 --- a/pkg/registry/flowcontrol/ensurer/flowschema.go +++ b/pkg/registry/flowcontrol/ensurer/flowschema.go @@ -18,190 +18,128 @@ package ensurer import ( "context" - "errors" "fmt" flowcontrolv1beta3 "k8s.io/api/flowcontrol/v1beta3" "k8s.io/apimachinery/pkg/api/equality" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/util/sets" flowcontrolclient "k8s.io/client-go/kubernetes/typed/flowcontrol/v1beta3" flowcontrollisters "k8s.io/client-go/listers/flowcontrol/v1beta3" flowcontrolapisv1beta3 "k8s.io/kubernetes/pkg/apis/flowcontrol/v1beta3" ) -var ( - errObjectNotFlowSchema = errors.New("object is not a FlowSchema type") -) - -// FlowSchemaEnsurer ensures the specified bootstrap configuration objects -type FlowSchemaEnsurer interface { - Ensure([]*flowcontrolv1beta3.FlowSchema) error -} - -// FlowSchemaRemover is the interface that wraps the -// RemoveAutoUpdateEnabledObjects method. -// -// RemoveAutoUpdateEnabledObjects removes a set of bootstrap FlowSchema -// objects specified via their names. The function removes an object -// only if automatic update of the spec is enabled for it. -type FlowSchemaRemover interface { - RemoveAutoUpdateEnabledObjects([]string) error -} - -// NewSuggestedFlowSchemaEnsurer returns a FlowSchemaEnsurer instance that -// can be used to ensure a set of suggested FlowSchema configuration objects. -func NewSuggestedFlowSchemaEnsurer(client flowcontrolclient.FlowSchemaInterface, lister flowcontrollisters.FlowSchemaLister) FlowSchemaEnsurer { - wrapper := &flowSchemaWrapper{ - client: client, - lister: lister, - } - return &fsEnsurer{ - strategy: newSuggestedEnsureStrategy(wrapper), - wrapper: wrapper, - } -} - -// NewMandatoryFlowSchemaEnsurer returns a FlowSchemaEnsurer instance that -// can be used to ensure a set of mandatory FlowSchema configuration objects. -func NewMandatoryFlowSchemaEnsurer(client flowcontrolclient.FlowSchemaInterface, lister flowcontrollisters.FlowSchemaLister) FlowSchemaEnsurer { - wrapper := &flowSchemaWrapper{ - client: client, - lister: lister, - } - return &fsEnsurer{ - strategy: newMandatoryEnsureStrategy(wrapper), - wrapper: wrapper, - } -} - -// NewFlowSchemaRemover returns a FlowSchemaRemover instance that -// can be used to remove a set of FlowSchema configuration objects. -func NewFlowSchemaRemover(client flowcontrolclient.FlowSchemaInterface, lister flowcontrollisters.FlowSchemaLister) FlowSchemaRemover { - return &fsEnsurer{ - wrapper: &flowSchemaWrapper{ +// WrapBootstrapFlowSchemas creates a generic representation of the given bootstrap objects bound with their operations +// Every object in `boots` is immutable. +func WrapBootstrapFlowSchemas(client flowcontrolclient.FlowSchemaInterface, lister flowcontrollisters.FlowSchemaLister, boots []*flowcontrolv1beta3.FlowSchema) BootstrapObjects { + return &bootstrapFlowSchemas{ + flowSchemaClient: flowSchemaClient{ client: client, - lister: lister, - }, + lister: lister}, + boots: boots, } } -// GetFlowSchemaRemoveCandidates returns a list of FlowSchema object -// names that are candidates for deletion from the cluster. -// bootstrap: a set of hard coded FlowSchema configuration objects -// kube-apiserver maintains in-memory. -func GetFlowSchemaRemoveCandidates(lister flowcontrollisters.FlowSchemaLister, bootstrap []*flowcontrolv1beta3.FlowSchema) ([]string, error) { - fsList, err := lister.List(labels.Everything()) - if err != nil { - return nil, fmt.Errorf("failed to list FlowSchema - %w", err) - } - - bootstrapNames := sets.String{} - for i := range bootstrap { - bootstrapNames.Insert(bootstrap[i].GetName()) - } - - currentObjects := make([]metav1.Object, len(fsList)) - for i := range fsList { - currentObjects[i] = fsList[i] - } - - return getDanglingBootstrapObjectNames(bootstrapNames, currentObjects), nil -} - -type fsEnsurer struct { - strategy ensureStrategy - wrapper configurationWrapper -} - -func (e *fsEnsurer) Ensure(flowSchemas []*flowcontrolv1beta3.FlowSchema) error { - for _, flowSchema := range flowSchemas { - // This code gets called by different goroutines. To avoid race conditions when - // https://github.com/kubernetes/kubernetes/blob/330b5a2b8dbd681811cb8235947557c99dd8e593/staging/src/k8s.io/apimachinery/pkg/runtime/helper.go#L221-L243 - // temporarily modifies the TypeMeta, we have to make a copy here. - if err := ensureConfiguration(e.wrapper, e.strategy, flowSchema.DeepCopy()); err != nil { - return err - } - } - - return nil -} - -func (e *fsEnsurer) RemoveAutoUpdateEnabledObjects(flowSchemas []string) error { - for _, flowSchema := range flowSchemas { - if err := removeAutoUpdateEnabledConfiguration(e.wrapper, flowSchema); err != nil { - return err - } - } - - return nil -} - -// flowSchemaWrapper abstracts all FlowSchema specific logic, with this -// we can manage all boiler plate code in one place. -type flowSchemaWrapper struct { +type flowSchemaClient struct { client flowcontrolclient.FlowSchemaInterface lister flowcontrollisters.FlowSchemaLister } -func (fs *flowSchemaWrapper) TypeName() string { +type bootstrapFlowSchemas struct { + flowSchemaClient + + // Every member is a pointer to immutable content + boots []*flowcontrolv1beta3.FlowSchema +} + +func (*flowSchemaClient) typeName() string { return "FlowSchema" } -func (fs *flowSchemaWrapper) Create(object runtime.Object) (runtime.Object, error) { - fsObject, ok := object.(*flowcontrolv1beta3.FlowSchema) - if !ok { - return nil, errObjectNotFlowSchema - } - - return fs.client.Create(context.TODO(), fsObject, metav1.CreateOptions{FieldManager: fieldManager}) +func (boots *bootstrapFlowSchemas) len() int { + return len(boots.boots) } -func (fs *flowSchemaWrapper) Update(object runtime.Object) (runtime.Object, error) { - fsObject, ok := object.(*flowcontrolv1beta3.FlowSchema) - if !ok { - return nil, errObjectNotFlowSchema +func (boots *bootstrapFlowSchemas) get(i int) bootstrapObject { + return &bootstrapFlowSchema{ + flowSchemaClient: &boots.flowSchemaClient, + bootstrap: boots.boots[i], } - - return fs.client.Update(context.TODO(), fsObject, metav1.UpdateOptions{FieldManager: fieldManager}) } -func (fs *flowSchemaWrapper) Get(name string) (configurationObject, error) { - return fs.lister.Get(name) +func (boots *bootstrapFlowSchemas) getExistingObjects() ([]deletable, error) { + objs, err := boots.lister.List(labels.Everything()) + if err != nil { + return nil, fmt.Errorf("failed to list FlowSchema objects - %w", err) + } + dels := make([]deletable, len(objs)) + for i, obj := range objs { + dels[i] = &deletableFlowSchema{ + FlowSchema: obj, + client: boots.client, + } + } + return dels, nil } -func (fs *flowSchemaWrapper) Delete(name string) error { - return fs.client.Delete(context.TODO(), name, metav1.DeleteOptions{}) +type bootstrapFlowSchema struct { + *flowSchemaClient + + // points to immutable contnet + bootstrap *flowcontrolv1beta3.FlowSchema } -func (fs *flowSchemaWrapper) CopySpec(bootstrap, current runtime.Object) error { - bootstrapFS, ok := bootstrap.(*flowcontrolv1beta3.FlowSchema) - if !ok { - return errObjectNotFlowSchema - } - currentFS, ok := current.(*flowcontrolv1beta3.FlowSchema) - if !ok { - return errObjectNotFlowSchema - } - - specCopy := bootstrapFS.Spec.DeepCopy() - currentFS.Spec = *specCopy - return nil +func (boot *bootstrapFlowSchema) getName() string { + return boot.bootstrap.Name } -func (fs *flowSchemaWrapper) HasSpecChanged(bootstrap, current runtime.Object) (bool, error) { - bootstrapFS, ok := bootstrap.(*flowcontrolv1beta3.FlowSchema) - if !ok { - return false, errObjectNotFlowSchema - } - currentFS, ok := current.(*flowcontrolv1beta3.FlowSchema) - if !ok { - return false, errObjectNotFlowSchema - } +func (boot *bootstrapFlowSchema) create(ctx context.Context) error { + // Copy the object here because the Encoder in the client code may modify the object; see + // https://github.com/kubernetes/kubernetes/pull/117107 + // and WithVersionEncoder in apimachinery/pkg/runtime/helper.go. + _, err := boot.client.Create(ctx, boot.bootstrap.DeepCopy(), metav1.CreateOptions{FieldManager: fieldManager}) + return err +} - return flowSchemaSpecChanged(bootstrapFS, currentFS), nil +func (boot *bootstrapFlowSchema) getCurrent() (wantAndHave, error) { + current, err := boot.lister.Get(boot.bootstrap.Name) + if err != nil { + return nil, err + } + return &wantAndHaveFlowSchema{ + client: boot.client, + want: boot.bootstrap, + have: current, + }, nil +} + +type wantAndHaveFlowSchema struct { + client flowcontrolclient.FlowSchemaInterface + want *flowcontrolv1beta3.FlowSchema + have *flowcontrolv1beta3.FlowSchema +} + +func (wah *wantAndHaveFlowSchema) getWant() configurationObject { + return wah.want +} + +func (wah *wantAndHaveFlowSchema) getHave() configurationObject { + return wah.have +} + +func (wah *wantAndHaveFlowSchema) copyHave(specFromWant bool) updatable { + copy := wah.have.DeepCopy() + if specFromWant { + copy.Spec = *wah.want.Spec.DeepCopy() + } + return &updatableFlowSchema{ + FlowSchema: copy, + client: wah.client, + } +} + +func (wah *wantAndHaveFlowSchema) specsDiffer() bool { + return flowSchemaSpecChanged(wah.want, wah.have) } func flowSchemaSpecChanged(expected, actual *flowcontrolv1beta3.FlowSchema) bool { @@ -209,3 +147,23 @@ func flowSchemaSpecChanged(expected, actual *flowcontrolv1beta3.FlowSchema) bool flowcontrolapisv1beta3.SetObjectDefaults_FlowSchema(copiedExpectedFlowSchema) return !equality.Semantic.DeepEqual(copiedExpectedFlowSchema.Spec, actual.Spec) } + +type updatableFlowSchema struct { + *flowcontrolv1beta3.FlowSchema + client flowcontrolclient.FlowSchemaInterface +} + +func (u *updatableFlowSchema) update(ctx context.Context) error { + _, err := u.client.Update(ctx, u.FlowSchema, metav1.UpdateOptions{FieldManager: fieldManager}) + return err +} + +type deletableFlowSchema struct { + *flowcontrolv1beta3.FlowSchema + client flowcontrolclient.FlowSchemaInterface +} + +func (dbl *deletableFlowSchema) delete(ctx context.Context /* TODO: resourceVersion string */) error { + // return dbl.client.Delete(context.TODO(), dbl.Name, metav1.DeleteOptions{Preconditions: &metav1.Preconditions{ResourceVersion: &resourceVersion}}) + return dbl.client.Delete(ctx, dbl.Name, metav1.DeleteOptions{}) +} diff --git a/pkg/registry/flowcontrol/ensurer/flowschema_test.go b/pkg/registry/flowcontrol/ensurer/flowschema_test.go index ee1bc9bb4e9..439dca2908d 100644 --- a/pkg/registry/flowcontrol/ensurer/flowschema_test.go +++ b/pkg/registry/flowcontrol/ensurer/flowschema_test.go @@ -26,20 +26,23 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apiserver/pkg/apis/flowcontrol/bootstrap" "k8s.io/client-go/kubernetes/fake" - flowcontrolclient "k8s.io/client-go/kubernetes/typed/flowcontrol/v1beta3" flowcontrollisters "k8s.io/client-go/listers/flowcontrol/v1beta3" "k8s.io/client-go/tools/cache" + "k8s.io/klog/v2" flowcontrolapisv1beta3 "k8s.io/kubernetes/pkg/apis/flowcontrol/v1beta3" "github.com/google/go-cmp/cmp" - "github.com/google/go-cmp/cmp/cmpopts" "github.com/stretchr/testify/assert" ) +func init() { + klog.InitFlags(nil) +} + func TestEnsureFlowSchema(t *testing.T) { tests := []struct { name string - strategy func(flowcontrolclient.FlowSchemaInterface, flowcontrollisters.FlowSchemaLister) FlowSchemaEnsurer + strategy func() EnsureStrategy current *flowcontrolv1beta3.FlowSchema bootstrap *flowcontrolv1beta3.FlowSchema expected *flowcontrolv1beta3.FlowSchema @@ -47,21 +50,21 @@ func TestEnsureFlowSchema(t *testing.T) { // for suggested configurations { name: "suggested flow schema does not exist - the object should always be re-created", - strategy: NewSuggestedFlowSchemaEnsurer, + strategy: NewSuggestedEnsureStrategy, bootstrap: newFlowSchema("fs1", "pl1", 100).Object(), current: nil, expected: newFlowSchema("fs1", "pl1", 100).Object(), }, { name: "suggested flow schema exists, auto update is enabled, spec does not match - current object should be updated", - strategy: NewSuggestedFlowSchemaEnsurer, + strategy: NewSuggestedEnsureStrategy, bootstrap: newFlowSchema("fs1", "pl1", 100).Object(), current: newFlowSchema("fs1", "pl1", 200).WithAutoUpdateAnnotation("true").Object(), expected: newFlowSchema("fs1", "pl1", 100).WithAutoUpdateAnnotation("true").Object(), }, { name: "suggested flow schema exists, auto update is disabled, spec does not match - current object should not be updated", - strategy: NewSuggestedFlowSchemaEnsurer, + strategy: NewSuggestedEnsureStrategy, bootstrap: newFlowSchema("fs1", "pl1", 100).Object(), current: newFlowSchema("fs1", "pl1", 200).WithAutoUpdateAnnotation("false").Object(), expected: newFlowSchema("fs1", "pl1", 200).WithAutoUpdateAnnotation("false").Object(), @@ -70,21 +73,21 @@ func TestEnsureFlowSchema(t *testing.T) { // for mandatory configurations { name: "mandatory flow schema does not exist - new object should be created", - strategy: NewMandatoryFlowSchemaEnsurer, + strategy: NewMandatoryEnsureStrategy, bootstrap: newFlowSchema("fs1", "pl1", 100).WithAutoUpdateAnnotation("true").Object(), current: nil, expected: newFlowSchema("fs1", "pl1", 100).WithAutoUpdateAnnotation("true").Object(), }, { name: "mandatory flow schema exists, annotation is missing - annotation should be added", - strategy: NewMandatoryFlowSchemaEnsurer, + strategy: NewMandatoryEnsureStrategy, bootstrap: newFlowSchema("fs1", "pl1", 100).Object(), current: newFlowSchema("fs1", "pl1", 100).Object(), expected: newFlowSchema("fs1", "pl1", 100).WithAutoUpdateAnnotation("true").Object(), }, { name: "mandatory flow schema exists, auto update is disabled, spec does not match - current object should be updated", - strategy: NewMandatoryFlowSchemaEnsurer, + strategy: NewMandatoryEnsureStrategy, bootstrap: newFlowSchema("fs1", "pl1", 100).Object(), current: newFlowSchema("fs1", "pl1", 200).WithAutoUpdateAnnotation("false").Object(), expected: newFlowSchema("fs1", "pl1", 100).WithAutoUpdateAnnotation("true").Object(), @@ -100,9 +103,9 @@ func TestEnsureFlowSchema(t *testing.T) { indexer.Add(test.current) } - ensurer := test.strategy(client, flowcontrollisters.NewFlowSchemaLister(indexer)) - - err := ensurer.Ensure([]*flowcontrolv1beta3.FlowSchema{test.bootstrap}) + boots := WrapBootstrapFlowSchemas(client, flowcontrollisters.NewFlowSchemaLister(indexer), []*flowcontrolv1beta3.FlowSchema{test.bootstrap}) + strategy := test.strategy() + err := EnsureConfigurations(context.Background(), boots, strategy) if err != nil { t.Fatalf("Expected no error, but got: %v", err) } @@ -207,12 +210,19 @@ func TestSuggestedFSEnsureStrategy_ShouldUpdate(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - strategy := newSuggestedEnsureStrategy(&flowSchemaWrapper{}) - newObjectGot, updateGot, err := strategy.ShouldUpdate(test.current, test.bootstrap) + wah := &wantAndHaveFlowSchema{ + want: test.bootstrap, + have: test.current, + } + strategy := NewSuggestedEnsureStrategy() + updatableGot, updateGot, err := strategy.ShouldUpdate(wah) if err != nil { t.Errorf("Expected no error, but got: %v", err) } - + var newObjectGot *flowcontrolv1beta3.FlowSchema + if updatableGot != nil { + newObjectGot = updatableGot.(*updatableFlowSchema).FlowSchema + } if test.newObjectExpected == nil { if newObjectGot != nil { t.Errorf("Expected a nil object, but got: %#v", newObjectGot) @@ -288,24 +298,42 @@ func TestRemoveFlowSchema(t *testing.T) { removeExpected bool }{ { - name: "flow schema does not exist", + name: "no flow schema objects exist", bootstrapName: "fs1", current: nil, }, { - name: "flow schema exists, auto update is enabled", - bootstrapName: "fs1", + name: "flow schema unwanted, auto update is enabled", + bootstrapName: "fs0", current: newFlowSchema("fs1", "pl1", 200).WithAutoUpdateAnnotation("true").Object(), removeExpected: true, }, { - name: "flow schema exists, auto update is disabled", + name: "flow schema unwanted, auto update is disabled", + bootstrapName: "fs0", + current: newFlowSchema("fs1", "pl1", 200).WithAutoUpdateAnnotation("false").Object(), + removeExpected: false, + }, + { + name: "flow schema unwanted, the auto-update annotation is malformed", + bootstrapName: "fs0", + current: newFlowSchema("fs1", "pl1", 200).WithAutoUpdateAnnotation("invalid").Object(), + removeExpected: false, + }, + { + name: "flow schema wanted, auto update is enabled", + bootstrapName: "fs1", + current: newFlowSchema("fs1", "pl1", 200).WithAutoUpdateAnnotation("true").Object(), + removeExpected: false, + }, + { + name: "flow schema wanted, auto update is disabled", bootstrapName: "fs1", current: newFlowSchema("fs1", "pl1", 200).WithAutoUpdateAnnotation("false").Object(), removeExpected: false, }, { - name: "flow schema exists, the auto-update annotation is malformed", + name: "flow schema wanted, the auto-update annotation is malformed", bootstrapName: "fs1", current: newFlowSchema("fs1", "pl1", 200).WithAutoUpdateAnnotation("invalid").Object(), removeExpected: false, @@ -320,9 +348,10 @@ func TestRemoveFlowSchema(t *testing.T) { client.Create(context.TODO(), test.current, metav1.CreateOptions{}) indexer.Add(test.current) } + bootFS := newFlowSchema(test.bootstrapName, "pl", 100).Object() + boots := WrapBootstrapFlowSchemas(client, flowcontrollisters.NewFlowSchemaLister(indexer), []*flowcontrolv1beta3.FlowSchema{bootFS}) + err := RemoveUnwantedObjects(context.Background(), boots) - remover := NewFlowSchemaRemover(client, flowcontrollisters.NewFlowSchemaLister(indexer)) - err := remover.RemoveAutoUpdateEnabledObjects([]string{test.bootstrapName}) if err != nil { t.Fatalf("Expected no error, but got: %v", err) } @@ -330,100 +359,21 @@ func TestRemoveFlowSchema(t *testing.T) { if test.current == nil { return } - _, err = client.Get(context.TODO(), test.bootstrapName, metav1.GetOptions{}) + _, err = client.Get(context.TODO(), test.current.Name, metav1.GetOptions{}) switch { case test.removeExpected: if !apierrors.IsNotFound(err) { - t.Errorf("Expected error: %q, but got: %v", metav1.StatusReasonNotFound, err) + t.Errorf("Expected error from Get after Delete: %q, but got: %v", metav1.StatusReasonNotFound, err) } default: if err != nil { - t.Errorf("Expected no error, but got: %v", err) + t.Errorf("Expected no error from Get after Delete, but got: %v", err) } } }) } } -func TestGetFlowSchemaRemoveCandidate(t *testing.T) { - tests := []struct { - name string - current []*flowcontrolv1beta3.FlowSchema - bootstrap []*flowcontrolv1beta3.FlowSchema - expected []string - }{ - { - name: "no object has been removed from the bootstrap configuration", - bootstrap: []*flowcontrolv1beta3.FlowSchema{ - newFlowSchema("fs1", "pl1", 100).WithAutoUpdateAnnotation("true").Object(), - newFlowSchema("fs2", "pl2", 200).WithAutoUpdateAnnotation("true").Object(), - newFlowSchema("fs3", "pl3", 300).WithAutoUpdateAnnotation("true").Object(), - }, - current: []*flowcontrolv1beta3.FlowSchema{ - newFlowSchema("fs1", "pl1", 100).WithAutoUpdateAnnotation("true").Object(), - newFlowSchema("fs2", "pl2", 200).WithAutoUpdateAnnotation("true").Object(), - newFlowSchema("fs3", "pl3", 300).WithAutoUpdateAnnotation("true").Object(), - }, - expected: []string{}, - }, - { - name: "bootstrap is empty, all current objects with the annotation should be candidates", - bootstrap: []*flowcontrolv1beta3.FlowSchema{}, - current: []*flowcontrolv1beta3.FlowSchema{ - newFlowSchema("fs1", "pl1", 100).WithAutoUpdateAnnotation("true").Object(), - newFlowSchema("fs2", "pl2", 200).WithAutoUpdateAnnotation("true").Object(), - newFlowSchema("fs3", "pl3", 300).Object(), - }, - expected: []string{"fs1", "fs2"}, - }, - { - name: "object(s) have been removed from the bootstrap configuration", - bootstrap: []*flowcontrolv1beta3.FlowSchema{ - newFlowSchema("fs1", "pl1", 100).WithAutoUpdateAnnotation("true").Object(), - }, - current: []*flowcontrolv1beta3.FlowSchema{ - newFlowSchema("fs1", "pl1", 100).WithAutoUpdateAnnotation("true").Object(), - newFlowSchema("fs2", "pl2", 200).WithAutoUpdateAnnotation("true").Object(), - newFlowSchema("fs3", "pl3", 300).WithAutoUpdateAnnotation("true").Object(), - }, - expected: []string{"fs2", "fs3"}, - }, - { - name: "object(s) without the annotation key are ignored", - bootstrap: []*flowcontrolv1beta3.FlowSchema{ - newFlowSchema("fs1", "pl1", 100).WithAutoUpdateAnnotation("true").Object(), - }, - current: []*flowcontrolv1beta3.FlowSchema{ - newFlowSchema("fs1", "pl1", 100).WithAutoUpdateAnnotation("true").Object(), - newFlowSchema("fs2", "pl2", 200).Object(), - newFlowSchema("fs3", "pl3", 300).Object(), - }, - expected: []string{}, - }, - } - - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{}) - for i := range test.current { - indexer.Add(test.current[i]) - } - - lister := flowcontrollisters.NewFlowSchemaLister(indexer) - removeListGot, err := GetFlowSchemaRemoveCandidates(lister, test.bootstrap) - if err != nil { - t.Fatalf("Expected no error, but got: %v", err) - } - - if !cmp.Equal(test.expected, removeListGot, cmpopts.SortSlices(func(a string, b string) bool { - return a < b - })) { - t.Errorf("Remove candidate list does not match - diff: %s", cmp.Diff(test.expected, removeListGot)) - } - }) - } -} - type fsBuilder struct { object *flowcontrolv1beta3.FlowSchema } diff --git a/pkg/registry/flowcontrol/ensurer/prioritylevelconfiguration.go b/pkg/registry/flowcontrol/ensurer/prioritylevelconfiguration.go index a536d649d4c..09bf96b02b3 100644 --- a/pkg/registry/flowcontrol/ensurer/prioritylevelconfiguration.go +++ b/pkg/registry/flowcontrol/ensurer/prioritylevelconfiguration.go @@ -18,191 +18,128 @@ package ensurer import ( "context" - "errors" "fmt" flowcontrolv1beta3 "k8s.io/api/flowcontrol/v1beta3" "k8s.io/apimachinery/pkg/api/equality" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/util/sets" flowcontrolclient "k8s.io/client-go/kubernetes/typed/flowcontrol/v1beta3" flowcontrollisters "k8s.io/client-go/listers/flowcontrol/v1beta3" flowcontrolapisv1beta3 "k8s.io/kubernetes/pkg/apis/flowcontrol/v1beta3" ) -var ( - errObjectNotPriorityLevel = errors.New("object is not a PriorityLevelConfiguration type") -) - -// PriorityLevelEnsurer ensures the specified bootstrap configuration objects -type PriorityLevelEnsurer interface { - Ensure([]*flowcontrolv1beta3.PriorityLevelConfiguration) error -} - -// PriorityLevelRemover is the interface that wraps the -// RemoveAutoUpdateEnabledObjects method. -// -// RemoveAutoUpdateEnabledObjects removes a set of bootstrap -// PriorityLevelConfiguration objects specified via their names. -// The function removes an object only if automatic update -// of the spec is enabled for it. -type PriorityLevelRemover interface { - RemoveAutoUpdateEnabledObjects([]string) error -} - -// NewSuggestedPriorityLevelEnsurerEnsurer returns a PriorityLevelEnsurer instance that -// can be used to ensure a set of suggested PriorityLevelConfiguration configuration objects. -func NewSuggestedPriorityLevelEnsurerEnsurer(client flowcontrolclient.PriorityLevelConfigurationInterface, lister flowcontrollisters.PriorityLevelConfigurationLister) PriorityLevelEnsurer { - wrapper := &priorityLevelConfigurationWrapper{ - client: client, - lister: lister, - } - return &plEnsurer{ - strategy: newSuggestedEnsureStrategy(wrapper), - wrapper: wrapper, - } -} - -// NewMandatoryPriorityLevelEnsurer returns a PriorityLevelEnsurer instance that -// can be used to ensure a set of mandatory PriorityLevelConfiguration configuration objects. -func NewMandatoryPriorityLevelEnsurer(client flowcontrolclient.PriorityLevelConfigurationInterface, lister flowcontrollisters.PriorityLevelConfigurationLister) PriorityLevelEnsurer { - wrapper := &priorityLevelConfigurationWrapper{ - client: client, - lister: lister, - } - return &plEnsurer{ - strategy: newMandatoryEnsureStrategy(wrapper), - wrapper: wrapper, - } -} - -// NewPriorityLevelRemover returns a PriorityLevelRemover instance that -// can be used to remove a set of PriorityLevelConfiguration configuration objects. -func NewPriorityLevelRemover(client flowcontrolclient.PriorityLevelConfigurationInterface, lister flowcontrollisters.PriorityLevelConfigurationLister) PriorityLevelRemover { - return &plEnsurer{ - wrapper: &priorityLevelConfigurationWrapper{ +// WrapBootstrapPriorityLevelConfigurations creates a generic representation of the given bootstrap objects bound with their operations. +// Every object in `boots` is immutable. +func WrapBootstrapPriorityLevelConfigurations(client flowcontrolclient.PriorityLevelConfigurationInterface, lister flowcontrollisters.PriorityLevelConfigurationLister, boots []*flowcontrolv1beta3.PriorityLevelConfiguration) BootstrapObjects { + return &bootstrapPriorityLevelConfigurations{ + priorityLevelConfigurationClient: priorityLevelConfigurationClient{ client: client, - lister: lister, - }, + lister: lister}, + boots: boots, } } -// GetPriorityLevelRemoveCandidates returns a list of PriorityLevelConfiguration -// names that are candidates for removal from the cluster. -// bootstrap: a set of hard coded PriorityLevelConfiguration configuration -// objects kube-apiserver maintains in-memory. -func GetPriorityLevelRemoveCandidates(lister flowcontrollisters.PriorityLevelConfigurationLister, bootstrap []*flowcontrolv1beta3.PriorityLevelConfiguration) ([]string, error) { - plList, err := lister.List(labels.Everything()) - if err != nil { - return nil, fmt.Errorf("failed to list PriorityLevelConfiguration - %w", err) - } - - bootstrapNames := sets.String{} - for i := range bootstrap { - bootstrapNames.Insert(bootstrap[i].GetName()) - } - - currentObjects := make([]metav1.Object, len(plList)) - for i := range plList { - currentObjects[i] = plList[i] - } - - return getDanglingBootstrapObjectNames(bootstrapNames, currentObjects), nil -} - -type plEnsurer struct { - strategy ensureStrategy - wrapper configurationWrapper -} - -func (e *plEnsurer) Ensure(priorityLevels []*flowcontrolv1beta3.PriorityLevelConfiguration) error { - for _, priorityLevel := range priorityLevels { - // This code gets called by different goroutines. To avoid race conditions when - // https://github.com/kubernetes/kubernetes/blob/330b5a2b8dbd681811cb8235947557c99dd8e593/staging/src/k8s.io/apimachinery/pkg/runtime/helper.go#L221-L243 - // temporarily modifies the TypeMeta, we have to make a copy here. - if err := ensureConfiguration(e.wrapper, e.strategy, priorityLevel.DeepCopy()); err != nil { - return err - } - } - - return nil -} - -func (e *plEnsurer) RemoveAutoUpdateEnabledObjects(priorityLevels []string) error { - for _, priorityLevel := range priorityLevels { - if err := removeAutoUpdateEnabledConfiguration(e.wrapper, priorityLevel); err != nil { - return err - } - } - - return nil -} - -// priorityLevelConfigurationWrapper abstracts all PriorityLevelConfiguration specific logic, -// with this we can manage all boiler plate code in one place. -type priorityLevelConfigurationWrapper struct { +type priorityLevelConfigurationClient struct { client flowcontrolclient.PriorityLevelConfigurationInterface lister flowcontrollisters.PriorityLevelConfigurationLister } -func (fs *priorityLevelConfigurationWrapper) TypeName() string { +type bootstrapPriorityLevelConfigurations struct { + priorityLevelConfigurationClient + + // Every member is a pointer to immutable content + boots []*flowcontrolv1beta3.PriorityLevelConfiguration +} + +func (*priorityLevelConfigurationClient) typeName() string { return "PriorityLevelConfiguration" } -func (fs *priorityLevelConfigurationWrapper) Create(object runtime.Object) (runtime.Object, error) { - plObject, ok := object.(*flowcontrolv1beta3.PriorityLevelConfiguration) - if !ok { - return nil, errObjectNotPriorityLevel - } - - return fs.client.Create(context.TODO(), plObject, metav1.CreateOptions{FieldManager: fieldManager}) +func (boots *bootstrapPriorityLevelConfigurations) len() int { + return len(boots.boots) } -func (fs *priorityLevelConfigurationWrapper) Update(object runtime.Object) (runtime.Object, error) { - fsObject, ok := object.(*flowcontrolv1beta3.PriorityLevelConfiguration) - if !ok { - return nil, errObjectNotPriorityLevel +func (boots *bootstrapPriorityLevelConfigurations) get(i int) bootstrapObject { + return &bootstrapPriorityLevelConfiguration{ + priorityLevelConfigurationClient: &boots.priorityLevelConfigurationClient, + bootstrap: boots.boots[i], } - - return fs.client.Update(context.TODO(), fsObject, metav1.UpdateOptions{FieldManager: fieldManager}) } -func (fs *priorityLevelConfigurationWrapper) Get(name string) (configurationObject, error) { - return fs.lister.Get(name) +func (boots *bootstrapPriorityLevelConfigurations) getExistingObjects() ([]deletable, error) { + objs, err := boots.lister.List(labels.Everything()) + if err != nil { + return nil, fmt.Errorf("failed to list PriorityLevelConfiguration objects - %w", err) + } + dels := make([]deletable, len(objs)) + for i, obj := range objs { + dels[i] = &deletablePriorityLevelConfiguration{ + PriorityLevelConfiguration: obj, + client: boots.client, + } + } + return dels, nil } -func (fs *priorityLevelConfigurationWrapper) Delete(name string) error { - return fs.client.Delete(context.TODO(), name, metav1.DeleteOptions{}) +type bootstrapPriorityLevelConfiguration struct { + *priorityLevelConfigurationClient + + // points to immutable content + bootstrap *flowcontrolv1beta3.PriorityLevelConfiguration } -func (fs *priorityLevelConfigurationWrapper) CopySpec(bootstrap, current runtime.Object) error { - bootstrapFS, ok := bootstrap.(*flowcontrolv1beta3.PriorityLevelConfiguration) - if !ok { - return errObjectNotPriorityLevel - } - currentFS, ok := current.(*flowcontrolv1beta3.PriorityLevelConfiguration) - if !ok { - return errObjectNotPriorityLevel - } - - specCopy := bootstrapFS.Spec.DeepCopy() - currentFS.Spec = *specCopy - return nil +func (boot *bootstrapPriorityLevelConfiguration) getName() string { + return boot.bootstrap.Name } -func (fs *priorityLevelConfigurationWrapper) HasSpecChanged(bootstrap, current runtime.Object) (bool, error) { - bootstrapFS, ok := bootstrap.(*flowcontrolv1beta3.PriorityLevelConfiguration) - if !ok { - return false, errObjectNotPriorityLevel - } - currentFS, ok := current.(*flowcontrolv1beta3.PriorityLevelConfiguration) - if !ok { - return false, errObjectNotPriorityLevel - } +func (boot *bootstrapPriorityLevelConfiguration) create(ctx context.Context) error { + // Copy the object here because the Encoder in the client code may modify the object; see + // https://github.com/kubernetes/kubernetes/pull/117107 + // and WithVersionEncoder in apimachinery/pkg/runtime/helper.go. + _, err := boot.client.Create(ctx, boot.bootstrap.DeepCopy(), metav1.CreateOptions{FieldManager: fieldManager}) + return err +} - return priorityLevelSpecChanged(bootstrapFS, currentFS), nil +func (boot *bootstrapPriorityLevelConfiguration) getCurrent() (wantAndHave, error) { + current, err := boot.lister.Get(boot.bootstrap.Name) + if err != nil { + return nil, err + } + return &wantAndHavePriorityLevelConfiguration{ + client: boot.client, + want: boot.bootstrap, + have: current, + }, nil +} + +type wantAndHavePriorityLevelConfiguration struct { + client flowcontrolclient.PriorityLevelConfigurationInterface + want *flowcontrolv1beta3.PriorityLevelConfiguration + have *flowcontrolv1beta3.PriorityLevelConfiguration +} + +func (wah *wantAndHavePriorityLevelConfiguration) getWant() configurationObject { + return wah.want +} + +func (wah *wantAndHavePriorityLevelConfiguration) getHave() configurationObject { + return wah.have +} + +func (wah *wantAndHavePriorityLevelConfiguration) copyHave(specFromWant bool) updatable { + copy := wah.have.DeepCopy() + if specFromWant { + copy.Spec = *wah.want.Spec.DeepCopy() + } + return &updatablePriorityLevelConfiguration{ + PriorityLevelConfiguration: copy, + client: wah.client, + } +} + +func (wah *wantAndHavePriorityLevelConfiguration) specsDiffer() bool { + return priorityLevelSpecChanged(wah.want, wah.have) } func priorityLevelSpecChanged(expected, actual *flowcontrolv1beta3.PriorityLevelConfiguration) bool { @@ -210,3 +147,23 @@ func priorityLevelSpecChanged(expected, actual *flowcontrolv1beta3.PriorityLevel flowcontrolapisv1beta3.SetObjectDefaults_PriorityLevelConfiguration(copiedExpectedPriorityLevel) return !equality.Semantic.DeepEqual(copiedExpectedPriorityLevel.Spec, actual.Spec) } + +type updatablePriorityLevelConfiguration struct { + *flowcontrolv1beta3.PriorityLevelConfiguration + client flowcontrolclient.PriorityLevelConfigurationInterface +} + +func (u *updatablePriorityLevelConfiguration) update(ctx context.Context) error { + _, err := u.client.Update(ctx, u.PriorityLevelConfiguration, metav1.UpdateOptions{FieldManager: fieldManager}) + return err +} + +type deletablePriorityLevelConfiguration struct { + *flowcontrolv1beta3.PriorityLevelConfiguration + client flowcontrolclient.PriorityLevelConfigurationInterface +} + +func (dbl *deletablePriorityLevelConfiguration) delete(ctx context.Context /* resourceVersion string */) error { + // return dbl.client.Delete(context.TODO(), dbl.Name, metav1.DeleteOptions{Preconditions: &metav1.Preconditions{ResourceVersion: &resourceVersion}}) + return dbl.client.Delete(ctx, dbl.Name, metav1.DeleteOptions{}) +} diff --git a/pkg/registry/flowcontrol/ensurer/prioritylevelconfiguration_test.go b/pkg/registry/flowcontrol/ensurer/prioritylevelconfiguration_test.go index 8c87dceda98..05443f36b22 100644 --- a/pkg/registry/flowcontrol/ensurer/prioritylevelconfiguration_test.go +++ b/pkg/registry/flowcontrol/ensurer/prioritylevelconfiguration_test.go @@ -26,20 +26,18 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apiserver/pkg/apis/flowcontrol/bootstrap" "k8s.io/client-go/kubernetes/fake" - flowcontrolclient "k8s.io/client-go/kubernetes/typed/flowcontrol/v1beta3" flowcontrollisters "k8s.io/client-go/listers/flowcontrol/v1beta3" "k8s.io/client-go/tools/cache" flowcontrolapisv1beta3 "k8s.io/kubernetes/pkg/apis/flowcontrol/v1beta3" "k8s.io/utils/pointer" "github.com/google/go-cmp/cmp" - "github.com/google/go-cmp/cmp/cmpopts" ) func TestEnsurePriorityLevel(t *testing.T) { tests := []struct { name string - strategy func(flowcontrolclient.PriorityLevelConfigurationInterface, flowcontrollisters.PriorityLevelConfigurationLister) PriorityLevelEnsurer + strategy func() EnsureStrategy current *flowcontrolv1beta3.PriorityLevelConfiguration bootstrap *flowcontrolv1beta3.PriorityLevelConfiguration expected *flowcontrolv1beta3.PriorityLevelConfiguration @@ -47,21 +45,21 @@ func TestEnsurePriorityLevel(t *testing.T) { // for suggested configurations { name: "suggested priority level configuration does not exist - the object should always be re-created", - strategy: NewSuggestedPriorityLevelEnsurerEnsurer, + strategy: NewSuggestedEnsureStrategy, bootstrap: newPLConfiguration("pl1").WithLimited(10).Object(), current: nil, expected: newPLConfiguration("pl1").WithLimited(10).Object(), }, { name: "suggested priority level configuration exists, auto update is enabled, spec does not match - current object should be updated", - strategy: NewSuggestedPriorityLevelEnsurerEnsurer, + strategy: NewSuggestedEnsureStrategy, bootstrap: newPLConfiguration("pl1").WithLimited(20).Object(), current: newPLConfiguration("pl1").WithAutoUpdateAnnotation("true").WithLimited(10).Object(), expected: newPLConfiguration("pl1").WithAutoUpdateAnnotation("true").WithLimited(20).Object(), }, { name: "suggested priority level configuration exists, auto update is disabled, spec does not match - current object should not be updated", - strategy: NewSuggestedPriorityLevelEnsurerEnsurer, + strategy: NewSuggestedEnsureStrategy, bootstrap: newPLConfiguration("pl1").WithLimited(20).Object(), current: newPLConfiguration("pl1").WithAutoUpdateAnnotation("false").WithLimited(10).Object(), expected: newPLConfiguration("pl1").WithAutoUpdateAnnotation("false").WithLimited(10).Object(), @@ -70,21 +68,21 @@ func TestEnsurePriorityLevel(t *testing.T) { // for mandatory configurations { name: "mandatory priority level configuration does not exist - new object should be created", - strategy: NewMandatoryPriorityLevelEnsurer, + strategy: NewMandatoryEnsureStrategy, bootstrap: newPLConfiguration("pl1").WithLimited(10).WithAutoUpdateAnnotation("true").Object(), current: nil, expected: newPLConfiguration("pl1").WithLimited(10).WithAutoUpdateAnnotation("true").Object(), }, { name: "mandatory priority level configuration exists, annotation is missing - annotation is added", - strategy: NewMandatoryPriorityLevelEnsurer, + strategy: NewMandatoryEnsureStrategy, bootstrap: newPLConfiguration("pl1").WithLimited(20).Object(), current: newPLConfiguration("pl1").WithLimited(20).Object(), expected: newPLConfiguration("pl1").WithAutoUpdateAnnotation("true").WithLimited(20).Object(), }, { name: "mandatory priority level configuration exists, auto update is disabled, spec does not match - current object should be updated", - strategy: NewMandatoryPriorityLevelEnsurer, + strategy: NewMandatoryEnsureStrategy, bootstrap: newPLConfiguration("pl1").WithLimited(20).Object(), current: newPLConfiguration("pl1").WithAutoUpdateAnnotation("false").WithLimited(10).Object(), expected: newPLConfiguration("pl1").WithAutoUpdateAnnotation("true").WithLimited(20).Object(), @@ -100,9 +98,10 @@ func TestEnsurePriorityLevel(t *testing.T) { indexer.Add(test.current) } - ensurer := test.strategy(client, flowcontrollisters.NewPriorityLevelConfigurationLister(indexer)) + boots := WrapBootstrapPriorityLevelConfigurations(client, flowcontrollisters.NewPriorityLevelConfigurationLister(indexer), []*flowcontrolv1beta3.PriorityLevelConfiguration{test.bootstrap}) + strategy := test.strategy() - err := ensurer.Ensure([]*flowcontrolv1beta3.PriorityLevelConfiguration{test.bootstrap}) + err := EnsureConfigurations(context.Background(), boots, strategy) if err != nil { t.Fatalf("Expected no error, but got: %v", err) } @@ -207,12 +206,19 @@ func TestSuggestedPLEnsureStrategy_ShouldUpdate(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - strategy := newSuggestedEnsureStrategy(&priorityLevelConfigurationWrapper{}) - newObjectGot, updateGot, err := strategy.ShouldUpdate(test.current, test.bootstrap) + strategy := NewSuggestedEnsureStrategy() + wah := &wantAndHavePriorityLevelConfiguration{ + want: test.bootstrap, + have: test.current, + } + updatableGot, updateGot, err := strategy.ShouldUpdate(wah) if err != nil { t.Errorf("Expected no error, but got: %v", err) } - + var newObjectGot *flowcontrolv1beta3.PriorityLevelConfiguration + if updatableGot != nil { + newObjectGot = updatableGot.(*updatablePriorityLevelConfiguration).PriorityLevelConfiguration + } if test.newObjectExpected == nil { if newObjectGot != nil { t.Errorf("Expected a nil object, but got: %#v", newObjectGot) @@ -308,24 +314,42 @@ func TestRemovePriorityLevelConfiguration(t *testing.T) { removeExpected bool }{ { - name: "priority level configuration does not exist", + name: "no priority level configuration objects exist", bootstrapName: "pl1", current: nil, }, { - name: "priority level configuration exists, auto update is enabled", - bootstrapName: "pl1", + name: "priority level configuration not wanted, auto update is enabled", + bootstrapName: "pl0", current: newPLConfiguration("pl1").WithAutoUpdateAnnotation("true").Object(), removeExpected: true, }, { - name: "priority level configuration exists, auto update is disabled", + name: "priority level configuration not wanted, auto update is disabled", + bootstrapName: "pl0", + current: newPLConfiguration("pl1").WithAutoUpdateAnnotation("false").Object(), + removeExpected: false, + }, + { + name: "priority level configuration not wanted, the auto-update annotation is malformed", + bootstrapName: "pl0", + current: newPLConfiguration("pl1").WithAutoUpdateAnnotation("invalid").Object(), + removeExpected: false, + }, + { + name: "priority level configuration wanted, auto update is enabled", + bootstrapName: "pl1", + current: newPLConfiguration("pl1").WithAutoUpdateAnnotation("true").Object(), + removeExpected: false, + }, + { + name: "priority level configuration wanted, auto update is disabled", bootstrapName: "pl1", current: newPLConfiguration("pl1").WithAutoUpdateAnnotation("false").Object(), removeExpected: false, }, { - name: "priority level configuration exists, the auto-update annotation is malformed", + name: "priority level configuration wanted, the auto-update annotation is malformed", bootstrapName: "pl1", current: newPLConfiguration("pl1").WithAutoUpdateAnnotation("invalid").Object(), removeExpected: false, @@ -341,8 +365,9 @@ func TestRemovePriorityLevelConfiguration(t *testing.T) { indexer.Add(test.current) } - remover := NewPriorityLevelRemover(client, flowcontrollisters.NewPriorityLevelConfigurationLister(indexer)) - err := remover.RemoveAutoUpdateEnabledObjects([]string{test.bootstrapName}) + boot := newPLConfiguration(test.bootstrapName).Object() + boots := WrapBootstrapPriorityLevelConfigurations(client, flowcontrollisters.NewPriorityLevelConfigurationLister(indexer), []*flowcontrolv1beta3.PriorityLevelConfiguration{boot}) + err := RemoveUnwantedObjects(context.Background(), boots) if err != nil { t.Fatalf("Expected no error, but got: %v", err) } @@ -350,7 +375,7 @@ func TestRemovePriorityLevelConfiguration(t *testing.T) { if test.current == nil { return } - _, err = client.Get(context.TODO(), test.bootstrapName, metav1.GetOptions{}) + _, err = client.Get(context.TODO(), test.current.Name, metav1.GetOptions{}) switch { case test.removeExpected: if !apierrors.IsNotFound(err) { @@ -365,85 +390,6 @@ func TestRemovePriorityLevelConfiguration(t *testing.T) { } } -func TestGetPriorityLevelRemoveCandidate(t *testing.T) { - tests := []struct { - name string - current []*flowcontrolv1beta3.PriorityLevelConfiguration - bootstrap []*flowcontrolv1beta3.PriorityLevelConfiguration - expected []string - }{ - { - name: "no object has been removed from the bootstrap configuration", - bootstrap: []*flowcontrolv1beta3.PriorityLevelConfiguration{ - newPLConfiguration("pl1").WithAutoUpdateAnnotation("true").Object(), - newPLConfiguration("pl2").WithAutoUpdateAnnotation("true").Object(), - newPLConfiguration("pl3").WithAutoUpdateAnnotation("true").Object(), - }, - current: []*flowcontrolv1beta3.PriorityLevelConfiguration{ - newPLConfiguration("pl1").WithAutoUpdateAnnotation("true").Object(), - newPLConfiguration("pl2").WithAutoUpdateAnnotation("true").Object(), - newPLConfiguration("pl3").WithAutoUpdateAnnotation("true").Object(), - }, - expected: []string{}, - }, - { - name: "bootstrap is empty, all current objects with the annotation should be candidates", - bootstrap: []*flowcontrolv1beta3.PriorityLevelConfiguration{}, - current: []*flowcontrolv1beta3.PriorityLevelConfiguration{ - newPLConfiguration("pl1").WithAutoUpdateAnnotation("true").Object(), - newPLConfiguration("pl2").WithAutoUpdateAnnotation("true").Object(), - newPLConfiguration("pl3").Object(), - }, - expected: []string{"pl1", "pl2"}, - }, - { - name: "object(s) have been removed from the bootstrap configuration", - bootstrap: []*flowcontrolv1beta3.PriorityLevelConfiguration{ - newPLConfiguration("pl1").WithAutoUpdateAnnotation("true").Object(), - }, - current: []*flowcontrolv1beta3.PriorityLevelConfiguration{ - newPLConfiguration("pl1").WithAutoUpdateAnnotation("true").Object(), - newPLConfiguration("pl2").WithAutoUpdateAnnotation("true").Object(), - newPLConfiguration("pl3").WithAutoUpdateAnnotation("true").Object(), - }, - expected: []string{"pl2", "pl3"}, - }, - { - name: "object(s) without the annotation key are ignored", - bootstrap: []*flowcontrolv1beta3.PriorityLevelConfiguration{ - newPLConfiguration("pl1").WithAutoUpdateAnnotation("true").Object(), - }, - current: []*flowcontrolv1beta3.PriorityLevelConfiguration{ - newPLConfiguration("pl1").WithAutoUpdateAnnotation("true").Object(), - newPLConfiguration("pl2").Object(), - newPLConfiguration("pl3").Object(), - }, - expected: []string{}, - }, - } - - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{}) - for i := range test.current { - indexer.Add(test.current[i]) - } - - lister := flowcontrollisters.NewPriorityLevelConfigurationLister(indexer) - removeListGot, err := GetPriorityLevelRemoveCandidates(lister, test.bootstrap) - if err != nil { - t.Fatalf("Expected no error, but got: %v", err) - } - - if !cmp.Equal(test.expected, removeListGot, cmpopts.SortSlices(func(a string, b string) bool { - return a < b - })) { - t.Errorf("Remove candidate list does not match - diff: %s", cmp.Diff(test.expected, removeListGot)) - } - }) - } -} - type plBuilder struct { object *flowcontrolv1beta3.PriorityLevelConfiguration } diff --git a/pkg/registry/flowcontrol/ensurer/strategy.go b/pkg/registry/flowcontrol/ensurer/strategy.go index a46d097a79f..3cec8875aaa 100644 --- a/pkg/registry/flowcontrol/ensurer/strategy.go +++ b/pkg/registry/flowcontrol/ensurer/strategy.go @@ -17,26 +17,25 @@ limitations under the License. package ensurer import ( - "errors" + "context" "fmt" "strconv" + "github.com/google/go-cmp/cmp" flowcontrolv1beta3 "k8s.io/api/flowcontrol/v1beta3" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/klog/v2" - - "github.com/google/go-cmp/cmp" ) const ( fieldManager = "api-priority-and-fairness-config-producer-v1" ) -// ensureStrategy provides a strategy for ensuring apf bootstrap configurationWrapper. -// We have two types of configurationWrapper objects: +// EnsureStrategy provides a maintenance strategy for APF configuration objects. +// We have two types of strategy, corresponding to the two types of config objetcs: // // - mandatory: the mandatory configurationWrapper objects are about ensuring that the P&F // system itself won't crash; we have to be sure there's 'catch-all' place for @@ -45,58 +44,67 @@ const ( // // - suggested: additional configurationWrapper objects for initial behavior. // the cluster operators have an option to edit or delete these configurationWrapper objects. -type ensureStrategy interface { +type EnsureStrategy interface { // Name of the strategy, for now we have two: 'mandatory' and 'suggested'. // This comes handy in logging. Name() string - // ShouldUpdate accepts the current and the bootstrap configuration and determines + // ShouldUpdate accepts a pair of the current and the bootstrap configuration and determines // whether an update is necessary. // current is the existing in-cluster configuration object. // bootstrap is the configuration the kube-apiserver maintains in-memory. // + // revised: the new object represents the new configuration to be stored in-cluster. // ok: true if auto update is required, otherwise false - // object: the new object represents the new configuration to be stored in-cluster. // err: err is set when the function runs into an error and can not - // determine if auto update is needed. - ShouldUpdate(current, bootstrap configurationObject) (object runtime.Object, ok bool, err error) + // determine if auto update is needed. + ShouldUpdate(wantAndHave) (revised updatable, ok bool, err error) } -// this internal interface provides abstraction for dealing with the `Spec` -// of both 'FlowSchema' and 'PriorityLevelConfiguration' objects. -// Since the ensure logic for both types is common, we use a few internal interfaces -// to abstract out the differences of these two types. -type specCopier interface { - // HasSpecChanged returns true if the spec of both the bootstrap and - // the current configuration object is same, otherwise false. - HasSpecChanged(bootstrap, current runtime.Object) (bool, error) - - // CopySpec makes a deep copy the spec of the bootstrap object - // and copies it to that of the current object. - // CopySpec assumes that the current object is safe to mutate, so it - // rests with the caller to make a deep copy of the current. - CopySpec(bootstrap, current runtime.Object) error +// BootstrapObjects is a generic interface to a list of bootstrap objects bound up with the relevant operations on them. +// The binding makes it unnecessary to have any type casts. +// A bootstrap object is a mandatory or suggested config object, +// with the spec that the code is built to provide. +type BootstrapObjects interface { + typeName() string // the Kind of the objects + len() int // number of objects + get(int) bootstrapObject // extract one object, origin 0 + getExistingObjects() ([]deletable, error) // returns all the APF config objects that exist at the moment } -// this internal interface provides abstraction for CRUD operation -// related to both 'FlowSchema' and 'PriorityLevelConfiguration' objects. -// Since the ensure logic for both types is common, we use a few internal interfaces -// to abstract out the differences of these two types. -type configurationClient interface { - Create(object runtime.Object) (runtime.Object, error) - Update(object runtime.Object) (runtime.Object, error) - Get(name string) (configurationObject, error) - Delete(name string) error +// deletable is an existing config object and it supports the delete operation +type deletable interface { + configurationObject + delete(context.Context) error // delete the object. TODO: make conditional on ResouceVersion } -type configurationWrapper interface { - // TypeName returns the type of the configuration that this interface deals with. - // We use it to log the type name of the configuration object being ensured. - // It is either 'PriorityLevelConfiguration' or 'FlowSchema' - TypeName() string +// bootstrapObject is a single bootstrap object. +// Its spec is what the code provides. +type bootstrapObject interface { + typeName() string // the Kind of the object + getName() string // the object's name + create(context.Context) error // request the server to create the object + getCurrent() (wantAndHave, error) // pair up with the object as it currently exists +} - configurationClient - specCopier +// wantAndHave is a pair of versions of an APF config object. +// The "want" has the spec that the code provides. +// The "have" is what came from the server. +type wantAndHave interface { + getWant() configurationObject + getHave() configurationObject + + specsDiffer() bool + + // copyHave returns a copy of the "have" version, + // optionally with spec replaced by the spec from "want". + copyHave(specFromWant bool) updatable +} + +// updatable is an APF config object that can be written back to the apiserver +type updatable interface { + configurationObject + update(context.Context) error } // A convenient wrapper interface that is used by the ensure logic. @@ -105,17 +113,17 @@ type configurationObject interface { runtime.Object } -func newSuggestedEnsureStrategy(copier specCopier) ensureStrategy { +// NewSuggestedEnsureStrategy returns an EnsureStrategy for suggested config objects +func NewSuggestedEnsureStrategy() EnsureStrategy { return &strategy{ - copier: copier, alwaysAutoUpdateSpec: false, name: "suggested", } } -func newMandatoryEnsureStrategy(copier specCopier) ensureStrategy { +// NewMandatoryEnsureStrategy returns an EnsureStrategy for mandatory config objects +func NewMandatoryEnsureStrategy() EnsureStrategy { return &strategy{ - copier: copier, alwaysAutoUpdateSpec: true, name: "mandatory", } @@ -123,7 +131,6 @@ func newMandatoryEnsureStrategy(copier specCopier) ensureStrategy { // auto-update strategy for the configuration objects type strategy struct { - copier specCopier alwaysAutoUpdateSpec bool name string } @@ -132,8 +139,10 @@ func (s *strategy) Name() string { return s.name } -func (s *strategy) ShouldUpdate(current, bootstrap configurationObject) (runtime.Object, bool, error) { - if current == nil || bootstrap == nil { +func (s *strategy) ShouldUpdate(wah wantAndHave) (updatable, bool, error) { + current := wah.getHave() + + if current == nil { return nil, false, nil } @@ -143,39 +152,23 @@ func (s *strategy) ShouldUpdate(current, bootstrap configurationObject) (runtime } updateAnnotation := shouldUpdateAnnotation(current, autoUpdateSpec) - var specChanged bool - if autoUpdateSpec { - changed, err := s.copier.HasSpecChanged(bootstrap, current) - if err != nil { - return nil, false, fmt.Errorf("failed to compare spec - %w", err) - } - specChanged = changed - } + specChanged := autoUpdateSpec && wah.specsDiffer() if !(updateAnnotation || specChanged) { // the annotation key is up to date and the spec has not changed, no update is necessary return nil, false, nil } - // if we are here, either we need to update the annotation key or the spec. - copy, ok := current.DeepCopyObject().(configurationObject) - if !ok { - // we should never be here - return nil, false, errors.New("incompatible object type") - } - + revised := wah.copyHave(specChanged) if updateAnnotation { - setAutoUpdateAnnotation(copy, autoUpdateSpec) - } - if specChanged { - s.copier.CopySpec(bootstrap, copy) + setAutoUpdateAnnotation(revised, autoUpdateSpec) } - return copy, true, nil + return revised, true, nil } // shouldUpdateSpec inspects the auto-update annotation key and generation field to determine -// whether the configurationWrapper object should be auto-updated. +// whether the config object should be auto-updated. func shouldUpdateSpec(accessor metav1.Object) bool { value, _ := accessor.GetAnnotations()[flowcontrolv1beta3.AutoUpdateAnnotationKey] if autoUpdate, err := strconv.ParseBool(value); err == nil { @@ -215,130 +208,130 @@ func setAutoUpdateAnnotation(accessor metav1.Object, autoUpdate bool) { accessor.GetAnnotations()[flowcontrolv1beta3.AutoUpdateAnnotationKey] = strconv.FormatBool(autoUpdate) } -// ensureConfiguration ensures the boostrap configurationWrapper on the cluster based on the specified strategy. -func ensureConfiguration(wrapper configurationWrapper, strategy ensureStrategy, bootstrap configurationObject) error { - name := bootstrap.GetName() +// EnsureConfigurations applies the given maintenance strategy to the given objects. +// At the first error, if any, it stops and returns that error. +func EnsureConfigurations(ctx context.Context, boots BootstrapObjects, strategy EnsureStrategy) error { + len := boots.len() + for i := 0; i < len; i++ { + bo := boots.get(i) + err := EnsureConfiguration(ctx, bo, strategy) + if err != nil { + return err + } + } + return nil +} + +// EnsureConfiguration applies the given maintenance strategy to the given object. +func EnsureConfiguration(ctx context.Context, bootstrap bootstrapObject, strategy EnsureStrategy) error { + name := bootstrap.getName() configurationType := strategy.Name() - var current configurationObject + var wah wantAndHave var err error for { - current, err = wrapper.Get(bootstrap.GetName()) + wah, err = bootstrap.getCurrent() if err == nil { break } if !apierrors.IsNotFound(err) { - return fmt.Errorf("failed to retrieve %s type=%s name=%q error=%w", wrapper.TypeName(), configurationType, name, err) + return fmt.Errorf("failed to retrieve %s type=%s name=%q error=%w", bootstrap.typeName(), configurationType, name, err) } // we always re-create a missing configuration object - if _, err = wrapper.Create(bootstrap); err == nil { - klog.V(2).InfoS(fmt.Sprintf("Successfully created %s", wrapper.TypeName()), "type", configurationType, "name", name) + if err = bootstrap.create(ctx); err == nil { + klog.V(2).InfoS(fmt.Sprintf("Successfully created %s", bootstrap.typeName()), "type", configurationType, "name", name) return nil } if !apierrors.IsAlreadyExists(err) { - return fmt.Errorf("cannot create %s type=%s name=%q error=%w", wrapper.TypeName(), configurationType, name, err) + return fmt.Errorf("cannot create %s type=%s name=%q error=%w", bootstrap.typeName(), configurationType, name, err) } - klog.V(5).InfoS(fmt.Sprintf("Something created the %s concurrently", wrapper.TypeName()), "type", configurationType, "name", name) + klog.V(5).InfoS(fmt.Sprintf("Something created the %s concurrently", bootstrap.typeName()), "type", configurationType, "name", name) } - klog.V(5).InfoS(fmt.Sprintf("The %s already exists, checking whether it is up to date", wrapper.TypeName()), "type", configurationType, "name", name) - newObject, update, err := strategy.ShouldUpdate(current, bootstrap) + klog.V(5).InfoS(fmt.Sprintf("The %s already exists, checking whether it is up to date", bootstrap.typeName()), "type", configurationType, "name", name) + newObject, update, err := strategy.ShouldUpdate(wah) if err != nil { - return fmt.Errorf("failed to determine whether auto-update is required for %s type=%s name=%q error=%w", wrapper.TypeName(), configurationType, name, err) + return fmt.Errorf("failed to determine whether auto-update is required for %s type=%s name=%q error=%w", bootstrap.typeName(), configurationType, name, err) } if !update { if klogV := klog.V(5); klogV.Enabled() { - klogV.InfoS("No update required", "wrapper", wrapper.TypeName(), "type", configurationType, "name", name, - "diff", cmp.Diff(current, bootstrap)) + klogV.InfoS("No update required", "wrapper", bootstrap.typeName(), "type", configurationType, "name", name, + "diff", cmp.Diff(wah.getHave(), wah.getWant())) } return nil } - if _, err = wrapper.Update(newObject); err == nil { - klog.V(2).Infof("Updated the %s type=%s name=%q diff: %s", wrapper.TypeName(), configurationType, name, cmp.Diff(current, newObject)) + if err = newObject.update(ctx); err == nil { + klog.V(2).Infof("Updated the %s type=%s name=%q diff: %s", bootstrap.typeName(), configurationType, name, cmp.Diff(wah.getHave(), wah.getWant())) return nil } if apierrors.IsConflict(err) { - klog.V(2).InfoS(fmt.Sprintf("Something updated the %s concurrently, I will check its spec later", wrapper.TypeName()), "type", configurationType, "name", name) + klog.V(2).InfoS(fmt.Sprintf("Something updated the %s concurrently, I will check its spec later", bootstrap.typeName()), "type", configurationType, "name", name) return nil } - return fmt.Errorf("failed to update the %s, will retry later type=%s name=%q error=%w", wrapper.TypeName(), configurationType, name, err) + return fmt.Errorf("failed to update the %s, will retry later type=%s name=%q error=%w", bootstrap.typeName(), configurationType, name, err) } -// removeAutoUpdateEnabledConfiguration makes an attempt to remove the given -// configuration object if automatic update of the spec is enabled for this object. -func removeAutoUpdateEnabledConfiguration(wrapper configurationWrapper, name string) error { - current, err := wrapper.Get(name) +// RemoveUnwantedObjects attempts to delete the configuration objects +// that exist, are annotated `apf.kubernetes.io/autoupdate-spec=true`, and do not +// have a name in the given set. A refusal due to concurrent update is logged +// and not considered an error; the object will be reconsidered later. +func RemoveUnwantedObjects(ctx context.Context, boots BootstrapObjects) error { + current, err := boots.getExistingObjects() if err != nil { - if apierrors.IsNotFound(err) { - return nil + return err + } + wantedNames := namesOfBootstrapObjects(boots) + for _, object := range current { + name := object.GetName() + if wantedNames.Has(name) { + continue } - - return fmt.Errorf("failed to retrieve the %s, will retry later name=%q error=%w", wrapper.TypeName(), name, err) - } - - value := current.GetAnnotations()[flowcontrolv1beta3.AutoUpdateAnnotationKey] - autoUpdate, err := strconv.ParseBool(value) - if err != nil { - klog.ErrorS(err, fmt.Sprintf("Skipping deletion of the %s", wrapper.TypeName()), "name", name) - - // This may need manual intervention, in case the annotation value is malformed, - // so don't return an error, that might trigger futile retry loop. - return nil - } - if !autoUpdate { - klog.V(5).InfoS(fmt.Sprintf("Skipping deletion of the %s", wrapper.TypeName()), "name", name) - return nil - } - - if err := wrapper.Delete(name); err != nil { - if apierrors.IsNotFound(err) { - klog.V(5).InfoS(fmt.Sprintf("Something concurrently deleted the %s", wrapper.TypeName()), "name", name) - return nil - } - - return fmt.Errorf("failed to delete the %s, will retry later name=%q error=%w", wrapper.TypeName(), name, err) - } - - klog.V(2).InfoS(fmt.Sprintf("Successfully deleted the %s", wrapper.TypeName()), "name", name) - return nil -} - -// getDanglingBootstrapObjectNames returns a list of names of bootstrap -// configuration objects that are potentially candidates for deletion from -// the cluster, given a set of bootstrap and current configuration. -// - bootstrap: a set of hard coded configuration kube-apiserver maintains in-memory. -// - current: a set of configuration objects that exist on the cluster -// -// Any object present in current is added to the list if both a and b are true: -// -// a. the object in current is missing from the bootstrap configuration -// b. the object has the designated auto-update annotation key -// -// This function shares the common logic for both FlowSchema and -// PriorityLevelConfiguration type and hence it accepts metav1.Object only. -func getDanglingBootstrapObjectNames(bootstrap sets.String, current []metav1.Object) []string { - if len(current) == 0 { - return nil - } - - candidates := make([]string, 0) - for i := range current { - object := current[i] - if _, ok := object.GetAnnotations()[flowcontrolv1beta3.AutoUpdateAnnotationKey]; !ok { + var value string + var ok, autoUpdate bool + var err error + if value, ok = object.GetAnnotations()[flowcontrolv1beta3.AutoUpdateAnnotationKey]; !ok { // the configuration object does not have the annotation key, // it's probably a user defined configuration object, // so we can skip it. + klog.V(5).InfoS("Skipping deletion of APF object with no "+flowcontrolv1beta3.AutoUpdateAnnotationKey+" annotation", "name", name) continue } - - if _, ok := bootstrap[object.GetName()]; !ok { - candidates = append(candidates, object.GetName()) + autoUpdate, err = strconv.ParseBool(value) + if err != nil { + // Log this because it is not an expected situation. + klog.V(4).InfoS("Skipping deletion of APF object with malformed "+flowcontrolv1beta3.AutoUpdateAnnotationKey+" annotation", "name", name, "annotationValue", value, "parseError", err) + continue + } + if !autoUpdate { + klog.V(5).InfoS("Skipping deletion of APF object with "+flowcontrolv1beta3.AutoUpdateAnnotationKey+"=false annotation", "name", name) + continue + } + // TODO: expectedResourceVersion := object.GetResourceVersion() + err = object.delete(ctx /* TODO: expectedResourceVersion */) + if err == nil { + klog.V(2).InfoS(fmt.Sprintf("Successfully deleted the unwanted %s", boots.typeName()), "name", name) + continue + } + if apierrors.IsNotFound(err) { + klog.V(5).InfoS("Unwanted APF object was concurrently deleted", "name", name) + } else { + return fmt.Errorf("failed to delete unwatned APF object %q - %w", name, err) } } - return candidates + return nil +} + +func namesOfBootstrapObjects(bos BootstrapObjects) sets.String { + names := sets.NewString() + len := bos.len() + for i := 0; i < len; i++ { + bo := bos.get(i) + names.Insert(bo.getName()) + } + return names } diff --git a/pkg/registry/flowcontrol/rest/storage_flowcontrol.go b/pkg/registry/flowcontrol/rest/storage_flowcontrol.go index 5fed585a571..f3ae49a0850 100644 --- a/pkg/registry/flowcontrol/rest/storage_flowcontrol.go +++ b/pkg/registry/flowcontrol/rest/storage_flowcontrol.go @@ -141,36 +141,43 @@ func (bce *bootstrapConfigurationEnsurer) ensureAPFBootstrapConfiguration(hookCo return fmt.Errorf("failed to initialize clientset for APF - %w", err) } - // get a derived context that gets cancelled after 5m or - // when the StopCh gets closed, whichever happens first. - ctx, cancel := contextFromChannelAndMaxWaitDuration(hookContext.StopCh, 5*time.Minute) - defer cancel() + err = func() error { + // get a derived context that gets cancelled after 5m or + // when the StopCh gets closed, whichever happens first. + ctx, cancel := contextFromChannelAndMaxWaitDuration(hookContext.StopCh, 5*time.Minute) + defer cancel() - if !cache.WaitForCacheSync(ctx.Done(), bce.informersSynced...) { - return fmt.Errorf("APF bootstrap ensurer timed out waiting for cache sync") - } + if !cache.WaitForCacheSync(ctx.Done(), bce.informersSynced...) { + return fmt.Errorf("APF bootstrap ensurer timed out waiting for cache sync") + } - err = wait.PollImmediateUntilWithContext( - ctx, - time.Second, - func(context.Context) (bool, error) { - if err := ensure(clientset, bce.fsLister, bce.plcLister); err != nil { - klog.ErrorS(err, "APF bootstrap ensurer ran into error, will retry later") - return false, nil - } - return true, nil - }) + err = wait.PollImmediateUntilWithContext( + ctx, + time.Second, + func(context.Context) (bool, error) { + if err := ensure(ctx, clientset, bce.fsLister, bce.plcLister); err != nil { + klog.ErrorS(err, "APF bootstrap ensurer ran into error, will retry later") + return false, nil + } + return true, nil + }) + if err != nil { + return fmt.Errorf("unable to initialize APF bootstrap configuration: %w", err) + } + return nil + }() if err != nil { - return fmt.Errorf("unable to initialize APF bootstrap configuration") + return err } // we have successfully initialized the bootstrap configuration, now we // spin up a goroutine which reconciles the bootstrap configuration periodically. go func() { + ctx := wait.ContextForChannel(hookContext.StopCh) wait.PollImmediateUntil( time.Minute, func() (bool, error) { - if err := ensure(clientset, bce.fsLister, bce.plcLister); err != nil { + if err := ensure(ctx, clientset, bce.fsLister, bce.plcLister); err != nil { klog.ErrorS(err, "APF bootstrap ensurer ran into error, will retry later") } // always auto update both suggested and mandatory configuration @@ -182,79 +189,64 @@ func (bce *bootstrapConfigurationEnsurer) ensureAPFBootstrapConfiguration(hookCo return nil } -func ensure(clientset flowcontrolclient.FlowcontrolV1beta3Interface, fsLister flowcontrollisters.FlowSchemaLister, plcLister flowcontrollisters.PriorityLevelConfigurationLister) error { - if err := ensureSuggestedConfiguration(clientset, fsLister, plcLister); err != nil { +func ensure(ctx context.Context, clientset flowcontrolclient.FlowcontrolV1beta3Interface, fsLister flowcontrollisters.FlowSchemaLister, plcLister flowcontrollisters.PriorityLevelConfigurationLister) error { + + if err := ensureSuggestedConfiguration(ctx, clientset, fsLister, plcLister); err != nil { // We should not attempt creation of mandatory objects if ensuring the suggested // configuration resulted in an error. // This only happens when the stop channel is closed. return fmt.Errorf("failed ensuring suggested settings - %w", err) } - if err := ensureMandatoryConfiguration(clientset, fsLister, plcLister); err != nil { + if err := ensureMandatoryConfiguration(ctx, clientset, fsLister, plcLister); err != nil { return fmt.Errorf("failed ensuring mandatory settings - %w", err) } - if err := removeDanglingBootstrapConfiguration(clientset, fsLister, plcLister); err != nil { + if err := removeDanglingBootstrapConfiguration(ctx, clientset, fsLister, plcLister); err != nil { return fmt.Errorf("failed to delete removed settings - %w", err) } return nil } -func ensureSuggestedConfiguration(clientset flowcontrolclient.FlowcontrolV1beta3Interface, fsLister flowcontrollisters.FlowSchemaLister, plcLister flowcontrollisters.PriorityLevelConfigurationLister) error { - plEnsurer := ensurer.NewSuggestedPriorityLevelEnsurerEnsurer(clientset.PriorityLevelConfigurations(), plcLister) - if err := plEnsurer.Ensure(flowcontrolbootstrap.SuggestedPriorityLevelConfigurations); err != nil { +func ensureSuggestedConfiguration(ctx context.Context, clientset flowcontrolclient.FlowcontrolV1beta3Interface, fsLister flowcontrollisters.FlowSchemaLister, plcLister flowcontrollisters.PriorityLevelConfigurationLister) error { + plcSuggesteds := ensurer.WrapBootstrapPriorityLevelConfigurations(clientset.PriorityLevelConfigurations(), plcLister, flowcontrolbootstrap.SuggestedPriorityLevelConfigurations) + if err := ensurer.EnsureConfigurations(ctx, plcSuggesteds, ensurer.NewSuggestedEnsureStrategy()); err != nil { return err } - fsEnsurer := ensurer.NewSuggestedFlowSchemaEnsurer(clientset.FlowSchemas(), fsLister) - return fsEnsurer.Ensure(flowcontrolbootstrap.SuggestedFlowSchemas) + fsSuggesteds := ensurer.WrapBootstrapFlowSchemas(clientset.FlowSchemas(), fsLister, flowcontrolbootstrap.SuggestedFlowSchemas) + return ensurer.EnsureConfigurations(ctx, fsSuggesteds, ensurer.NewSuggestedEnsureStrategy()) } -func ensureMandatoryConfiguration(clientset flowcontrolclient.FlowcontrolV1beta3Interface, fsLister flowcontrollisters.FlowSchemaLister, plcLister flowcontrollisters.PriorityLevelConfigurationLister) error { - fsEnsurer := ensurer.NewMandatoryFlowSchemaEnsurer(clientset.FlowSchemas(), fsLister) - if err := fsEnsurer.Ensure(flowcontrolbootstrap.MandatoryFlowSchemas); err != nil { +func ensureMandatoryConfiguration(ctx context.Context, clientset flowcontrolclient.FlowcontrolV1beta3Interface, fsLister flowcontrollisters.FlowSchemaLister, plcLister flowcontrollisters.PriorityLevelConfigurationLister) error { + plcMandatories := ensurer.WrapBootstrapPriorityLevelConfigurations(clientset.PriorityLevelConfigurations(), plcLister, flowcontrolbootstrap.MandatoryPriorityLevelConfigurations) + if err := ensurer.EnsureConfigurations(ctx, plcMandatories, ensurer.NewMandatoryEnsureStrategy()); err != nil { return err } - plEnsurer := ensurer.NewMandatoryPriorityLevelEnsurer(clientset.PriorityLevelConfigurations(), plcLister) - return plEnsurer.Ensure(flowcontrolbootstrap.MandatoryPriorityLevelConfigurations) + fsMandatories := ensurer.WrapBootstrapFlowSchemas(clientset.FlowSchemas(), fsLister, flowcontrolbootstrap.MandatoryFlowSchemas) + return ensurer.EnsureConfigurations(ctx, fsMandatories, ensurer.NewMandatoryEnsureStrategy()) } -func removeDanglingBootstrapConfiguration(clientset flowcontrolclient.FlowcontrolV1beta3Interface, fsLister flowcontrollisters.FlowSchemaLister, plcLister flowcontrollisters.PriorityLevelConfigurationLister) error { - if err := removeDanglingBootstrapFlowSchema(clientset.FlowSchemas(), fsLister); err != nil { +func removeDanglingBootstrapConfiguration(ctx context.Context, clientset flowcontrolclient.FlowcontrolV1beta3Interface, fsLister flowcontrollisters.FlowSchemaLister, plcLister flowcontrollisters.PriorityLevelConfigurationLister) error { + if err := removeDanglingBootstrapFlowSchema(ctx, clientset, fsLister); err != nil { return err } - return removeDanglingBootstrapPriorityLevel(clientset.PriorityLevelConfigurations(), plcLister) + return removeDanglingBootstrapPriorityLevel(ctx, clientset, plcLister) } -func removeDanglingBootstrapFlowSchema(client flowcontrolclient.FlowSchemaInterface, lister flowcontrollisters.FlowSchemaLister) error { +func removeDanglingBootstrapFlowSchema(ctx context.Context, clientset flowcontrolclient.FlowcontrolV1beta3Interface, fsLister flowcontrollisters.FlowSchemaLister) error { bootstrap := append(flowcontrolbootstrap.MandatoryFlowSchemas, flowcontrolbootstrap.SuggestedFlowSchemas...) - candidates, err := ensurer.GetFlowSchemaRemoveCandidates(lister, bootstrap) - if err != nil { - return err - } - if len(candidates) == 0 { - return nil - } - - fsRemover := ensurer.NewFlowSchemaRemover(client, lister) - return fsRemover.RemoveAutoUpdateEnabledObjects(candidates) + fsBoots := ensurer.WrapBootstrapFlowSchemas(clientset.FlowSchemas(), fsLister, bootstrap) + return ensurer.RemoveUnwantedObjects(ctx, fsBoots) } -func removeDanglingBootstrapPriorityLevel(client flowcontrolclient.PriorityLevelConfigurationInterface, lister flowcontrollisters.PriorityLevelConfigurationLister) error { +func removeDanglingBootstrapPriorityLevel(ctx context.Context, clientset flowcontrolclient.FlowcontrolV1beta3Interface, plcLister flowcontrollisters.PriorityLevelConfigurationLister) error { bootstrap := append(flowcontrolbootstrap.MandatoryPriorityLevelConfigurations, flowcontrolbootstrap.SuggestedPriorityLevelConfigurations...) - candidates, err := ensurer.GetPriorityLevelRemoveCandidates(lister, bootstrap) - if err != nil { - return err - } - if len(candidates) == 0 { - return nil - } - - plRemover := ensurer.NewPriorityLevelRemover(client, lister) - return plRemover.RemoveAutoUpdateEnabledObjects(candidates) + plcBoots := ensurer.WrapBootstrapPriorityLevelConfigurations(clientset.PriorityLevelConfigurations(), plcLister, bootstrap) + return ensurer.RemoveUnwantedObjects(ctx, plcBoots) } // contextFromChannelAndMaxWaitDuration returns a Context that is bound to the